summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h1628
1 files changed, 1628 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h
new file mode 100644
index 000000000..bc024ebe9
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h
@@ -0,0 +1,1628 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDKAFKACPP_INT_H_
+#define _RDKAFKACPP_INT_H_
+
+#include <string>
+#include <iostream>
+#include <cstring>
+#include <stdlib.h>
+
+#include "rdkafkacpp.h"
+
+extern "C" {
+#include "../src/rdkafka.h"
+}
+
+#ifdef _WIN32
+/* Visual Studio */
+#include "../src/win32_config.h"
+#else
+/* POSIX / UNIX based systems */
+#include "../config.h" /* mklove output */
+#endif
+
+#ifdef _MSC_VER
+typedef int mode_t;
+#pragma warning(disable : 4250)
+#endif
+
+
+namespace RdKafka {
+
+void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque);
+void log_cb_trampoline(const rd_kafka_t *rk,
+ int level,
+ const char *fac,
+ const char *buf);
+void error_cb_trampoline(rd_kafka_t *rk,
+ int err,
+ const char *reason,
+ void *opaque);
+void throttle_cb_trampoline(rd_kafka_t *rk,
+ const char *broker_name,
+ int32_t broker_id,
+ int throttle_time_ms,
+ void *opaque);
+int stats_cb_trampoline(rd_kafka_t *rk,
+ char *json,
+ size_t json_len,
+ void *opaque);
+int socket_cb_trampoline(int domain, int type, int protocol, void *opaque);
+int open_cb_trampoline(const char *pathname,
+ int flags,
+ mode_t mode,
+ void *opaque);
+void rebalance_cb_trampoline(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *c_partitions,
+ void *opaque);
+void offset_commit_cb_trampoline0(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *c_offsets,
+ void *opaque);
+void oauthbearer_token_refresh_cb_trampoline(rd_kafka_t *rk,
+ const char *oauthbearer_config,
+ void *opaque);
+
+int ssl_cert_verify_cb_trampoline(rd_kafka_t *rk,
+ const char *broker_name,
+ int32_t broker_id,
+ int *x509_error,
+ int depth,
+ const char *buf,
+ size_t size,
+ char *errstr,
+ size_t errstr_size,
+ void *opaque);
+
+rd_kafka_topic_partition_list_t *partitions_to_c_parts(
+ const std::vector<TopicPartition *> &partitions);
+
+/**
+ * @brief Update the application provided 'partitions' with info from 'c_parts'
+ */
+void update_partitions_from_c_parts(
+ std::vector<TopicPartition *> &partitions,
+ const rd_kafka_topic_partition_list_t *c_parts);
+
+
+class ErrorImpl : public Error {
+ public:
+ ~ErrorImpl() {
+ rd_kafka_error_destroy(c_error_);
+ }
+
+ ErrorImpl(ErrorCode code, const std::string *errstr) {
+ c_error_ = rd_kafka_error_new(static_cast<rd_kafka_resp_err_t>(code),
+ errstr ? "%s" : NULL,
+ errstr ? errstr->c_str() : NULL);
+ }
+
+ ErrorImpl(rd_kafka_error_t *c_error) : c_error_(c_error) {
+ }
+
+ static Error *create(ErrorCode code, const std::string *errstr) {
+ return new ErrorImpl(code, errstr);
+ }
+
+ ErrorCode code() const {
+ return static_cast<ErrorCode>(rd_kafka_error_code(c_error_));
+ }
+
+ std::string name() const {
+ return std::string(rd_kafka_error_name(c_error_));
+ }
+
+ std::string str() const {
+ return std::string(rd_kafka_error_string(c_error_));
+ }
+
+ bool is_fatal() const {
+ return !!rd_kafka_error_is_fatal(c_error_);
+ }
+
+ bool is_retriable() const {
+ return !!rd_kafka_error_is_retriable(c_error_);
+ }
+
+ bool txn_requires_abort() const {
+ return !!rd_kafka_error_txn_requires_abort(c_error_);
+ }
+
+ rd_kafka_error_t *c_error_;
+};
+
+
+class EventImpl : public Event {
+ public:
+ ~EventImpl() {
+ }
+
+ EventImpl(Type type,
+ ErrorCode err,
+ Severity severity,
+ const char *fac,
+ const char *str) :
+ type_(type),
+ err_(err),
+ severity_(severity),
+ fac_(fac ? fac : ""),
+ str_(str),
+ id_(0),
+ throttle_time_(0),
+ fatal_(false) {
+ }
+
+ EventImpl(Type type) :
+ type_(type),
+ err_(ERR_NO_ERROR),
+ severity_(EVENT_SEVERITY_EMERG),
+ fac_(""),
+ str_(""),
+ id_(0),
+ throttle_time_(0),
+ fatal_(false) {
+ }
+
+ Type type() const {
+ return type_;
+ }
+ ErrorCode err() const {
+ return err_;
+ }
+ Severity severity() const {
+ return severity_;
+ }
+ std::string fac() const {
+ return fac_;
+ }
+ std::string str() const {
+ return str_;
+ }
+ std::string broker_name() const {
+ if (type_ == EVENT_THROTTLE)
+ return str_;
+ else
+ return std::string("");
+ }
+ int broker_id() const {
+ return id_;
+ }
+ int throttle_time() const {
+ return throttle_time_;
+ }
+
+ bool fatal() const {
+ return fatal_;
+ }
+
+ Type type_;
+ ErrorCode err_;
+ Severity severity_;
+ std::string fac_;
+ std::string str_; /* reused for THROTTLE broker_name */
+ int id_;
+ int throttle_time_;
+ bool fatal_;
+};
+
+class QueueImpl : virtual public Queue {
+ public:
+ QueueImpl(rd_kafka_queue_t *c_rkqu) : queue_(c_rkqu) {
+ }
+ ~QueueImpl() {
+ rd_kafka_queue_destroy(queue_);
+ }
+ static Queue *create(Handle *base);
+ ErrorCode forward(Queue *queue);
+ Message *consume(int timeout_ms);
+ int poll(int timeout_ms);
+ void io_event_enable(int fd, const void *payload, size_t size);
+
+ rd_kafka_queue_t *queue_;
+};
+
+
+
+class HeadersImpl : public Headers {
+ public:
+ HeadersImpl() : headers_(rd_kafka_headers_new(8)) {
+ }
+
+ HeadersImpl(rd_kafka_headers_t *headers) : headers_(headers) {
+ }
+
+ HeadersImpl(const std::vector<Header> &headers) {
+ if (headers.size() > 0) {
+ headers_ = rd_kafka_headers_new(headers.size());
+ from_vector(headers);
+ } else {
+ headers_ = rd_kafka_headers_new(8);
+ }
+ }
+
+ ~HeadersImpl() {
+ if (headers_) {
+ rd_kafka_headers_destroy(headers_);
+ }
+ }
+
+ ErrorCode add(const std::string &key, const char *value) {
+ rd_kafka_resp_err_t err;
+ err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value, -1);
+ return static_cast<RdKafka::ErrorCode>(err);
+ }
+
+ ErrorCode add(const std::string &key, const void *value, size_t value_size) {
+ rd_kafka_resp_err_t err;
+ err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value,
+ value_size);
+ return static_cast<RdKafka::ErrorCode>(err);
+ }
+
+ ErrorCode add(const std::string &key, const std::string &value) {
+ rd_kafka_resp_err_t err;
+ err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value.c_str(),
+ value.size());
+ return static_cast<RdKafka::ErrorCode>(err);
+ }
+
+ ErrorCode add(const Header &header) {
+ rd_kafka_resp_err_t err;
+ err =
+ rd_kafka_header_add(headers_, header.key().c_str(), header.key().size(),
+ header.value(), header.value_size());
+ return static_cast<RdKafka::ErrorCode>(err);
+ }
+
+ ErrorCode remove(const std::string &key) {
+ rd_kafka_resp_err_t err;
+ err = rd_kafka_header_remove(headers_, key.c_str());
+ return static_cast<RdKafka::ErrorCode>(err);
+ }
+
+ std::vector<Headers::Header> get(const std::string &key) const {
+ std::vector<Headers::Header> headers;
+ const void *value;
+ size_t size;
+ rd_kafka_resp_err_t err;
+ for (size_t idx = 0; !(err = rd_kafka_header_get(headers_, idx, key.c_str(),
+ &value, &size));
+ idx++) {
+ headers.push_back(Headers::Header(key, value, size));
+ }
+ return headers;
+ }
+
+ Headers::Header get_last(const std::string &key) const {
+ const void *value;
+ size_t size;
+ rd_kafka_resp_err_t err;
+ err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size);
+ return Headers::Header(key, value, size,
+ static_cast<RdKafka::ErrorCode>(err));
+ }
+
+ std::vector<Headers::Header> get_all() const {
+ std::vector<Headers::Header> headers;
+ size_t idx = 0;
+ const char *name;
+ const void *valuep;
+ size_t size;
+ while (!rd_kafka_header_get_all(headers_, idx++, &name, &valuep, &size)) {
+ headers.push_back(Headers::Header(name, valuep, size));
+ }
+ return headers;
+ }
+
+ size_t size() const {
+ return rd_kafka_header_cnt(headers_);
+ }
+
+ /** @brief Reset the C headers pointer to NULL. */
+ void c_headers_destroyed() {
+ headers_ = NULL;
+ }
+
+ /** @returns the underlying C headers, or NULL. */
+ rd_kafka_headers_t *c_ptr() {
+ return headers_;
+ }
+
+
+ private:
+ void from_vector(const std::vector<Header> &headers) {
+ if (headers.size() == 0)
+ return;
+ for (std::vector<Header>::const_iterator it = headers.begin();
+ it != headers.end(); it++)
+ this->add(*it);
+ }
+
+ HeadersImpl(HeadersImpl const &) /*= delete*/;
+ HeadersImpl &operator=(HeadersImpl const &) /*= delete*/;
+
+ rd_kafka_headers_t *headers_;
+};
+
+
+
+class MessageImpl : public Message {
+ public:
+ ~MessageImpl() {
+ if (free_rkmessage_)
+ rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
+ if (key_)
+ delete key_;
+ if (headers_)
+ delete headers_;
+ }
+
+ MessageImpl(rd_kafka_type_t rk_type,
+ RdKafka::Topic *topic,
+ rd_kafka_message_t *rkmessage) :
+ topic_(topic),
+ rkmessage_(rkmessage),
+ free_rkmessage_(true),
+ key_(NULL),
+ headers_(NULL),
+ rk_type_(rk_type) {
+ }
+
+ MessageImpl(rd_kafka_type_t rk_type,
+ RdKafka::Topic *topic,
+ rd_kafka_message_t *rkmessage,
+ bool dofree) :
+ topic_(topic),
+ rkmessage_(rkmessage),
+ free_rkmessage_(dofree),
+ key_(NULL),
+ headers_(NULL),
+ rk_type_(rk_type) {
+ }
+
+ MessageImpl(rd_kafka_type_t rk_type, rd_kafka_message_t *rkmessage) :
+ topic_(NULL),
+ rkmessage_(rkmessage),
+ free_rkmessage_(true),
+ key_(NULL),
+ headers_(NULL),
+ rk_type_(rk_type) {
+ if (rkmessage->rkt) {
+ /* Possibly NULL */
+ topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt));
+ }
+ }
+
+ /* Create errored message */
+ MessageImpl(rd_kafka_type_t rk_type,
+ RdKafka::Topic *topic,
+ RdKafka::ErrorCode err) :
+ topic_(topic),
+ free_rkmessage_(false),
+ key_(NULL),
+ headers_(NULL),
+ rk_type_(rk_type) {
+ rkmessage_ = &rkmessage_err_;
+ memset(&rkmessage_err_, 0, sizeof(rkmessage_err_));
+ rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err);
+ }
+
+ std::string errstr() const {
+ const char *es;
+ /* message_errstr() is only available for the consumer. */
+ if (rk_type_ == RD_KAFKA_CONSUMER)
+ es = rd_kafka_message_errstr(rkmessage_);
+ else
+ es = rd_kafka_err2str(rkmessage_->err);
+
+ return std::string(es ? es : "");
+ }
+
+ ErrorCode err() const {
+ return static_cast<RdKafka::ErrorCode>(rkmessage_->err);
+ }
+
+ Topic *topic() const {
+ return topic_;
+ }
+ std::string topic_name() const {
+ if (rkmessage_->rkt)
+ return rd_kafka_topic_name(rkmessage_->rkt);
+ else
+ return "";
+ }
+ int32_t partition() const {
+ return rkmessage_->partition;
+ }
+ void *payload() const {
+ return rkmessage_->payload;
+ }
+ size_t len() const {
+ return rkmessage_->len;
+ }
+ const std::string *key() const {
+ if (key_) {
+ return key_;
+ } else if (rkmessage_->key) {
+ key_ = new std::string(static_cast<char const *>(rkmessage_->key),
+ rkmessage_->key_len);
+ return key_;
+ }
+ return NULL;
+ }
+ const void *key_pointer() const {
+ return rkmessage_->key;
+ }
+ size_t key_len() const {
+ return rkmessage_->key_len;
+ }
+
+ int64_t offset() const {
+ return rkmessage_->offset;
+ }
+
+ MessageTimestamp timestamp() const {
+ MessageTimestamp ts;
+ rd_kafka_timestamp_type_t tstype;
+ ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype);
+ ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype);
+ return ts;
+ }
+
+ void *msg_opaque() const {
+ return rkmessage_->_private;
+ }
+
+ int64_t latency() const {
+ return rd_kafka_message_latency(rkmessage_);
+ }
+
+ struct rd_kafka_message_s *c_ptr() {
+ return rkmessage_;
+ }
+
+ Status status() const {
+ return static_cast<Status>(rd_kafka_message_status(rkmessage_));
+ }
+
+ Headers *headers() {
+ ErrorCode err;
+ return headers(&err);
+ }
+
+ Headers *headers(ErrorCode *err) {
+ *err = ERR_NO_ERROR;
+
+ if (!headers_) {
+ rd_kafka_headers_t *c_hdrs;
+ rd_kafka_resp_err_t c_err;
+
+ if ((c_err = rd_kafka_message_detach_headers(rkmessage_, &c_hdrs))) {
+ *err = static_cast<RdKafka::ErrorCode>(c_err);
+ return NULL;
+ }
+
+ headers_ = new HeadersImpl(c_hdrs);
+ }
+
+ return headers_;
+ }
+
+ int32_t broker_id() const {
+ return rd_kafka_message_broker_id(rkmessage_);
+ }
+
+ int32_t leader_epoch() const {
+ return rd_kafka_message_leader_epoch(rkmessage_);
+ }
+
+
+ Error *offset_store() {
+ rd_kafka_error_t *c_error;
+
+ c_error = rd_kafka_offset_store_message(rkmessage_);
+
+ if (c_error)
+ return new ErrorImpl(c_error);
+ else
+ return NULL;
+ }
+
+ RdKafka::Topic *topic_;
+ rd_kafka_message_t *rkmessage_;
+ bool free_rkmessage_;
+ /* For error signalling by the C++ layer the .._err_ message is
+ * used as a place holder and rkmessage_ is set to point to it. */
+ rd_kafka_message_t rkmessage_err_;
+ mutable std::string *key_; /* mutable because it's a cached value */
+
+ private:
+ /* "delete" copy ctor + copy assignment, for safety of key_ */
+ MessageImpl(MessageImpl const &) /*= delete*/;
+ MessageImpl &operator=(MessageImpl const &) /*= delete*/;
+
+ RdKafka::Headers *headers_;
+ const rd_kafka_type_t rk_type_; /**< Client type */
+};
+
+
+class ConfImpl : public Conf {
+ public:
+ ConfImpl(ConfType conf_type) :
+ consume_cb_(NULL),
+ dr_cb_(NULL),
+ event_cb_(NULL),
+ socket_cb_(NULL),
+ open_cb_(NULL),
+ partitioner_cb_(NULL),
+ partitioner_kp_cb_(NULL),
+ rebalance_cb_(NULL),
+ offset_commit_cb_(NULL),
+ oauthbearer_token_refresh_cb_(NULL),
+ ssl_cert_verify_cb_(NULL),
+ conf_type_(conf_type),
+ rk_conf_(NULL),
+ rkt_conf_(NULL) {
+ }
+ ~ConfImpl() {
+ if (rk_conf_)
+ rd_kafka_conf_destroy(rk_conf_);
+ else if (rkt_conf_)
+ rd_kafka_topic_conf_destroy(rkt_conf_);
+ }
+
+ Conf::ConfResult set(const std::string &name,
+ const std::string &value,
+ std::string &errstr);
+
+ Conf::ConfResult set(const std::string &name,
+ DeliveryReportCb *dr_cb,
+ std::string &errstr) {
+ if (name != "dr_cb") {
+ errstr = "Invalid value type, expected RdKafka::DeliveryReportCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ dr_cb_ = dr_cb;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult set(const std::string &name,
+ OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
+ std::string &errstr) {
+ if (name != "oauthbearer_token_refresh_cb") {
+ errstr =
+ "Invalid value type, expected RdKafka::OAuthBearerTokenRefreshCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ oauthbearer_token_refresh_cb_ = oauthbearer_token_refresh_cb;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult set(const std::string &name,
+ EventCb *event_cb,
+ std::string &errstr) {
+ if (name != "event_cb") {
+ errstr = "Invalid value type, expected RdKafka::EventCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ event_cb_ = event_cb;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult set(const std::string &name,
+ const Conf *topic_conf,
+ std::string &errstr) {
+ const ConfImpl *tconf_impl =
+ dynamic_cast<const RdKafka::ConfImpl *>(topic_conf);
+ if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) {
+ errstr = "Invalid value type, expected RdKafka::Conf";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ rd_kafka_conf_set_default_topic_conf(
+ rk_conf_, rd_kafka_topic_conf_dup(tconf_impl->rkt_conf_));
+
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult set(const std::string &name,
+ PartitionerCb *partitioner_cb,
+ std::string &errstr) {
+ if (name != "partitioner_cb") {
+ errstr = "Invalid value type, expected RdKafka::PartitionerCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rkt_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
+ return Conf::CONF_INVALID;
+ }
+
+ partitioner_cb_ = partitioner_cb;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult set(const std::string &name,
+ PartitionerKeyPointerCb *partitioner_kp_cb,
+ std::string &errstr) {
+ if (name != "partitioner_key_pointer_cb") {
+ errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rkt_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
+ return Conf::CONF_INVALID;
+ }
+
+ partitioner_kp_cb_ = partitioner_kp_cb;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult set(const std::string &name,
+ SocketCb *socket_cb,
+ std::string &errstr) {
+ if (name != "socket_cb") {
+ errstr = "Invalid value type, expected RdKafka::SocketCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ socket_cb_ = socket_cb;
+ return Conf::CONF_OK;
+ }
+
+
+ Conf::ConfResult set(const std::string &name,
+ OpenCb *open_cb,
+ std::string &errstr) {
+ if (name != "open_cb") {
+ errstr = "Invalid value type, expected RdKafka::OpenCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ open_cb_ = open_cb;
+ return Conf::CONF_OK;
+ }
+
+
+
+ Conf::ConfResult set(const std::string &name,
+ RebalanceCb *rebalance_cb,
+ std::string &errstr) {
+ if (name != "rebalance_cb") {
+ errstr = "Invalid value type, expected RdKafka::RebalanceCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ rebalance_cb_ = rebalance_cb;
+ return Conf::CONF_OK;
+ }
+
+
+ Conf::ConfResult set(const std::string &name,
+ OffsetCommitCb *offset_commit_cb,
+ std::string &errstr) {
+ if (name != "offset_commit_cb") {
+ errstr = "Invalid value type, expected RdKafka::OffsetCommitCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ offset_commit_cb_ = offset_commit_cb;
+ return Conf::CONF_OK;
+ }
+
+
+ Conf::ConfResult set(const std::string &name,
+ SslCertificateVerifyCb *ssl_cert_verify_cb,
+ std::string &errstr) {
+ if (name != "ssl_cert_verify_cb") {
+ errstr = "Invalid value type, expected RdKafka::SslCertificateVerifyCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ ssl_cert_verify_cb_ = ssl_cert_verify_cb;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult set_engine_callback_data(void *value, std::string &errstr) {
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ rd_kafka_conf_set_engine_callback_data(rk_conf_, value);
+ return Conf::CONF_OK;
+ }
+
+
+ Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type,
+ RdKafka::CertificateEncoding cert_enc,
+ const void *buffer,
+ size_t size,
+ std::string &errstr) {
+ rd_kafka_conf_res_t res;
+ char errbuf[512];
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ res = rd_kafka_conf_set_ssl_cert(
+ rk_conf_, static_cast<rd_kafka_cert_type_t>(cert_type),
+ static_cast<rd_kafka_cert_enc_t>(cert_enc), buffer, size, errbuf,
+ sizeof(errbuf));
+
+ if (res != RD_KAFKA_CONF_OK)
+ errstr = errbuf;
+
+ return static_cast<Conf::ConfResult>(res);
+ }
+
+ Conf::ConfResult enable_sasl_queue(bool enable, std::string &errstr) {
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ rd_kafka_conf_enable_sasl_queue(rk_conf_, enable ? 1 : 0);
+
+ return Conf::CONF_OK;
+ }
+
+
+ Conf::ConfResult get(const std::string &name, std::string &value) const {
+ if (name.compare("dr_cb") == 0 || name.compare("event_cb") == 0 ||
+ name.compare("partitioner_cb") == 0 ||
+ name.compare("partitioner_key_pointer_cb") == 0 ||
+ name.compare("socket_cb") == 0 || name.compare("open_cb") == 0 ||
+ name.compare("rebalance_cb") == 0 ||
+ name.compare("offset_commit_cb") == 0 ||
+ name.compare("oauthbearer_token_refresh_cb") == 0 ||
+ name.compare("ssl_cert_verify_cb") == 0 ||
+ name.compare("set_engine_callback_data") == 0 ||
+ name.compare("enable_sasl_queue") == 0) {
+ return Conf::CONF_INVALID;
+ }
+ rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID;
+
+ /* Get size of property */
+ size_t size;
+ if (rk_conf_)
+ res = rd_kafka_conf_get(rk_conf_, name.c_str(), NULL, &size);
+ else if (rkt_conf_)
+ res = rd_kafka_topic_conf_get(rkt_conf_, name.c_str(), NULL, &size);
+ if (res != RD_KAFKA_CONF_OK)
+ return static_cast<Conf::ConfResult>(res);
+
+ char *tmpValue = new char[size];
+
+ if (rk_conf_)
+ res = rd_kafka_conf_get(rk_conf_, name.c_str(), tmpValue, &size);
+ else if (rkt_conf_)
+ res = rd_kafka_topic_conf_get(rkt_conf_, name.c_str(), tmpValue, &size);
+
+ if (res == RD_KAFKA_CONF_OK)
+ value.assign(tmpValue);
+ delete[] tmpValue;
+
+ return static_cast<Conf::ConfResult>(res);
+ }
+
+ Conf::ConfResult get(DeliveryReportCb *&dr_cb) const {
+ if (!rk_conf_)
+ return Conf::CONF_INVALID;
+ dr_cb = this->dr_cb_;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult get(
+ OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const {
+ if (!rk_conf_)
+ return Conf::CONF_INVALID;
+ oauthbearer_token_refresh_cb = this->oauthbearer_token_refresh_cb_;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult get(EventCb *&event_cb) const {
+ if (!rk_conf_)
+ return Conf::CONF_INVALID;
+ event_cb = this->event_cb_;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult get(PartitionerCb *&partitioner_cb) const {
+ if (!rkt_conf_)
+ return Conf::CONF_INVALID;
+ partitioner_cb = this->partitioner_cb_;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const {
+ if (!rkt_conf_)
+ return Conf::CONF_INVALID;
+ partitioner_kp_cb = this->partitioner_kp_cb_;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult get(SocketCb *&socket_cb) const {
+ if (!rk_conf_)
+ return Conf::CONF_INVALID;
+ socket_cb = this->socket_cb_;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult get(OpenCb *&open_cb) const {
+ if (!rk_conf_)
+ return Conf::CONF_INVALID;
+ open_cb = this->open_cb_;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult get(RebalanceCb *&rebalance_cb) const {
+ if (!rk_conf_)
+ return Conf::CONF_INVALID;
+ rebalance_cb = this->rebalance_cb_;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const {
+ if (!rk_conf_)
+ return Conf::CONF_INVALID;
+ offset_commit_cb = this->offset_commit_cb_;
+ return Conf::CONF_OK;
+ }
+
+ Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const {
+ if (!rk_conf_)
+ return Conf::CONF_INVALID;
+ ssl_cert_verify_cb = this->ssl_cert_verify_cb_;
+ return Conf::CONF_OK;
+ }
+
+ std::list<std::string> *dump();
+
+
+ Conf::ConfResult set(const std::string &name,
+ ConsumeCb *consume_cb,
+ std::string &errstr) {
+ if (name != "consume_cb") {
+ errstr = "Invalid value type, expected RdKafka::ConsumeCb";
+ return Conf::CONF_INVALID;
+ }
+
+ if (!rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ return Conf::CONF_INVALID;
+ }
+
+ consume_cb_ = consume_cb;
+ return Conf::CONF_OK;
+ }
+
+ struct rd_kafka_conf_s *c_ptr_global() {
+ if (conf_type_ == CONF_GLOBAL)
+ return rk_conf_;
+ else
+ return NULL;
+ }
+
+ struct rd_kafka_topic_conf_s *c_ptr_topic() {
+ if (conf_type_ == CONF_TOPIC)
+ return rkt_conf_;
+ else
+ return NULL;
+ }
+
+ ConsumeCb *consume_cb_;
+ DeliveryReportCb *dr_cb_;
+ EventCb *event_cb_;
+ SocketCb *socket_cb_;
+ OpenCb *open_cb_;
+ PartitionerCb *partitioner_cb_;
+ PartitionerKeyPointerCb *partitioner_kp_cb_;
+ RebalanceCb *rebalance_cb_;
+ OffsetCommitCb *offset_commit_cb_;
+ OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_;
+ SslCertificateVerifyCb *ssl_cert_verify_cb_;
+ ConfType conf_type_;
+ rd_kafka_conf_t *rk_conf_;
+ rd_kafka_topic_conf_t *rkt_conf_;
+};
+
+
+class HandleImpl : virtual public Handle {
+ public:
+ ~HandleImpl() {
+ }
+ HandleImpl() {
+ }
+ std::string name() const {
+ return std::string(rd_kafka_name(rk_));
+ }
+ std::string memberid() const {
+ char *str = rd_kafka_memberid(rk_);
+ std::string memberid = str ? str : "";
+ if (str)
+ rd_kafka_mem_free(rk_, str);
+ return memberid;
+ }
+ int poll(int timeout_ms) {
+ return rd_kafka_poll(rk_, timeout_ms);
+ }
+ int outq_len() {
+ return rd_kafka_outq_len(rk_);
+ }
+
+ void set_common_config(const RdKafka::ConfImpl *confimpl);
+
+ RdKafka::ErrorCode metadata(bool all_topics,
+ const Topic *only_rkt,
+ Metadata **metadatap,
+ int timeout_ms);
+
+ ErrorCode pause(std::vector<TopicPartition *> &partitions);
+ ErrorCode resume(std::vector<TopicPartition *> &partitions);
+
+ ErrorCode query_watermark_offsets(const std::string &topic,
+ int32_t partition,
+ int64_t *low,
+ int64_t *high,
+ int timeout_ms) {
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_query_watermark_offsets(
+ rk_, topic.c_str(), partition, low, high, timeout_ms));
+ }
+
+ ErrorCode get_watermark_offsets(const std::string &topic,
+ int32_t partition,
+ int64_t *low,
+ int64_t *high) {
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_get_watermark_offsets(
+ rk_, topic.c_str(), partition, low, high));
+ }
+
+ Queue *get_partition_queue(const TopicPartition *partition);
+
+ Queue *get_sasl_queue() {
+ rd_kafka_queue_t *rkqu;
+ rkqu = rd_kafka_queue_get_sasl(rk_);
+
+ if (rkqu == NULL)
+ return NULL;
+
+ return new QueueImpl(rkqu);
+ }
+
+ Queue *get_background_queue() {
+ rd_kafka_queue_t *rkqu;
+ rkqu = rd_kafka_queue_get_background(rk_);
+
+ if (rkqu == NULL)
+ return NULL;
+
+ return new QueueImpl(rkqu);
+ }
+
+
+ ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets,
+ int timeout_ms) {
+ rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
+ ErrorCode err = static_cast<ErrorCode>(
+ rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms));
+ update_partitions_from_c_parts(offsets, c_offsets);
+ rd_kafka_topic_partition_list_destroy(c_offsets);
+ return err;
+ }
+
+ ErrorCode set_log_queue(Queue *queue);
+
+ void yield() {
+ rd_kafka_yield(rk_);
+ }
+
+ std::string clusterid(int timeout_ms) {
+ char *str = rd_kafka_clusterid(rk_, timeout_ms);
+ std::string clusterid = str ? str : "";
+ if (str)
+ rd_kafka_mem_free(rk_, str);
+ return clusterid;
+ }
+
+ struct rd_kafka_s *c_ptr() {
+ return rk_;
+ }
+
+ int32_t controllerid(int timeout_ms) {
+ return rd_kafka_controllerid(rk_, timeout_ms);
+ }
+
+ ErrorCode fatal_error(std::string &errstr) const {
+ char errbuf[512];
+ RdKafka::ErrorCode err = static_cast<RdKafka::ErrorCode>(
+ rd_kafka_fatal_error(rk_, errbuf, sizeof(errbuf)));
+ if (err)
+ errstr = errbuf;
+ return err;
+ }
+
+ ErrorCode oauthbearer_set_token(const std::string &token_value,
+ int64_t md_lifetime_ms,
+ const std::string &md_principal_name,
+ const std::list<std::string> &extensions,
+ std::string &errstr) {
+ char errbuf[512];
+ ErrorCode err;
+ const char **extensions_copy = new const char *[extensions.size()];
+ int elem = 0;
+
+ for (std::list<std::string>::const_iterator it = extensions.begin();
+ it != extensions.end(); it++)
+ extensions_copy[elem++] = it->c_str();
+ err = static_cast<ErrorCode>(rd_kafka_oauthbearer_set_token(
+ rk_, token_value.c_str(), md_lifetime_ms, md_principal_name.c_str(),
+ extensions_copy, extensions.size(), errbuf, sizeof(errbuf)));
+ delete[] extensions_copy;
+
+ if (err != ERR_NO_ERROR)
+ errstr = errbuf;
+
+ return err;
+ }
+
+ ErrorCode oauthbearer_set_token_failure(const std::string &errstr) {
+ return static_cast<ErrorCode>(
+ rd_kafka_oauthbearer_set_token_failure(rk_, errstr.c_str()));
+ }
+
+ Error *sasl_background_callbacks_enable() {
+ rd_kafka_error_t *c_error = rd_kafka_sasl_background_callbacks_enable(rk_);
+
+ if (c_error)
+ return new ErrorImpl(c_error);
+
+ return NULL;
+ }
+
+ Error *sasl_set_credentials(const std::string &username,
+ const std::string &password) {
+ rd_kafka_error_t *c_error =
+ rd_kafka_sasl_set_credentials(rk_, username.c_str(), password.c_str());
+
+ if (c_error)
+ return new ErrorImpl(c_error);
+
+ return NULL;
+ };
+
+ void *mem_malloc(size_t size) {
+ return rd_kafka_mem_malloc(rk_, size);
+ }
+
+ void mem_free(void *ptr) {
+ rd_kafka_mem_free(rk_, ptr);
+ }
+
+ rd_kafka_t *rk_;
+ /* All Producer and Consumer callbacks must reside in HandleImpl and
+ * the opaque provided to rdkafka must be a pointer to HandleImpl, since
+ * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
+ * HandleImpl due to the skewed diamond inheritance. */
+ ConsumeCb *consume_cb_;
+ EventCb *event_cb_;
+ SocketCb *socket_cb_;
+ OpenCb *open_cb_;
+ DeliveryReportCb *dr_cb_;
+ PartitionerCb *partitioner_cb_;
+ PartitionerKeyPointerCb *partitioner_kp_cb_;
+ RebalanceCb *rebalance_cb_;
+ OffsetCommitCb *offset_commit_cb_;
+ OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_;
+ SslCertificateVerifyCb *ssl_cert_verify_cb_;
+};
+
+
+class TopicImpl : public Topic {
+ public:
+ ~TopicImpl() {
+ rd_kafka_topic_destroy(rkt_);
+ }
+
+ std::string name() const {
+ return rd_kafka_topic_name(rkt_);
+ }
+
+ bool partition_available(int32_t partition) const {
+ return !!rd_kafka_topic_partition_available(rkt_, partition);
+ }
+
+ ErrorCode offset_store(int32_t partition, int64_t offset) {
+ return static_cast<RdKafka::ErrorCode>(
+ rd_kafka_offset_store(rkt_, partition, offset));
+ }
+
+ static Topic *create(Handle &base, const std::string &topic, Conf *conf);
+
+ struct rd_kafka_topic_s *c_ptr() {
+ return rkt_;
+ }
+
+ rd_kafka_topic_t *rkt_;
+ PartitionerCb *partitioner_cb_;
+ PartitionerKeyPointerCb *partitioner_kp_cb_;
+};
+
+
+/**
+ * Topic and Partition
+ */
+class TopicPartitionImpl : public TopicPartition {
+ public:
+ ~TopicPartitionImpl() {
+ }
+
+ static TopicPartition *create(const std::string &topic, int partition);
+
+ TopicPartitionImpl(const std::string &topic, int partition) :
+ topic_(topic),
+ partition_(partition),
+ offset_(RdKafka::Topic::OFFSET_INVALID),
+ err_(ERR_NO_ERROR),
+ leader_epoch_(-1) {
+ }
+
+ TopicPartitionImpl(const std::string &topic, int partition, int64_t offset) :
+ topic_(topic),
+ partition_(partition),
+ offset_(offset),
+ err_(ERR_NO_ERROR),
+ leader_epoch_(-1) {
+ }
+
+ TopicPartitionImpl(const rd_kafka_topic_partition_t *c_part) {
+ topic_ = std::string(c_part->topic);
+ partition_ = c_part->partition;
+ offset_ = c_part->offset;
+ err_ = static_cast<ErrorCode>(c_part->err);
+ leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(c_part);
+ // FIXME: metadata
+ }
+
+ static void destroy(std::vector<TopicPartition *> &partitions);
+
+ int partition() const {
+ return partition_;
+ }
+ const std::string &topic() const {
+ return topic_;
+ }
+
+ int64_t offset() const {
+ return offset_;
+ }
+
+ ErrorCode err() const {
+ return err_;
+ }
+
+ void set_offset(int64_t offset) {
+ offset_ = offset;
+ }
+
+ int32_t get_leader_epoch() {
+ return leader_epoch_;
+ }
+
+ void set_leader_epoch(int32_t leader_epoch) {
+ leader_epoch_ = leader_epoch_;
+ }
+
+ std::ostream &operator<<(std::ostream &ostrm) const {
+ return ostrm << topic_ << " [" << partition_ << "]";
+ }
+
+ std::string topic_;
+ int partition_;
+ int64_t offset_;
+ ErrorCode err_;
+ int32_t leader_epoch_;
+};
+
+
+/**
+ * @class ConsumerGroupMetadata wraps the
+ * C rd_kafka_consumer_group_metadata_t object.
+ */
+class ConsumerGroupMetadataImpl : public ConsumerGroupMetadata {
+ public:
+ ~ConsumerGroupMetadataImpl() {
+ rd_kafka_consumer_group_metadata_destroy(cgmetadata_);
+ }
+
+ ConsumerGroupMetadataImpl(rd_kafka_consumer_group_metadata_t *cgmetadata) :
+ cgmetadata_(cgmetadata) {
+ }
+
+ rd_kafka_consumer_group_metadata_t *cgmetadata_;
+};
+
+
+class KafkaConsumerImpl : virtual public KafkaConsumer,
+ virtual public HandleImpl {
+ public:
+ ~KafkaConsumerImpl() {
+ if (rk_)
+ rd_kafka_destroy_flags(rk_, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
+ }
+
+ static KafkaConsumer *create(Conf *conf, std::string &errstr);
+
+ ErrorCode assignment(std::vector<TopicPartition *> &partitions);
+ bool assignment_lost();
+ std::string rebalance_protocol() {
+ const char *str = rd_kafka_rebalance_protocol(rk_);
+ return std::string(str ? str : "");
+ }
+ ErrorCode subscription(std::vector<std::string> &topics);
+ ErrorCode subscribe(const std::vector<std::string> &topics);
+ ErrorCode unsubscribe();
+ ErrorCode assign(const std::vector<TopicPartition *> &partitions);
+ ErrorCode unassign();
+ Error *incremental_assign(const std::vector<TopicPartition *> &partitions);
+ Error *incremental_unassign(const std::vector<TopicPartition *> &partitions);
+
+ Message *consume(int timeout_ms);
+ ErrorCode commitSync() {
+ return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0 /*sync*/));
+ }
+ ErrorCode commitAsync() {
+ return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1 /*async*/));
+ }
+ ErrorCode commitSync(Message *message) {
+ MessageImpl *msgimpl = dynamic_cast<MessageImpl *>(message);
+ return static_cast<ErrorCode>(
+ rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0 /*sync*/));
+ }
+ ErrorCode commitAsync(Message *message) {
+ MessageImpl *msgimpl = dynamic_cast<MessageImpl *>(message);
+ return static_cast<ErrorCode>(
+ rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 1 /*async*/));
+ }
+
+ ErrorCode commitSync(std::vector<TopicPartition *> &offsets) {
+ rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets);
+ rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 0);
+ if (!err)
+ update_partitions_from_c_parts(offsets, c_parts);
+ rd_kafka_topic_partition_list_destroy(c_parts);
+ return static_cast<ErrorCode>(err);
+ }
+
+ ErrorCode commitAsync(const std::vector<TopicPartition *> &offsets) {
+ rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets);
+ rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 1);
+ rd_kafka_topic_partition_list_destroy(c_parts);
+ return static_cast<ErrorCode>(err);
+ }
+
+ ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) {
+ return static_cast<ErrorCode>(rd_kafka_commit_queue(
+ rk_, NULL, NULL, RdKafka::offset_commit_cb_trampoline0,
+ offset_commit_cb));
+ }
+
+ ErrorCode commitSync(std::vector<TopicPartition *> &offsets,
+ OffsetCommitCb *offset_commit_cb) {
+ rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets);
+ rd_kafka_resp_err_t err = rd_kafka_commit_queue(
+ rk_, c_parts, NULL, RdKafka::offset_commit_cb_trampoline0,
+ offset_commit_cb);
+ rd_kafka_topic_partition_list_destroy(c_parts);
+ return static_cast<ErrorCode>(err);
+ }
+
+ ErrorCode committed(std::vector<TopicPartition *> &partitions,
+ int timeout_ms);
+ ErrorCode position(std::vector<TopicPartition *> &partitions);
+
+ ConsumerGroupMetadata *groupMetadata() {
+ rd_kafka_consumer_group_metadata_t *cgmetadata;
+
+ cgmetadata = rd_kafka_consumer_group_metadata(rk_);
+ if (!cgmetadata)
+ return NULL;
+
+ return new ConsumerGroupMetadataImpl(cgmetadata);
+ }
+
+ ErrorCode close();
+
+ Error *close(Queue *queue);
+
+ bool closed() {
+ return rd_kafka_consumer_closed(rk_) ? true : false;
+ }
+
+ ErrorCode seek(const TopicPartition &partition, int timeout_ms);
+
+ ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) {
+ rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets);
+ rd_kafka_resp_err_t err = rd_kafka_offsets_store(rk_, c_parts);
+ update_partitions_from_c_parts(offsets, c_parts);
+ rd_kafka_topic_partition_list_destroy(c_parts);
+ return static_cast<ErrorCode>(err);
+ }
+};
+
+
+class MetadataImpl : public Metadata {
+ public:
+ MetadataImpl(const rd_kafka_metadata_t *metadata);
+ ~MetadataImpl();
+
+ const std::vector<const BrokerMetadata *> *brokers() const {
+ return &brokers_;
+ }
+
+ const std::vector<const TopicMetadata *> *topics() const {
+ return &topics_;
+ }
+
+ std::string orig_broker_name() const {
+ return std::string(metadata_->orig_broker_name);
+ }
+
+ int32_t orig_broker_id() const {
+ return metadata_->orig_broker_id;
+ }
+
+ private:
+ const rd_kafka_metadata_t *metadata_;
+ std::vector<const BrokerMetadata *> brokers_;
+ std::vector<const TopicMetadata *> topics_;
+ std::string orig_broker_name_;
+};
+
+
+
+class ConsumerImpl : virtual public Consumer, virtual public HandleImpl {
+ public:
+ ~ConsumerImpl() {
+ if (rk_)
+ rd_kafka_destroy(rk_);
+ }
+ static Consumer *create(Conf *conf, std::string &errstr);
+
+ ErrorCode start(Topic *topic, int32_t partition, int64_t offset);
+ ErrorCode start(Topic *topic,
+ int32_t partition,
+ int64_t offset,
+ Queue *queue);
+ ErrorCode stop(Topic *topic, int32_t partition);
+ ErrorCode seek(Topic *topic,
+ int32_t partition,
+ int64_t offset,
+ int timeout_ms);
+ Message *consume(Topic *topic, int32_t partition, int timeout_ms);
+ Message *consume(Queue *queue, int timeout_ms);
+ int consume_callback(Topic *topic,
+ int32_t partition,
+ int timeout_ms,
+ ConsumeCb *cb,
+ void *opaque);
+ int consume_callback(Queue *queue,
+ int timeout_ms,
+ RdKafka::ConsumeCb *consume_cb,
+ void *opaque);
+};
+
+
+
+class ProducerImpl : virtual public Producer, virtual public HandleImpl {
+ public:
+ ~ProducerImpl() {
+ if (rk_)
+ rd_kafka_destroy(rk_);
+ }
+
+ ErrorCode produce(Topic *topic,
+ int32_t partition,
+ int msgflags,
+ void *payload,
+ size_t len,
+ const std::string *key,
+ void *msg_opaque);
+
+ ErrorCode produce(Topic *topic,
+ int32_t partition,
+ int msgflags,
+ void *payload,
+ size_t len,
+ const void *key,
+ size_t key_len,
+ void *msg_opaque);
+
+ ErrorCode produce(Topic *topic,
+ int32_t partition,
+ const std::vector<char> *payload,
+ const std::vector<char> *key,
+ void *msg_opaque);
+
+ ErrorCode produce(const std::string topic_name,
+ int32_t partition,
+ int msgflags,
+ void *payload,
+ size_t len,
+ const void *key,
+ size_t key_len,
+ int64_t timestamp,
+ void *msg_opaque);
+
+ ErrorCode produce(const std::string topic_name,
+ int32_t partition,
+ int msgflags,
+ void *payload,
+ size_t len,
+ const void *key,
+ size_t key_len,
+ int64_t timestamp,
+ RdKafka::Headers *headers,
+ void *msg_opaque);
+
+ ErrorCode flush(int timeout_ms) {
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_, timeout_ms));
+ }
+
+ ErrorCode purge(int purge_flags) {
+ return static_cast<RdKafka::ErrorCode>(
+ rd_kafka_purge(rk_, (int)purge_flags));
+ }
+
+ Error *init_transactions(int timeout_ms) {
+ rd_kafka_error_t *c_error;
+
+ c_error = rd_kafka_init_transactions(rk_, timeout_ms);
+
+ if (c_error)
+ return new ErrorImpl(c_error);
+ else
+ return NULL;
+ }
+
+ Error *begin_transaction() {
+ rd_kafka_error_t *c_error;
+
+ c_error = rd_kafka_begin_transaction(rk_);
+
+ if (c_error)
+ return new ErrorImpl(c_error);
+ else
+ return NULL;
+ }
+
+ Error *send_offsets_to_transaction(
+ const std::vector<TopicPartition *> &offsets,
+ const ConsumerGroupMetadata *group_metadata,
+ int timeout_ms) {
+ rd_kafka_error_t *c_error;
+ const RdKafka::ConsumerGroupMetadataImpl *cgmdimpl =
+ dynamic_cast<const RdKafka::ConsumerGroupMetadataImpl *>(
+ group_metadata);
+ rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
+
+ c_error = rd_kafka_send_offsets_to_transaction(
+ rk_, c_offsets, cgmdimpl->cgmetadata_, timeout_ms);
+
+ rd_kafka_topic_partition_list_destroy(c_offsets);
+
+ if (c_error)
+ return new ErrorImpl(c_error);
+ else
+ return NULL;
+ }
+
+ Error *commit_transaction(int timeout_ms) {
+ rd_kafka_error_t *c_error;
+
+ c_error = rd_kafka_commit_transaction(rk_, timeout_ms);
+
+ if (c_error)
+ return new ErrorImpl(c_error);
+ else
+ return NULL;
+ }
+
+ Error *abort_transaction(int timeout_ms) {
+ rd_kafka_error_t *c_error;
+
+ c_error = rd_kafka_abort_transaction(rk_, timeout_ms);
+
+ if (c_error)
+ return new ErrorImpl(c_error);
+ else
+ return NULL;
+ }
+
+ static Producer *create(Conf *conf, std::string &errstr);
+};
+
+
+
+} // namespace RdKafka
+
+#endif /* _RDKAFKACPP_INT_H_ */