diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:22 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:22 +0000 |
commit | c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src-cpp | |
parent | Adding upstream version 1.43.2. (diff) | |
download | netdata-c21c3b0befeb46a51b6bf3758ffa30813bea0ff0.tar.xz netdata-c21c3b0befeb46a51b6bf3758ffa30813bea0ff0.zip |
Adding upstream version 1.44.3.upstream/1.44.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src-cpp')
17 files changed, 7365 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/CMakeLists.txt b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/CMakeLists.txt new file mode 100644 index 000000000..b0a6d51e4 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/CMakeLists.txt @@ -0,0 +1,90 @@ +set(LIBVER 1) + +set( + sources + ConfImpl.cpp + ConsumerImpl.cpp + HandleImpl.cpp + HeadersImpl.cpp + KafkaConsumerImpl.cpp + MessageImpl.cpp + MetadataImpl.cpp + ProducerImpl.cpp + QueueImpl.cpp + RdKafka.cpp + TopicImpl.cpp + TopicPartitionImpl.cpp +) + +if(RDKAFKA_BUILD_STATIC) + set(CMAKE_POSITION_INDEPENDENT_CODE ON) + set(RDKAFKA_BUILD_MODE STATIC) +else() + set(RDKAFKA_BUILD_MODE SHARED) +endif() + +add_library(rdkafka++ ${RDKAFKA_BUILD_MODE} ${sources}) +if(NOT RDKAFKA_BUILD_STATIC) + set_property(TARGET rdkafka++ PROPERTY SOVERSION ${LIBVER}) +endif() + +target_link_libraries(rdkafka++ PUBLIC rdkafka) + +# Support '#include <rdkafcpp.h>' +target_include_directories(rdkafka++ PUBLIC "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}>") +if(NOT RDKAFKA_BUILD_STATIC) + target_compile_definitions(rdkafka++ PRIVATE LIBRDKAFKACPP_EXPORTS) +endif() + +# Generate pkg-config file +set(PKG_CONFIG_VERSION "${PROJECT_VERSION}") +if(NOT RDKAFKA_BUILD_STATIC) + set(PKG_CONFIG_NAME "librdkafka++") + set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library") + set(PKG_CONFIG_REQUIRES "rdkafka") + set(PKG_CONFIG_CFLAGS "-I\${includedir}") + set(PKG_CONFIG_LIBS "-L\${libdir} -lrdkafka++") + set(PKG_CONFIG_LIBS_PRIVATE "-lrdkafka") + configure_file( + "../packaging/cmake/rdkafka.pc.in" + "${GENERATED_DIR}/rdkafka++.pc" + @ONLY + ) + install( + FILES ${GENERATED_DIR}/rdkafka++.pc + DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig" + ) +else() + set(PKG_CONFIG_NAME "librdkafka++-static") + set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library (static)") + set(PKG_CONFIG_REQUIRES "") + set(PKG_CONFIG_CFLAGS "-I\${includedir} -DLIBRDKAFKA_STATICLIB") + set(PKG_CONFIG_LIBS "-L\${libdir} \${libdir}/librdkafka++.a") + if(WIN32) + string(APPEND PKG_CONFIG_LIBS " -lws2_32 -lsecur32 -lcrypt32") + endif() + + configure_file( + "../packaging/cmake/rdkafka.pc.in" + "${GENERATED_DIR}/rdkafka++-static.pc" + @ONLY + ) + install( + FILES ${GENERATED_DIR}/rdkafka++-static.pc + DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig" + ) +endif() + +install( + TARGETS rdkafka++ + EXPORT "${targets_export_name}" + LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}" + ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}" + RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}" + INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}" +) + +install( + FILES "rdkafkacpp.h" + DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/librdkafka" +) diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ConfImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ConfImpl.cpp new file mode 100644 index 000000000..53d7b30c5 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ConfImpl.cpp @@ -0,0 +1,84 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include <string> +#include <list> + +#include "rdkafkacpp_int.h" + + + +RdKafka::ConfImpl::ConfResult RdKafka::ConfImpl::set(const std::string &name, + const std::string &value, + std::string &errstr) { + rd_kafka_conf_res_t res; + char errbuf[512]; + + if (this->conf_type_ == CONF_GLOBAL) + res = rd_kafka_conf_set(this->rk_conf_, name.c_str(), value.c_str(), errbuf, + sizeof(errbuf)); + else + res = rd_kafka_topic_conf_set(this->rkt_conf_, name.c_str(), value.c_str(), + errbuf, sizeof(errbuf)); + + if (res != RD_KAFKA_CONF_OK) + errstr = errbuf; + + return static_cast<Conf::ConfResult>(res); +} + + +std::list<std::string> *RdKafka::ConfImpl::dump() { + const char **arrc; + size_t cnt; + std::list<std::string> *arr; + + if (rk_conf_) + arrc = rd_kafka_conf_dump(rk_conf_, &cnt); + else + arrc = rd_kafka_topic_conf_dump(rkt_conf_, &cnt); + + arr = new std::list<std::string>(); + for (int i = 0; i < static_cast<int>(cnt); i++) + arr->push_back(std::string(arrc[i])); + + rd_kafka_conf_dump_free(arrc, cnt); + return arr; +} + +RdKafka::Conf *RdKafka::Conf::create(ConfType type) { + ConfImpl *conf = new ConfImpl(type); + + if (type == CONF_GLOBAL) + conf->rk_conf_ = rd_kafka_conf_new(); + else + conf->rkt_conf_ = rd_kafka_topic_conf_new(); + + return conf; +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ConsumerImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ConsumerImpl.cpp new file mode 100644 index 000000000..b7f5e3b22 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ConsumerImpl.cpp @@ -0,0 +1,244 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include <string> +#include <list> +#include <cerrno> + +#include "rdkafkacpp_int.h" + +RdKafka::Consumer::~Consumer() { +} + +RdKafka::Consumer *RdKafka::Consumer::create(const RdKafka::Conf *conf, + std::string &errstr) { + char errbuf[512]; + const RdKafka::ConfImpl *confimpl = + dynamic_cast<const RdKafka::ConfImpl *>(conf); + RdKafka::ConsumerImpl *rkc = new RdKafka::ConsumerImpl(); + rd_kafka_conf_t *rk_conf = NULL; + + if (confimpl) { + if (!confimpl->rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + delete rkc; + return NULL; + } + + rkc->set_common_config(confimpl); + + rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_); + } + + rd_kafka_t *rk; + if (!(rk = + rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf, errbuf, sizeof(errbuf)))) { + errstr = errbuf; + // rd_kafka_new() takes ownership only if succeeds + if (rk_conf) + rd_kafka_conf_destroy(rk_conf); + delete rkc; + return NULL; + } + + rkc->rk_ = rk; + + + return rkc; +} + +int64_t RdKafka::Consumer::OffsetTail(int64_t offset) { + return RD_KAFKA_OFFSET_TAIL(offset); +} + +RdKafka::ErrorCode RdKafka::ConsumerImpl::start(Topic *topic, + int32_t partition, + int64_t offset) { + RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); + + if (rd_kafka_consume_start(topicimpl->rkt_, partition, offset) == -1) + return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); + + return RdKafka::ERR_NO_ERROR; +} + + +RdKafka::ErrorCode RdKafka::ConsumerImpl::start(Topic *topic, + int32_t partition, + int64_t offset, + Queue *queue) { + RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); + RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue); + + if (rd_kafka_consume_start_queue(topicimpl->rkt_, partition, offset, + queueimpl->queue_) == -1) + return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); + + return RdKafka::ERR_NO_ERROR; +} + + +RdKafka::ErrorCode RdKafka::ConsumerImpl::stop(Topic *topic, + int32_t partition) { + RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); + + if (rd_kafka_consume_stop(topicimpl->rkt_, partition) == -1) + return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); + + return RdKafka::ERR_NO_ERROR; +} + +RdKafka::ErrorCode RdKafka::ConsumerImpl::seek(Topic *topic, + int32_t partition, + int64_t offset, + int timeout_ms) { + RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); + + if (rd_kafka_seek(topicimpl->rkt_, partition, offset, timeout_ms) == -1) + return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); + + return RdKafka::ERR_NO_ERROR; +} + +RdKafka::Message *RdKafka::ConsumerImpl::consume(Topic *topic, + int32_t partition, + int timeout_ms) { + RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); + rd_kafka_message_t *rkmessage; + + rkmessage = rd_kafka_consume(topicimpl->rkt_, partition, timeout_ms); + if (!rkmessage) + return new RdKafka::MessageImpl( + RD_KAFKA_CONSUMER, topic, + static_cast<RdKafka::ErrorCode>(rd_kafka_last_error())); + + return new RdKafka::MessageImpl(RD_KAFKA_CONSUMER, topic, rkmessage); +} + +namespace { +/* Helper struct for `consume_callback'. + * Encapsulates the values we need in order to call `rd_kafka_consume_callback' + * and keep track of the C++ callback function and `opaque' value. + */ +struct ConsumerImplCallback { + ConsumerImplCallback(RdKafka::Topic *topic, + RdKafka::ConsumeCb *cb, + void *data) : + topic(topic), cb_cls(cb), cb_data(data) { + } + /* This function is the one we give to `rd_kafka_consume_callback', with + * the `opaque' pointer pointing to an instance of this struct, in which + * we can find the C++ callback and `cb_data'. + */ + static void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) { + ConsumerImplCallback *instance = + static_cast<ConsumerImplCallback *>(opaque); + RdKafka::MessageImpl message(RD_KAFKA_CONSUMER, instance->topic, msg, + false /*don't free*/); + instance->cb_cls->consume_cb(message, instance->cb_data); + } + RdKafka::Topic *topic; + RdKafka::ConsumeCb *cb_cls; + void *cb_data; +}; +} // namespace + +int RdKafka::ConsumerImpl::consume_callback(RdKafka::Topic *topic, + int32_t partition, + int timeout_ms, + RdKafka::ConsumeCb *consume_cb, + void *opaque) { + RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(topic); + ConsumerImplCallback context(topic, consume_cb, opaque); + return rd_kafka_consume_callback(topicimpl->rkt_, partition, timeout_ms, + &ConsumerImplCallback::consume_cb_trampoline, + &context); +} + + +RdKafka::Message *RdKafka::ConsumerImpl::consume(Queue *queue, int timeout_ms) { + RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue); + rd_kafka_message_t *rkmessage; + + rkmessage = rd_kafka_consume_queue(queueimpl->queue_, timeout_ms); + if (!rkmessage) + return new RdKafka::MessageImpl( + RD_KAFKA_CONSUMER, NULL, + static_cast<RdKafka::ErrorCode>(rd_kafka_last_error())); + /* + * Recover our Topic * from the topic conf's opaque field, which we + * set in RdKafka::Topic::create() for just this kind of situation. + */ + void *opaque = rd_kafka_topic_opaque(rkmessage->rkt); + Topic *topic = static_cast<Topic *>(opaque); + + return new RdKafka::MessageImpl(RD_KAFKA_CONSUMER, topic, rkmessage); +} + +namespace { +/* Helper struct for `consume_callback' with a Queue. + * Encapsulates the values we need in order to call `rd_kafka_consume_callback' + * and keep track of the C++ callback function and `opaque' value. + */ +struct ConsumerImplQueueCallback { + ConsumerImplQueueCallback(RdKafka::ConsumeCb *cb, void *data) : + cb_cls(cb), cb_data(data) { + } + /* This function is the one we give to `rd_kafka_consume_callback', with + * the `opaque' pointer pointing to an instance of this struct, in which + * we can find the C++ callback and `cb_data'. + */ + static void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) { + ConsumerImplQueueCallback *instance = + static_cast<ConsumerImplQueueCallback *>(opaque); + /* + * Recover our Topic * from the topic conf's opaque field, which we + * set in RdKafka::Topic::create() for just this kind of situation. + */ + void *topic_opaque = rd_kafka_topic_opaque(msg->rkt); + RdKafka::Topic *topic = static_cast<RdKafka::Topic *>(topic_opaque); + RdKafka::MessageImpl message(RD_KAFKA_CONSUMER, topic, msg, + false /*don't free*/); + instance->cb_cls->consume_cb(message, instance->cb_data); + } + RdKafka::ConsumeCb *cb_cls; + void *cb_data; +}; +} // namespace + +int RdKafka::ConsumerImpl::consume_callback(Queue *queue, + int timeout_ms, + RdKafka::ConsumeCb *consume_cb, + void *opaque) { + RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue); + ConsumerImplQueueCallback context(consume_cb, opaque); + return rd_kafka_consume_callback_queue( + queueimpl->queue_, timeout_ms, + &ConsumerImplQueueCallback::consume_cb_trampoline, &context); +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp new file mode 100644 index 000000000..7aa2f2939 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp @@ -0,0 +1,425 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include <string> +#include <list> + +#include "rdkafkacpp_int.h" + +void RdKafka::consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + RdKafka::Topic *topic = static_cast<Topic *>(rd_kafka_topic_opaque(msg->rkt)); + + RdKafka::MessageImpl message(RD_KAFKA_CONSUMER, topic, msg, + false /*don't free*/); + + handle->consume_cb_->consume_cb(message, opaque); +} + +void RdKafka::log_cb_trampoline(const rd_kafka_t *rk, + int level, + const char *fac, + const char *buf) { + if (!rk) { + rd_kafka_log_print(rk, level, fac, buf); + return; + } + + void *opaque = rd_kafka_opaque(rk); + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + + if (!handle->event_cb_) { + rd_kafka_log_print(rk, level, fac, buf); + return; + } + + RdKafka::EventImpl event(RdKafka::Event::EVENT_LOG, RdKafka::ERR_NO_ERROR, + static_cast<RdKafka::Event::Severity>(level), fac, + buf); + + handle->event_cb_->event_cb(event); +} + + +void RdKafka::error_cb_trampoline(rd_kafka_t *rk, + int err, + const char *reason, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + char errstr[512]; + bool is_fatal = false; + + if (err == RD_KAFKA_RESP_ERR__FATAL) { + /* Translate to underlying fatal error code and string */ + err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr)); + if (err) + reason = errstr; + is_fatal = true; + } + RdKafka::EventImpl event(RdKafka::Event::EVENT_ERROR, + static_cast<RdKafka::ErrorCode>(err), + RdKafka::Event::EVENT_SEVERITY_ERROR, NULL, reason); + event.fatal_ = is_fatal; + handle->event_cb_->event_cb(event); +} + + +void RdKafka::throttle_cb_trampoline(rd_kafka_t *rk, + const char *broker_name, + int32_t broker_id, + int throttle_time_ms, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + + RdKafka::EventImpl event(RdKafka::Event::EVENT_THROTTLE); + event.str_ = broker_name; + event.id_ = broker_id; + event.throttle_time_ = throttle_time_ms; + + handle->event_cb_->event_cb(event); +} + + +int RdKafka::stats_cb_trampoline(rd_kafka_t *rk, + char *json, + size_t json_len, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + + RdKafka::EventImpl event(RdKafka::Event::EVENT_STATS, RdKafka::ERR_NO_ERROR, + RdKafka::Event::EVENT_SEVERITY_INFO, NULL, json); + + handle->event_cb_->event_cb(event); + + return 0; +} + + +int RdKafka::socket_cb_trampoline(int domain, + int type, + int protocol, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + + return handle->socket_cb_->socket_cb(domain, type, protocol); +} + +int RdKafka::open_cb_trampoline(const char *pathname, + int flags, + mode_t mode, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + + return handle->open_cb_->open_cb(pathname, flags, static_cast<int>(mode)); +} + +void RdKafka::oauthbearer_token_refresh_cb_trampoline( + rd_kafka_t *rk, + const char *oauthbearer_config, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + + handle->oauthbearer_token_refresh_cb_->oauthbearer_token_refresh_cb( + handle, std::string(oauthbearer_config ? oauthbearer_config : "")); +} + + +int RdKafka::ssl_cert_verify_cb_trampoline(rd_kafka_t *rk, + const char *broker_name, + int32_t broker_id, + int *x509_error, + int depth, + const char *buf, + size_t size, + char *errstr, + size_t errstr_size, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + std::string errbuf; + + bool res = 0 != handle->ssl_cert_verify_cb_->ssl_cert_verify_cb( + std::string(broker_name), broker_id, x509_error, depth, + buf, size, errbuf); + + if (res) + return (int)res; + + size_t errlen = + errbuf.size() > errstr_size - 1 ? errstr_size - 1 : errbuf.size(); + + memcpy(errstr, errbuf.c_str(), errlen); + if (errstr_size > 0) + errstr[errlen] = '\0'; + + return (int)res; +} + + +RdKafka::ErrorCode RdKafka::HandleImpl::metadata(bool all_topics, + const Topic *only_rkt, + Metadata **metadatap, + int timeout_ms) { + const rd_kafka_metadata_t *cmetadatap = NULL; + + rd_kafka_topic_t *topic = + only_rkt ? static_cast<const TopicImpl *>(only_rkt)->rkt_ : NULL; + + const rd_kafka_resp_err_t rc = + rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms); + + *metadatap = (rc == RD_KAFKA_RESP_ERR_NO_ERROR) + ? new RdKafka::MetadataImpl(cmetadatap) + : NULL; + + return static_cast<RdKafka::ErrorCode>(rc); +} + +/** + * Convert a list of C partitions to C++ partitions + */ +static void c_parts_to_partitions( + const rd_kafka_topic_partition_list_t *c_parts, + std::vector<RdKafka::TopicPartition *> &partitions) { + partitions.resize(c_parts->cnt); + for (int i = 0; i < c_parts->cnt; i++) + partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]); +} + +static void free_partition_vector(std::vector<RdKafka::TopicPartition *> &v) { + for (unsigned int i = 0; i < v.size(); i++) + delete v[i]; + v.clear(); +} + +void RdKafka::rebalance_cb_trampoline( + rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *c_partitions, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + std::vector<RdKafka::TopicPartition *> partitions; + + c_parts_to_partitions(c_partitions, partitions); + + handle->rebalance_cb_->rebalance_cb( + dynamic_cast<RdKafka::KafkaConsumer *>(handle), + static_cast<RdKafka::ErrorCode>(err), partitions); + + free_partition_vector(partitions); +} + + +void RdKafka::offset_commit_cb_trampoline0( + rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *c_offsets, + void *opaque) { + OffsetCommitCb *cb = static_cast<RdKafka::OffsetCommitCb *>(opaque); + std::vector<RdKafka::TopicPartition *> offsets; + + if (c_offsets) + c_parts_to_partitions(c_offsets, offsets); + + cb->offset_commit_cb(static_cast<RdKafka::ErrorCode>(err), offsets); + + free_partition_vector(offsets); +} + +static void offset_commit_cb_trampoline( + rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *c_offsets, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + RdKafka::offset_commit_cb_trampoline0(rk, err, c_offsets, + handle->offset_commit_cb_); +} + + +void RdKafka::HandleImpl::set_common_config(const RdKafka::ConfImpl *confimpl) { + rd_kafka_conf_set_opaque(confimpl->rk_conf_, this); + + if (confimpl->event_cb_) { + rd_kafka_conf_set_log_cb(confimpl->rk_conf_, RdKafka::log_cb_trampoline); + rd_kafka_conf_set_error_cb(confimpl->rk_conf_, + RdKafka::error_cb_trampoline); + rd_kafka_conf_set_throttle_cb(confimpl->rk_conf_, + RdKafka::throttle_cb_trampoline); + rd_kafka_conf_set_stats_cb(confimpl->rk_conf_, + RdKafka::stats_cb_trampoline); + event_cb_ = confimpl->event_cb_; + } + + if (confimpl->oauthbearer_token_refresh_cb_) { + rd_kafka_conf_set_oauthbearer_token_refresh_cb( + confimpl->rk_conf_, RdKafka::oauthbearer_token_refresh_cb_trampoline); + oauthbearer_token_refresh_cb_ = confimpl->oauthbearer_token_refresh_cb_; + } + + if (confimpl->socket_cb_) { + rd_kafka_conf_set_socket_cb(confimpl->rk_conf_, + RdKafka::socket_cb_trampoline); + socket_cb_ = confimpl->socket_cb_; + } + + if (confimpl->ssl_cert_verify_cb_) { + rd_kafka_conf_set_ssl_cert_verify_cb( + confimpl->rk_conf_, RdKafka::ssl_cert_verify_cb_trampoline); + ssl_cert_verify_cb_ = confimpl->ssl_cert_verify_cb_; + } + + if (confimpl->open_cb_) { +#ifndef _WIN32 + rd_kafka_conf_set_open_cb(confimpl->rk_conf_, RdKafka::open_cb_trampoline); + open_cb_ = confimpl->open_cb_; +#endif + } + + if (confimpl->rebalance_cb_) { + rd_kafka_conf_set_rebalance_cb(confimpl->rk_conf_, + RdKafka::rebalance_cb_trampoline); + rebalance_cb_ = confimpl->rebalance_cb_; + } + + if (confimpl->offset_commit_cb_) { + rd_kafka_conf_set_offset_commit_cb(confimpl->rk_conf_, + offset_commit_cb_trampoline); + offset_commit_cb_ = confimpl->offset_commit_cb_; + } + + if (confimpl->consume_cb_) { + rd_kafka_conf_set_consume_cb(confimpl->rk_conf_, + RdKafka::consume_cb_trampoline); + consume_cb_ = confimpl->consume_cb_; + } +} + + +RdKafka::ErrorCode RdKafka::HandleImpl::pause( + std::vector<RdKafka::TopicPartition *> &partitions) { + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_resp_err_t err; + + c_parts = partitions_to_c_parts(partitions); + + err = rd_kafka_pause_partitions(rk_, c_parts); + + if (!err) + update_partitions_from_c_parts(partitions, c_parts); + + rd_kafka_topic_partition_list_destroy(c_parts); + + return static_cast<RdKafka::ErrorCode>(err); +} + + +RdKafka::ErrorCode RdKafka::HandleImpl::resume( + std::vector<RdKafka::TopicPartition *> &partitions) { + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_resp_err_t err; + + c_parts = partitions_to_c_parts(partitions); + + err = rd_kafka_resume_partitions(rk_, c_parts); + + if (!err) + update_partitions_from_c_parts(partitions, c_parts); + + rd_kafka_topic_partition_list_destroy(c_parts); + + return static_cast<RdKafka::ErrorCode>(err); +} + +RdKafka::Queue *RdKafka::HandleImpl::get_partition_queue( + const TopicPartition *part) { + rd_kafka_queue_t *rkqu; + rkqu = rd_kafka_queue_get_partition(rk_, part->topic().c_str(), + part->partition()); + + if (rkqu == NULL) + return NULL; + + return new QueueImpl(rkqu); +} + +RdKafka::ErrorCode RdKafka::HandleImpl::set_log_queue(RdKafka::Queue *queue) { + rd_kafka_queue_t *rkqu = NULL; + if (queue) { + QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue); + rkqu = queueimpl->queue_; + } + return static_cast<RdKafka::ErrorCode>(rd_kafka_set_log_queue(rk_, rkqu)); +} + +namespace RdKafka { + +rd_kafka_topic_partition_list_t *partitions_to_c_parts( + const std::vector<RdKafka::TopicPartition *> &partitions) { + rd_kafka_topic_partition_list_t *c_parts; + + c_parts = rd_kafka_topic_partition_list_new((int)partitions.size()); + + for (unsigned int i = 0; i < partitions.size(); i++) { + const RdKafka::TopicPartitionImpl *tpi = + dynamic_cast<const RdKafka::TopicPartitionImpl *>(partitions[i]); + rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add( + c_parts, tpi->topic_.c_str(), tpi->partition_); + rktpar->offset = tpi->offset_; + if (tpi->leader_epoch_ != -1) + rd_kafka_topic_partition_set_leader_epoch(rktpar, tpi->leader_epoch_); + } + + return c_parts; +} + + +/** + * @brief Update the application provided 'partitions' with info from 'c_parts' + */ +void update_partitions_from_c_parts( + std::vector<RdKafka::TopicPartition *> &partitions, + const rd_kafka_topic_partition_list_t *c_parts) { + for (int i = 0; i < c_parts->cnt; i++) { + rd_kafka_topic_partition_t *p = &c_parts->elems[i]; + + /* Find corresponding C++ entry */ + for (unsigned int j = 0; j < partitions.size(); j++) { + RdKafka::TopicPartitionImpl *pp = + dynamic_cast<RdKafka::TopicPartitionImpl *>(partitions[j]); + if (!strcmp(p->topic, pp->topic_.c_str()) && + p->partition == pp->partition_) { + pp->offset_ = p->offset; + pp->err_ = static_cast<RdKafka::ErrorCode>(p->err); + pp->leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(p); + } + } + } +} + +} // namespace RdKafka diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HeadersImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HeadersImpl.cpp new file mode 100644 index 000000000..b567ef36c --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HeadersImpl.cpp @@ -0,0 +1,48 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include <string> +#include <list> +#include <cerrno> + +#include "rdkafkacpp_int.h" + +RdKafka::Headers *RdKafka::Headers::create() { + return new RdKafka::HeadersImpl(); +} + +RdKafka::Headers *RdKafka::Headers::create(const std::vector<Header> &headers) { + if (headers.size() > 0) + return new RdKafka::HeadersImpl(headers); + else + return new RdKafka::HeadersImpl(); +} + +RdKafka::Headers::~Headers() { +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/KafkaConsumerImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/KafkaConsumerImpl.cpp new file mode 100644 index 000000000..6f3b81c72 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/KafkaConsumerImpl.cpp @@ -0,0 +1,296 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2015 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <string> +#include <vector> + +#include "rdkafkacpp_int.h" + +RdKafka::KafkaConsumer::~KafkaConsumer() { +} + +RdKafka::KafkaConsumer *RdKafka::KafkaConsumer::create( + const RdKafka::Conf *conf, + std::string &errstr) { + char errbuf[512]; + const RdKafka::ConfImpl *confimpl = + dynamic_cast<const RdKafka::ConfImpl *>(conf); + RdKafka::KafkaConsumerImpl *rkc = new RdKafka::KafkaConsumerImpl(); + rd_kafka_conf_t *rk_conf = NULL; + size_t grlen; + + if (!confimpl || !confimpl->rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + delete rkc; + return NULL; + } + + if (rd_kafka_conf_get(confimpl->rk_conf_, "group.id", NULL, &grlen) != + RD_KAFKA_CONF_OK || + grlen <= 1 /* terminating null only */) { + errstr = "\"group.id\" must be configured"; + delete rkc; + return NULL; + } + + rkc->set_common_config(confimpl); + + rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_); + + rd_kafka_t *rk; + if (!(rk = + rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf, errbuf, sizeof(errbuf)))) { + errstr = errbuf; + // rd_kafka_new() takes ownership only if succeeds + rd_kafka_conf_destroy(rk_conf); + delete rkc; + return NULL; + } + + rkc->rk_ = rk; + + /* Redirect handle queue to cgrp's queue to provide a single queue point */ + rd_kafka_poll_set_consumer(rk); + + return rkc; +} + + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::subscribe( + const std::vector<std::string> &topics) { + rd_kafka_topic_partition_list_t *c_topics; + rd_kafka_resp_err_t err; + + c_topics = rd_kafka_topic_partition_list_new((int)topics.size()); + + for (unsigned int i = 0; i < topics.size(); i++) + rd_kafka_topic_partition_list_add(c_topics, topics[i].c_str(), + RD_KAFKA_PARTITION_UA); + + err = rd_kafka_subscribe(rk_, c_topics); + + rd_kafka_topic_partition_list_destroy(c_topics); + + return static_cast<RdKafka::ErrorCode>(err); +} + + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::unsubscribe() { + return static_cast<RdKafka::ErrorCode>(rd_kafka_unsubscribe(this->rk_)); +} + +RdKafka::Message *RdKafka::KafkaConsumerImpl::consume(int timeout_ms) { + rd_kafka_message_t *rkmessage; + + rkmessage = rd_kafka_consumer_poll(this->rk_, timeout_ms); + + if (!rkmessage) + return new RdKafka::MessageImpl(RD_KAFKA_CONSUMER, NULL, + RdKafka::ERR__TIMED_OUT); + + return new RdKafka::MessageImpl(RD_KAFKA_CONSUMER, rkmessage); +} + + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::assignment( + std::vector<RdKafka::TopicPartition *> &partitions) { + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_resp_err_t err; + + if ((err = rd_kafka_assignment(rk_, &c_parts))) + return static_cast<RdKafka::ErrorCode>(err); + + partitions.resize(c_parts->cnt); + + for (int i = 0; i < c_parts->cnt; i++) + partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]); + + rd_kafka_topic_partition_list_destroy(c_parts); + + return RdKafka::ERR_NO_ERROR; +} + + + +bool RdKafka::KafkaConsumerImpl::assignment_lost() { + return rd_kafka_assignment_lost(rk_) ? true : false; +} + + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::subscription( + std::vector<std::string> &topics) { + rd_kafka_topic_partition_list_t *c_topics; + rd_kafka_resp_err_t err; + + if ((err = rd_kafka_subscription(rk_, &c_topics))) + return static_cast<RdKafka::ErrorCode>(err); + + topics.resize(c_topics->cnt); + for (int i = 0; i < c_topics->cnt; i++) + topics[i] = std::string(c_topics->elems[i].topic); + + rd_kafka_topic_partition_list_destroy(c_topics); + + return RdKafka::ERR_NO_ERROR; +} + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::assign( + const std::vector<TopicPartition *> &partitions) { + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_resp_err_t err; + + c_parts = partitions_to_c_parts(partitions); + + err = rd_kafka_assign(rk_, c_parts); + + rd_kafka_topic_partition_list_destroy(c_parts); + return static_cast<RdKafka::ErrorCode>(err); +} + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::unassign() { + return static_cast<RdKafka::ErrorCode>(rd_kafka_assign(rk_, NULL)); +} + + +RdKafka::Error *RdKafka::KafkaConsumerImpl::incremental_assign( + const std::vector<TopicPartition *> &partitions) { + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_error_t *c_error; + + c_parts = partitions_to_c_parts(partitions); + c_error = rd_kafka_incremental_assign(rk_, c_parts); + rd_kafka_topic_partition_list_destroy(c_parts); + + if (c_error) + return new ErrorImpl(c_error); + + return NULL; +} + + +RdKafka::Error *RdKafka::KafkaConsumerImpl::incremental_unassign( + const std::vector<TopicPartition *> &partitions) { + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_error_t *c_error; + + c_parts = partitions_to_c_parts(partitions); + c_error = rd_kafka_incremental_unassign(rk_, c_parts); + rd_kafka_topic_partition_list_destroy(c_parts); + + if (c_error) + return new ErrorImpl(c_error); + + return NULL; +} + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::committed( + std::vector<RdKafka::TopicPartition *> &partitions, + int timeout_ms) { + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_resp_err_t err; + + c_parts = partitions_to_c_parts(partitions); + + err = rd_kafka_committed(rk_, c_parts, timeout_ms); + + if (!err) { + update_partitions_from_c_parts(partitions, c_parts); + } + + rd_kafka_topic_partition_list_destroy(c_parts); + + return static_cast<RdKafka::ErrorCode>(err); +} + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::position( + std::vector<RdKafka::TopicPartition *> &partitions) { + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_resp_err_t err; + + c_parts = partitions_to_c_parts(partitions); + + err = rd_kafka_position(rk_, c_parts); + + if (!err) { + update_partitions_from_c_parts(partitions, c_parts); + } + + rd_kafka_topic_partition_list_destroy(c_parts); + + return static_cast<RdKafka::ErrorCode>(err); +} + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::seek( + const RdKafka::TopicPartition &partition, + int timeout_ms) { + const RdKafka::TopicPartitionImpl *p = + dynamic_cast<const RdKafka::TopicPartitionImpl *>(&partition); + rd_kafka_topic_t *rkt; + + if (!(rkt = rd_kafka_topic_new(rk_, p->topic_.c_str(), NULL))) + return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); + + /* FIXME: Use a C API that takes a topic_partition_list_t instead */ + RdKafka::ErrorCode err = static_cast<RdKafka::ErrorCode>( + rd_kafka_seek(rkt, p->partition_, p->offset_, timeout_ms)); + + rd_kafka_topic_destroy(rkt); + + return err; +} + + + +RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::close() { + return static_cast<RdKafka::ErrorCode>(rd_kafka_consumer_close(rk_)); +} + + +RdKafka::Error *RdKafka::KafkaConsumerImpl::close(Queue *queue) { + QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue); + rd_kafka_error_t *c_error; + + c_error = rd_kafka_consumer_close_queue(rk_, queueimpl->queue_); + if (c_error) + return new ErrorImpl(c_error); + + return NULL; +} + + +RdKafka::ConsumerGroupMetadata::~ConsumerGroupMetadata() { +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/Makefile b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/Makefile new file mode 100644 index 000000000..78ecb31f2 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/Makefile @@ -0,0 +1,55 @@ +PKGNAME= librdkafka +LIBNAME= librdkafka++ +LIBVER= 1 + +CXXSRCS= RdKafka.cpp ConfImpl.cpp HandleImpl.cpp \ + ConsumerImpl.cpp ProducerImpl.cpp KafkaConsumerImpl.cpp \ + TopicImpl.cpp TopicPartitionImpl.cpp MessageImpl.cpp \ + HeadersImpl.cpp QueueImpl.cpp MetadataImpl.cpp + +HDRS= rdkafkacpp.h + +OBJS= $(CXXSRCS:%.cpp=%.o) + + + +all: lib check + +# No linker script/symbol hiding for C++ library +DISABLE_LDS=y + +MKL_NO_SELFCONTAINED_STATIC_LIB=y +include ../mklove/Makefile.base + +# Use C++ compiler as linker rather than the default C compiler +CC_LD=$(CXX) + +# OSX and Cygwin requires linking required libraries +ifeq ($(_UNAME_S),Darwin) + FWD_LINKING_REQ=y +endif +ifeq ($(_UNAME_S),AIX) + FWD_LINKING_REQ=y +endif +ifeq ($(shell uname -o 2>/dev/null),Cygwin) + FWD_LINKING_REQ=y +endif + +# Ignore previously defined library dependencies for the C library, +# we'll get those dependencies through the C library linkage. +LIBS := -L../src -lrdkafka +MKL_PKGCONFIG_REQUIRES_PRIVATE := rdkafka +MKL_PKGCONFIG_REQUIRES := rdkafka + +CHECK_FILES+= $(LIBFILENAME) $(LIBNAME).a + + +file-check: lib +check: file-check + +install: lib-install +uninstall: lib-uninstall + +clean: lib-clean + +-include $(DEPS) diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/MessageImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/MessageImpl.cpp new file mode 100644 index 000000000..c6d83150f --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/MessageImpl.cpp @@ -0,0 +1,38 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include <string> +#include <list> +#include <cerrno> + +#include "rdkafkacpp_int.h" + + +RdKafka::Message::~Message() { +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/MetadataImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/MetadataImpl.cpp new file mode 100644 index 000000000..62cbf9042 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/MetadataImpl.cpp @@ -0,0 +1,170 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdkafkacpp_int.h" + +using namespace RdKafka; + +BrokerMetadata::~BrokerMetadata() { +} +PartitionMetadata::~PartitionMetadata() { +} +TopicMetadata::~TopicMetadata() { +} +Metadata::~Metadata() { +} + + +/** + * Metadata: Broker information handler implementation + */ +class BrokerMetadataImpl : public BrokerMetadata { + public: + BrokerMetadataImpl(const rd_kafka_metadata_broker_t *broker_metadata) : + broker_metadata_(broker_metadata), host_(broker_metadata->host) { + } + + int32_t id() const { + return broker_metadata_->id; + } + + std::string host() const { + return host_; + } + int port() const { + return broker_metadata_->port; + } + + virtual ~BrokerMetadataImpl() { + } + + private: + const rd_kafka_metadata_broker_t *broker_metadata_; + const std::string host_; +}; + +/** + * Metadata: Partition information handler + */ +class PartitionMetadataImpl : public PartitionMetadata { + public: + // @TODO too much memory copy? maybe we should create a new vector class that + // read directly from C arrays? + // @TODO use auto_ptr? + PartitionMetadataImpl( + const rd_kafka_metadata_partition_t *partition_metadata) : + partition_metadata_(partition_metadata) { + replicas_.reserve(partition_metadata->replica_cnt); + for (int i = 0; i < partition_metadata->replica_cnt; ++i) + replicas_.push_back(partition_metadata->replicas[i]); + + isrs_.reserve(partition_metadata->isr_cnt); + for (int i = 0; i < partition_metadata->isr_cnt; ++i) + isrs_.push_back(partition_metadata->isrs[i]); + } + + int32_t id() const { + return partition_metadata_->id; + } + int32_t leader() const { + return partition_metadata_->leader; + } + ErrorCode err() const { + return static_cast<ErrorCode>(partition_metadata_->err); + } + + const std::vector<int32_t> *replicas() const { + return &replicas_; + } + const std::vector<int32_t> *isrs() const { + return &isrs_; + } + + ~PartitionMetadataImpl() { + } + + private: + const rd_kafka_metadata_partition_t *partition_metadata_; + std::vector<int32_t> replicas_, isrs_; +}; + +/** + * Metadata: Topic information handler + */ +class TopicMetadataImpl : public TopicMetadata { + public: + TopicMetadataImpl(const rd_kafka_metadata_topic_t *topic_metadata) : + topic_metadata_(topic_metadata), topic_(topic_metadata->topic) { + partitions_.reserve(topic_metadata->partition_cnt); + for (int i = 0; i < topic_metadata->partition_cnt; ++i) + partitions_.push_back( + new PartitionMetadataImpl(&topic_metadata->partitions[i])); + } + + ~TopicMetadataImpl() { + for (size_t i = 0; i < partitions_.size(); ++i) + delete partitions_[i]; + } + + std::string topic() const { + return topic_; + } + const std::vector<const PartitionMetadata *> *partitions() const { + return &partitions_; + } + ErrorCode err() const { + return static_cast<ErrorCode>(topic_metadata_->err); + } + + private: + const rd_kafka_metadata_topic_t *topic_metadata_; + const std::string topic_; + std::vector<const PartitionMetadata *> partitions_; +}; + +MetadataImpl::MetadataImpl(const rd_kafka_metadata_t *metadata) : + metadata_(metadata) { + brokers_.reserve(metadata->broker_cnt); + for (int i = 0; i < metadata->broker_cnt; ++i) + brokers_.push_back(new BrokerMetadataImpl(&metadata->brokers[i])); + + topics_.reserve(metadata->topic_cnt); + for (int i = 0; i < metadata->topic_cnt; ++i) + topics_.push_back(new TopicMetadataImpl(&metadata->topics[i])); +} + +MetadataImpl::~MetadataImpl() { + for (size_t i = 0; i < brokers_.size(); ++i) + delete brokers_[i]; + for (size_t i = 0; i < topics_.size(); ++i) + delete topics_[i]; + + + if (metadata_) + rd_kafka_metadata_destroy(metadata_); +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp new file mode 100644 index 000000000..8300dfb3b --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp @@ -0,0 +1,197 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include <string> +#include <list> +#include <cerrno> + +#include "rdkafkacpp_int.h" + + +RdKafka::Producer::~Producer() { +} + +static void dr_msg_cb_trampoline(rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, + void *opaque) { + RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); + RdKafka::MessageImpl message(RD_KAFKA_PRODUCER, NULL, + (rd_kafka_message_t *)rkmessage, false); + handle->dr_cb_->dr_cb(message); +} + + + +RdKafka::Producer *RdKafka::Producer::create(const RdKafka::Conf *conf, + std::string &errstr) { + char errbuf[512]; + const RdKafka::ConfImpl *confimpl = + dynamic_cast<const RdKafka::ConfImpl *>(conf); + RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl(); + rd_kafka_conf_t *rk_conf = NULL; + + if (confimpl) { + if (!confimpl->rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + delete rkp; + return NULL; + } + + rkp->set_common_config(confimpl); + + rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_); + + if (confimpl->dr_cb_) { + rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline); + rkp->dr_cb_ = confimpl->dr_cb_; + } + } + + + rd_kafka_t *rk; + if (!(rk = + rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errbuf, sizeof(errbuf)))) { + errstr = errbuf; + // rd_kafka_new() takes ownership only if succeeds + if (rk_conf) + rd_kafka_conf_destroy(rk_conf); + delete rkp; + return NULL; + } + + rkp->rk_ = rk; + + return rkp; +} + + +RdKafka::ErrorCode RdKafka::ProducerImpl::produce(RdKafka::Topic *topic, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const std::string *key, + void *msg_opaque) { + RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); + + if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags, payload, len, + key ? key->c_str() : NULL, key ? key->size() : 0, + msg_opaque) == -1) + return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); + + return RdKafka::ERR_NO_ERROR; +} + + +RdKafka::ErrorCode RdKafka::ProducerImpl::produce(RdKafka::Topic *topic, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const void *key, + size_t key_len, + void *msg_opaque) { + RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); + + if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags, payload, len, key, + key_len, msg_opaque) == -1) + return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); + + return RdKafka::ERR_NO_ERROR; +} + + +RdKafka::ErrorCode RdKafka::ProducerImpl::produce( + RdKafka::Topic *topic, + int32_t partition, + const std::vector<char> *payload, + const std::vector<char> *key, + void *msg_opaque) { + RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); + + if (rd_kafka_produce(topicimpl->rkt_, partition, RD_KAFKA_MSG_F_COPY, + payload ? (void *)&(*payload)[0] : NULL, + payload ? payload->size() : 0, key ? &(*key)[0] : NULL, + key ? key->size() : 0, msg_opaque) == -1) + return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); + + return RdKafka::ERR_NO_ERROR; +} + +RdKafka::ErrorCode RdKafka::ProducerImpl::produce(const std::string topic_name, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const void *key, + size_t key_len, + int64_t timestamp, + void *msg_opaque) { + return static_cast<RdKafka::ErrorCode>(rd_kafka_producev( + rk_, RD_KAFKA_V_TOPIC(topic_name.c_str()), + RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags), + RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len), + RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), + RD_KAFKA_V_END)); +} + +RdKafka::ErrorCode RdKafka::ProducerImpl::produce(const std::string topic_name, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const void *key, + size_t key_len, + int64_t timestamp, + RdKafka::Headers *headers, + void *msg_opaque) { + rd_kafka_headers_t *hdrs = NULL; + RdKafka::HeadersImpl *headersimpl = NULL; + rd_kafka_resp_err_t err; + + if (headers) { + headersimpl = static_cast<RdKafka::HeadersImpl *>(headers); + hdrs = headersimpl->c_ptr(); + } + + err = rd_kafka_producev( + rk_, RD_KAFKA_V_TOPIC(topic_name.c_str()), + RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags), + RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len), + RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), + RD_KAFKA_V_HEADERS(hdrs), RD_KAFKA_V_END); + + if (!err && headersimpl) { + /* A successful producev() call will destroy the C headers. */ + headersimpl->c_headers_destroyed(); + delete headers; + } + + return static_cast<RdKafka::ErrorCode>(err); +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/QueueImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/QueueImpl.cpp new file mode 100644 index 000000000..19ebce9d6 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/QueueImpl.cpp @@ -0,0 +1,70 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <cerrno> + +#include "rdkafkacpp_int.h" + +RdKafka::Queue::~Queue() { +} + +RdKafka::Queue *RdKafka::Queue::create(Handle *base) { + return new RdKafka::QueueImpl( + rd_kafka_queue_new(dynamic_cast<HandleImpl *>(base)->rk_)); +} + +RdKafka::ErrorCode RdKafka::QueueImpl::forward(Queue *queue) { + if (!queue) { + rd_kafka_queue_forward(queue_, NULL); + } else { + QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue); + rd_kafka_queue_forward(queue_, queueimpl->queue_); + } + return RdKafka::ERR_NO_ERROR; +} + +RdKafka::Message *RdKafka::QueueImpl::consume(int timeout_ms) { + rd_kafka_message_t *rkmessage; + rkmessage = rd_kafka_consume_queue(queue_, timeout_ms); + + if (!rkmessage) + return new RdKafka::MessageImpl(RD_KAFKA_CONSUMER, NULL, + RdKafka::ERR__TIMED_OUT); + + return new RdKafka::MessageImpl(RD_KAFKA_CONSUMER, rkmessage); +} + +int RdKafka::QueueImpl::poll(int timeout_ms) { + return rd_kafka_queue_poll_callback(queue_, timeout_ms); +} + +void RdKafka::QueueImpl::io_event_enable(int fd, + const void *payload, + size_t size) { + rd_kafka_queue_io_event_enable(queue_, fd, payload, size); +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/README.md b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/README.md new file mode 100644 index 000000000..a4845894f --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/README.md @@ -0,0 +1,16 @@ +librdkafka C++ interface +======================== + +**See rdkafkacpp.h for the public C++ API** + + + +Maintainer notes for the C++ interface: + + * The public C++ interface (rdkafkacpp.h) does not include the + public C interface (rdkafka.h) in any way, this means that all + constants, flags, etc, must be kept in sync manually between the two + header files. + A regression test should be implemented that checks this is true. + + * The public C++ interface is provided using pure virtual abstract classes. diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/RdKafka.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/RdKafka.cpp new file mode 100644 index 000000000..b6cb33c28 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/RdKafka.cpp @@ -0,0 +1,59 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <string> + +#include "rdkafkacpp_int.h" + +int RdKafka::version() { + return rd_kafka_version(); +} + +std::string RdKafka::version_str() { + return std::string(rd_kafka_version_str()); +} + +std::string RdKafka::get_debug_contexts() { + return std::string(RD_KAFKA_DEBUG_CONTEXTS); +} + +std::string RdKafka::err2str(RdKafka::ErrorCode err) { + return std::string(rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(err))); +} + +int RdKafka::wait_destroyed(int timeout_ms) { + return rd_kafka_wait_destroyed(timeout_ms); +} + +void *RdKafka::mem_malloc(size_t size) { + return rd_kafka_mem_malloc(NULL, size); +} + +void RdKafka::mem_free(void *ptr) { + rd_kafka_mem_free(NULL, ptr); +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/TopicImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/TopicImpl.cpp new file mode 100644 index 000000000..bf9734df9 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/TopicImpl.cpp @@ -0,0 +1,124 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include <string> +#include <list> +#include <cerrno> + +#include "rdkafkacpp_int.h" + +const int32_t RdKafka::Topic::PARTITION_UA = RD_KAFKA_PARTITION_UA; + +const int64_t RdKafka::Topic::OFFSET_BEGINNING = RD_KAFKA_OFFSET_BEGINNING; + +const int64_t RdKafka::Topic::OFFSET_END = RD_KAFKA_OFFSET_END; + +const int64_t RdKafka::Topic::OFFSET_STORED = RD_KAFKA_OFFSET_STORED; + +const int64_t RdKafka::Topic::OFFSET_INVALID = RD_KAFKA_OFFSET_INVALID; + +RdKafka::Topic::~Topic() { +} + +static int32_t partitioner_cb_trampoline(const rd_kafka_topic_t *rkt, + const void *keydata, + size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, + void *msg_opaque) { + RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(rkt_opaque); + std::string key(static_cast<const char *>(keydata), keylen); + return topicimpl->partitioner_cb_->partitioner_cb(topicimpl, &key, + partition_cnt, msg_opaque); +} + +static int32_t partitioner_kp_cb_trampoline(const rd_kafka_topic_t *rkt, + const void *keydata, + size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, + void *msg_opaque) { + RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(rkt_opaque); + return topicimpl->partitioner_kp_cb_->partitioner_cb( + topicimpl, keydata, keylen, partition_cnt, msg_opaque); +} + + + +RdKafka::Topic *RdKafka::Topic::create(Handle *base, + const std::string &topic_str, + const Conf *conf, + std::string &errstr) { + const RdKafka::ConfImpl *confimpl = + static_cast<const RdKafka::ConfImpl *>(conf); + rd_kafka_topic_t *rkt; + rd_kafka_topic_conf_t *rkt_conf; + rd_kafka_t *rk = dynamic_cast<HandleImpl *>(base)->rk_; + + RdKafka::TopicImpl *topic = new RdKafka::TopicImpl(); + + if (!confimpl) { + /* Reuse default topic config, but we need our own copy to + * set the topic opaque. */ + rkt_conf = rd_kafka_default_topic_conf_dup(rk); + } else { + /* Make a copy of conf struct to allow Conf reuse. */ + rkt_conf = rd_kafka_topic_conf_dup(confimpl->rkt_conf_); + } + + /* Set topic opaque to the topic so that we can reach our topic object + * from whatever callbacks get registered. + * The application itself will not need these opaques since their + * callbacks are class based. */ + rd_kafka_topic_conf_set_opaque(rkt_conf, static_cast<void *>(topic)); + + if (confimpl) { + if (confimpl->partitioner_cb_) { + rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, + partitioner_cb_trampoline); + topic->partitioner_cb_ = confimpl->partitioner_cb_; + } else if (confimpl->partitioner_kp_cb_) { + rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, + partitioner_kp_cb_trampoline); + topic->partitioner_kp_cb_ = confimpl->partitioner_kp_cb_; + } + } + + + if (!(rkt = rd_kafka_topic_new(rk, topic_str.c_str(), rkt_conf))) { + errstr = rd_kafka_err2str(rd_kafka_last_error()); + delete topic; + rd_kafka_topic_conf_destroy(rkt_conf); + return NULL; + } + + topic->rkt_ = rkt; + + return topic; +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/TopicPartitionImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/TopicPartitionImpl.cpp new file mode 100644 index 000000000..90ef820bf --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/TopicPartitionImpl.cpp @@ -0,0 +1,57 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2015 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include <string> +#include <vector> + +#include "rdkafkacpp_int.h" + +RdKafka::TopicPartition::~TopicPartition() { +} + +RdKafka::TopicPartition *RdKafka::TopicPartition::create( + const std::string &topic, + int partition) { + return new TopicPartitionImpl(topic, partition); +} + +RdKafka::TopicPartition *RdKafka::TopicPartition::create( + const std::string &topic, + int partition, + int64_t offset) { + return new TopicPartitionImpl(topic, partition, offset); +} + +void RdKafka::TopicPartition::destroy( + std::vector<TopicPartition *> &partitions) { + for (std::vector<TopicPartition *>::iterator it = partitions.begin(); + it != partitions.end(); ++it) + delete (*it); + partitions.clear(); +} diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp.h b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp.h new file mode 100644 index 000000000..1df1043c0 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp.h @@ -0,0 +1,3764 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014-2022 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RDKAFKACPP_H_ +#define _RDKAFKACPP_H_ + +/** + * @file rdkafkacpp.h + * @brief Apache Kafka C/C++ consumer and producer client library. + * + * rdkafkacpp.h contains the public C++ API for librdkafka. + * The API is documented in this file as comments prefixing the class, + * function, type, enum, define, etc. + * For more information, see the C interface in rdkafka.h and read the + * manual in INTRODUCTION.md. + * The C++ interface is STD C++ '03 compliant and adheres to the + * Google C++ Style Guide. + + * @sa For the C interface see rdkafka.h + * + * @tableofcontents + */ + +/**@cond NO_DOC*/ +#include <string> +#include <list> +#include <vector> +#include <cstdlib> +#include <cstring> +#include <stdint.h> +#include <sys/types.h> + +#ifdef _WIN32 +#ifndef ssize_t +#ifndef _BASETSD_H_ +#include <basetsd.h> +#endif +#ifndef _SSIZE_T_DEFINED +#define _SSIZE_T_DEFINED +typedef SSIZE_T ssize_t; +#endif +#endif +#undef RD_EXPORT +#ifdef LIBRDKAFKA_STATICLIB +#define RD_EXPORT +#else +#ifdef LIBRDKAFKACPP_EXPORTS +#define RD_EXPORT __declspec(dllexport) +#else +#define RD_EXPORT __declspec(dllimport) +#endif +#endif +#else +#define RD_EXPORT +#endif + +/**@endcond*/ + +extern "C" { +/* Forward declarations */ +struct rd_kafka_s; +struct rd_kafka_topic_s; +struct rd_kafka_message_s; +struct rd_kafka_conf_s; +struct rd_kafka_topic_conf_s; +} + +namespace RdKafka { + +/** + * @name Miscellaneous APIs + * @{ + */ + +/** + * @brief librdkafka version + * + * Interpreted as hex \c MM.mm.rr.xx: + * - MM = Major + * - mm = minor + * - rr = revision + * - xx = pre-release id (0xff is the final release) + * + * E.g.: \c 0x000801ff = 0.8.1 + * + * @remark This value should only be used during compile time, + * for runtime checks of version use RdKafka::version() + */ +#define RD_KAFKA_VERSION 0x020100ff + +/** + * @brief Returns the librdkafka version as integer. + * + * @sa See RD_KAFKA_VERSION for how to parse the integer format. + */ +RD_EXPORT +int version(); + +/** + * @brief Returns the librdkafka version as string. + */ +RD_EXPORT +std::string version_str(); + +/** + * @brief Returns a CSV list of the supported debug contexts + * for use with Conf::Set("debug", ..). + */ +RD_EXPORT +std::string get_debug_contexts(); + +/** + * @brief Wait for all rd_kafka_t objects to be destroyed. + * + * @returns 0 if all kafka objects are now destroyed, or -1 if the + * timeout was reached. + * Since RdKafka handle deletion is an asynch operation the + * \p wait_destroyed() function can be used for applications where + * a clean shutdown is required. + */ +RD_EXPORT +int wait_destroyed(int timeout_ms); + +/** + * @brief Allocate memory using the same allocator librdkafka uses. + * + * This is typically an abstraction for the malloc(3) call and makes sure + * the application can use the same memory allocator as librdkafka for + * allocating pointers that are used by librdkafka. + * + * @remark Memory allocated by mem_malloc() must be freed using + * mem_free(). + */ +RD_EXPORT +void *mem_malloc(size_t size); + +/** + * @brief Free pointer returned by librdkafka + * + * This is typically an abstraction for the free(3) call and makes sure + * the application can use the same memory allocator as librdkafka for + * freeing pointers returned by librdkafka. + * + * In standard setups it is usually not necessary to use this interface + * rather than the free(3) function. + * + * @remark mem_free() must only be used for pointers returned by APIs + * that explicitly mention using this function for freeing. + */ +RD_EXPORT +void mem_free(void *ptr); + +/**@}*/ + + + +/** + * @name Constants, errors, types + * @{ + * + * + */ + +/** + * @brief Error codes. + * + * The negative error codes delimited by two underscores + * (\c _ERR__..) denotes errors internal to librdkafka and are + * displayed as \c \"Local: \<error string..\>\", while the error codes + * delimited by a single underscore (\c ERR_..) denote broker + * errors and are displayed as \c \"Broker: \<error string..\>\". + * + * @sa Use RdKafka::err2str() to translate an error code a human readable string + */ +enum ErrorCode { + /* Internal errors to rdkafka: */ + /** Begin internal error codes */ + ERR__BEGIN = -200, + /** Received message is incorrect */ + ERR__BAD_MSG = -199, + /** Bad/unknown compression */ + ERR__BAD_COMPRESSION = -198, + /** Broker is going away */ + ERR__DESTROY = -197, + /** Generic failure */ + ERR__FAIL = -196, + /** Broker transport failure */ + ERR__TRANSPORT = -195, + /** Critical system resource */ + ERR__CRIT_SYS_RESOURCE = -194, + /** Failed to resolve broker */ + ERR__RESOLVE = -193, + /** Produced message timed out*/ + ERR__MSG_TIMED_OUT = -192, + /** Reached the end of the topic+partition queue on + * the broker. Not really an error. + * This event is disabled by default, + * see the `enable.partition.eof` configuration property. */ + ERR__PARTITION_EOF = -191, + /** Permanent: Partition does not exist in cluster. */ + ERR__UNKNOWN_PARTITION = -190, + /** File or filesystem error */ + ERR__FS = -189, + /** Permanent: Topic does not exist in cluster. */ + ERR__UNKNOWN_TOPIC = -188, + /** All broker connections are down. */ + ERR__ALL_BROKERS_DOWN = -187, + /** Invalid argument, or invalid configuration */ + ERR__INVALID_ARG = -186, + /** Operation timed out */ + ERR__TIMED_OUT = -185, + /** Queue is full */ + ERR__QUEUE_FULL = -184, + /** ISR count < required.acks */ + ERR__ISR_INSUFF = -183, + /** Broker node update */ + ERR__NODE_UPDATE = -182, + /** SSL error */ + ERR__SSL = -181, + /** Waiting for coordinator to become available. */ + ERR__WAIT_COORD = -180, + /** Unknown client group */ + ERR__UNKNOWN_GROUP = -179, + /** Operation in progress */ + ERR__IN_PROGRESS = -178, + /** Previous operation in progress, wait for it to finish. */ + ERR__PREV_IN_PROGRESS = -177, + /** This operation would interfere with an existing subscription */ + ERR__EXISTING_SUBSCRIPTION = -176, + /** Assigned partitions (rebalance_cb) */ + ERR__ASSIGN_PARTITIONS = -175, + /** Revoked partitions (rebalance_cb) */ + ERR__REVOKE_PARTITIONS = -174, + /** Conflicting use */ + ERR__CONFLICT = -173, + /** Wrong state */ + ERR__STATE = -172, + /** Unknown protocol */ + ERR__UNKNOWN_PROTOCOL = -171, + /** Not implemented */ + ERR__NOT_IMPLEMENTED = -170, + /** Authentication failure*/ + ERR__AUTHENTICATION = -169, + /** No stored offset */ + ERR__NO_OFFSET = -168, + /** Outdated */ + ERR__OUTDATED = -167, + /** Timed out in queue */ + ERR__TIMED_OUT_QUEUE = -166, + /** Feature not supported by broker */ + ERR__UNSUPPORTED_FEATURE = -165, + /** Awaiting cache update */ + ERR__WAIT_CACHE = -164, + /** Operation interrupted */ + ERR__INTR = -163, + /** Key serialization error */ + ERR__KEY_SERIALIZATION = -162, + /** Value serialization error */ + ERR__VALUE_SERIALIZATION = -161, + /** Key deserialization error */ + ERR__KEY_DESERIALIZATION = -160, + /** Value deserialization error */ + ERR__VALUE_DESERIALIZATION = -159, + /** Partial response */ + ERR__PARTIAL = -158, + /** Modification attempted on read-only object */ + ERR__READ_ONLY = -157, + /** No such entry / item not found */ + ERR__NOENT = -156, + /** Read underflow */ + ERR__UNDERFLOW = -155, + /** Invalid type */ + ERR__INVALID_TYPE = -154, + /** Retry operation */ + ERR__RETRY = -153, + /** Purged in queue */ + ERR__PURGE_QUEUE = -152, + /** Purged in flight */ + ERR__PURGE_INFLIGHT = -151, + /** Fatal error: see RdKafka::Handle::fatal_error() */ + ERR__FATAL = -150, + /** Inconsistent state */ + ERR__INCONSISTENT = -149, + /** Gap-less ordering would not be guaranteed if proceeding */ + ERR__GAPLESS_GUARANTEE = -148, + /** Maximum poll interval exceeded */ + ERR__MAX_POLL_EXCEEDED = -147, + /** Unknown broker */ + ERR__UNKNOWN_BROKER = -146, + /** Functionality not configured */ + ERR__NOT_CONFIGURED = -145, + /** Instance has been fenced */ + ERR__FENCED = -144, + /** Application generated error */ + ERR__APPLICATION = -143, + /** Assignment lost */ + ERR__ASSIGNMENT_LOST = -142, + /** No operation performed */ + ERR__NOOP = -141, + /** No offset to automatically reset to */ + ERR__AUTO_OFFSET_RESET = -140, + /** Partition log truncation detected */ + ERR__LOG_TRUNCATION = -139, + + /** End internal error codes */ + ERR__END = -100, + + /* Kafka broker errors: */ + /** Unknown broker error */ + ERR_UNKNOWN = -1, + /** Success */ + ERR_NO_ERROR = 0, + /** Offset out of range */ + ERR_OFFSET_OUT_OF_RANGE = 1, + /** Invalid message */ + ERR_INVALID_MSG = 2, + /** Unknown topic or partition */ + ERR_UNKNOWN_TOPIC_OR_PART = 3, + /** Invalid message size */ + ERR_INVALID_MSG_SIZE = 4, + /** Leader not available */ + ERR_LEADER_NOT_AVAILABLE = 5, + /** Not leader for partition */ + ERR_NOT_LEADER_FOR_PARTITION = 6, + /** Request timed out */ + ERR_REQUEST_TIMED_OUT = 7, + /** Broker not available */ + ERR_BROKER_NOT_AVAILABLE = 8, + /** Replica not available */ + ERR_REPLICA_NOT_AVAILABLE = 9, + /** Message size too large */ + ERR_MSG_SIZE_TOO_LARGE = 10, + /** StaleControllerEpochCode */ + ERR_STALE_CTRL_EPOCH = 11, + /** Offset metadata string too large */ + ERR_OFFSET_METADATA_TOO_LARGE = 12, + /** Broker disconnected before response received */ + ERR_NETWORK_EXCEPTION = 13, + /** Coordinator load in progress */ + ERR_COORDINATOR_LOAD_IN_PROGRESS = 14, +/** Group coordinator load in progress */ +#define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS + /** Coordinator not available */ + ERR_COORDINATOR_NOT_AVAILABLE = 15, +/** Group coordinator not available */ +#define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE + /** Not coordinator */ + ERR_NOT_COORDINATOR = 16, +/** Not coordinator for group */ +#define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR + /** Invalid topic */ + ERR_TOPIC_EXCEPTION = 17, + /** Message batch larger than configured server segment size */ + ERR_RECORD_LIST_TOO_LARGE = 18, + /** Not enough in-sync replicas */ + ERR_NOT_ENOUGH_REPLICAS = 19, + /** Message(s) written to insufficient number of in-sync replicas */ + ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20, + /** Invalid required acks value */ + ERR_INVALID_REQUIRED_ACKS = 21, + /** Specified group generation id is not valid */ + ERR_ILLEGAL_GENERATION = 22, + /** Inconsistent group protocol */ + ERR_INCONSISTENT_GROUP_PROTOCOL = 23, + /** Invalid group.id */ + ERR_INVALID_GROUP_ID = 24, + /** Unknown member */ + ERR_UNKNOWN_MEMBER_ID = 25, + /** Invalid session timeout */ + ERR_INVALID_SESSION_TIMEOUT = 26, + /** Group rebalance in progress */ + ERR_REBALANCE_IN_PROGRESS = 27, + /** Commit offset data size is not valid */ + ERR_INVALID_COMMIT_OFFSET_SIZE = 28, + /** Topic authorization failed */ + ERR_TOPIC_AUTHORIZATION_FAILED = 29, + /** Group authorization failed */ + ERR_GROUP_AUTHORIZATION_FAILED = 30, + /** Cluster authorization failed */ + ERR_CLUSTER_AUTHORIZATION_FAILED = 31, + /** Invalid timestamp */ + ERR_INVALID_TIMESTAMP = 32, + /** Unsupported SASL mechanism */ + ERR_UNSUPPORTED_SASL_MECHANISM = 33, + /** Illegal SASL state */ + ERR_ILLEGAL_SASL_STATE = 34, + /** Unuspported version */ + ERR_UNSUPPORTED_VERSION = 35, + /** Topic already exists */ + ERR_TOPIC_ALREADY_EXISTS = 36, + /** Invalid number of partitions */ + ERR_INVALID_PARTITIONS = 37, + /** Invalid replication factor */ + ERR_INVALID_REPLICATION_FACTOR = 38, + /** Invalid replica assignment */ + ERR_INVALID_REPLICA_ASSIGNMENT = 39, + /** Invalid config */ + ERR_INVALID_CONFIG = 40, + /** Not controller for cluster */ + ERR_NOT_CONTROLLER = 41, + /** Invalid request */ + ERR_INVALID_REQUEST = 42, + /** Message format on broker does not support request */ + ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43, + /** Policy violation */ + ERR_POLICY_VIOLATION = 44, + /** Broker received an out of order sequence number */ + ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45, + /** Broker received a duplicate sequence number */ + ERR_DUPLICATE_SEQUENCE_NUMBER = 46, + /** Producer attempted an operation with an old epoch */ + ERR_INVALID_PRODUCER_EPOCH = 47, + /** Producer attempted a transactional operation in an invalid state */ + ERR_INVALID_TXN_STATE = 48, + /** Producer attempted to use a producer id which is not + * currently assigned to its transactional id */ + ERR_INVALID_PRODUCER_ID_MAPPING = 49, + /** Transaction timeout is larger than the maximum + * value allowed by the broker's max.transaction.timeout.ms */ + ERR_INVALID_TRANSACTION_TIMEOUT = 50, + /** Producer attempted to update a transaction while another + * concurrent operation on the same transaction was ongoing */ + ERR_CONCURRENT_TRANSACTIONS = 51, + /** Indicates that the transaction coordinator sending a + * WriteTxnMarker is no longer the current coordinator for a + * given producer */ + ERR_TRANSACTION_COORDINATOR_FENCED = 52, + /** Transactional Id authorization failed */ + ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53, + /** Security features are disabled */ + ERR_SECURITY_DISABLED = 54, + /** Operation not attempted */ + ERR_OPERATION_NOT_ATTEMPTED = 55, + /** Disk error when trying to access log file on the disk */ + ERR_KAFKA_STORAGE_ERROR = 56, + /** The user-specified log directory is not found in the broker config */ + ERR_LOG_DIR_NOT_FOUND = 57, + /** SASL Authentication failed */ + ERR_SASL_AUTHENTICATION_FAILED = 58, + /** Unknown Producer Id */ + ERR_UNKNOWN_PRODUCER_ID = 59, + /** Partition reassignment is in progress */ + ERR_REASSIGNMENT_IN_PROGRESS = 60, + /** Delegation Token feature is not enabled */ + ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61, + /** Delegation Token is not found on server */ + ERR_DELEGATION_TOKEN_NOT_FOUND = 62, + /** Specified Principal is not valid Owner/Renewer */ + ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63, + /** Delegation Token requests are not allowed on this connection */ + ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64, + /** Delegation Token authorization failed */ + ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65, + /** Delegation Token is expired */ + ERR_DELEGATION_TOKEN_EXPIRED = 66, + /** Supplied principalType is not supported */ + ERR_INVALID_PRINCIPAL_TYPE = 67, + /** The group is not empty */ + ERR_NON_EMPTY_GROUP = 68, + /** The group id does not exist */ + ERR_GROUP_ID_NOT_FOUND = 69, + /** The fetch session ID was not found */ + ERR_FETCH_SESSION_ID_NOT_FOUND = 70, + /** The fetch session epoch is invalid */ + ERR_INVALID_FETCH_SESSION_EPOCH = 71, + /** No matching listener */ + ERR_LISTENER_NOT_FOUND = 72, + /** Topic deletion is disabled */ + ERR_TOPIC_DELETION_DISABLED = 73, + /** Leader epoch is older than broker epoch */ + ERR_FENCED_LEADER_EPOCH = 74, + /** Leader epoch is newer than broker epoch */ + ERR_UNKNOWN_LEADER_EPOCH = 75, + /** Unsupported compression type */ + ERR_UNSUPPORTED_COMPRESSION_TYPE = 76, + /** Broker epoch has changed */ + ERR_STALE_BROKER_EPOCH = 77, + /** Leader high watermark is not caught up */ + ERR_OFFSET_NOT_AVAILABLE = 78, + /** Group member needs a valid member ID */ + ERR_MEMBER_ID_REQUIRED = 79, + /** Preferred leader was not available */ + ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80, + /** Consumer group has reached maximum size */ + ERR_GROUP_MAX_SIZE_REACHED = 81, + /** Static consumer fenced by other consumer with same + * group.instance.id. */ + ERR_FENCED_INSTANCE_ID = 82, + /** Eligible partition leaders are not available */ + ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE = 83, + /** Leader election not needed for topic partition */ + ERR_ELECTION_NOT_NEEDED = 84, + /** No partition reassignment is in progress */ + ERR_NO_REASSIGNMENT_IN_PROGRESS = 85, + /** Deleting offsets of a topic while the consumer group is + * subscribed to it */ + ERR_GROUP_SUBSCRIBED_TO_TOPIC = 86, + /** Broker failed to validate record */ + ERR_INVALID_RECORD = 87, + /** There are unstable offsets that need to be cleared */ + ERR_UNSTABLE_OFFSET_COMMIT = 88, + /** Throttling quota has been exceeded */ + ERR_THROTTLING_QUOTA_EXCEEDED = 89, + /** There is a newer producer with the same transactionalId + * which fences the current one */ + ERR_PRODUCER_FENCED = 90, + /** Request illegally referred to resource that does not exist */ + ERR_RESOURCE_NOT_FOUND = 91, + /** Request illegally referred to the same resource twice */ + ERR_DUPLICATE_RESOURCE = 92, + /** Requested credential would not meet criteria for acceptability */ + ERR_UNACCEPTABLE_CREDENTIAL = 93, + /** Indicates that the either the sender or recipient of a + * voter-only request is not one of the expected voters */ + ERR_INCONSISTENT_VOTER_SET = 94, + /** Invalid update version */ + ERR_INVALID_UPDATE_VERSION = 95, + /** Unable to update finalized features due to server error */ + ERR_FEATURE_UPDATE_FAILED = 96, + /** Request principal deserialization failed during forwarding */ + ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97 +}; + + +/** + * @brief Returns a human readable representation of a kafka error. + */ +RD_EXPORT +std::string err2str(RdKafka::ErrorCode err); + + + +/** + * @enum CertificateType + * @brief SSL certificate types + */ +enum CertificateType { + CERT_PUBLIC_KEY, /**< Client's public key */ + CERT_PRIVATE_KEY, /**< Client's private key */ + CERT_CA, /**< CA certificate */ + CERT__CNT +}; + +/** + * @enum CertificateEncoding + * @brief SSL certificate encoding + */ +enum CertificateEncoding { + CERT_ENC_PKCS12, /**< PKCS#12 */ + CERT_ENC_DER, /**< DER / binary X.509 ASN1 */ + CERT_ENC_PEM, /**< PEM */ + CERT_ENC__CNT +}; + +/**@} */ + + + +/**@cond NO_DOC*/ +/* Forward declarations */ +class Handle; +class Producer; +class Message; +class Headers; +class Queue; +class Event; +class Topic; +class TopicPartition; +class Metadata; +class KafkaConsumer; +/**@endcond*/ + + +/** + * @name Error class + * @{ + * + */ + +/** + * @brief The Error class is used as a return value from APIs to propagate + * an error. The error consists of an error code which is to be used + * programatically, an error string for showing to the user, + * and various error flags that can be used programmatically to decide + * how to handle the error; e.g., should the operation be retried, + * was it a fatal error, etc. + * + * Error objects must be deleted explicitly to free its resources. + */ +class RD_EXPORT Error { + public: + /** + * @brief Create error object. + */ + static Error *create(ErrorCode code, const std::string *errstr); + + virtual ~Error() { + } + + /* + * Error accessor methods + */ + + /** + * @returns the error code, e.g., RdKafka::ERR_UNKNOWN_MEMBER_ID. + */ + virtual ErrorCode code() const = 0; + + /** + * @returns the error code name, e.g, "ERR_UNKNOWN_MEMBER_ID". + */ + virtual std::string name() const = 0; + + /** + * @returns a human readable error string. + */ + virtual std::string str() const = 0; + + /** + * @returns true if the error is a fatal error, indicating that the client + * instance is no longer usable, else false. + */ + virtual bool is_fatal() const = 0; + + /** + * @returns true if the operation may be retried, else false. + */ + virtual bool is_retriable() const = 0; + + /** + * @returns true if the error is an abortable transaction error in which case + * the application must call RdKafka::Producer::abort_transaction() + * and start a new transaction with + * RdKafka::Producer::begin_transaction() if it wishes to proceed + * with transactions. + * Else returns false. + * + * @remark The return value of this method is only valid for errors returned + * by the transactional API. + */ + virtual bool txn_requires_abort() const = 0; +}; + +/**@}*/ + + +/** + * @name Callback classes + * @{ + * + * + * librdkafka uses (optional) callbacks to propagate information and + * delegate decisions to the application logic. + * + * An application must call RdKafka::poll() at regular intervals to + * serve queued callbacks. + */ + + +/** + * @brief Delivery Report callback class + * + * The delivery report callback will be called once for each message + * accepted by RdKafka::Producer::produce() (et.al) with + * RdKafka::Message::err() set to indicate the result of the produce request. + * + * The callback is called when a message is succesfully produced or + * if librdkafka encountered a permanent failure, or the retry counter for + * temporary errors has been exhausted. + * + * An application must call RdKafka::poll() at regular intervals to + * serve queued delivery report callbacks. + + */ +class RD_EXPORT DeliveryReportCb { + public: + /** + * @brief Delivery report callback. + */ + virtual void dr_cb(Message &message) = 0; + + virtual ~DeliveryReportCb() { + } +}; + + +/** + * @brief SASL/OAUTHBEARER token refresh callback class + * + * The SASL/OAUTHBEARER token refresh callback is triggered via RdKafka::poll() + * whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved, + * typically based on the configuration defined in \c sasl.oauthbearer.config. + * + * The \c oauthbearer_config argument is the value of the + * \c sasl.oauthbearer.config configuration property. + * + * The callback should invoke RdKafka::Handle::oauthbearer_set_token() or + * RdKafka::Handle::oauthbearer_set_token_failure() to indicate success or + * failure, respectively. + * + * The refresh operation is eventable and may be received when an event + * callback handler is set with an event type of + * \c RdKafka::Event::EVENT_OAUTHBEARER_TOKEN_REFRESH. + * + * Note that before any SASL/OAUTHBEARER broker connection can succeed the + * application must call RdKafka::Handle::oauthbearer_set_token() once -- either + * directly or, more typically, by invoking RdKafka::poll() -- in order to + * cause retrieval of an initial token to occur. + * + * An application must call RdKafka::poll() at regular intervals to + * serve queued SASL/OAUTHBEARER token refresh callbacks (when + * OAUTHBEARER is the SASL mechanism). + */ +class RD_EXPORT OAuthBearerTokenRefreshCb { + public: + /** + * @brief SASL/OAUTHBEARER token refresh callback class. + * + * @param handle The RdKafka::Handle which requires a refreshed token. + * @param oauthbearer_config The value of the + * \p sasl.oauthbearer.config configuration property for \p handle. + */ + virtual void oauthbearer_token_refresh_cb( + RdKafka::Handle *handle, + const std::string &oauthbearer_config) = 0; + + virtual ~OAuthBearerTokenRefreshCb() { + } +}; + + +/** + * @brief Partitioner callback class + * + * Generic partitioner callback class for implementing custom partitioners. + * + * @sa RdKafka::Conf::set() \c "partitioner_cb" + */ +class RD_EXPORT PartitionerCb { + public: + /** + * @brief Partitioner callback + * + * Return the partition to use for \p key in \p topic. + * + * The \p msg_opaque is the same \p msg_opaque provided in the + * RdKafka::Producer::produce() call. + * + * @remark \p key may be NULL or the empty. + * + * @returns Must return a value between 0 and \p partition_cnt + * (non-inclusive). May return RD_KAFKA_PARTITION_UA (-1) if partitioning + * failed. + * + * @sa The callback may use RdKafka::Topic::partition_available() to check + * if a partition has an active leader broker. + */ + virtual int32_t partitioner_cb(const Topic *topic, + const std::string *key, + int32_t partition_cnt, + void *msg_opaque) = 0; + + virtual ~PartitionerCb() { + } +}; + +/** + * @brief Variant partitioner with key pointer + * + */ +class PartitionerKeyPointerCb { + public: + /** + * @brief Variant partitioner callback that gets \p key as pointer and length + * instead of as a const std::string *. + * + * @remark \p key may be NULL or have \p key_len 0. + * + * @sa See RdKafka::PartitionerCb::partitioner_cb() for exact semantics + */ + virtual int32_t partitioner_cb(const Topic *topic, + const void *key, + size_t key_len, + int32_t partition_cnt, + void *msg_opaque) = 0; + + virtual ~PartitionerKeyPointerCb() { + } +}; + + + +/** + * @brief Event callback class + * + * Events are a generic interface for propagating errors, statistics, logs, etc + * from librdkafka to the application. + * + * @sa RdKafka::Event + */ +class RD_EXPORT EventCb { + public: + /** + * @brief Event callback + * + * @sa RdKafka::Event + */ + virtual void event_cb(Event &event) = 0; + + virtual ~EventCb() { + } +}; + + +/** + * @brief Event object class as passed to the EventCb callback. + */ +class RD_EXPORT Event { + public: + /** @brief Event type */ + enum Type { + EVENT_ERROR, /**< Event is an error condition */ + EVENT_STATS, /**< Event is a statistics JSON document */ + EVENT_LOG, /**< Event is a log message */ + EVENT_THROTTLE /**< Event is a throttle level signaling from the broker */ + }; + + /** @brief EVENT_LOG severities (conforms to syslog(3) severities) */ + enum Severity { + EVENT_SEVERITY_EMERG = 0, + EVENT_SEVERITY_ALERT = 1, + EVENT_SEVERITY_CRITICAL = 2, + EVENT_SEVERITY_ERROR = 3, + EVENT_SEVERITY_WARNING = 4, + EVENT_SEVERITY_NOTICE = 5, + EVENT_SEVERITY_INFO = 6, + EVENT_SEVERITY_DEBUG = 7 + }; + + virtual ~Event() { + } + + /* + * Event Accessor methods + */ + + /** + * @returns The event type + * @remark Applies to all event types + */ + virtual Type type() const = 0; + + /** + * @returns Event error, if any. + * @remark Applies to all event types except THROTTLE + */ + virtual ErrorCode err() const = 0; + + /** + * @returns Log severity level. + * @remark Applies to LOG event type. + */ + virtual Severity severity() const = 0; + + /** + * @returns Log facility string. + * @remark Applies to LOG event type. + */ + virtual std::string fac() const = 0; + + /** + * @returns Log message string. + * + * \c EVENT_LOG: Log message string. + * \c EVENT_STATS: JSON object (as string). + * + * @remark Applies to LOG event type. + */ + virtual std::string str() const = 0; + + /** + * @returns Throttle time in milliseconds. + * @remark Applies to THROTTLE event type. + */ + virtual int throttle_time() const = 0; + + /** + * @returns Throttling broker's name. + * @remark Applies to THROTTLE event type. + */ + virtual std::string broker_name() const = 0; + + /** + * @returns Throttling broker's id. + * @remark Applies to THROTTLE event type. + */ + virtual int broker_id() const = 0; + + + /** + * @returns true if this is a fatal error. + * @remark Applies to ERROR event type. + * @sa RdKafka::Handle::fatal_error() + */ + virtual bool fatal() const = 0; +}; + + + +/** + * @brief Consume callback class + */ +class RD_EXPORT ConsumeCb { + public: + /** + * @brief The consume callback is used with + * RdKafka::Consumer::consume_callback() + * methods and will be called for each consumed \p message. + * + * The callback interface is optional but provides increased performance. + */ + virtual void consume_cb(Message &message, void *opaque) = 0; + + virtual ~ConsumeCb() { + } +}; + + +/** + * @brief \b KafkaConsumer: Rebalance callback class + */ +class RD_EXPORT RebalanceCb { + public: + /** + * @brief Group rebalance callback for use with RdKafka::KafkaConsumer + * + * Registering a \p rebalance_cb turns off librdkafka's automatic + * partition assignment/revocation and instead delegates that responsibility + * to the application's \p rebalance_cb. + * + * The rebalance callback is responsible for updating librdkafka's + * assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS + * and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle + * arbitrary rebalancing failures where \p err is neither of those. + * @remark In this latter case (arbitrary error), the application must + * call unassign() to synchronize state. + * + * For eager/non-cooperative `partition.assignment.strategy` assignors, + * such as `range` and `roundrobin`, the application must use + * assign assign() to set and unassign() to clear the entire assignment. + * For the cooperative assignors, such as `cooperative-sticky`, the + * application must use incremental_assign() for ERR__ASSIGN_PARTITIONS and + * incremental_unassign() for ERR__REVOKE_PARTITIONS. + * + * Without a rebalance callback this is done automatically by librdkafka + * but registering a rebalance callback gives the application flexibility + * in performing other operations along with the assinging/revocation, + * such as fetching offsets from an alternate location (on assign) + * or manually committing offsets (on revoke). + * + * @sa RdKafka::KafkaConsumer::assign() + * @sa RdKafka::KafkaConsumer::incremental_assign() + * @sa RdKafka::KafkaConsumer::incremental_unassign() + * @sa RdKafka::KafkaConsumer::assignment_lost() + * @sa RdKafka::KafkaConsumer::rebalance_protocol() + * + * The following example show's the application's responsibilities: + * @code + * class MyRebalanceCb : public RdKafka::RebalanceCb { + * public: + * void rebalance_cb (RdKafka::KafkaConsumer *consumer, + * RdKafka::ErrorCode err, + * std::vector<RdKafka::TopicPartition*> &partitions) { + * if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { + * // application may load offets from arbitrary external + * // storage here and update \p partitions + * if (consumer->rebalance_protocol() == "COOPERATIVE") + * consumer->incremental_assign(partitions); + * else + * consumer->assign(partitions); + * + * } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) { + * // Application may commit offsets manually here + * // if auto.commit.enable=false + * if (consumer->rebalance_protocol() == "COOPERATIVE") + * consumer->incremental_unassign(partitions); + * else + * consumer->unassign(); + * + * } else { + * std::cerr << "Rebalancing error: " << + * RdKafka::err2str(err) << std::endl; + * consumer->unassign(); + * } + * } + * } + * @endcode + * + * @remark The above example lacks error handling for assign calls, see + * the examples/ directory. + */ + virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, + RdKafka::ErrorCode err, + std::vector<TopicPartition *> &partitions) = 0; + + virtual ~RebalanceCb() { + } +}; + + +/** + * @brief Offset Commit callback class + */ +class RD_EXPORT OffsetCommitCb { + public: + /** + * @brief Set offset commit callback for use with consumer groups + * + * The results of automatic or manual offset commits will be scheduled + * for this callback and is served by RdKafka::KafkaConsumer::consume(). + * + * If no partitions had valid offsets to commit this callback will be called + * with \p err == ERR__NO_OFFSET which is not to be considered an error. + * + * The \p offsets list contains per-partition information: + * - \c topic The topic committed + * - \c partition The partition committed + * - \c offset: Committed offset (attempted) + * - \c err: Commit error + */ + virtual void offset_commit_cb(RdKafka::ErrorCode err, + std::vector<TopicPartition *> &offsets) = 0; + + virtual ~OffsetCommitCb() { + } +}; + + + +/** + * @brief SSL broker certificate verification class. + * + * @remark Class instance must outlive the RdKafka client instance. + */ +class RD_EXPORT SslCertificateVerifyCb { + public: + /** + * @brief SSL broker certificate verification callback. + * + * The verification callback is triggered from internal librdkafka threads + * upon connecting to a broker. On each connection attempt the callback + * will be called for each certificate in the broker's certificate chain, + * starting at the root certification, as long as the application callback + * returns 1 (valid certificate). + * + * \p broker_name and \p broker_id correspond to the broker the connection + * is being made to. + * The \c x509_error argument indicates if OpenSSL's verification of + * the certificate succeed (0) or failed (an OpenSSL error code). + * The application may set the SSL context error code by returning 0 + * from the verify callback and providing a non-zero SSL context error code + * in \p x509_error. + * If the verify callback sets \p x509_error to 0, returns 1, and the + * original \p x509_error was non-zero, the error on the SSL context will + * be cleared. + * \p x509_error is always a valid pointer to an int. + * + * \p depth is the depth of the current certificate in the chain, starting + * at the root certificate. + * + * The certificate itself is passed in binary DER format in \p buf of + * size \p size. + * + * The callback must 1 if verification succeeds, or 0 if verification fails + * and write a human-readable error message + * to \p errstr. + * + * @warning This callback will be called from internal librdkafka threads. + * + * @remark See <openssl/x509_vfy.h> in the OpenSSL source distribution + * for a list of \p x509_error codes. + */ + virtual bool ssl_cert_verify_cb(const std::string &broker_name, + int32_t broker_id, + int *x509_error, + int depth, + const char *buf, + size_t size, + std::string &errstr) = 0; + + virtual ~SslCertificateVerifyCb() { + } +}; + + +/** + * @brief \b Portability: SocketCb callback class + * + */ +class RD_EXPORT SocketCb { + public: + /** + * @brief Socket callback + * + * The socket callback is responsible for opening a socket + * according to the supplied \p domain, \p type and \p protocol. + * The socket shall be created with \c CLOEXEC set in a racefree fashion, if + * possible. + * + * It is typically not required to register an alternative socket + * implementation + * + * @returns The socket file descriptor or -1 on error (\c errno must be set) + */ + virtual int socket_cb(int domain, int type, int protocol) = 0; + + virtual ~SocketCb() { + } +}; + + +/** + * @brief \b Portability: OpenCb callback class + * + */ +class RD_EXPORT OpenCb { + public: + /** + * @brief Open callback + * The open callback is responsible for opening the file specified by + * \p pathname, using \p flags and \p mode. + * The file shall be opened with \c CLOEXEC set in a racefree fashion, if + * possible. + * + * It is typically not required to register an alternative open implementation + * + * @remark Not currently available on native Win32 + */ + virtual int open_cb(const std::string &path, int flags, int mode) = 0; + + virtual ~OpenCb() { + } +}; + + +/**@}*/ + + + +/** + * @name Configuration interface + * @{ + * + */ + +/** + * @brief Configuration interface + * + * Holds either global or topic configuration that are passed to + * RdKafka::Consumer::create(), RdKafka::Producer::create(), + * RdKafka::KafkaConsumer::create(), etc. + * + * @sa CONFIGURATION.md for the full list of supported properties. + */ +class RD_EXPORT Conf { + public: + /** + * @brief Configuration object type + */ + enum ConfType { + CONF_GLOBAL, /**< Global configuration */ + CONF_TOPIC /**< Topic specific configuration */ + }; + + /** + * @brief RdKafka::Conf::Set() result code + */ + enum ConfResult { + CONF_UNKNOWN = -2, /**< Unknown configuration property */ + CONF_INVALID = -1, /**< Invalid configuration value */ + CONF_OK = 0 /**< Configuration property was succesfully set */ + }; + + + /** + * @brief Create configuration object + */ + static Conf *create(ConfType type); + + virtual ~Conf() { + } + + /** + * @brief Set configuration property \p name to value \p value. + * + * Fallthrough: + * Topic-level configuration properties may be set using this interface + * in which case they are applied on the \c default_topic_conf. + * If no \c default_topic_conf has been set one will be created. + * Any sub-sequent set("default_topic_conf", ..) calls will + * replace the current default topic configuration. + + * @returns CONF_OK on success, else writes a human readable error + * description to \p errstr on error. + */ + virtual Conf::ConfResult set(const std::string &name, + const std::string &value, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"dr_cb\" */ + virtual Conf::ConfResult set(const std::string &name, + DeliveryReportCb *dr_cb, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"oauthbearer_token_refresh_cb\" */ + virtual Conf::ConfResult set( + const std::string &name, + OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"event_cb\" */ + virtual Conf::ConfResult set(const std::string &name, + EventCb *event_cb, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"default_topic_conf\" + * + * Sets the default topic configuration to use for for automatically + * subscribed topics. + * + * @sa RdKafka::KafkaConsumer::subscribe() + */ + virtual Conf::ConfResult set(const std::string &name, + const Conf *topic_conf, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"partitioner_cb\" */ + virtual Conf::ConfResult set(const std::string &name, + PartitionerCb *partitioner_cb, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"partitioner_key_pointer_cb\" */ + virtual Conf::ConfResult set(const std::string &name, + PartitionerKeyPointerCb *partitioner_kp_cb, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"socket_cb\" */ + virtual Conf::ConfResult set(const std::string &name, + SocketCb *socket_cb, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"open_cb\" */ + virtual Conf::ConfResult set(const std::string &name, + OpenCb *open_cb, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"rebalance_cb\" */ + virtual Conf::ConfResult set(const std::string &name, + RebalanceCb *rebalance_cb, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"offset_commit_cb\" */ + virtual Conf::ConfResult set(const std::string &name, + OffsetCommitCb *offset_commit_cb, + std::string &errstr) = 0; + + /** @brief Use with \p name = \c \"ssl_cert_verify_cb\". + * @returns CONF_OK on success or CONF_INVALID if SSL is + * not supported in this build. + */ + virtual Conf::ConfResult set(const std::string &name, + SslCertificateVerifyCb *ssl_cert_verify_cb, + std::string &errstr) = 0; + + /** + * @brief Set certificate/key \p cert_type from the \p cert_enc encoded + * memory at \p buffer of \p size bytes. + * + * @param cert_type Certificate or key type to configure. + * @param cert_enc Buffer \p encoding type. + * @param buffer Memory pointer to encoded certificate or key. + * The memory is not referenced after this function returns. + * @param size Size of memory at \p buffer. + * @param errstr A human-readable error string will be written to this string + * on failure. + * + * @returns CONF_OK on success or CONF_INVALID if the memory in + * \p buffer is of incorrect encoding, or if librdkafka + * was not built with SSL support. + * + * @remark Calling this method multiple times with the same \p cert_type + * will replace the previous value. + * + * @remark Calling this method with \p buffer set to NULL will clear the + * configuration for \p cert_type. + * + * @remark The private key may require a password, which must be specified + * with the `ssl.key.password` configuration property prior to + * calling this function. + * + * @remark Private and public keys in PEM format may also be set with the + * `ssl.key.pem` and `ssl.certificate.pem` configuration properties. + * + * @remark CA certificate in PEM format may also be set with the + * `ssl.ca.pem` configuration property. + * + * @remark When librdkafka is linked to OpenSSL 3.0 and the certificate is + * encoded using an obsolete cipher, it might be necessary to set up + * an OpenSSL configuration file to load the "legacy" provider and + * set the OPENSSL_CONF environment variable. + * See + * https://github.com/openssl/openssl/blob/master/README-PROVIDERS.md for more + * information. + */ + virtual Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type, + RdKafka::CertificateEncoding cert_enc, + const void *buffer, + size_t size, + std::string &errstr) = 0; + + /** @brief Query single configuration value + * + * Do not use this method to get callbacks registered by the configuration + * file. Instead use the specific get() methods with the specific callback + * parameter in the signature. + * + * Fallthrough: + * Topic-level configuration properties from the \c default_topic_conf + * may be retrieved using this interface. + * + * @returns CONF_OK if the property was set previously set and + * returns the value in \p value. */ + virtual Conf::ConfResult get(const std::string &name, + std::string &value) const = 0; + + /** @brief Query single configuration value + * @returns CONF_OK if the property was set previously set and + * returns the value in \p dr_cb. */ + virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0; + + /** @brief Query single configuration value + * @returns CONF_OK if the property was set previously set and + * returns the value in \p oauthbearer_token_refresh_cb. */ + virtual Conf::ConfResult get( + OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0; + + /** @brief Query single configuration value + * @returns CONF_OK if the property was set previously set and + * returns the value in \p event_cb. */ + virtual Conf::ConfResult get(EventCb *&event_cb) const = 0; + + /** @brief Query single configuration value + * @returns CONF_OK if the property was set previously set and + * returns the value in \p partitioner_cb. */ + virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0; + + /** @brief Query single configuration value + * @returns CONF_OK if the property was set previously set and + * returns the value in \p partitioner_kp_cb. */ + virtual Conf::ConfResult get( + PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0; + + /** @brief Query single configuration value + * @returns CONF_OK if the property was set previously set and + * returns the value in \p socket_cb. */ + virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0; + + /** @brief Query single configuration value + * @returns CONF_OK if the property was set previously set and + * returns the value in \p open_cb. */ + virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0; + + /** @brief Query single configuration value + * @returns CONF_OK if the property was set previously set and + * returns the value in \p rebalance_cb. */ + virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0; + + /** @brief Query single configuration value + * @returns CONF_OK if the property was set previously set and + * returns the value in \p offset_commit_cb. */ + virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0; + + /** @brief Use with \p name = \c \"ssl_cert_verify_cb\" */ + virtual Conf::ConfResult get( + SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0; + + /** @brief Dump configuration names and values to list containing + * name,value tuples */ + virtual std::list<std::string> *dump() = 0; + + /** @brief Use with \p name = \c \"consume_cb\" */ + virtual Conf::ConfResult set(const std::string &name, + ConsumeCb *consume_cb, + std::string &errstr) = 0; + + /** + * @brief Returns the underlying librdkafka C rd_kafka_conf_t handle. + * + * @warning Calling the C API on this handle is not recommended and there + * is no official support for it, but for cases where the C++ + * does not provide the proper functionality this C handle can be + * used to interact directly with the core librdkafka API. + * + * @remark The lifetime of the returned pointer is the same as the Conf + * object this method is called on. + * + * @remark Include <rdkafka/rdkafka.h> prior to including + * <rdkafka/rdkafkacpp.h> + * + * @returns \c rd_kafka_conf_t* if this is a CONF_GLOBAL object, else NULL. + */ + virtual struct rd_kafka_conf_s *c_ptr_global() = 0; + + /** + * @brief Returns the underlying librdkafka C rd_kafka_topic_conf_t handle. + * + * @warning Calling the C API on this handle is not recommended and there + * is no official support for it, but for cases where the C++ + * does not provide the proper functionality this C handle can be + * used to interact directly with the core librdkafka API. + * + * @remark The lifetime of the returned pointer is the same as the Conf + * object this method is called on. + * + * @remark Include <rdkafka/rdkafka.h> prior to including + * <rdkafka/rdkafkacpp.h> + * + * @returns \c rd_kafka_topic_conf_t* if this is a CONF_TOPIC object, + * else NULL. + */ + virtual struct rd_kafka_topic_conf_s *c_ptr_topic() = 0; + + /** + * @brief Set callback_data for ssl engine. + * + * @remark The \c ssl.engine.location configuration must be set for this + * to have affect. + * + * @remark The memory pointed to by \p value must remain valid for the + * lifetime of the configuration object and any Kafka clients that + * use it. + * + * @returns CONF_OK on success, else CONF_INVALID. + */ + virtual Conf::ConfResult set_engine_callback_data(void *value, + std::string &errstr) = 0; + + + /** @brief Enable/disable creation of a queue specific to SASL events + * and callbacks. + * + * For SASL mechanisms that trigger callbacks (currently OAUTHBEARER) this + * configuration API allows an application to get a dedicated + * queue for the SASL events/callbacks. After enabling the queue with this API + * the application can retrieve the queue by calling + * RdKafka::Handle::get_sasl_queue() on the client instance. + * This queue may then be served directly by the application + * (RdKafka::Queue::poll()) or forwarded to another queue, such as + * the background queue. + * + * A convenience function is available to automatically forward the SASL queue + * to librdkafka's background thread, see + * RdKafka::Handle::sasl_background_callbacks_enable(). + * + * By default (\p enable = false) the main queue (as served by + * RdKafka::Handle::poll(), et.al.) is used for SASL callbacks. + * + * @remark The SASL queue is currently only used by the SASL OAUTHBEARER " + * mechanism's token refresh callback. + */ + virtual Conf::ConfResult enable_sasl_queue(bool enable, + std::string &errstr) = 0; +}; + +/**@}*/ + + +/** + * @name Kafka base client handle + * @{ + * + */ + +/** + * @brief Base handle, super class for specific clients. + */ +class RD_EXPORT Handle { + public: + virtual ~Handle() { + } + + /** @returns the name of the handle */ + virtual std::string name() const = 0; + + /** + * @brief Returns the client's broker-assigned group member id + * + * @remark This currently requires the high-level KafkaConsumer + * + * @returns Last assigned member id, or empty string if not currently + * a group member. + */ + virtual std::string memberid() const = 0; + + + /** + * @brief Polls the provided kafka handle for events. + * + * Events will trigger application provided callbacks to be called. + * + * The \p timeout_ms argument specifies the maximum amount of time + * (in milliseconds) that the call will block waiting for events. + * For non-blocking calls, provide 0 as \p timeout_ms. + * To wait indefinately for events, provide -1. + * + * Events: + * - delivery report callbacks (if an RdKafka::DeliveryCb is configured) + * [producer] + * - event callbacks (if an RdKafka::EventCb is configured) [producer & + * consumer] + * + * @remark An application should make sure to call poll() at regular + * intervals to serve any queued callbacks waiting to be called. + * + * @warning This method MUST NOT be used with the RdKafka::KafkaConsumer, + * use its RdKafka::KafkaConsumer::consume() instead. + * + * @returns the number of events served. + */ + virtual int poll(int timeout_ms) = 0; + + /** + * @brief Returns the current out queue length + * + * The out queue contains messages and requests waiting to be sent to, + * or acknowledged by, the broker. + */ + virtual int outq_len() = 0; + + /** + * @brief Request Metadata from broker. + * + * Parameters: + * \p all_topics - if non-zero: request info about all topics in cluster, + * if zero: only request info about locally known topics. + * \p only_rkt - only request info about this topic + * \p metadatap - pointer to hold metadata result. + * The \p *metadatap pointer must be released with \c + * delete. \p timeout_ms - maximum response time before failing. + * + * @returns RdKafka::ERR_NO_ERROR on success (in which case \p *metadatap + * will be set), else RdKafka::ERR__TIMED_OUT on timeout or + * other error code on error. + */ + virtual ErrorCode metadata(bool all_topics, + const Topic *only_rkt, + Metadata **metadatap, + int timeout_ms) = 0; + + + /** + * @brief Pause producing or consumption for the provided list of partitions. + * + * Success or error is returned per-partition in the \p partitions list. + * + * @returns ErrorCode::NO_ERROR + * + * @sa resume() + */ + virtual ErrorCode pause(std::vector<TopicPartition *> &partitions) = 0; + + + /** + * @brief Resume producing or consumption for the provided list of partitions. + * + * Success or error is returned per-partition in the \p partitions list. + * + * @returns ErrorCode::NO_ERROR + * + * @sa pause() + */ + virtual ErrorCode resume(std::vector<TopicPartition *> &partitions) = 0; + + + /** + * @brief Query broker for low (oldest/beginning) + * and high (newest/end) offsets for partition. + * + * Offsets are returned in \p *low and \p *high respectively. + * + * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure. + */ + virtual ErrorCode query_watermark_offsets(const std::string &topic, + int32_t partition, + int64_t *low, + int64_t *high, + int timeout_ms) = 0; + + /** + * @brief Get last known low (oldest/beginning) + * and high (newest/end) offsets for partition. + * + * The low offset is updated periodically (if statistics.interval.ms is set) + * while the high offset is updated on each fetched message set from the + * broker. + * + * If there is no cached offset (either low or high, or both) then + * OFFSET_INVALID will be returned for the respective offset. + * + * Offsets are returned in \p *low and \p *high respectively. + * + * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure. + * + * @remark Shall only be used with an active consumer instance. + */ + virtual ErrorCode get_watermark_offsets(const std::string &topic, + int32_t partition, + int64_t *low, + int64_t *high) = 0; + + + /** + * @brief Look up the offsets for the given partitions by timestamp. + * + * The returned offset for each partition is the earliest offset whose + * timestamp is greater than or equal to the given timestamp in the + * corresponding partition. + * + * The timestamps to query are represented as \c offset in \p offsets + * on input, and \c offset() will return the closest earlier offset + * for the timestamp on output. + * + * Timestamps are expressed as milliseconds since epoch (UTC). + * + * The function will block for at most \p timeout_ms milliseconds. + * + * @remark Duplicate Topic+Partitions are not supported. + * @remark Errors are also returned per TopicPartition, see \c err() + * + * @returns an error code for general errors, else RdKafka::ERR_NO_ERROR + * in which case per-partition errors might be set. + */ + virtual ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets, + int timeout_ms) = 0; + + + /** + * @brief Retrieve queue for a given partition. + * + * @returns The fetch queue for the given partition if successful. Else, + * NULL is returned. + * + * @remark This function only works on consumers. + */ + virtual Queue *get_partition_queue(const TopicPartition *partition) = 0; + + /** + * @brief Forward librdkafka logs (and debug) to the specified queue + * for serving with one of the ..poll() calls. + * + * This allows an application to serve log callbacks (\c log_cb) + * in its thread of choice. + * + * @param queue Queue to forward logs to. If the value is NULL the logs + * are forwarded to the main queue. + * + * @remark The configuration property \c log.queue MUST also be set to true. + * + * @remark librdkafka maintains its own reference to the provided queue. + * + * @returns ERR_NO_ERROR on success or an error code on error. + */ + virtual ErrorCode set_log_queue(Queue *queue) = 0; + + /** + * @brief Cancels the current callback dispatcher (Handle::poll(), + * KafkaConsumer::consume(), etc). + * + * A callback may use this to force an immediate return to the calling + * code (caller of e.g. Handle::poll()) without processing any further + * events. + * + * @remark This function MUST ONLY be called from within a + * librdkafka callback. + */ + virtual void yield() = 0; + + /** + * @brief Returns the ClusterId as reported in broker metadata. + * + * @param timeout_ms If there is no cached value from metadata retrieval + * then this specifies the maximum amount of time + * (in milliseconds) the call will block waiting + * for metadata to be retrieved. + * Use 0 for non-blocking calls. + * + * @remark Requires broker version >=0.10.0 and api.version.request=true. + * + * @returns Last cached ClusterId, or empty string if no ClusterId could be + * retrieved in the allotted timespan. + */ + virtual std::string clusterid(int timeout_ms) = 0; + + /** + * @brief Returns the underlying librdkafka C rd_kafka_t handle. + * + * @warning Calling the C API on this handle is not recommended and there + * is no official support for it, but for cases where the C++ + * does not provide the proper functionality this C handle can be + * used to interact directly with the core librdkafka API. + * + * @remark The lifetime of the returned pointer is the same as the Topic + * object this method is called on. + * + * @remark Include <rdkafka/rdkafka.h> prior to including + * <rdkafka/rdkafkacpp.h> + * + * @returns \c rd_kafka_t* + */ + virtual struct rd_kafka_s *c_ptr() = 0; + + /** + * @brief Returns the current ControllerId (controller broker id) + * as reported in broker metadata. + * + * @param timeout_ms If there is no cached value from metadata retrieval + * then this specifies the maximum amount of time + * (in milliseconds) the call will block waiting + * for metadata to be retrieved. + * Use 0 for non-blocking calls. + * + * @remark Requires broker version >=0.10.0 and api.version.request=true. + * + * @returns Last cached ControllerId, or -1 if no ControllerId could be + * retrieved in the allotted timespan. + */ + virtual int32_t controllerid(int timeout_ms) = 0; + + + /** + * @brief Returns the first fatal error set on this client instance, + * or ERR_NO_ERROR if no fatal error has occurred. + * + * This function is to be used with the Idempotent Producer and + * the Event class for \c EVENT_ERROR events to detect fatal errors. + * + * Generally all errors raised by the error event are to be considered + * informational and temporary, the client will try to recover from all + * errors in a graceful fashion (by retrying, etc). + * + * However, some errors should logically be considered fatal to retain + * consistency; in particular a set of errors that may occur when using the + * Idempotent Producer and the in-order or exactly-once producer guarantees + * can't be satisfied. + * + * @param errstr A human readable error string if a fatal error was set. + * + * @returns ERR_NO_ERROR if no fatal error has been raised, else + * any other error code. + */ + virtual ErrorCode fatal_error(std::string &errstr) const = 0; + + /** + * @brief Set SASL/OAUTHBEARER token and metadata + * + * @param token_value the mandatory token value to set, often (but not + * necessarily) a JWS compact serialization as per + * https://tools.ietf.org/html/rfc7515#section-3.1. + * @param md_lifetime_ms when the token expires, in terms of the number of + * milliseconds since the epoch. + * @param md_principal_name the Kafka principal name associated with the + * token. + * @param extensions potentially empty SASL extension keys and values where + * element [i] is the key and [i+1] is the key's value, to be communicated + * to the broker as additional key-value pairs during the initial client + * response as per https://tools.ietf.org/html/rfc7628#section-3.1. The + * number of SASL extension keys plus values must be a non-negative multiple + * of 2. Any provided keys and values are copied. + * @param errstr A human readable error string is written here, only if + * there is an error. + * + * The SASL/OAUTHBEARER token refresh callback should invoke + * this method upon success. The extension keys must not include the reserved + * key "`auth`", and all extension keys and values must conform to the + * required format as per https://tools.ietf.org/html/rfc7628#section-3.1: + * + * key = 1*(ALPHA) + * value = *(VCHAR / SP / HTAB / CR / LF ) + * + * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise \p errstr set + * and:<br> + * \c RdKafka::ERR__INVALID_ARG if any of the arguments are + * invalid;<br> + * \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not + * supported by this build;<br> + * \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is + * not configured as the client's authentication mechanism.<br> + * + * @sa RdKafka::oauthbearer_set_token_failure + * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb" + */ + virtual ErrorCode oauthbearer_set_token( + const std::string &token_value, + int64_t md_lifetime_ms, + const std::string &md_principal_name, + const std::list<std::string> &extensions, + std::string &errstr) = 0; + + /** + * @brief SASL/OAUTHBEARER token refresh failure indicator. + * + * @param errstr human readable error reason for failing to acquire a token. + * + * The SASL/OAUTHBEARER token refresh callback should + * invoke this method upon failure to refresh the token. + * + * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise:<br> + * \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not + * supported by this build;<br> + * \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is + * not configured as the client's authentication mechanism. + * + * @sa RdKafka::oauthbearer_set_token + * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb" + */ + virtual ErrorCode oauthbearer_set_token_failure( + const std::string &errstr) = 0; + + /** + * @brief Enable SASL OAUTHBEARER refresh callbacks on the librdkafka + * background thread. + * + * This serves as an alternative for applications that do not + * call RdKafka::Handle::poll() (et.al.) at regular intervals. + */ + virtual Error *sasl_background_callbacks_enable() = 0; + + + /** + * @returns the SASL callback queue, if enabled, else NULL. + * + * @sa RdKafka::Conf::enable_sasl_queue() + */ + virtual Queue *get_sasl_queue() = 0; + + /** + * @returns the librdkafka background thread queue. + */ + virtual Queue *get_background_queue() = 0; + + + + /** + * @brief Allocate memory using the same allocator librdkafka uses. + * + * This is typically an abstraction for the malloc(3) call and makes sure + * the application can use the same memory allocator as librdkafka for + * allocating pointers that are used by librdkafka. + * + * @remark Memory allocated by mem_malloc() must be freed using + * mem_free(). + */ + virtual void *mem_malloc(size_t size) = 0; + + /** + * @brief Free pointer returned by librdkafka + * + * This is typically an abstraction for the free(3) call and makes sure + * the application can use the same memory allocator as librdkafka for + * freeing pointers returned by librdkafka. + * + * In standard setups it is usually not necessary to use this interface + * rather than the free(3) function. + * + * @remark mem_free() must only be used for pointers returned by APIs + * that explicitly mention using this function for freeing. + */ + virtual void mem_free(void *ptr) = 0; + + /** + * @brief Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by + * this Kafka client. + * + * This function sets or resets the SASL username and password credentials + * used by this Kafka client. The new credentials will be used the next time + * this client needs to authenticate to a broker. + * will not disconnect existing connections that might have been made using + * the old credentials. + * + * @remark This function only applies to the SASL PLAIN and SCRAM mechanisms. + * + * @returns NULL on success or an error object on error. + */ + virtual Error *sasl_set_credentials(const std::string &username, + const std::string &password) = 0; +}; + + +/**@}*/ + + +/** + * @name Topic and partition objects + * @{ + * + */ + +/** + * @brief Topic+Partition + * + * This is a generic type to hold a single partition and various + * information about it. + * + * Is typically used with std::vector<RdKafka::TopicPartition*> to provide + * a list of partitions for different operations. + */ +class RD_EXPORT TopicPartition { + public: + /** + * @brief Create topic+partition object for \p topic and \p partition. + * + * Use \c delete to deconstruct. + */ + static TopicPartition *create(const std::string &topic, int partition); + + /** + * @brief Create topic+partition object for \p topic and \p partition + * with offset \p offset. + * + * Use \c delete to deconstruct. + */ + static TopicPartition *create(const std::string &topic, + int partition, + int64_t offset); + + virtual ~TopicPartition() = 0; + + /** + * @brief Destroy/delete the TopicPartitions in \p partitions + * and clear the vector. + */ + static void destroy(std::vector<TopicPartition *> &partitions); + + /** @returns topic name */ + virtual const std::string &topic() const = 0; + + /** @returns partition id */ + virtual int partition() const = 0; + + /** @returns offset (if applicable) */ + virtual int64_t offset() const = 0; + + /** @brief Set offset */ + virtual void set_offset(int64_t offset) = 0; + + /** @returns error code (if applicable) */ + virtual ErrorCode err() const = 0; + + /** @brief Get partition leader epoch, or -1 if not known or relevant. */ + virtual int32_t get_leader_epoch() = 0; + + /** @brief Set partition leader epoch. */ + virtual void set_leader_epoch(int32_t leader_epoch) = 0; +}; + + + +/** + * @brief Topic handle + * + */ +class RD_EXPORT Topic { + public: + /** + * @brief Unassigned partition. + * + * The unassigned partition is used by the producer API for messages + * that should be partitioned using the configured or default partitioner. + */ + static const int32_t PARTITION_UA; + + /** @brief Special offsets */ + static const int64_t OFFSET_BEGINNING; /**< Consume from beginning */ + static const int64_t OFFSET_END; /**< Consume from end */ + static const int64_t OFFSET_STORED; /**< Use offset storage */ + static const int64_t OFFSET_INVALID; /**< Invalid offset */ + + + /** + * @brief Creates a new topic handle for topic named \p topic_str + * + * \p conf is an optional configuration for the topic that will be used + * instead of the default topic configuration. + * The \p conf object is reusable after this call. + * + * @returns the new topic handle or NULL on error (see \p errstr). + */ + static Topic *create(Handle *base, + const std::string &topic_str, + const Conf *conf, + std::string &errstr); + + virtual ~Topic() = 0; + + + /** @returns the topic name */ + virtual std::string name() const = 0; + + /** + * @returns true if \p partition is available for the topic (has leader). + * @warning \b MUST \b ONLY be called from within a + * RdKafka::PartitionerCb callback. + */ + virtual bool partition_available(int32_t partition) const = 0; + + /** + * @brief Store offset \p offset + 1 for topic partition \p partition. + * The offset will be committed (written) to the broker (or file) according + * to \p auto.commit.interval.ms or next manual offset-less commit call. + * + * @deprecated This API lacks support for partition leader epochs, which makes + * it at risk for unclean leader election log truncation issues. + * Use KafkaConsumer::offsets_store() or + * Message::offset_store() instead. + * + * @remark \c enable.auto.offset.store must be set to \c false when using + * this API. + * + * @returns RdKafka::ERR_NO_ERROR on success or an error code if none of the + * offsets could be stored. + */ + virtual ErrorCode offset_store(int32_t partition, int64_t offset) = 0; + + /** + * @brief Returns the underlying librdkafka C rd_kafka_topic_t handle. + * + * @warning Calling the C API on this handle is not recommended and there + * is no official support for it, but for cases where the C++ API + * does not provide the underlying functionality this C handle can be + * used to interact directly with the core librdkafka API. + * + * @remark The lifetime of the returned pointer is the same as the Topic + * object this method is called on. + * + * @remark Include <rdkafka/rdkafka.h> prior to including + * <rdkafka/rdkafkacpp.h> + * + * @returns \c rd_kafka_topic_t* + */ + virtual struct rd_kafka_topic_s *c_ptr() = 0; +}; + + +/**@}*/ + + +/** + * @name Message object + * @{ + * + */ + + +/** + * @brief Message timestamp object + * + * Represents the number of milliseconds since the epoch (UTC). + * + * The MessageTimestampType dictates the timestamp type or origin. + * + * @remark Requires Apache Kafka broker version >= 0.10.0 + * + */ + +class RD_EXPORT MessageTimestamp { + public: + /*! Message timestamp type */ + enum MessageTimestampType { + MSG_TIMESTAMP_NOT_AVAILABLE, /**< Timestamp not available */ + MSG_TIMESTAMP_CREATE_TIME, /**< Message creation time (source) */ + MSG_TIMESTAMP_LOG_APPEND_TIME /**< Message log append time (broker) */ + }; + + MessageTimestampType type; /**< Timestamp type */ + int64_t timestamp; /**< Milliseconds since epoch (UTC). */ +}; + + +/** + * @brief Headers object + * + * Represents message headers. + * + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers + * + * @remark Requires Apache Kafka >= 0.11.0 brokers + */ +class RD_EXPORT Headers { + public: + virtual ~Headers() = 0; + + /** + * @brief Header object + * + * This object represents a single Header with a key value pair + * and an ErrorCode + * + * @remark dynamic allocation of this object is not supported. + */ + class Header { + public: + /** + * @brief Header object to encapsulate a single Header + * + * @param key the string value for the header key + * @param value the bytes of the header value, or NULL + * @param value_size the length in bytes of the header value + * + * @remark key and value are copied. + * + */ + Header(const std::string &key, const void *value, size_t value_size) : + key_(key), err_(ERR_NO_ERROR), value_size_(value_size) { + value_ = copy_value(value, value_size); + } + + /** + * @brief Header object to encapsulate a single Header + * + * @param key the string value for the header key + * @param value the bytes of the header value + * @param value_size the length in bytes of the header value + * @param err the error code if one returned + * + * @remark The error code is used for when the Header is constructed + * internally by using RdKafka::Headers::get_last which constructs + * a Header encapsulating the ErrorCode in the process. + * If err is set, the value and value_size fields will be undefined. + */ + Header(const std::string &key, + const void *value, + size_t value_size, + const RdKafka::ErrorCode err) : + key_(key), err_(err), value_(NULL), value_size_(value_size) { + if (err == ERR_NO_ERROR) + value_ = copy_value(value, value_size); + } + + /** + * @brief Copy constructor + * + * @param other Header to make a copy of. + */ + Header(const Header &other) : + key_(other.key_), err_(other.err_), value_size_(other.value_size_) { + value_ = copy_value(other.value_, value_size_); + } + + /** + * @brief Assignment operator + * + * @param other Header to make a copy of. + */ + Header &operator=(const Header &other) { + if (&other == this) { + return *this; + } + + key_ = other.key_; + err_ = other.err_; + value_size_ = other.value_size_; + + if (value_ != NULL) + mem_free(value_); + + value_ = copy_value(other.value_, value_size_); + + return *this; + } + + ~Header() { + if (value_ != NULL) + mem_free(value_); + } + + /** @returns the key/name associated with this Header */ + std::string key() const { + return key_; + } + + /** @returns returns the binary value, or NULL */ + const void *value() const { + return value_; + } + + /** @returns returns the value casted to a nul-terminated C string, + * or NULL. */ + const char *value_string() const { + return static_cast<const char *>(value_); + } + + /** @returns Value Size the length of the Value in bytes */ + size_t value_size() const { + return value_size_; + } + + /** @returns the error code of this Header (usually ERR_NO_ERROR) */ + RdKafka::ErrorCode err() const { + return err_; + } + + private: + char *copy_value(const void *value, size_t value_size) { + if (!value) + return NULL; + + char *dest = (char *)mem_malloc(value_size + 1); + memcpy(dest, (const char *)value, value_size); + dest[value_size] = '\0'; + + return dest; + } + + std::string key_; + RdKafka::ErrorCode err_; + char *value_; + size_t value_size_; + void *operator new(size_t); /* Prevent dynamic allocation */ + }; + + /** + * @brief Create a new instance of the Headers object + * + * @returns an empty Headers list + */ + static Headers *create(); + + /** + * @brief Create a new instance of the Headers object from a std::vector + * + * @param headers std::vector of RdKafka::Headers::Header objects. + * The headers are copied, not referenced. + * + * @returns a Headers list from std::vector set to the size of the std::vector + */ + static Headers *create(const std::vector<Header> &headers); + + /** + * @brief Adds a Header to the end of the list. + * + * @param key header key/name + * @param value binary value, or NULL + * @param value_size size of the value + * + * @returns an ErrorCode signalling success or failure to add the header. + */ + virtual ErrorCode add(const std::string &key, + const void *value, + size_t value_size) = 0; + + /** + * @brief Adds a Header to the end of the list. + * + * Convenience method for adding a std::string as a value for the header. + * + * @param key header key/name + * @param value value string + * + * @returns an ErrorCode signalling success or failure to add the header. + */ + virtual ErrorCode add(const std::string &key, const std::string &value) = 0; + + /** + * @brief Adds a Header to the end of the list. + * + * This method makes a copy of the passed header. + * + * @param header Existing header to copy + * + * @returns an ErrorCode signalling success or failure to add the header. + */ + virtual ErrorCode add(const Header &header) = 0; + + /** + * @brief Removes all the Headers of a given key + * + * @param key header key/name to remove + * + * @returns An ErrorCode signalling a success or failure to remove the Header. + */ + virtual ErrorCode remove(const std::string &key) = 0; + + /** + * @brief Gets all of the Headers of a given key + * + * @param key header key/name + * + * @remark If duplicate keys exist this will return them all as a std::vector + * + * @returns a std::vector containing all the Headers of the given key. + */ + virtual std::vector<Header> get(const std::string &key) const = 0; + + /** + * @brief Gets the last occurrence of a Header of a given key + * + * @param key header key/name + * + * @remark This will only return the most recently added header + * + * @returns the Header if found, otherwise a Header with an err set to + * ERR__NOENT. + */ + virtual Header get_last(const std::string &key) const = 0; + + /** + * @brief Returns all Headers + * + * @returns a std::vector containing all of the Headers + */ + virtual std::vector<Header> get_all() const = 0; + + /** + * @returns the number of headers. + */ + virtual size_t size() const = 0; +}; + + +/** + * @brief Message object + * + * This object represents either a single consumed or produced message, + * or an event (\p err() is set). + * + * An application must check RdKafka::Message::err() to see if the + * object is a proper message (error is RdKafka::ERR_NO_ERROR) or a + * an error event. + * + */ +class RD_EXPORT Message { + public: + /** @brief Message persistence status can be used by the application to + * find out if a produced message was persisted in the topic log. */ + enum Status { + /** Message was never transmitted to the broker, or failed with + * an error indicating it was not written to the log. + * Application retry risks ordering, but not duplication. */ + MSG_STATUS_NOT_PERSISTED = 0, + + /** Message was transmitted to broker, but no acknowledgement was + * received. + * Application retry risks ordering and duplication. */ + MSG_STATUS_POSSIBLY_PERSISTED = 1, + + /** Message was written to the log and fully acknowledged. + * No reason for application to retry. + * Note: this value should only be trusted with \c acks=all. */ + MSG_STATUS_PERSISTED = 2, + }; + + /** + * @brief Accessor functions* + * @remark Not all fields are present in all types of callbacks. + */ + + /** @returns The error string if object represent an error event, + * else an empty string. */ + virtual std::string errstr() const = 0; + + /** @returns The error code if object represents an error event, else 0. */ + virtual ErrorCode err() const = 0; + + /** @returns the RdKafka::Topic object for a message (if applicable), + * or NULL if a corresponding RdKafka::Topic object has not been + * explicitly created with RdKafka::Topic::create(). + * In this case use topic_name() instead. */ + virtual Topic *topic() const = 0; + + /** @returns Topic name (if applicable, else empty string) */ + virtual std::string topic_name() const = 0; + + /** @returns Partition (if applicable) */ + virtual int32_t partition() const = 0; + + /** @returns Message payload (if applicable) */ + virtual void *payload() const = 0; + + /** @returns Message payload length (if applicable) */ + virtual size_t len() const = 0; + + /** @returns Message key as string (if applicable) */ + virtual const std::string *key() const = 0; + + /** @returns Message key as void pointer (if applicable) */ + virtual const void *key_pointer() const = 0; + + /** @returns Message key's binary length (if applicable) */ + virtual size_t key_len() const = 0; + + /** @returns Message or error offset (if applicable) */ + virtual int64_t offset() const = 0; + + /** @returns Message timestamp (if applicable) */ + virtual MessageTimestamp timestamp() const = 0; + + /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */ + virtual void *msg_opaque() const = 0; + + virtual ~Message() = 0; + + /** @returns the latency in microseconds for a produced message measured + * from the produce() call, or -1 if latency is not available. */ + virtual int64_t latency() const = 0; + + /** + * @brief Returns the underlying librdkafka C rd_kafka_message_t handle. + * + * @warning Calling the C API on this handle is not recommended and there + * is no official support for it, but for cases where the C++ API + * does not provide the underlying functionality this C handle can be + * used to interact directly with the core librdkafka API. + * + * @remark The lifetime of the returned pointer is the same as the Message + * object this method is called on. + * + * @remark Include <rdkafka/rdkafka.h> prior to including + * <rdkafka/rdkafkacpp.h> + * + * @returns \c rd_kafka_message_t* + */ + virtual struct rd_kafka_message_s *c_ptr() = 0; + + /** + * @brief Returns the message's persistence status in the topic log. + */ + virtual Status status() const = 0; + + /** @returns the Headers instance for this Message, or NULL if there + * are no headers. + * + * @remark The lifetime of the Headers are the same as the Message. */ + virtual RdKafka::Headers *headers() = 0; + + /** @returns the Headers instance for this Message (if applicable). + * If NULL is returned the reason is given in \p err, which + * is either ERR__NOENT if there were no headers, or another + * error code if header parsing failed. + * + * @remark The lifetime of the Headers are the same as the Message. */ + virtual RdKafka::Headers *headers(RdKafka::ErrorCode *err) = 0; + + /** @returns the broker id of the broker the message was produced to or + * fetched from, or -1 if not known/applicable. */ + virtual int32_t broker_id() const = 0; + + /** @returns the message's partition leader epoch at the time the message was + * fetched and if known, else -1. */ + virtual int32_t leader_epoch() const = 0; + + /** + * @brief Store offset +1 for the consumed message. + * + * The message offset + 1 will be committed to broker according + * to \c `auto.commit.interval.ms` or manual offset-less commit() + * + * @warning This method may only be called for partitions that are currently + * assigned. + * Non-assigned partitions will fail with ERR__STATE. + * + * @warning Avoid storing offsets after calling seek() (et.al) as + * this may later interfere with resuming a paused partition, instead + * store offsets prior to calling seek. + * + * @remark \c `enable.auto.offset.store` must be set to "false" when using + * this API. + * + * @returns NULL on success or an error object on failure. + */ + virtual Error *offset_store() = 0; +}; + +/**@}*/ + + +/** + * @name Queue interface + * @{ + * + */ + + +/** + * @brief Queue interface + * + * Create a new message queue. Message queues allows the application + * to re-route consumed messages from multiple topic+partitions into + * one single queue point. This queue point, containing messages from + * a number of topic+partitions, may then be served by a single + * consume() method, rather than one per topic+partition combination. + * + * See the RdKafka::Consumer::start(), RdKafka::Consumer::consume(), and + * RdKafka::Consumer::consume_callback() methods that take a queue as the first + * parameter for more information. + */ +class RD_EXPORT Queue { + public: + /** + * @brief Create Queue object + */ + static Queue *create(Handle *handle); + + /** + * @brief Forward/re-route queue to \p dst. + * If \p dst is \c NULL, the forwarding is removed. + * + * The internal refcounts for both queues are increased. + * + * @remark Regardless of whether \p dst is NULL or not, after calling this + * function, \p src will not forward it's fetch queue to the consumer + * queue. + */ + virtual ErrorCode forward(Queue *dst) = 0; + + + /** + * @brief Consume message or get error event from the queue. + * + * @remark Use \c delete to free the message. + * + * @returns One of: + * - proper message (RdKafka::Message::err() is ERR_NO_ERROR) + * - error event (RdKafka::Message::err() is != ERR_NO_ERROR) + * - timeout due to no message or event in \p timeout_ms + * (RdKafka::Message::err() is ERR__TIMED_OUT) + */ + virtual Message *consume(int timeout_ms) = 0; + + /** + * @brief Poll queue, serving any enqueued callbacks. + * + * @remark Must NOT be used for queues containing messages. + * + * @returns the number of events served or 0 on timeout. + */ + virtual int poll(int timeout_ms) = 0; + + virtual ~Queue() = 0; + + /** + * @brief Enable IO event triggering for queue. + * + * To ease integration with IO based polling loops this API + * allows an application to create a separate file-descriptor + * that librdkafka will write \p payload (of size \p size) to + * whenever a new element is enqueued on a previously empty queue. + * + * To remove event triggering call with \p fd = -1. + * + * librdkafka will maintain a copy of the \p payload. + * + * @remark When using forwarded queues the IO event must only be enabled + * on the final forwarded-to (destination) queue. + */ + virtual void io_event_enable(int fd, const void *payload, size_t size) = 0; +}; + +/**@}*/ + +/** + * @name ConsumerGroupMetadata + * @{ + * + */ +/** + * @brief ConsumerGroupMetadata holds a consumer instance's group + * metadata state. + * + * This class currently does not have any public methods. + */ +class RD_EXPORT ConsumerGroupMetadata { + public: + virtual ~ConsumerGroupMetadata() = 0; +}; + +/**@}*/ + +/** + * @name KafkaConsumer + * @{ + * + */ + + +/** + * @brief High-level KafkaConsumer (for brokers 0.9 and later) + * + * @remark Requires Apache Kafka >= 0.9.0 brokers + * + * Currently supports the \c range and \c roundrobin partition assignment + * strategies (see \c partition.assignment.strategy) + */ +class RD_EXPORT KafkaConsumer : public virtual Handle { + public: + /** + * @brief Creates a KafkaConsumer. + * + * The \p conf object must have \c group.id set to the consumer group to join. + * + * Use RdKafka::KafkaConsumer::close() to shut down the consumer. + * + * @sa RdKafka::RebalanceCb + * @sa CONFIGURATION.md for \c group.id, \c session.timeout.ms, + * \c partition.assignment.strategy, etc. + */ + static KafkaConsumer *create(const Conf *conf, std::string &errstr); + + virtual ~KafkaConsumer() = 0; + + + /** @brief Returns the current partition assignment as set by + * RdKafka::KafkaConsumer::assign() */ + virtual ErrorCode assignment( + std::vector<RdKafka::TopicPartition *> &partitions) = 0; + + /** @brief Returns the current subscription as set by + * RdKafka::KafkaConsumer::subscribe() */ + virtual ErrorCode subscription(std::vector<std::string> &topics) = 0; + + /** + * @brief Update the subscription set to \p topics. + * + * Any previous subscription will be unassigned and unsubscribed first. + * + * The subscription set denotes the desired topics to consume and this + * set is provided to the partition assignor (one of the elected group + * members) for all clients which then uses the configured + * \c partition.assignment.strategy to assign the subscription sets's + * topics's partitions to the consumers, depending on their subscription. + * + * The result of such an assignment is a rebalancing which is either + * handled automatically in librdkafka or can be overridden by the application + * by providing a RdKafka::RebalanceCb. + * + * The rebalancing passes the assigned partition set to + * RdKafka::KafkaConsumer::assign() to update what partitions are actually + * being fetched by the KafkaConsumer. + * + * Regex pattern matching automatically performed for topics prefixed + * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\" + * + * @remark A consumer error will be raised for each unavailable topic in the + * \p topics. The error will be ERR_UNKNOWN_TOPIC_OR_PART + * for non-existent topics, and + * ERR_TOPIC_AUTHORIZATION_FAILED for unauthorized topics. + * The consumer error will be raised through consume() (et.al.) + * with the \c RdKafka::Message::err() returning one of the + * error codes mentioned above. + * The subscribe function itself is asynchronous and will not return + * an error on unavailable topics. + * + * @returns an error if the provided list of topics is invalid. + */ + virtual ErrorCode subscribe(const std::vector<std::string> &topics) = 0; + + /** @brief Unsubscribe from the current subscription set. */ + virtual ErrorCode unsubscribe() = 0; + + /** + * @brief Update the assignment set to \p partitions. + * + * The assignment set is the set of partitions actually being consumed + * by the KafkaConsumer. + */ + virtual ErrorCode assign(const std::vector<TopicPartition *> &partitions) = 0; + + /** + * @brief Stop consumption and remove the current assignment. + */ + virtual ErrorCode unassign() = 0; + + /** + * @brief Consume message or get error event, triggers callbacks. + * + * Will automatically call registered callbacks for any such queued events, + * including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb, + * etc. + * + * @remark Use \c delete to free the message. + * + * @remark An application should make sure to call consume() at regular + * intervals, even if no messages are expected, to serve any + * queued callbacks waiting to be called. This is especially + * important when a RebalanceCb has been registered as it needs + * to be called and handled properly to synchronize internal + * consumer state. + * + * @remark Application MUST NOT call \p poll() on KafkaConsumer objects. + * + * @returns One of: + * - proper message (RdKafka::Message::err() is ERR_NO_ERROR) + * - error event (RdKafka::Message::err() is != ERR_NO_ERROR) + * - timeout due to no message or event in \p timeout_ms + * (RdKafka::Message::err() is ERR__TIMED_OUT) + */ + virtual Message *consume(int timeout_ms) = 0; + + /** + * @brief Commit offsets for the current assignment. + * + * @remark This is the synchronous variant that blocks until offsets + * are committed or the commit fails (see return value). + * + * @remark If a RdKafka::OffsetCommitCb callback is registered it will + * be called with commit details on a future call to + * RdKafka::KafkaConsumer::consume() + + * + * @returns ERR_NO_ERROR or error code. + */ + virtual ErrorCode commitSync() = 0; + + /** + * @brief Asynchronous version of RdKafka::KafkaConsumer::CommitSync() + * + * @sa RdKafka::KafkaConsumer::commitSync() + */ + virtual ErrorCode commitAsync() = 0; + + /** + * @brief Commit offset for a single topic+partition based on \p message + * + * @remark The offset committed will be the message's offset + 1. + * + * @remark This is the synchronous variant. + * + * @sa RdKafka::KafkaConsumer::commitSync() + */ + virtual ErrorCode commitSync(Message *message) = 0; + + /** + * @brief Commit offset for a single topic+partition based on \p message + * + * @remark The offset committed will be the message's offset + 1. + * + * @remark This is the asynchronous variant. + * + * @sa RdKafka::KafkaConsumer::commitSync() + */ + virtual ErrorCode commitAsync(Message *message) = 0; + + /** + * @brief Commit offsets for the provided list of partitions. + * + * @remark The \c .offset of the partitions in \p offsets should be the + * offset where consumption will resume, i.e., the last + * processed offset + 1. + * + * @remark This is the synchronous variant. + */ + virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets) = 0; + + /** + * @brief Commit offset for the provided list of partitions. + * + * @remark The \c .offset of the partitions in \p offsets should be the + * offset where consumption will resume, i.e., the last + * processed offset + 1. + * + * @remark This is the asynchronous variant. + */ + virtual ErrorCode commitAsync( + const std::vector<TopicPartition *> &offsets) = 0; + + /** + * @brief Commit offsets for the current assignment. + * + * @remark This is the synchronous variant that blocks until offsets + * are committed or the commit fails (see return value). + * + * @remark The provided callback will be called from this function. + * + * @returns ERR_NO_ERROR or error code. + */ + virtual ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) = 0; + + /** + * @brief Commit offsets for the provided list of partitions. + * + * @remark This is the synchronous variant that blocks until offsets + * are committed or the commit fails (see return value). + * + * @remark The provided callback will be called from this function. + * + * @returns ERR_NO_ERROR or error code. + */ + virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets, + OffsetCommitCb *offset_commit_cb) = 0; + + + + /** + * @brief Retrieve committed offsets for topics+partitions. + * + * @returns ERR_NO_ERROR on success in which case the + * \p offset or \p err field of each \p partitions' element is filled + * in with the stored offset, or a partition specific error. + * Else returns an error code. + */ + virtual ErrorCode committed(std::vector<TopicPartition *> &partitions, + int timeout_ms) = 0; + + /** + * @brief Retrieve current positions (offsets) for topics+partitions. + * + * @returns ERR_NO_ERROR on success in which case the + * \p offset or \p err field of each \p partitions' element is filled + * in with the stored offset, or a partition specific error. + * Else returns an error code. + */ + virtual ErrorCode position(std::vector<TopicPartition *> &partitions) = 0; + + + /** + * For pausing and resuming consumption, see + * @sa RdKafka::Handle::pause() and RdKafka::Handle::resume() + */ + + + /** + * @brief Close and shut down the consumer. + * + * This call will block until the following operations are finished: + * - Trigger a local rebalance to void the current assignment (if any). + * - Stop consumption for current assignment (if any). + * - Commit offsets (if any). + * - Leave group (if applicable). + * + * The maximum blocking time is roughly limited to session.timeout.ms. + * + * @remark Callbacks, such as RdKafka::RebalanceCb and + * RdKafka::OffsetCommitCb, etc, may be called. + * + * @remark The consumer object must later be freed with \c delete + */ + virtual ErrorCode close() = 0; + + + /** + * @brief Seek consumer for topic+partition to offset which is either an + * absolute or logical offset. + * + * If \p timeout_ms is not 0 the call will wait this long for the + * seek to be performed. If the timeout is reached the internal state + * will be unknown and this function returns `ERR__TIMED_OUT`. + * If \p timeout_ms is 0 it will initiate the seek but return + * immediately without any error reporting (e.g., async). + * + * This call triggers a fetch queue barrier flush. + * + * @remark Consumption for the given partition must have started for the + * seek to work. Use assign() to set the starting offset. + * + * @returns an ErrorCode to indicate success or failure. + */ + virtual ErrorCode seek(const TopicPartition &partition, int timeout_ms) = 0; + + + /** + * @brief Store offset \p offset for topic partition \p partition. + * The offset will be committed (written) to the offset store according + * to \p auto.commit.interval.ms or the next manual offset-less commit*() + * + * Per-partition success/error status propagated through TopicPartition.err() + * + * @remark The \c .offset field is stored as is, it will NOT be + 1. + * + * @remark \c enable.auto.offset.store must be set to \c false when using + * this API. + * + * @remark The leader epoch, if set, will be used to fence outdated partition + * leaders. See TopicPartition::set_leader_epoch(). + * + * @returns RdKafka::ERR_NO_ERROR on success, or + * RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could + * be stored, or + * RdKafka::ERR___INVALID_ARG if \c enable.auto.offset.store is true. + */ + virtual ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) = 0; + + + /** + * @returns the current consumer group metadata associated with this consumer, + * or NULL if the consumer is configured with a \c group.id. + * This metadata object should be passed to the transactional + * producer's RdKafka::Producer::send_offsets_to_transaction() API. + * + * @remark The returned object must be deleted by the application. + * + * @sa RdKafka::Producer::send_offsets_to_transaction() + */ + virtual ConsumerGroupMetadata *groupMetadata() = 0; + + + /** @brief Check whether the consumer considers the current assignment to + * have been lost involuntarily. This method is only applicable for + * use with a subscribing consumer. Assignments are revoked + * immediately when determined to have been lost, so this method is + * only useful within a rebalance callback. Partitions that have + * been lost may already be owned by other members in the group and + * therefore commiting offsets, for example, may fail. + * + * @remark Calling assign(), incremental_assign() or incremental_unassign() + * resets this flag. + * + * @returns Returns true if the current partition assignment is considered + * lost, false otherwise. + */ + virtual bool assignment_lost() = 0; + + /** + * @brief The rebalance protocol currently in use. This will be + * "NONE" if the consumer has not (yet) joined a group, else it will + * match the rebalance protocol ("EAGER", "COOPERATIVE") of the + * configured and selected assignor(s). All configured + * assignors must have the same protocol type, meaning + * online migration of a consumer group from using one + * protocol to another (in particular upgading from EAGER + * to COOPERATIVE) without a restart is not currently + * supported. + * + * @returns an empty string on error, or one of + * "NONE", "EAGER", "COOPERATIVE" on success. + */ + + virtual std::string rebalance_protocol() = 0; + + + /** + * @brief Incrementally add \p partitions to the current assignment. + * + * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, + * this method should be used in a rebalance callback to adjust the current + * assignment appropriately in the case where the rebalance type is + * ERR__ASSIGN_PARTITIONS. The application must pass the partition list + * passed to the callback (or a copy of it), even if the list is empty. + * This method may also be used outside the context of a rebalance callback. + * + * @returns NULL on success, or an error object if the operation was + * unsuccessful. + * + * @remark The returned object must be deleted by the application. + */ + virtual Error *incremental_assign( + const std::vector<TopicPartition *> &partitions) = 0; + + + /** + * @brief Incrementally remove \p partitions from the current assignment. + * + * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, + * this method should be used in a rebalance callback to adjust the current + * assignment appropriately in the case where the rebalance type is + * ERR__REVOKE_PARTITIONS. The application must pass the partition list + * passed to the callback (or a copy of it), even if the list is empty. + * This method may also be used outside the context of a rebalance callback. + * + * @returns NULL on success, or an error object if the operation was + * unsuccessful. + * + * @remark The returned object must be deleted by the application. + */ + virtual Error *incremental_unassign( + const std::vector<TopicPartition *> &partitions) = 0; + + /** + * @brief Close and shut down the consumer. + * + * Performs the same actions as RdKafka::KafkaConsumer::close() but in a + * background thread. + * + * Rebalance events/callbacks (etc) will be forwarded to the + * application-provided \p queue. The application must poll this queue until + * RdKafka::KafkaConsumer::closed() returns true. + * + * @remark Depending on consumer group join state there may or may not be + * rebalance events emitted on \p rkqu. + * + * @returns an error object if the consumer close failed, else NULL. + * + * @sa RdKafka::KafkaConsumer::closed() + */ + virtual Error *close(Queue *queue) = 0; + + + /** @returns true if the consumer is closed, else 0. + * + * @sa RdKafka::KafkaConsumer::close() + */ + virtual bool closed() = 0; +}; + + +/**@}*/ + + +/** + * @name Simple Consumer (legacy) + * @{ + * + */ + +/** + * @brief Simple Consumer (legacy) + * + * A simple non-balanced, non-group-aware, consumer. + */ +class RD_EXPORT Consumer : public virtual Handle { + public: + /** + * @brief Creates a new Kafka consumer handle. + * + * \p conf is an optional object that will be used instead of the default + * configuration. + * The \p conf object is reusable after this call. + * + * @returns the new handle on success or NULL on error in which case + * \p errstr is set to a human readable error message. + */ + static Consumer *create(const Conf *conf, std::string &errstr); + + virtual ~Consumer() = 0; + + + /** + * @brief Start consuming messages for topic and \p partition + * at offset \p offset which may either be a proper offset (0..N) + * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END. + * + * rdkafka will attempt to keep \p queued.min.messages (config property) + * messages in the local queue by repeatedly fetching batches of messages + * from the broker until the threshold is reached. + * + * The application shall use one of the \p ..->consume*() functions + * to consume messages from the local queue, each kafka message being + * represented as a `RdKafka::Message *` object. + * + * \p ..->start() must not be called multiple times for the same + * topic and partition without stopping consumption first with + * \p ..->stop(). + * + * @returns an ErrorCode to indicate success or failure. + */ + virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset) = 0; + + /** + * @brief Start consuming messages for topic and \p partition on + * queue \p queue. + * + * @sa RdKafka::Consumer::start() + */ + virtual ErrorCode start(Topic *topic, + int32_t partition, + int64_t offset, + Queue *queue) = 0; + + /** + * @brief Stop consuming messages for topic and \p partition, purging + * all messages currently in the local queue. + * + * The application needs to be stop all consumers before destroying + * the Consumer handle. + * + * @returns an ErrorCode to indicate success or failure. + */ + virtual ErrorCode stop(Topic *topic, int32_t partition) = 0; + + /** + * @brief Seek consumer for topic+partition to \p offset which is either an + * absolute or logical offset. + * + * If \p timeout_ms is not 0 the call will wait this long for the + * seek to be performed. If the timeout is reached the internal state + * will be unknown and this function returns `ERR__TIMED_OUT`. + * If \p timeout_ms is 0 it will initiate the seek but return + * immediately without any error reporting (e.g., async). + * + * This call triggers a fetch queue barrier flush. + * + * @returns an ErrorCode to indicate success or failure. + */ + virtual ErrorCode seek(Topic *topic, + int32_t partition, + int64_t offset, + int timeout_ms) = 0; + + /** + * @brief Consume a single message from \p topic and \p partition. + * + * \p timeout_ms is maximum amount of time to wait for a message to be + * received. + * Consumer must have been previously started with \p ..->start(). + * + * @returns a Message object, the application needs to check if message + * is an error or a proper message RdKafka::Message::err() and checking for + * \p ERR_NO_ERROR. + * + * The message object must be destroyed when the application is done with it. + * + * Errors (in RdKafka::Message::err()): + * - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched. + * - ERR__PARTITION_EOF - End of partition reached, not an error. + */ + virtual Message *consume(Topic *topic, int32_t partition, int timeout_ms) = 0; + + /** + * @brief Consume a single message from the specified queue. + * + * \p timeout_ms is maximum amount of time to wait for a message to be + * received. + * Consumer must have been previously started on the queue with + * \p ..->start(). + * + * @returns a Message object, the application needs to check if message + * is an error or a proper message \p Message->err() and checking for + * \p ERR_NO_ERROR. + * + * The message object must be destroyed when the application is done with it. + * + * Errors (in RdKafka::Message::err()): + * - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched + * + * Note that Message->topic() may be nullptr after certain kinds of + * errors, so applications should check that it isn't null before + * dereferencing it. + */ + virtual Message *consume(Queue *queue, int timeout_ms) = 0; + + /** + * @brief Consumes messages from \p topic and \p partition, calling + * the provided callback for each consumed messsage. + * + * \p consume_callback() provides higher throughput performance + * than \p consume(). + * + * \p timeout_ms is the maximum amount of time to wait for one or + * more messages to arrive. + * + * The provided \p consume_cb instance has its \p consume_cb function + * called for every message received. + * + * The \p opaque argument is passed to the \p consume_cb as \p opaque. + * + * @returns the number of messages processed or -1 on error. + * + * @sa RdKafka::Consumer::consume() + */ + virtual int consume_callback(Topic *topic, + int32_t partition, + int timeout_ms, + ConsumeCb *consume_cb, + void *opaque) = 0; + + /** + * @brief Consumes messages from \p queue, calling the provided callback for + * each consumed messsage. + * + * @sa RdKafka::Consumer::consume_callback() + */ + virtual int consume_callback(Queue *queue, + int timeout_ms, + RdKafka::ConsumeCb *consume_cb, + void *opaque) = 0; + + /** + * @brief Converts an offset into the logical offset from the tail of a topic. + * + * \p offset is the (positive) number of items from the end. + * + * @returns the logical offset for message \p offset from the tail, this value + * may be passed to Consumer::start, et.al. + * @remark The returned logical offset is specific to librdkafka. + */ + static int64_t OffsetTail(int64_t offset); +}; + +/**@}*/ + + +/** + * @name Producer + * @{ + * + */ + + +/** + * @brief Producer + */ +class RD_EXPORT Producer : public virtual Handle { + public: + /** + * @brief Creates a new Kafka producer handle. + * + * \p conf is an optional object that will be used instead of the default + * configuration. + * The \p conf object is reusable after this call. + * + * @returns the new handle on success or NULL on error in which case + * \p errstr is set to a human readable error message. + */ + static Producer *create(const Conf *conf, std::string &errstr); + + + virtual ~Producer() = 0; + + /** + * @brief RdKafka::Producer::produce() \p msgflags + * + * These flags are optional. + */ + enum { + RK_MSG_FREE = 0x1, /**< rdkafka will free(3) \p payload + * when it is done with it. + * Mutually exclusive with RK_MSG_COPY. */ + RK_MSG_COPY = 0x2, /**< the \p payload data will be copied + * and the \p payload pointer will not + * be used by rdkafka after the + * call returns. + * Mutually exclusive with RK_MSG_FREE. */ + RK_MSG_BLOCK = 0x4 /**< Block produce*() on message queue + * full. + * WARNING: + * If a delivery report callback + * is used the application MUST + * call rd_kafka_poll() (or equiv.) + * to make sure delivered messages + * are drained from the internal + * delivery report queue. + * Failure to do so will result + * in indefinately blocking on + * the produce() call when the + * message queue is full. + */ + + + /**@cond NO_DOC*/ + /* For backwards compatibility: */ +#ifndef MSG_COPY /* defined in sys/msg.h */ + , /** this comma must exist betwen + * RK_MSG_BLOCK and MSG_FREE + */ + MSG_FREE = RK_MSG_FREE, + MSG_COPY = RK_MSG_COPY +#endif + /**@endcond*/ + }; + + /** + * @brief Produce and send a single message to broker. + * + * This is an asynch non-blocking API. + * + * \p partition is the target partition, either: + * - RdKafka::Topic::PARTITION_UA (unassigned) for + * automatic partitioning using the topic's partitioner function, or + * - a fixed partition (0..N) + * + * \p msgflags is zero or more of the following flags OR:ed together: + * RK_MSG_BLOCK - block \p produce*() call if + * \p queue.buffering.max.messages or + * \p queue.buffering.max.kbytes are exceeded. + * Messages are considered in-queue from the point they + * are accepted by produce() until their corresponding + * delivery report callback/event returns. + * It is thus a requirement to call + * poll() (or equiv.) from a separate + * thread when RK_MSG_BLOCK is used. + * See WARNING on \c RK_MSG_BLOCK above. + * RK_MSG_FREE - rdkafka will free(3) \p payload when it is done with it. + * RK_MSG_COPY - the \p payload data will be copied and the \p payload + * pointer will not be used by rdkafka after the + * call returns. + * + * NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive. + * + * If the function returns an error code and RK_MSG_FREE was specified, then + * the memory associated with the payload is still the caller's + * responsibility. + * + * \p payload is the message payload of size \p len bytes. + * + * \p key is an optional message key, if non-NULL it + * will be passed to the topic partitioner as well as be sent with the + * message to the broker and passed on to the consumer. + * + * \p msg_opaque is an optional application-provided per-message opaque + * pointer that will provided in the delivery report callback (\p dr_cb) for + * referencing this message. + * + * @returns an ErrorCode to indicate success or failure: + * - ERR_NO_ERROR - message successfully enqueued for transmission. + * + * - ERR__QUEUE_FULL - maximum number of outstanding messages has been + * reached: \c queue.buffering.max.message + * + * - ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size: + * \c messages.max.bytes + * + * - ERR__UNKNOWN_PARTITION - requested \p partition is unknown in the + * Kafka cluster. + * + * - ERR__UNKNOWN_TOPIC - topic is unknown in the Kafka cluster. + */ + virtual ErrorCode produce(Topic *topic, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const std::string *key, + void *msg_opaque) = 0; + + /** + * @brief Variant produce() that passes the key as a pointer and length + * instead of as a const std::string *. + */ + virtual ErrorCode produce(Topic *topic, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const void *key, + size_t key_len, + void *msg_opaque) = 0; + + /** + * @brief produce() variant that takes topic as a string (no need for + * creating a Topic object), and also allows providing the + * message timestamp (milliseconds since beginning of epoch, UTC). + * Otherwise identical to produce() above. + */ + virtual ErrorCode produce(const std::string topic_name, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const void *key, + size_t key_len, + int64_t timestamp, + void *msg_opaque) = 0; + + /** + * @brief produce() variant that that allows for Header support on produce + * Otherwise identical to produce() above. + * + * @warning The \p headers will be freed/deleted if the produce() call + * succeeds, or left untouched if produce() fails. + */ + virtual ErrorCode produce(const std::string topic_name, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const void *key, + size_t key_len, + int64_t timestamp, + RdKafka::Headers *headers, + void *msg_opaque) = 0; + + + /** + * @brief Variant produce() that accepts vectors for key and payload. + * The vector data will be copied. + */ + virtual ErrorCode produce(Topic *topic, + int32_t partition, + const std::vector<char> *payload, + const std::vector<char> *key, + void *msg_opaque) = 0; + + + /** + * @brief Wait until all outstanding produce requests, et.al, are completed. + * This should typically be done prior to destroying a producer + * instance to make sure all queued and in-flight produce requests are + * completed before terminating. + * + * @remark The \c linger.ms time will be ignored for the duration of the call, + * queued messages will be sent to the broker as soon as possible. + * + * @remark This function will call Producer::poll() and thus + * trigger callbacks. + * + * @returns ERR__TIMED_OUT if \p timeout_ms was reached before all + * outstanding requests were completed, else ERR_NO_ERROR + */ + virtual ErrorCode flush(int timeout_ms) = 0; + + + /** + * @brief Purge messages currently handled by the producer instance. + * + * @param purge_flags tells which messages should be purged and how. + * + * The application will need to call Handle::poll() or Producer::flush() + * afterwards to serve the delivery report callbacks of the purged messages. + * + * Messages purged from internal queues fail with the delivery report + * error code set to ERR__PURGE_QUEUE, while purged messages that + * are in-flight to or from the broker will fail with the error code set to + * ERR__PURGE_INFLIGHT. + * + * @warning Purging messages that are in-flight to or from the broker + * will ignore any sub-sequent acknowledgement for these messages + * received from the broker, effectively making it impossible + * for the application to know if the messages were successfully + * produced or not. This may result in duplicate messages if the + * application retries these messages at a later time. + * + * @remark This call may block for a short time while background thread + * queues are purged. + * + * @returns ERR_NO_ERROR on success, + * ERR__INVALID_ARG if the \p purge flags are invalid or unknown, + * ERR__NOT_IMPLEMENTED if called on a non-producer client instance. + */ + virtual ErrorCode purge(int purge_flags) = 0; + + /** + * @brief RdKafka::Handle::purge() \p purge_flags + */ + enum { + PURGE_QUEUE = 0x1, /**< Purge messages in internal queues */ + + PURGE_INFLIGHT = 0x2, /*! Purge messages in-flight to or from the broker. + * Purging these messages will void any future + * acknowledgements from the broker, making it + * impossible for the application to know if these + * messages were successfully delivered or not. + * Retrying these messages may lead to duplicates. */ + + PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue + * purging to finish. */ + }; + + /** + * @name Transactional API + * @{ + * + * Requires Kafka broker version v0.11.0 or later + * + * See the Transactional API documentation in rdkafka.h for more information. + */ + + /** + * @brief Initialize transactions for the producer instance. + * + * @param timeout_ms The maximum time to block. On timeout the operation + * may continue in the background, depending on state, + * and it is okay to call init_transactions() again. + * + * @returns an RdKafka::Error object on error, or NULL on success. + * Check whether the returned error object permits retrying + * by calling RdKafka::Error::is_retriable(), or whether a fatal + * error has been raised by calling RdKafka::Error::is_fatal(). + * + * @remark The returned error object (if not NULL) must be deleted. + * + * See rd_kafka_init_transactions() in rdkafka.h for more information. + * + */ + virtual Error *init_transactions(int timeout_ms) = 0; + + + /** + * @brief init_transactions() must have been called successfully + * (once) before this function is called. + * + * @returns an RdKafka::Error object on error, or NULL on success. + * Check whether a fatal error has been raised by calling + * RdKafka::Error::is_fatal_error(). + * + * @remark The returned error object (if not NULL) must be deleted. + * + * See rd_kafka_begin_transaction() in rdkafka.h for more information. + */ + virtual Error *begin_transaction() = 0; + + /** + * @brief Sends a list of topic partition offsets to the consumer group + * coordinator for \p group_metadata, and marks the offsets as part + * part of the current transaction. + * These offsets will be considered committed only if the transaction + * is committed successfully. + * + * The offsets should be the next message your application will + * consume, + * i.e., the last processed message's offset + 1 for each partition. + * Either track the offsets manually during processing or use + * RdKafka::KafkaConsumer::position() (on the consumer) to get the + * current offsets for + * the partitions assigned to the consumer. + * + * Use this method at the end of a consume-transform-produce loop prior + * to committing the transaction with commit_transaction(). + * + * @param offsets List of offsets to commit to the consumer group upon + * successful commit of the transaction. Offsets should be + * the next message to consume, + * e.g., last processed message + 1. + * @param group_metadata The current consumer group metadata as returned by + * RdKafka::KafkaConsumer::groupMetadata() on the consumer + * instance the provided offsets were consumed from. + * @param timeout_ms Maximum time allowed to register the + * offsets on the broker. + * + * @remark This function must be called on the transactional producer + * instance, not the consumer. + * + * @remark The consumer must disable auto commits + * (set \c enable.auto.commit to false on the consumer). + * + * @returns an RdKafka::Error object on error, or NULL on success. + * Check whether the returned error object permits retrying + * by calling RdKafka::Error::is_retriable(), or whether an abortable + * or fatal error has been raised by calling + * RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal() + * respectively. + * + * @remark The returned error object (if not NULL) must be deleted. + * + * See rd_kafka_send_offsets_to_transaction() in rdkafka.h for + * more information. + */ + virtual Error *send_offsets_to_transaction( + const std::vector<TopicPartition *> &offsets, + const ConsumerGroupMetadata *group_metadata, + int timeout_ms) = 0; + + /** + * @brief Commit the current transaction as started with begin_transaction(). + * + * Any outstanding messages will be flushed (delivered) before actually + * committing the transaction. + * + * @param timeout_ms The maximum time to block. On timeout the operation + * may continue in the background, depending on state, + * and it is okay to call this function again. + * Pass -1 to use the remaining transaction timeout, + * this is the recommended use. + * + * @remark It is strongly recommended to always pass -1 (remaining transaction + * time) as the \p timeout_ms. Using other values risk internal + * state desynchronization in case any of the underlying protocol + * requests fail. + * + * @returns an RdKafka::Error object on error, or NULL on success. + * Check whether the returned error object permits retrying + * by calling RdKafka::Error::is_retriable(), or whether an abortable + * or fatal error has been raised by calling + * RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal() + * respectively. + * + * @remark The returned error object (if not NULL) must be deleted. + * + * See rd_kafka_commit_transaction() in rdkafka.h for more information. + */ + virtual Error *commit_transaction(int timeout_ms) = 0; + + /** + * @brief Aborts the ongoing transaction. + * + * This function should also be used to recover from non-fatal + * abortable transaction errors. + * + * Any outstanding messages will be purged and fail with + * RdKafka::ERR__PURGE_INFLIGHT or RdKafka::ERR__PURGE_QUEUE. + * See RdKafka::Producer::purge() for details. + * + * @param timeout_ms The maximum time to block. On timeout the operation + * may continue in the background, depending on state, + * and it is okay to call this function again. + * Pass -1 to use the remaining transaction timeout, + * this is the recommended use. + * + * @remark It is strongly recommended to always pass -1 (remaining transaction + * time) as the \p timeout_ms. Using other values risk internal + * state desynchronization in case any of the underlying protocol + * requests fail. + * + * @returns an RdKafka::Error object on error, or NULL on success. + * Check whether the returned error object permits retrying + * by calling RdKafka::Error::is_retriable(), or whether a + * fatal error has been raised by calling RdKafka::Error::is_fatal(). + * + * @remark The returned error object (if not NULL) must be deleted. + * + * See rd_kafka_abort_transaction() in rdkafka.h for more information. + */ + virtual Error *abort_transaction(int timeout_ms) = 0; + + /**@}*/ +}; + +/**@}*/ + + +/** + * @name Metadata interface + * @{ + * + */ + + +/** + * @brief Metadata: Broker information + */ +class BrokerMetadata { + public: + /** @returns Broker id */ + virtual int32_t id() const = 0; + + /** @returns Broker hostname */ + virtual std::string host() const = 0; + + /** @returns Broker listening port */ + virtual int port() const = 0; + + virtual ~BrokerMetadata() = 0; +}; + + + +/** + * @brief Metadata: Partition information + */ +class PartitionMetadata { + public: + /** @brief Replicas */ + typedef std::vector<int32_t> ReplicasVector; + /** @brief ISRs (In-Sync-Replicas) */ + typedef std::vector<int32_t> ISRSVector; + + /** @brief Replicas iterator */ + typedef ReplicasVector::const_iterator ReplicasIterator; + /** @brief ISRs iterator */ + typedef ISRSVector::const_iterator ISRSIterator; + + + /** @returns Partition id */ + virtual int32_t id() const = 0; + + /** @returns Partition error reported by broker */ + virtual ErrorCode err() const = 0; + + /** @returns Leader broker (id) for partition */ + virtual int32_t leader() const = 0; + + /** @returns Replica brokers */ + virtual const std::vector<int32_t> *replicas() const = 0; + + /** @returns In-Sync-Replica brokers + * @warning The broker may return a cached/outdated list of ISRs. + */ + virtual const std::vector<int32_t> *isrs() const = 0; + + virtual ~PartitionMetadata() = 0; +}; + + + +/** + * @brief Metadata: Topic information + */ +class TopicMetadata { + public: + /** @brief Partitions */ + typedef std::vector<const PartitionMetadata *> PartitionMetadataVector; + /** @brief Partitions iterator */ + typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator; + + /** @returns Topic name */ + virtual std::string topic() const = 0; + + /** @returns Partition list */ + virtual const PartitionMetadataVector *partitions() const = 0; + + /** @returns Topic error reported by broker */ + virtual ErrorCode err() const = 0; + + virtual ~TopicMetadata() = 0; +}; + + +/** + * @brief Metadata container + */ +class Metadata { + public: + /** @brief Brokers */ + typedef std::vector<const BrokerMetadata *> BrokerMetadataVector; + /** @brief Topics */ + typedef std::vector<const TopicMetadata *> TopicMetadataVector; + + /** @brief Brokers iterator */ + typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator; + /** @brief Topics iterator */ + typedef TopicMetadataVector::const_iterator TopicMetadataIterator; + + + /** + * @brief Broker list + * @remark Ownership of the returned pointer is retained by the instance of + * Metadata that is called. + */ + virtual const BrokerMetadataVector *brokers() const = 0; + + /** + * @brief Topic list + * @remark Ownership of the returned pointer is retained by the instance of + * Metadata that is called. + */ + virtual const TopicMetadataVector *topics() const = 0; + + /** @brief Broker (id) originating this metadata */ + virtual int32_t orig_broker_id() const = 0; + + /** @brief Broker (name) originating this metadata */ + virtual std::string orig_broker_name() const = 0; + + virtual ~Metadata() = 0; +}; + +/**@}*/ + +} // namespace RdKafka + + +#endif /* _RDKAFKACPP_H_ */ diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h new file mode 100644 index 000000000..bc024ebe9 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h @@ -0,0 +1,1628 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RDKAFKACPP_INT_H_ +#define _RDKAFKACPP_INT_H_ + +#include <string> +#include <iostream> +#include <cstring> +#include <stdlib.h> + +#include "rdkafkacpp.h" + +extern "C" { +#include "../src/rdkafka.h" +} + +#ifdef _WIN32 +/* Visual Studio */ +#include "../src/win32_config.h" +#else +/* POSIX / UNIX based systems */ +#include "../config.h" /* mklove output */ +#endif + +#ifdef _MSC_VER +typedef int mode_t; +#pragma warning(disable : 4250) +#endif + + +namespace RdKafka { + +void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque); +void log_cb_trampoline(const rd_kafka_t *rk, + int level, + const char *fac, + const char *buf); +void error_cb_trampoline(rd_kafka_t *rk, + int err, + const char *reason, + void *opaque); +void throttle_cb_trampoline(rd_kafka_t *rk, + const char *broker_name, + int32_t broker_id, + int throttle_time_ms, + void *opaque); +int stats_cb_trampoline(rd_kafka_t *rk, + char *json, + size_t json_len, + void *opaque); +int socket_cb_trampoline(int domain, int type, int protocol, void *opaque); +int open_cb_trampoline(const char *pathname, + int flags, + mode_t mode, + void *opaque); +void rebalance_cb_trampoline(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *c_partitions, + void *opaque); +void offset_commit_cb_trampoline0(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *c_offsets, + void *opaque); +void oauthbearer_token_refresh_cb_trampoline(rd_kafka_t *rk, + const char *oauthbearer_config, + void *opaque); + +int ssl_cert_verify_cb_trampoline(rd_kafka_t *rk, + const char *broker_name, + int32_t broker_id, + int *x509_error, + int depth, + const char *buf, + size_t size, + char *errstr, + size_t errstr_size, + void *opaque); + +rd_kafka_topic_partition_list_t *partitions_to_c_parts( + const std::vector<TopicPartition *> &partitions); + +/** + * @brief Update the application provided 'partitions' with info from 'c_parts' + */ +void update_partitions_from_c_parts( + std::vector<TopicPartition *> &partitions, + const rd_kafka_topic_partition_list_t *c_parts); + + +class ErrorImpl : public Error { + public: + ~ErrorImpl() { + rd_kafka_error_destroy(c_error_); + } + + ErrorImpl(ErrorCode code, const std::string *errstr) { + c_error_ = rd_kafka_error_new(static_cast<rd_kafka_resp_err_t>(code), + errstr ? "%s" : NULL, + errstr ? errstr->c_str() : NULL); + } + + ErrorImpl(rd_kafka_error_t *c_error) : c_error_(c_error) { + } + + static Error *create(ErrorCode code, const std::string *errstr) { + return new ErrorImpl(code, errstr); + } + + ErrorCode code() const { + return static_cast<ErrorCode>(rd_kafka_error_code(c_error_)); + } + + std::string name() const { + return std::string(rd_kafka_error_name(c_error_)); + } + + std::string str() const { + return std::string(rd_kafka_error_string(c_error_)); + } + + bool is_fatal() const { + return !!rd_kafka_error_is_fatal(c_error_); + } + + bool is_retriable() const { + return !!rd_kafka_error_is_retriable(c_error_); + } + + bool txn_requires_abort() const { + return !!rd_kafka_error_txn_requires_abort(c_error_); + } + + rd_kafka_error_t *c_error_; +}; + + +class EventImpl : public Event { + public: + ~EventImpl() { + } + + EventImpl(Type type, + ErrorCode err, + Severity severity, + const char *fac, + const char *str) : + type_(type), + err_(err), + severity_(severity), + fac_(fac ? fac : ""), + str_(str), + id_(0), + throttle_time_(0), + fatal_(false) { + } + + EventImpl(Type type) : + type_(type), + err_(ERR_NO_ERROR), + severity_(EVENT_SEVERITY_EMERG), + fac_(""), + str_(""), + id_(0), + throttle_time_(0), + fatal_(false) { + } + + Type type() const { + return type_; + } + ErrorCode err() const { + return err_; + } + Severity severity() const { + return severity_; + } + std::string fac() const { + return fac_; + } + std::string str() const { + return str_; + } + std::string broker_name() const { + if (type_ == EVENT_THROTTLE) + return str_; + else + return std::string(""); + } + int broker_id() const { + return id_; + } + int throttle_time() const { + return throttle_time_; + } + + bool fatal() const { + return fatal_; + } + + Type type_; + ErrorCode err_; + Severity severity_; + std::string fac_; + std::string str_; /* reused for THROTTLE broker_name */ + int id_; + int throttle_time_; + bool fatal_; +}; + +class QueueImpl : virtual public Queue { + public: + QueueImpl(rd_kafka_queue_t *c_rkqu) : queue_(c_rkqu) { + } + ~QueueImpl() { + rd_kafka_queue_destroy(queue_); + } + static Queue *create(Handle *base); + ErrorCode forward(Queue *queue); + Message *consume(int timeout_ms); + int poll(int timeout_ms); + void io_event_enable(int fd, const void *payload, size_t size); + + rd_kafka_queue_t *queue_; +}; + + + +class HeadersImpl : public Headers { + public: + HeadersImpl() : headers_(rd_kafka_headers_new(8)) { + } + + HeadersImpl(rd_kafka_headers_t *headers) : headers_(headers) { + } + + HeadersImpl(const std::vector<Header> &headers) { + if (headers.size() > 0) { + headers_ = rd_kafka_headers_new(headers.size()); + from_vector(headers); + } else { + headers_ = rd_kafka_headers_new(8); + } + } + + ~HeadersImpl() { + if (headers_) { + rd_kafka_headers_destroy(headers_); + } + } + + ErrorCode add(const std::string &key, const char *value) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value, -1); + return static_cast<RdKafka::ErrorCode>(err); + } + + ErrorCode add(const std::string &key, const void *value, size_t value_size) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value, + value_size); + return static_cast<RdKafka::ErrorCode>(err); + } + + ErrorCode add(const std::string &key, const std::string &value) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value.c_str(), + value.size()); + return static_cast<RdKafka::ErrorCode>(err); + } + + ErrorCode add(const Header &header) { + rd_kafka_resp_err_t err; + err = + rd_kafka_header_add(headers_, header.key().c_str(), header.key().size(), + header.value(), header.value_size()); + return static_cast<RdKafka::ErrorCode>(err); + } + + ErrorCode remove(const std::string &key) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_remove(headers_, key.c_str()); + return static_cast<RdKafka::ErrorCode>(err); + } + + std::vector<Headers::Header> get(const std::string &key) const { + std::vector<Headers::Header> headers; + const void *value; + size_t size; + rd_kafka_resp_err_t err; + for (size_t idx = 0; !(err = rd_kafka_header_get(headers_, idx, key.c_str(), + &value, &size)); + idx++) { + headers.push_back(Headers::Header(key, value, size)); + } + return headers; + } + + Headers::Header get_last(const std::string &key) const { + const void *value; + size_t size; + rd_kafka_resp_err_t err; + err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size); + return Headers::Header(key, value, size, + static_cast<RdKafka::ErrorCode>(err)); + } + + std::vector<Headers::Header> get_all() const { + std::vector<Headers::Header> headers; + size_t idx = 0; + const char *name; + const void *valuep; + size_t size; + while (!rd_kafka_header_get_all(headers_, idx++, &name, &valuep, &size)) { + headers.push_back(Headers::Header(name, valuep, size)); + } + return headers; + } + + size_t size() const { + return rd_kafka_header_cnt(headers_); + } + + /** @brief Reset the C headers pointer to NULL. */ + void c_headers_destroyed() { + headers_ = NULL; + } + + /** @returns the underlying C headers, or NULL. */ + rd_kafka_headers_t *c_ptr() { + return headers_; + } + + + private: + void from_vector(const std::vector<Header> &headers) { + if (headers.size() == 0) + return; + for (std::vector<Header>::const_iterator it = headers.begin(); + it != headers.end(); it++) + this->add(*it); + } + + HeadersImpl(HeadersImpl const &) /*= delete*/; + HeadersImpl &operator=(HeadersImpl const &) /*= delete*/; + + rd_kafka_headers_t *headers_; +}; + + + +class MessageImpl : public Message { + public: + ~MessageImpl() { + if (free_rkmessage_) + rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_)); + if (key_) + delete key_; + if (headers_) + delete headers_; + } + + MessageImpl(rd_kafka_type_t rk_type, + RdKafka::Topic *topic, + rd_kafka_message_t *rkmessage) : + topic_(topic), + rkmessage_(rkmessage), + free_rkmessage_(true), + key_(NULL), + headers_(NULL), + rk_type_(rk_type) { + } + + MessageImpl(rd_kafka_type_t rk_type, + RdKafka::Topic *topic, + rd_kafka_message_t *rkmessage, + bool dofree) : + topic_(topic), + rkmessage_(rkmessage), + free_rkmessage_(dofree), + key_(NULL), + headers_(NULL), + rk_type_(rk_type) { + } + + MessageImpl(rd_kafka_type_t rk_type, rd_kafka_message_t *rkmessage) : + topic_(NULL), + rkmessage_(rkmessage), + free_rkmessage_(true), + key_(NULL), + headers_(NULL), + rk_type_(rk_type) { + if (rkmessage->rkt) { + /* Possibly NULL */ + topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt)); + } + } + + /* Create errored message */ + MessageImpl(rd_kafka_type_t rk_type, + RdKafka::Topic *topic, + RdKafka::ErrorCode err) : + topic_(topic), + free_rkmessage_(false), + key_(NULL), + headers_(NULL), + rk_type_(rk_type) { + rkmessage_ = &rkmessage_err_; + memset(&rkmessage_err_, 0, sizeof(rkmessage_err_)); + rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err); + } + + std::string errstr() const { + const char *es; + /* message_errstr() is only available for the consumer. */ + if (rk_type_ == RD_KAFKA_CONSUMER) + es = rd_kafka_message_errstr(rkmessage_); + else + es = rd_kafka_err2str(rkmessage_->err); + + return std::string(es ? es : ""); + } + + ErrorCode err() const { + return static_cast<RdKafka::ErrorCode>(rkmessage_->err); + } + + Topic *topic() const { + return topic_; + } + std::string topic_name() const { + if (rkmessage_->rkt) + return rd_kafka_topic_name(rkmessage_->rkt); + else + return ""; + } + int32_t partition() const { + return rkmessage_->partition; + } + void *payload() const { + return rkmessage_->payload; + } + size_t len() const { + return rkmessage_->len; + } + const std::string *key() const { + if (key_) { + return key_; + } else if (rkmessage_->key) { + key_ = new std::string(static_cast<char const *>(rkmessage_->key), + rkmessage_->key_len); + return key_; + } + return NULL; + } + const void *key_pointer() const { + return rkmessage_->key; + } + size_t key_len() const { + return rkmessage_->key_len; + } + + int64_t offset() const { + return rkmessage_->offset; + } + + MessageTimestamp timestamp() const { + MessageTimestamp ts; + rd_kafka_timestamp_type_t tstype; + ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype); + ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype); + return ts; + } + + void *msg_opaque() const { + return rkmessage_->_private; + } + + int64_t latency() const { + return rd_kafka_message_latency(rkmessage_); + } + + struct rd_kafka_message_s *c_ptr() { + return rkmessage_; + } + + Status status() const { + return static_cast<Status>(rd_kafka_message_status(rkmessage_)); + } + + Headers *headers() { + ErrorCode err; + return headers(&err); + } + + Headers *headers(ErrorCode *err) { + *err = ERR_NO_ERROR; + + if (!headers_) { + rd_kafka_headers_t *c_hdrs; + rd_kafka_resp_err_t c_err; + + if ((c_err = rd_kafka_message_detach_headers(rkmessage_, &c_hdrs))) { + *err = static_cast<RdKafka::ErrorCode>(c_err); + return NULL; + } + + headers_ = new HeadersImpl(c_hdrs); + } + + return headers_; + } + + int32_t broker_id() const { + return rd_kafka_message_broker_id(rkmessage_); + } + + int32_t leader_epoch() const { + return rd_kafka_message_leader_epoch(rkmessage_); + } + + + Error *offset_store() { + rd_kafka_error_t *c_error; + + c_error = rd_kafka_offset_store_message(rkmessage_); + + if (c_error) + return new ErrorImpl(c_error); + else + return NULL; + } + + RdKafka::Topic *topic_; + rd_kafka_message_t *rkmessage_; + bool free_rkmessage_; + /* For error signalling by the C++ layer the .._err_ message is + * used as a place holder and rkmessage_ is set to point to it. */ + rd_kafka_message_t rkmessage_err_; + mutable std::string *key_; /* mutable because it's a cached value */ + + private: + /* "delete" copy ctor + copy assignment, for safety of key_ */ + MessageImpl(MessageImpl const &) /*= delete*/; + MessageImpl &operator=(MessageImpl const &) /*= delete*/; + + RdKafka::Headers *headers_; + const rd_kafka_type_t rk_type_; /**< Client type */ +}; + + +class ConfImpl : public Conf { + public: + ConfImpl(ConfType conf_type) : + consume_cb_(NULL), + dr_cb_(NULL), + event_cb_(NULL), + socket_cb_(NULL), + open_cb_(NULL), + partitioner_cb_(NULL), + partitioner_kp_cb_(NULL), + rebalance_cb_(NULL), + offset_commit_cb_(NULL), + oauthbearer_token_refresh_cb_(NULL), + ssl_cert_verify_cb_(NULL), + conf_type_(conf_type), + rk_conf_(NULL), + rkt_conf_(NULL) { + } + ~ConfImpl() { + if (rk_conf_) + rd_kafka_conf_destroy(rk_conf_); + else if (rkt_conf_) + rd_kafka_topic_conf_destroy(rkt_conf_); + } + + Conf::ConfResult set(const std::string &name, + const std::string &value, + std::string &errstr); + + Conf::ConfResult set(const std::string &name, + DeliveryReportCb *dr_cb, + std::string &errstr) { + if (name != "dr_cb") { + errstr = "Invalid value type, expected RdKafka::DeliveryReportCb"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + dr_cb_ = dr_cb; + return Conf::CONF_OK; + } + + Conf::ConfResult set(const std::string &name, + OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, + std::string &errstr) { + if (name != "oauthbearer_token_refresh_cb") { + errstr = + "Invalid value type, expected RdKafka::OAuthBearerTokenRefreshCb"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + oauthbearer_token_refresh_cb_ = oauthbearer_token_refresh_cb; + return Conf::CONF_OK; + } + + Conf::ConfResult set(const std::string &name, + EventCb *event_cb, + std::string &errstr) { + if (name != "event_cb") { + errstr = "Invalid value type, expected RdKafka::EventCb"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + event_cb_ = event_cb; + return Conf::CONF_OK; + } + + Conf::ConfResult set(const std::string &name, + const Conf *topic_conf, + std::string &errstr) { + const ConfImpl *tconf_impl = + dynamic_cast<const RdKafka::ConfImpl *>(topic_conf); + if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) { + errstr = "Invalid value type, expected RdKafka::Conf"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + rd_kafka_conf_set_default_topic_conf( + rk_conf_, rd_kafka_topic_conf_dup(tconf_impl->rkt_conf_)); + + return Conf::CONF_OK; + } + + Conf::ConfResult set(const std::string &name, + PartitionerCb *partitioner_cb, + std::string &errstr) { + if (name != "partitioner_cb") { + errstr = "Invalid value type, expected RdKafka::PartitionerCb"; + return Conf::CONF_INVALID; + } + + if (!rkt_conf_) { + errstr = "Requires RdKafka::Conf::CONF_TOPIC object"; + return Conf::CONF_INVALID; + } + + partitioner_cb_ = partitioner_cb; + return Conf::CONF_OK; + } + + Conf::ConfResult set(const std::string &name, + PartitionerKeyPointerCb *partitioner_kp_cb, + std::string &errstr) { + if (name != "partitioner_key_pointer_cb") { + errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb"; + return Conf::CONF_INVALID; + } + + if (!rkt_conf_) { + errstr = "Requires RdKafka::Conf::CONF_TOPIC object"; + return Conf::CONF_INVALID; + } + + partitioner_kp_cb_ = partitioner_kp_cb; + return Conf::CONF_OK; + } + + Conf::ConfResult set(const std::string &name, + SocketCb *socket_cb, + std::string &errstr) { + if (name != "socket_cb") { + errstr = "Invalid value type, expected RdKafka::SocketCb"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + socket_cb_ = socket_cb; + return Conf::CONF_OK; + } + + + Conf::ConfResult set(const std::string &name, + OpenCb *open_cb, + std::string &errstr) { + if (name != "open_cb") { + errstr = "Invalid value type, expected RdKafka::OpenCb"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + open_cb_ = open_cb; + return Conf::CONF_OK; + } + + + + Conf::ConfResult set(const std::string &name, + RebalanceCb *rebalance_cb, + std::string &errstr) { + if (name != "rebalance_cb") { + errstr = "Invalid value type, expected RdKafka::RebalanceCb"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + rebalance_cb_ = rebalance_cb; + return Conf::CONF_OK; + } + + + Conf::ConfResult set(const std::string &name, + OffsetCommitCb *offset_commit_cb, + std::string &errstr) { + if (name != "offset_commit_cb") { + errstr = "Invalid value type, expected RdKafka::OffsetCommitCb"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + offset_commit_cb_ = offset_commit_cb; + return Conf::CONF_OK; + } + + + Conf::ConfResult set(const std::string &name, + SslCertificateVerifyCb *ssl_cert_verify_cb, + std::string &errstr) { + if (name != "ssl_cert_verify_cb") { + errstr = "Invalid value type, expected RdKafka::SslCertificateVerifyCb"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + ssl_cert_verify_cb_ = ssl_cert_verify_cb; + return Conf::CONF_OK; + } + + Conf::ConfResult set_engine_callback_data(void *value, std::string &errstr) { + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + rd_kafka_conf_set_engine_callback_data(rk_conf_, value); + return Conf::CONF_OK; + } + + + Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type, + RdKafka::CertificateEncoding cert_enc, + const void *buffer, + size_t size, + std::string &errstr) { + rd_kafka_conf_res_t res; + char errbuf[512]; + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + res = rd_kafka_conf_set_ssl_cert( + rk_conf_, static_cast<rd_kafka_cert_type_t>(cert_type), + static_cast<rd_kafka_cert_enc_t>(cert_enc), buffer, size, errbuf, + sizeof(errbuf)); + + if (res != RD_KAFKA_CONF_OK) + errstr = errbuf; + + return static_cast<Conf::ConfResult>(res); + } + + Conf::ConfResult enable_sasl_queue(bool enable, std::string &errstr) { + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + rd_kafka_conf_enable_sasl_queue(rk_conf_, enable ? 1 : 0); + + return Conf::CONF_OK; + } + + + Conf::ConfResult get(const std::string &name, std::string &value) const { + if (name.compare("dr_cb") == 0 || name.compare("event_cb") == 0 || + name.compare("partitioner_cb") == 0 || + name.compare("partitioner_key_pointer_cb") == 0 || + name.compare("socket_cb") == 0 || name.compare("open_cb") == 0 || + name.compare("rebalance_cb") == 0 || + name.compare("offset_commit_cb") == 0 || + name.compare("oauthbearer_token_refresh_cb") == 0 || + name.compare("ssl_cert_verify_cb") == 0 || + name.compare("set_engine_callback_data") == 0 || + name.compare("enable_sasl_queue") == 0) { + return Conf::CONF_INVALID; + } + rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID; + + /* Get size of property */ + size_t size; + if (rk_conf_) + res = rd_kafka_conf_get(rk_conf_, name.c_str(), NULL, &size); + else if (rkt_conf_) + res = rd_kafka_topic_conf_get(rkt_conf_, name.c_str(), NULL, &size); + if (res != RD_KAFKA_CONF_OK) + return static_cast<Conf::ConfResult>(res); + + char *tmpValue = new char[size]; + + if (rk_conf_) + res = rd_kafka_conf_get(rk_conf_, name.c_str(), tmpValue, &size); + else if (rkt_conf_) + res = rd_kafka_topic_conf_get(rkt_conf_, name.c_str(), tmpValue, &size); + + if (res == RD_KAFKA_CONF_OK) + value.assign(tmpValue); + delete[] tmpValue; + + return static_cast<Conf::ConfResult>(res); + } + + Conf::ConfResult get(DeliveryReportCb *&dr_cb) const { + if (!rk_conf_) + return Conf::CONF_INVALID; + dr_cb = this->dr_cb_; + return Conf::CONF_OK; + } + + Conf::ConfResult get( + OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const { + if (!rk_conf_) + return Conf::CONF_INVALID; + oauthbearer_token_refresh_cb = this->oauthbearer_token_refresh_cb_; + return Conf::CONF_OK; + } + + Conf::ConfResult get(EventCb *&event_cb) const { + if (!rk_conf_) + return Conf::CONF_INVALID; + event_cb = this->event_cb_; + return Conf::CONF_OK; + } + + Conf::ConfResult get(PartitionerCb *&partitioner_cb) const { + if (!rkt_conf_) + return Conf::CONF_INVALID; + partitioner_cb = this->partitioner_cb_; + return Conf::CONF_OK; + } + + Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const { + if (!rkt_conf_) + return Conf::CONF_INVALID; + partitioner_kp_cb = this->partitioner_kp_cb_; + return Conf::CONF_OK; + } + + Conf::ConfResult get(SocketCb *&socket_cb) const { + if (!rk_conf_) + return Conf::CONF_INVALID; + socket_cb = this->socket_cb_; + return Conf::CONF_OK; + } + + Conf::ConfResult get(OpenCb *&open_cb) const { + if (!rk_conf_) + return Conf::CONF_INVALID; + open_cb = this->open_cb_; + return Conf::CONF_OK; + } + + Conf::ConfResult get(RebalanceCb *&rebalance_cb) const { + if (!rk_conf_) + return Conf::CONF_INVALID; + rebalance_cb = this->rebalance_cb_; + return Conf::CONF_OK; + } + + Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const { + if (!rk_conf_) + return Conf::CONF_INVALID; + offset_commit_cb = this->offset_commit_cb_; + return Conf::CONF_OK; + } + + Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const { + if (!rk_conf_) + return Conf::CONF_INVALID; + ssl_cert_verify_cb = this->ssl_cert_verify_cb_; + return Conf::CONF_OK; + } + + std::list<std::string> *dump(); + + + Conf::ConfResult set(const std::string &name, + ConsumeCb *consume_cb, + std::string &errstr) { + if (name != "consume_cb") { + errstr = "Invalid value type, expected RdKafka::ConsumeCb"; + return Conf::CONF_INVALID; + } + + if (!rk_conf_) { + errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; + return Conf::CONF_INVALID; + } + + consume_cb_ = consume_cb; + return Conf::CONF_OK; + } + + struct rd_kafka_conf_s *c_ptr_global() { + if (conf_type_ == CONF_GLOBAL) + return rk_conf_; + else + return NULL; + } + + struct rd_kafka_topic_conf_s *c_ptr_topic() { + if (conf_type_ == CONF_TOPIC) + return rkt_conf_; + else + return NULL; + } + + ConsumeCb *consume_cb_; + DeliveryReportCb *dr_cb_; + EventCb *event_cb_; + SocketCb *socket_cb_; + OpenCb *open_cb_; + PartitionerCb *partitioner_cb_; + PartitionerKeyPointerCb *partitioner_kp_cb_; + RebalanceCb *rebalance_cb_; + OffsetCommitCb *offset_commit_cb_; + OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_; + SslCertificateVerifyCb *ssl_cert_verify_cb_; + ConfType conf_type_; + rd_kafka_conf_t *rk_conf_; + rd_kafka_topic_conf_t *rkt_conf_; +}; + + +class HandleImpl : virtual public Handle { + public: + ~HandleImpl() { + } + HandleImpl() { + } + std::string name() const { + return std::string(rd_kafka_name(rk_)); + } + std::string memberid() const { + char *str = rd_kafka_memberid(rk_); + std::string memberid = str ? str : ""; + if (str) + rd_kafka_mem_free(rk_, str); + return memberid; + } + int poll(int timeout_ms) { + return rd_kafka_poll(rk_, timeout_ms); + } + int outq_len() { + return rd_kafka_outq_len(rk_); + } + + void set_common_config(const RdKafka::ConfImpl *confimpl); + + RdKafka::ErrorCode metadata(bool all_topics, + const Topic *only_rkt, + Metadata **metadatap, + int timeout_ms); + + ErrorCode pause(std::vector<TopicPartition *> &partitions); + ErrorCode resume(std::vector<TopicPartition *> &partitions); + + ErrorCode query_watermark_offsets(const std::string &topic, + int32_t partition, + int64_t *low, + int64_t *high, + int timeout_ms) { + return static_cast<RdKafka::ErrorCode>(rd_kafka_query_watermark_offsets( + rk_, topic.c_str(), partition, low, high, timeout_ms)); + } + + ErrorCode get_watermark_offsets(const std::string &topic, + int32_t partition, + int64_t *low, + int64_t *high) { + return static_cast<RdKafka::ErrorCode>(rd_kafka_get_watermark_offsets( + rk_, topic.c_str(), partition, low, high)); + } + + Queue *get_partition_queue(const TopicPartition *partition); + + Queue *get_sasl_queue() { + rd_kafka_queue_t *rkqu; + rkqu = rd_kafka_queue_get_sasl(rk_); + + if (rkqu == NULL) + return NULL; + + return new QueueImpl(rkqu); + } + + Queue *get_background_queue() { + rd_kafka_queue_t *rkqu; + rkqu = rd_kafka_queue_get_background(rk_); + + if (rkqu == NULL) + return NULL; + + return new QueueImpl(rkqu); + } + + + ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets, + int timeout_ms) { + rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets); + ErrorCode err = static_cast<ErrorCode>( + rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms)); + update_partitions_from_c_parts(offsets, c_offsets); + rd_kafka_topic_partition_list_destroy(c_offsets); + return err; + } + + ErrorCode set_log_queue(Queue *queue); + + void yield() { + rd_kafka_yield(rk_); + } + + std::string clusterid(int timeout_ms) { + char *str = rd_kafka_clusterid(rk_, timeout_ms); + std::string clusterid = str ? str : ""; + if (str) + rd_kafka_mem_free(rk_, str); + return clusterid; + } + + struct rd_kafka_s *c_ptr() { + return rk_; + } + + int32_t controllerid(int timeout_ms) { + return rd_kafka_controllerid(rk_, timeout_ms); + } + + ErrorCode fatal_error(std::string &errstr) const { + char errbuf[512]; + RdKafka::ErrorCode err = static_cast<RdKafka::ErrorCode>( + rd_kafka_fatal_error(rk_, errbuf, sizeof(errbuf))); + if (err) + errstr = errbuf; + return err; + } + + ErrorCode oauthbearer_set_token(const std::string &token_value, + int64_t md_lifetime_ms, + const std::string &md_principal_name, + const std::list<std::string> &extensions, + std::string &errstr) { + char errbuf[512]; + ErrorCode err; + const char **extensions_copy = new const char *[extensions.size()]; + int elem = 0; + + for (std::list<std::string>::const_iterator it = extensions.begin(); + it != extensions.end(); it++) + extensions_copy[elem++] = it->c_str(); + err = static_cast<ErrorCode>(rd_kafka_oauthbearer_set_token( + rk_, token_value.c_str(), md_lifetime_ms, md_principal_name.c_str(), + extensions_copy, extensions.size(), errbuf, sizeof(errbuf))); + delete[] extensions_copy; + + if (err != ERR_NO_ERROR) + errstr = errbuf; + + return err; + } + + ErrorCode oauthbearer_set_token_failure(const std::string &errstr) { + return static_cast<ErrorCode>( + rd_kafka_oauthbearer_set_token_failure(rk_, errstr.c_str())); + } + + Error *sasl_background_callbacks_enable() { + rd_kafka_error_t *c_error = rd_kafka_sasl_background_callbacks_enable(rk_); + + if (c_error) + return new ErrorImpl(c_error); + + return NULL; + } + + Error *sasl_set_credentials(const std::string &username, + const std::string &password) { + rd_kafka_error_t *c_error = + rd_kafka_sasl_set_credentials(rk_, username.c_str(), password.c_str()); + + if (c_error) + return new ErrorImpl(c_error); + + return NULL; + }; + + void *mem_malloc(size_t size) { + return rd_kafka_mem_malloc(rk_, size); + } + + void mem_free(void *ptr) { + rd_kafka_mem_free(rk_, ptr); + } + + rd_kafka_t *rk_; + /* All Producer and Consumer callbacks must reside in HandleImpl and + * the opaque provided to rdkafka must be a pointer to HandleImpl, since + * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to + * HandleImpl due to the skewed diamond inheritance. */ + ConsumeCb *consume_cb_; + EventCb *event_cb_; + SocketCb *socket_cb_; + OpenCb *open_cb_; + DeliveryReportCb *dr_cb_; + PartitionerCb *partitioner_cb_; + PartitionerKeyPointerCb *partitioner_kp_cb_; + RebalanceCb *rebalance_cb_; + OffsetCommitCb *offset_commit_cb_; + OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_; + SslCertificateVerifyCb *ssl_cert_verify_cb_; +}; + + +class TopicImpl : public Topic { + public: + ~TopicImpl() { + rd_kafka_topic_destroy(rkt_); + } + + std::string name() const { + return rd_kafka_topic_name(rkt_); + } + + bool partition_available(int32_t partition) const { + return !!rd_kafka_topic_partition_available(rkt_, partition); + } + + ErrorCode offset_store(int32_t partition, int64_t offset) { + return static_cast<RdKafka::ErrorCode>( + rd_kafka_offset_store(rkt_, partition, offset)); + } + + static Topic *create(Handle &base, const std::string &topic, Conf *conf); + + struct rd_kafka_topic_s *c_ptr() { + return rkt_; + } + + rd_kafka_topic_t *rkt_; + PartitionerCb *partitioner_cb_; + PartitionerKeyPointerCb *partitioner_kp_cb_; +}; + + +/** + * Topic and Partition + */ +class TopicPartitionImpl : public TopicPartition { + public: + ~TopicPartitionImpl() { + } + + static TopicPartition *create(const std::string &topic, int partition); + + TopicPartitionImpl(const std::string &topic, int partition) : + topic_(topic), + partition_(partition), + offset_(RdKafka::Topic::OFFSET_INVALID), + err_(ERR_NO_ERROR), + leader_epoch_(-1) { + } + + TopicPartitionImpl(const std::string &topic, int partition, int64_t offset) : + topic_(topic), + partition_(partition), + offset_(offset), + err_(ERR_NO_ERROR), + leader_epoch_(-1) { + } + + TopicPartitionImpl(const rd_kafka_topic_partition_t *c_part) { + topic_ = std::string(c_part->topic); + partition_ = c_part->partition; + offset_ = c_part->offset; + err_ = static_cast<ErrorCode>(c_part->err); + leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(c_part); + // FIXME: metadata + } + + static void destroy(std::vector<TopicPartition *> &partitions); + + int partition() const { + return partition_; + } + const std::string &topic() const { + return topic_; + } + + int64_t offset() const { + return offset_; + } + + ErrorCode err() const { + return err_; + } + + void set_offset(int64_t offset) { + offset_ = offset; + } + + int32_t get_leader_epoch() { + return leader_epoch_; + } + + void set_leader_epoch(int32_t leader_epoch) { + leader_epoch_ = leader_epoch_; + } + + std::ostream &operator<<(std::ostream &ostrm) const { + return ostrm << topic_ << " [" << partition_ << "]"; + } + + std::string topic_; + int partition_; + int64_t offset_; + ErrorCode err_; + int32_t leader_epoch_; +}; + + +/** + * @class ConsumerGroupMetadata wraps the + * C rd_kafka_consumer_group_metadata_t object. + */ +class ConsumerGroupMetadataImpl : public ConsumerGroupMetadata { + public: + ~ConsumerGroupMetadataImpl() { + rd_kafka_consumer_group_metadata_destroy(cgmetadata_); + } + + ConsumerGroupMetadataImpl(rd_kafka_consumer_group_metadata_t *cgmetadata) : + cgmetadata_(cgmetadata) { + } + + rd_kafka_consumer_group_metadata_t *cgmetadata_; +}; + + +class KafkaConsumerImpl : virtual public KafkaConsumer, + virtual public HandleImpl { + public: + ~KafkaConsumerImpl() { + if (rk_) + rd_kafka_destroy_flags(rk_, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); + } + + static KafkaConsumer *create(Conf *conf, std::string &errstr); + + ErrorCode assignment(std::vector<TopicPartition *> &partitions); + bool assignment_lost(); + std::string rebalance_protocol() { + const char *str = rd_kafka_rebalance_protocol(rk_); + return std::string(str ? str : ""); + } + ErrorCode subscription(std::vector<std::string> &topics); + ErrorCode subscribe(const std::vector<std::string> &topics); + ErrorCode unsubscribe(); + ErrorCode assign(const std::vector<TopicPartition *> &partitions); + ErrorCode unassign(); + Error *incremental_assign(const std::vector<TopicPartition *> &partitions); + Error *incremental_unassign(const std::vector<TopicPartition *> &partitions); + + Message *consume(int timeout_ms); + ErrorCode commitSync() { + return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0 /*sync*/)); + } + ErrorCode commitAsync() { + return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1 /*async*/)); + } + ErrorCode commitSync(Message *message) { + MessageImpl *msgimpl = dynamic_cast<MessageImpl *>(message); + return static_cast<ErrorCode>( + rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0 /*sync*/)); + } + ErrorCode commitAsync(Message *message) { + MessageImpl *msgimpl = dynamic_cast<MessageImpl *>(message); + return static_cast<ErrorCode>( + rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 1 /*async*/)); + } + + ErrorCode commitSync(std::vector<TopicPartition *> &offsets) { + rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets); + rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 0); + if (!err) + update_partitions_from_c_parts(offsets, c_parts); + rd_kafka_topic_partition_list_destroy(c_parts); + return static_cast<ErrorCode>(err); + } + + ErrorCode commitAsync(const std::vector<TopicPartition *> &offsets) { + rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets); + rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 1); + rd_kafka_topic_partition_list_destroy(c_parts); + return static_cast<ErrorCode>(err); + } + + ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) { + return static_cast<ErrorCode>(rd_kafka_commit_queue( + rk_, NULL, NULL, RdKafka::offset_commit_cb_trampoline0, + offset_commit_cb)); + } + + ErrorCode commitSync(std::vector<TopicPartition *> &offsets, + OffsetCommitCb *offset_commit_cb) { + rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets); + rd_kafka_resp_err_t err = rd_kafka_commit_queue( + rk_, c_parts, NULL, RdKafka::offset_commit_cb_trampoline0, + offset_commit_cb); + rd_kafka_topic_partition_list_destroy(c_parts); + return static_cast<ErrorCode>(err); + } + + ErrorCode committed(std::vector<TopicPartition *> &partitions, + int timeout_ms); + ErrorCode position(std::vector<TopicPartition *> &partitions); + + ConsumerGroupMetadata *groupMetadata() { + rd_kafka_consumer_group_metadata_t *cgmetadata; + + cgmetadata = rd_kafka_consumer_group_metadata(rk_); + if (!cgmetadata) + return NULL; + + return new ConsumerGroupMetadataImpl(cgmetadata); + } + + ErrorCode close(); + + Error *close(Queue *queue); + + bool closed() { + return rd_kafka_consumer_closed(rk_) ? true : false; + } + + ErrorCode seek(const TopicPartition &partition, int timeout_ms); + + ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) { + rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets); + rd_kafka_resp_err_t err = rd_kafka_offsets_store(rk_, c_parts); + update_partitions_from_c_parts(offsets, c_parts); + rd_kafka_topic_partition_list_destroy(c_parts); + return static_cast<ErrorCode>(err); + } +}; + + +class MetadataImpl : public Metadata { + public: + MetadataImpl(const rd_kafka_metadata_t *metadata); + ~MetadataImpl(); + + const std::vector<const BrokerMetadata *> *brokers() const { + return &brokers_; + } + + const std::vector<const TopicMetadata *> *topics() const { + return &topics_; + } + + std::string orig_broker_name() const { + return std::string(metadata_->orig_broker_name); + } + + int32_t orig_broker_id() const { + return metadata_->orig_broker_id; + } + + private: + const rd_kafka_metadata_t *metadata_; + std::vector<const BrokerMetadata *> brokers_; + std::vector<const TopicMetadata *> topics_; + std::string orig_broker_name_; +}; + + + +class ConsumerImpl : virtual public Consumer, virtual public HandleImpl { + public: + ~ConsumerImpl() { + if (rk_) + rd_kafka_destroy(rk_); + } + static Consumer *create(Conf *conf, std::string &errstr); + + ErrorCode start(Topic *topic, int32_t partition, int64_t offset); + ErrorCode start(Topic *topic, + int32_t partition, + int64_t offset, + Queue *queue); + ErrorCode stop(Topic *topic, int32_t partition); + ErrorCode seek(Topic *topic, + int32_t partition, + int64_t offset, + int timeout_ms); + Message *consume(Topic *topic, int32_t partition, int timeout_ms); + Message *consume(Queue *queue, int timeout_ms); + int consume_callback(Topic *topic, + int32_t partition, + int timeout_ms, + ConsumeCb *cb, + void *opaque); + int consume_callback(Queue *queue, + int timeout_ms, + RdKafka::ConsumeCb *consume_cb, + void *opaque); +}; + + + +class ProducerImpl : virtual public Producer, virtual public HandleImpl { + public: + ~ProducerImpl() { + if (rk_) + rd_kafka_destroy(rk_); + } + + ErrorCode produce(Topic *topic, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const std::string *key, + void *msg_opaque); + + ErrorCode produce(Topic *topic, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const void *key, + size_t key_len, + void *msg_opaque); + + ErrorCode produce(Topic *topic, + int32_t partition, + const std::vector<char> *payload, + const std::vector<char> *key, + void *msg_opaque); + + ErrorCode produce(const std::string topic_name, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const void *key, + size_t key_len, + int64_t timestamp, + void *msg_opaque); + + ErrorCode produce(const std::string topic_name, + int32_t partition, + int msgflags, + void *payload, + size_t len, + const void *key, + size_t key_len, + int64_t timestamp, + RdKafka::Headers *headers, + void *msg_opaque); + + ErrorCode flush(int timeout_ms) { + return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_, timeout_ms)); + } + + ErrorCode purge(int purge_flags) { + return static_cast<RdKafka::ErrorCode>( + rd_kafka_purge(rk_, (int)purge_flags)); + } + + Error *init_transactions(int timeout_ms) { + rd_kafka_error_t *c_error; + + c_error = rd_kafka_init_transactions(rk_, timeout_ms); + + if (c_error) + return new ErrorImpl(c_error); + else + return NULL; + } + + Error *begin_transaction() { + rd_kafka_error_t *c_error; + + c_error = rd_kafka_begin_transaction(rk_); + + if (c_error) + return new ErrorImpl(c_error); + else + return NULL; + } + + Error *send_offsets_to_transaction( + const std::vector<TopicPartition *> &offsets, + const ConsumerGroupMetadata *group_metadata, + int timeout_ms) { + rd_kafka_error_t *c_error; + const RdKafka::ConsumerGroupMetadataImpl *cgmdimpl = + dynamic_cast<const RdKafka::ConsumerGroupMetadataImpl *>( + group_metadata); + rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets); + + c_error = rd_kafka_send_offsets_to_transaction( + rk_, c_offsets, cgmdimpl->cgmetadata_, timeout_ms); + + rd_kafka_topic_partition_list_destroy(c_offsets); + + if (c_error) + return new ErrorImpl(c_error); + else + return NULL; + } + + Error *commit_transaction(int timeout_ms) { + rd_kafka_error_t *c_error; + + c_error = rd_kafka_commit_transaction(rk_, timeout_ms); + + if (c_error) + return new ErrorImpl(c_error); + else + return NULL; + } + + Error *abort_transaction(int timeout_ms) { + rd_kafka_error_t *c_error; + + c_error = rd_kafka_abort_transaction(rk_, timeout_ms); + + if (c_error) + return new ErrorImpl(c_error); + else + return NULL; + } + + static Producer *create(Conf *conf, std::string &errstr); +}; + + + +} // namespace RdKafka + +#endif /* _RDKAFKACPP_INT_H_ */ |