summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h1628
1 files changed, 0 insertions, 1628 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h
deleted file mode 100644
index bc024ebe9..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h
+++ /dev/null
@@ -1,1628 +0,0 @@
-/*
- * librdkafka - Apache Kafka C/C++ library
- *
- * Copyright (c) 2014 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#ifndef _RDKAFKACPP_INT_H_
-#define _RDKAFKACPP_INT_H_
-
-#include <string>
-#include <iostream>
-#include <cstring>
-#include <stdlib.h>
-
-#include "rdkafkacpp.h"
-
-extern "C" {
-#include "../src/rdkafka.h"
-}
-
-#ifdef _WIN32
-/* Visual Studio */
-#include "../src/win32_config.h"
-#else
-/* POSIX / UNIX based systems */
-#include "../config.h" /* mklove output */
-#endif
-
-#ifdef _MSC_VER
-typedef int mode_t;
-#pragma warning(disable : 4250)
-#endif
-
-
-namespace RdKafka {
-
-void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque);
-void log_cb_trampoline(const rd_kafka_t *rk,
- int level,
- const char *fac,
- const char *buf);
-void error_cb_trampoline(rd_kafka_t *rk,
- int err,
- const char *reason,
- void *opaque);
-void throttle_cb_trampoline(rd_kafka_t *rk,
- const char *broker_name,
- int32_t broker_id,
- int throttle_time_ms,
- void *opaque);
-int stats_cb_trampoline(rd_kafka_t *rk,
- char *json,
- size_t json_len,
- void *opaque);
-int socket_cb_trampoline(int domain, int type, int protocol, void *opaque);
-int open_cb_trampoline(const char *pathname,
- int flags,
- mode_t mode,
- void *opaque);
-void rebalance_cb_trampoline(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *c_partitions,
- void *opaque);
-void offset_commit_cb_trampoline0(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *c_offsets,
- void *opaque);
-void oauthbearer_token_refresh_cb_trampoline(rd_kafka_t *rk,
- const char *oauthbearer_config,
- void *opaque);
-
-int ssl_cert_verify_cb_trampoline(rd_kafka_t *rk,
- const char *broker_name,
- int32_t broker_id,
- int *x509_error,
- int depth,
- const char *buf,
- size_t size,
- char *errstr,
- size_t errstr_size,
- void *opaque);
-
-rd_kafka_topic_partition_list_t *partitions_to_c_parts(
- const std::vector<TopicPartition *> &partitions);
-
-/**
- * @brief Update the application provided 'partitions' with info from 'c_parts'
- */
-void update_partitions_from_c_parts(
- std::vector<TopicPartition *> &partitions,
- const rd_kafka_topic_partition_list_t *c_parts);
-
-
-class ErrorImpl : public Error {
- public:
- ~ErrorImpl() {
- rd_kafka_error_destroy(c_error_);
- }
-
- ErrorImpl(ErrorCode code, const std::string *errstr) {
- c_error_ = rd_kafka_error_new(static_cast<rd_kafka_resp_err_t>(code),
- errstr ? "%s" : NULL,
- errstr ? errstr->c_str() : NULL);
- }
-
- ErrorImpl(rd_kafka_error_t *c_error) : c_error_(c_error) {
- }
-
- static Error *create(ErrorCode code, const std::string *errstr) {
- return new ErrorImpl(code, errstr);
- }
-
- ErrorCode code() const {
- return static_cast<ErrorCode>(rd_kafka_error_code(c_error_));
- }
-
- std::string name() const {
- return std::string(rd_kafka_error_name(c_error_));
- }
-
- std::string str() const {
- return std::string(rd_kafka_error_string(c_error_));
- }
-
- bool is_fatal() const {
- return !!rd_kafka_error_is_fatal(c_error_);
- }
-
- bool is_retriable() const {
- return !!rd_kafka_error_is_retriable(c_error_);
- }
-
- bool txn_requires_abort() const {
- return !!rd_kafka_error_txn_requires_abort(c_error_);
- }
-
- rd_kafka_error_t *c_error_;
-};
-
-
-class EventImpl : public Event {
- public:
- ~EventImpl() {
- }
-
- EventImpl(Type type,
- ErrorCode err,
- Severity severity,
- const char *fac,
- const char *str) :
- type_(type),
- err_(err),
- severity_(severity),
- fac_(fac ? fac : ""),
- str_(str),
- id_(0),
- throttle_time_(0),
- fatal_(false) {
- }
-
- EventImpl(Type type) :
- type_(type),
- err_(ERR_NO_ERROR),
- severity_(EVENT_SEVERITY_EMERG),
- fac_(""),
- str_(""),
- id_(0),
- throttle_time_(0),
- fatal_(false) {
- }
-
- Type type() const {
- return type_;
- }
- ErrorCode err() const {
- return err_;
- }
- Severity severity() const {
- return severity_;
- }
- std::string fac() const {
- return fac_;
- }
- std::string str() const {
- return str_;
- }
- std::string broker_name() const {
- if (type_ == EVENT_THROTTLE)
- return str_;
- else
- return std::string("");
- }
- int broker_id() const {
- return id_;
- }
- int throttle_time() const {
- return throttle_time_;
- }
-
- bool fatal() const {
- return fatal_;
- }
-
- Type type_;
- ErrorCode err_;
- Severity severity_;
- std::string fac_;
- std::string str_; /* reused for THROTTLE broker_name */
- int id_;
- int throttle_time_;
- bool fatal_;
-};
-
-class QueueImpl : virtual public Queue {
- public:
- QueueImpl(rd_kafka_queue_t *c_rkqu) : queue_(c_rkqu) {
- }
- ~QueueImpl() {
- rd_kafka_queue_destroy(queue_);
- }
- static Queue *create(Handle *base);
- ErrorCode forward(Queue *queue);
- Message *consume(int timeout_ms);
- int poll(int timeout_ms);
- void io_event_enable(int fd, const void *payload, size_t size);
-
- rd_kafka_queue_t *queue_;
-};
-
-
-
-class HeadersImpl : public Headers {
- public:
- HeadersImpl() : headers_(rd_kafka_headers_new(8)) {
- }
-
- HeadersImpl(rd_kafka_headers_t *headers) : headers_(headers) {
- }
-
- HeadersImpl(const std::vector<Header> &headers) {
- if (headers.size() > 0) {
- headers_ = rd_kafka_headers_new(headers.size());
- from_vector(headers);
- } else {
- headers_ = rd_kafka_headers_new(8);
- }
- }
-
- ~HeadersImpl() {
- if (headers_) {
- rd_kafka_headers_destroy(headers_);
- }
- }
-
- ErrorCode add(const std::string &key, const char *value) {
- rd_kafka_resp_err_t err;
- err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value, -1);
- return static_cast<RdKafka::ErrorCode>(err);
- }
-
- ErrorCode add(const std::string &key, const void *value, size_t value_size) {
- rd_kafka_resp_err_t err;
- err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value,
- value_size);
- return static_cast<RdKafka::ErrorCode>(err);
- }
-
- ErrorCode add(const std::string &key, const std::string &value) {
- rd_kafka_resp_err_t err;
- err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value.c_str(),
- value.size());
- return static_cast<RdKafka::ErrorCode>(err);
- }
-
- ErrorCode add(const Header &header) {
- rd_kafka_resp_err_t err;
- err =
- rd_kafka_header_add(headers_, header.key().c_str(), header.key().size(),
- header.value(), header.value_size());
- return static_cast<RdKafka::ErrorCode>(err);
- }
-
- ErrorCode remove(const std::string &key) {
- rd_kafka_resp_err_t err;
- err = rd_kafka_header_remove(headers_, key.c_str());
- return static_cast<RdKafka::ErrorCode>(err);
- }
-
- std::vector<Headers::Header> get(const std::string &key) const {
- std::vector<Headers::Header> headers;
- const void *value;
- size_t size;
- rd_kafka_resp_err_t err;
- for (size_t idx = 0; !(err = rd_kafka_header_get(headers_, idx, key.c_str(),
- &value, &size));
- idx++) {
- headers.push_back(Headers::Header(key, value, size));
- }
- return headers;
- }
-
- Headers::Header get_last(const std::string &key) const {
- const void *value;
- size_t size;
- rd_kafka_resp_err_t err;
- err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size);
- return Headers::Header(key, value, size,
- static_cast<RdKafka::ErrorCode>(err));
- }
-
- std::vector<Headers::Header> get_all() const {
- std::vector<Headers::Header> headers;
- size_t idx = 0;
- const char *name;
- const void *valuep;
- size_t size;
- while (!rd_kafka_header_get_all(headers_, idx++, &name, &valuep, &size)) {
- headers.push_back(Headers::Header(name, valuep, size));
- }
- return headers;
- }
-
- size_t size() const {
- return rd_kafka_header_cnt(headers_);
- }
-
- /** @brief Reset the C headers pointer to NULL. */
- void c_headers_destroyed() {
- headers_ = NULL;
- }
-
- /** @returns the underlying C headers, or NULL. */
- rd_kafka_headers_t *c_ptr() {
- return headers_;
- }
-
-
- private:
- void from_vector(const std::vector<Header> &headers) {
- if (headers.size() == 0)
- return;
- for (std::vector<Header>::const_iterator it = headers.begin();
- it != headers.end(); it++)
- this->add(*it);
- }
-
- HeadersImpl(HeadersImpl const &) /*= delete*/;
- HeadersImpl &operator=(HeadersImpl const &) /*= delete*/;
-
- rd_kafka_headers_t *headers_;
-};
-
-
-
-class MessageImpl : public Message {
- public:
- ~MessageImpl() {
- if (free_rkmessage_)
- rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
- if (key_)
- delete key_;
- if (headers_)
- delete headers_;
- }
-
- MessageImpl(rd_kafka_type_t rk_type,
- RdKafka::Topic *topic,
- rd_kafka_message_t *rkmessage) :
- topic_(topic),
- rkmessage_(rkmessage),
- free_rkmessage_(true),
- key_(NULL),
- headers_(NULL),
- rk_type_(rk_type) {
- }
-
- MessageImpl(rd_kafka_type_t rk_type,
- RdKafka::Topic *topic,
- rd_kafka_message_t *rkmessage,
- bool dofree) :
- topic_(topic),
- rkmessage_(rkmessage),
- free_rkmessage_(dofree),
- key_(NULL),
- headers_(NULL),
- rk_type_(rk_type) {
- }
-
- MessageImpl(rd_kafka_type_t rk_type, rd_kafka_message_t *rkmessage) :
- topic_(NULL),
- rkmessage_(rkmessage),
- free_rkmessage_(true),
- key_(NULL),
- headers_(NULL),
- rk_type_(rk_type) {
- if (rkmessage->rkt) {
- /* Possibly NULL */
- topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt));
- }
- }
-
- /* Create errored message */
- MessageImpl(rd_kafka_type_t rk_type,
- RdKafka::Topic *topic,
- RdKafka::ErrorCode err) :
- topic_(topic),
- free_rkmessage_(false),
- key_(NULL),
- headers_(NULL),
- rk_type_(rk_type) {
- rkmessage_ = &rkmessage_err_;
- memset(&rkmessage_err_, 0, sizeof(rkmessage_err_));
- rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err);
- }
-
- std::string errstr() const {
- const char *es;
- /* message_errstr() is only available for the consumer. */
- if (rk_type_ == RD_KAFKA_CONSUMER)
- es = rd_kafka_message_errstr(rkmessage_);
- else
- es = rd_kafka_err2str(rkmessage_->err);
-
- return std::string(es ? es : "");
- }
-
- ErrorCode err() const {
- return static_cast<RdKafka::ErrorCode>(rkmessage_->err);
- }
-
- Topic *topic() const {
- return topic_;
- }
- std::string topic_name() const {
- if (rkmessage_->rkt)
- return rd_kafka_topic_name(rkmessage_->rkt);
- else
- return "";
- }
- int32_t partition() const {
- return rkmessage_->partition;
- }
- void *payload() const {
- return rkmessage_->payload;
- }
- size_t len() const {
- return rkmessage_->len;
- }
- const std::string *key() const {
- if (key_) {
- return key_;
- } else if (rkmessage_->key) {
- key_ = new std::string(static_cast<char const *>(rkmessage_->key),
- rkmessage_->key_len);
- return key_;
- }
- return NULL;
- }
- const void *key_pointer() const {
- return rkmessage_->key;
- }
- size_t key_len() const {
- return rkmessage_->key_len;
- }
-
- int64_t offset() const {
- return rkmessage_->offset;
- }
-
- MessageTimestamp timestamp() const {
- MessageTimestamp ts;
- rd_kafka_timestamp_type_t tstype;
- ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype);
- ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype);
- return ts;
- }
-
- void *msg_opaque() const {
- return rkmessage_->_private;
- }
-
- int64_t latency() const {
- return rd_kafka_message_latency(rkmessage_);
- }
-
- struct rd_kafka_message_s *c_ptr() {
- return rkmessage_;
- }
-
- Status status() const {
- return static_cast<Status>(rd_kafka_message_status(rkmessage_));
- }
-
- Headers *headers() {
- ErrorCode err;
- return headers(&err);
- }
-
- Headers *headers(ErrorCode *err) {
- *err = ERR_NO_ERROR;
-
- if (!headers_) {
- rd_kafka_headers_t *c_hdrs;
- rd_kafka_resp_err_t c_err;
-
- if ((c_err = rd_kafka_message_detach_headers(rkmessage_, &c_hdrs))) {
- *err = static_cast<RdKafka::ErrorCode>(c_err);
- return NULL;
- }
-
- headers_ = new HeadersImpl(c_hdrs);
- }
-
- return headers_;
- }
-
- int32_t broker_id() const {
- return rd_kafka_message_broker_id(rkmessage_);
- }
-
- int32_t leader_epoch() const {
- return rd_kafka_message_leader_epoch(rkmessage_);
- }
-
-
- Error *offset_store() {
- rd_kafka_error_t *c_error;
-
- c_error = rd_kafka_offset_store_message(rkmessage_);
-
- if (c_error)
- return new ErrorImpl(c_error);
- else
- return NULL;
- }
-
- RdKafka::Topic *topic_;
- rd_kafka_message_t *rkmessage_;
- bool free_rkmessage_;
- /* For error signalling by the C++ layer the .._err_ message is
- * used as a place holder and rkmessage_ is set to point to it. */
- rd_kafka_message_t rkmessage_err_;
- mutable std::string *key_; /* mutable because it's a cached value */
-
- private:
- /* "delete" copy ctor + copy assignment, for safety of key_ */
- MessageImpl(MessageImpl const &) /*= delete*/;
- MessageImpl &operator=(MessageImpl const &) /*= delete*/;
-
- RdKafka::Headers *headers_;
- const rd_kafka_type_t rk_type_; /**< Client type */
-};
-
-
-class ConfImpl : public Conf {
- public:
- ConfImpl(ConfType conf_type) :
- consume_cb_(NULL),
- dr_cb_(NULL),
- event_cb_(NULL),
- socket_cb_(NULL),
- open_cb_(NULL),
- partitioner_cb_(NULL),
- partitioner_kp_cb_(NULL),
- rebalance_cb_(NULL),
- offset_commit_cb_(NULL),
- oauthbearer_token_refresh_cb_(NULL),
- ssl_cert_verify_cb_(NULL),
- conf_type_(conf_type),
- rk_conf_(NULL),
- rkt_conf_(NULL) {
- }
- ~ConfImpl() {
- if (rk_conf_)
- rd_kafka_conf_destroy(rk_conf_);
- else if (rkt_conf_)
- rd_kafka_topic_conf_destroy(rkt_conf_);
- }
-
- Conf::ConfResult set(const std::string &name,
- const std::string &value,
- std::string &errstr);
-
- Conf::ConfResult set(const std::string &name,
- DeliveryReportCb *dr_cb,
- std::string &errstr) {
- if (name != "dr_cb") {
- errstr = "Invalid value type, expected RdKafka::DeliveryReportCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- dr_cb_ = dr_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set(const std::string &name,
- OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
- std::string &errstr) {
- if (name != "oauthbearer_token_refresh_cb") {
- errstr =
- "Invalid value type, expected RdKafka::OAuthBearerTokenRefreshCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- oauthbearer_token_refresh_cb_ = oauthbearer_token_refresh_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set(const std::string &name,
- EventCb *event_cb,
- std::string &errstr) {
- if (name != "event_cb") {
- errstr = "Invalid value type, expected RdKafka::EventCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- event_cb_ = event_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set(const std::string &name,
- const Conf *topic_conf,
- std::string &errstr) {
- const ConfImpl *tconf_impl =
- dynamic_cast<const RdKafka::ConfImpl *>(topic_conf);
- if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) {
- errstr = "Invalid value type, expected RdKafka::Conf";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- rd_kafka_conf_set_default_topic_conf(
- rk_conf_, rd_kafka_topic_conf_dup(tconf_impl->rkt_conf_));
-
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set(const std::string &name,
- PartitionerCb *partitioner_cb,
- std::string &errstr) {
- if (name != "partitioner_cb") {
- errstr = "Invalid value type, expected RdKafka::PartitionerCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rkt_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
- return Conf::CONF_INVALID;
- }
-
- partitioner_cb_ = partitioner_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set(const std::string &name,
- PartitionerKeyPointerCb *partitioner_kp_cb,
- std::string &errstr) {
- if (name != "partitioner_key_pointer_cb") {
- errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rkt_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
- return Conf::CONF_INVALID;
- }
-
- partitioner_kp_cb_ = partitioner_kp_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set(const std::string &name,
- SocketCb *socket_cb,
- std::string &errstr) {
- if (name != "socket_cb") {
- errstr = "Invalid value type, expected RdKafka::SocketCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- socket_cb_ = socket_cb;
- return Conf::CONF_OK;
- }
-
-
- Conf::ConfResult set(const std::string &name,
- OpenCb *open_cb,
- std::string &errstr) {
- if (name != "open_cb") {
- errstr = "Invalid value type, expected RdKafka::OpenCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- open_cb_ = open_cb;
- return Conf::CONF_OK;
- }
-
-
-
- Conf::ConfResult set(const std::string &name,
- RebalanceCb *rebalance_cb,
- std::string &errstr) {
- if (name != "rebalance_cb") {
- errstr = "Invalid value type, expected RdKafka::RebalanceCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- rebalance_cb_ = rebalance_cb;
- return Conf::CONF_OK;
- }
-
-
- Conf::ConfResult set(const std::string &name,
- OffsetCommitCb *offset_commit_cb,
- std::string &errstr) {
- if (name != "offset_commit_cb") {
- errstr = "Invalid value type, expected RdKafka::OffsetCommitCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- offset_commit_cb_ = offset_commit_cb;
- return Conf::CONF_OK;
- }
-
-
- Conf::ConfResult set(const std::string &name,
- SslCertificateVerifyCb *ssl_cert_verify_cb,
- std::string &errstr) {
- if (name != "ssl_cert_verify_cb") {
- errstr = "Invalid value type, expected RdKafka::SslCertificateVerifyCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- ssl_cert_verify_cb_ = ssl_cert_verify_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set_engine_callback_data(void *value, std::string &errstr) {
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- rd_kafka_conf_set_engine_callback_data(rk_conf_, value);
- return Conf::CONF_OK;
- }
-
-
- Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type,
- RdKafka::CertificateEncoding cert_enc,
- const void *buffer,
- size_t size,
- std::string &errstr) {
- rd_kafka_conf_res_t res;
- char errbuf[512];
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- res = rd_kafka_conf_set_ssl_cert(
- rk_conf_, static_cast<rd_kafka_cert_type_t>(cert_type),
- static_cast<rd_kafka_cert_enc_t>(cert_enc), buffer, size, errbuf,
- sizeof(errbuf));
-
- if (res != RD_KAFKA_CONF_OK)
- errstr = errbuf;
-
- return static_cast<Conf::ConfResult>(res);
- }
-
- Conf::ConfResult enable_sasl_queue(bool enable, std::string &errstr) {
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- rd_kafka_conf_enable_sasl_queue(rk_conf_, enable ? 1 : 0);
-
- return Conf::CONF_OK;
- }
-
-
- Conf::ConfResult get(const std::string &name, std::string &value) const {
- if (name.compare("dr_cb") == 0 || name.compare("event_cb") == 0 ||
- name.compare("partitioner_cb") == 0 ||
- name.compare("partitioner_key_pointer_cb") == 0 ||
- name.compare("socket_cb") == 0 || name.compare("open_cb") == 0 ||
- name.compare("rebalance_cb") == 0 ||
- name.compare("offset_commit_cb") == 0 ||
- name.compare("oauthbearer_token_refresh_cb") == 0 ||
- name.compare("ssl_cert_verify_cb") == 0 ||
- name.compare("set_engine_callback_data") == 0 ||
- name.compare("enable_sasl_queue") == 0) {
- return Conf::CONF_INVALID;
- }
- rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID;
-
- /* Get size of property */
- size_t size;
- if (rk_conf_)
- res = rd_kafka_conf_get(rk_conf_, name.c_str(), NULL, &size);
- else if (rkt_conf_)
- res = rd_kafka_topic_conf_get(rkt_conf_, name.c_str(), NULL, &size);
- if (res != RD_KAFKA_CONF_OK)
- return static_cast<Conf::ConfResult>(res);
-
- char *tmpValue = new char[size];
-
- if (rk_conf_)
- res = rd_kafka_conf_get(rk_conf_, name.c_str(), tmpValue, &size);
- else if (rkt_conf_)
- res = rd_kafka_topic_conf_get(rkt_conf_, name.c_str(), tmpValue, &size);
-
- if (res == RD_KAFKA_CONF_OK)
- value.assign(tmpValue);
- delete[] tmpValue;
-
- return static_cast<Conf::ConfResult>(res);
- }
-
- Conf::ConfResult get(DeliveryReportCb *&dr_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- dr_cb = this->dr_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(
- OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- oauthbearer_token_refresh_cb = this->oauthbearer_token_refresh_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(EventCb *&event_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- event_cb = this->event_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(PartitionerCb *&partitioner_cb) const {
- if (!rkt_conf_)
- return Conf::CONF_INVALID;
- partitioner_cb = this->partitioner_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const {
- if (!rkt_conf_)
- return Conf::CONF_INVALID;
- partitioner_kp_cb = this->partitioner_kp_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(SocketCb *&socket_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- socket_cb = this->socket_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(OpenCb *&open_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- open_cb = this->open_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(RebalanceCb *&rebalance_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- rebalance_cb = this->rebalance_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- offset_commit_cb = this->offset_commit_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- ssl_cert_verify_cb = this->ssl_cert_verify_cb_;
- return Conf::CONF_OK;
- }
-
- std::list<std::string> *dump();
-
-
- Conf::ConfResult set(const std::string &name,
- ConsumeCb *consume_cb,
- std::string &errstr) {
- if (name != "consume_cb") {
- errstr = "Invalid value type, expected RdKafka::ConsumeCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- consume_cb_ = consume_cb;
- return Conf::CONF_OK;
- }
-
- struct rd_kafka_conf_s *c_ptr_global() {
- if (conf_type_ == CONF_GLOBAL)
- return rk_conf_;
- else
- return NULL;
- }
-
- struct rd_kafka_topic_conf_s *c_ptr_topic() {
- if (conf_type_ == CONF_TOPIC)
- return rkt_conf_;
- else
- return NULL;
- }
-
- ConsumeCb *consume_cb_;
- DeliveryReportCb *dr_cb_;
- EventCb *event_cb_;
- SocketCb *socket_cb_;
- OpenCb *open_cb_;
- PartitionerCb *partitioner_cb_;
- PartitionerKeyPointerCb *partitioner_kp_cb_;
- RebalanceCb *rebalance_cb_;
- OffsetCommitCb *offset_commit_cb_;
- OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_;
- SslCertificateVerifyCb *ssl_cert_verify_cb_;
- ConfType conf_type_;
- rd_kafka_conf_t *rk_conf_;
- rd_kafka_topic_conf_t *rkt_conf_;
-};
-
-
-class HandleImpl : virtual public Handle {
- public:
- ~HandleImpl() {
- }
- HandleImpl() {
- }
- std::string name() const {
- return std::string(rd_kafka_name(rk_));
- }
- std::string memberid() const {
- char *str = rd_kafka_memberid(rk_);
- std::string memberid = str ? str : "";
- if (str)
- rd_kafka_mem_free(rk_, str);
- return memberid;
- }
- int poll(int timeout_ms) {
- return rd_kafka_poll(rk_, timeout_ms);
- }
- int outq_len() {
- return rd_kafka_outq_len(rk_);
- }
-
- void set_common_config(const RdKafka::ConfImpl *confimpl);
-
- RdKafka::ErrorCode metadata(bool all_topics,
- const Topic *only_rkt,
- Metadata **metadatap,
- int timeout_ms);
-
- ErrorCode pause(std::vector<TopicPartition *> &partitions);
- ErrorCode resume(std::vector<TopicPartition *> &partitions);
-
- ErrorCode query_watermark_offsets(const std::string &topic,
- int32_t partition,
- int64_t *low,
- int64_t *high,
- int timeout_ms) {
- return static_cast<RdKafka::ErrorCode>(rd_kafka_query_watermark_offsets(
- rk_, topic.c_str(), partition, low, high, timeout_ms));
- }
-
- ErrorCode get_watermark_offsets(const std::string &topic,
- int32_t partition,
- int64_t *low,
- int64_t *high) {
- return static_cast<RdKafka::ErrorCode>(rd_kafka_get_watermark_offsets(
- rk_, topic.c_str(), partition, low, high));
- }
-
- Queue *get_partition_queue(const TopicPartition *partition);
-
- Queue *get_sasl_queue() {
- rd_kafka_queue_t *rkqu;
- rkqu = rd_kafka_queue_get_sasl(rk_);
-
- if (rkqu == NULL)
- return NULL;
-
- return new QueueImpl(rkqu);
- }
-
- Queue *get_background_queue() {
- rd_kafka_queue_t *rkqu;
- rkqu = rd_kafka_queue_get_background(rk_);
-
- if (rkqu == NULL)
- return NULL;
-
- return new QueueImpl(rkqu);
- }
-
-
- ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets,
- int timeout_ms) {
- rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
- ErrorCode err = static_cast<ErrorCode>(
- rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms));
- update_partitions_from_c_parts(offsets, c_offsets);
- rd_kafka_topic_partition_list_destroy(c_offsets);
- return err;
- }
-
- ErrorCode set_log_queue(Queue *queue);
-
- void yield() {
- rd_kafka_yield(rk_);
- }
-
- std::string clusterid(int timeout_ms) {
- char *str = rd_kafka_clusterid(rk_, timeout_ms);
- std::string clusterid = str ? str : "";
- if (str)
- rd_kafka_mem_free(rk_, str);
- return clusterid;
- }
-
- struct rd_kafka_s *c_ptr() {
- return rk_;
- }
-
- int32_t controllerid(int timeout_ms) {
- return rd_kafka_controllerid(rk_, timeout_ms);
- }
-
- ErrorCode fatal_error(std::string &errstr) const {
- char errbuf[512];
- RdKafka::ErrorCode err = static_cast<RdKafka::ErrorCode>(
- rd_kafka_fatal_error(rk_, errbuf, sizeof(errbuf)));
- if (err)
- errstr = errbuf;
- return err;
- }
-
- ErrorCode oauthbearer_set_token(const std::string &token_value,
- int64_t md_lifetime_ms,
- const std::string &md_principal_name,
- const std::list<std::string> &extensions,
- std::string &errstr) {
- char errbuf[512];
- ErrorCode err;
- const char **extensions_copy = new const char *[extensions.size()];
- int elem = 0;
-
- for (std::list<std::string>::const_iterator it = extensions.begin();
- it != extensions.end(); it++)
- extensions_copy[elem++] = it->c_str();
- err = static_cast<ErrorCode>(rd_kafka_oauthbearer_set_token(
- rk_, token_value.c_str(), md_lifetime_ms, md_principal_name.c_str(),
- extensions_copy, extensions.size(), errbuf, sizeof(errbuf)));
- delete[] extensions_copy;
-
- if (err != ERR_NO_ERROR)
- errstr = errbuf;
-
- return err;
- }
-
- ErrorCode oauthbearer_set_token_failure(const std::string &errstr) {
- return static_cast<ErrorCode>(
- rd_kafka_oauthbearer_set_token_failure(rk_, errstr.c_str()));
- }
-
- Error *sasl_background_callbacks_enable() {
- rd_kafka_error_t *c_error = rd_kafka_sasl_background_callbacks_enable(rk_);
-
- if (c_error)
- return new ErrorImpl(c_error);
-
- return NULL;
- }
-
- Error *sasl_set_credentials(const std::string &username,
- const std::string &password) {
- rd_kafka_error_t *c_error =
- rd_kafka_sasl_set_credentials(rk_, username.c_str(), password.c_str());
-
- if (c_error)
- return new ErrorImpl(c_error);
-
- return NULL;
- };
-
- void *mem_malloc(size_t size) {
- return rd_kafka_mem_malloc(rk_, size);
- }
-
- void mem_free(void *ptr) {
- rd_kafka_mem_free(rk_, ptr);
- }
-
- rd_kafka_t *rk_;
- /* All Producer and Consumer callbacks must reside in HandleImpl and
- * the opaque provided to rdkafka must be a pointer to HandleImpl, since
- * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
- * HandleImpl due to the skewed diamond inheritance. */
- ConsumeCb *consume_cb_;
- EventCb *event_cb_;
- SocketCb *socket_cb_;
- OpenCb *open_cb_;
- DeliveryReportCb *dr_cb_;
- PartitionerCb *partitioner_cb_;
- PartitionerKeyPointerCb *partitioner_kp_cb_;
- RebalanceCb *rebalance_cb_;
- OffsetCommitCb *offset_commit_cb_;
- OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_;
- SslCertificateVerifyCb *ssl_cert_verify_cb_;
-};
-
-
-class TopicImpl : public Topic {
- public:
- ~TopicImpl() {
- rd_kafka_topic_destroy(rkt_);
- }
-
- std::string name() const {
- return rd_kafka_topic_name(rkt_);
- }
-
- bool partition_available(int32_t partition) const {
- return !!rd_kafka_topic_partition_available(rkt_, partition);
- }
-
- ErrorCode offset_store(int32_t partition, int64_t offset) {
- return static_cast<RdKafka::ErrorCode>(
- rd_kafka_offset_store(rkt_, partition, offset));
- }
-
- static Topic *create(Handle &base, const std::string &topic, Conf *conf);
-
- struct rd_kafka_topic_s *c_ptr() {
- return rkt_;
- }
-
- rd_kafka_topic_t *rkt_;
- PartitionerCb *partitioner_cb_;
- PartitionerKeyPointerCb *partitioner_kp_cb_;
-};
-
-
-/**
- * Topic and Partition
- */
-class TopicPartitionImpl : public TopicPartition {
- public:
- ~TopicPartitionImpl() {
- }
-
- static TopicPartition *create(const std::string &topic, int partition);
-
- TopicPartitionImpl(const std::string &topic, int partition) :
- topic_(topic),
- partition_(partition),
- offset_(RdKafka::Topic::OFFSET_INVALID),
- err_(ERR_NO_ERROR),
- leader_epoch_(-1) {
- }
-
- TopicPartitionImpl(const std::string &topic, int partition, int64_t offset) :
- topic_(topic),
- partition_(partition),
- offset_(offset),
- err_(ERR_NO_ERROR),
- leader_epoch_(-1) {
- }
-
- TopicPartitionImpl(const rd_kafka_topic_partition_t *c_part) {
- topic_ = std::string(c_part->topic);
- partition_ = c_part->partition;
- offset_ = c_part->offset;
- err_ = static_cast<ErrorCode>(c_part->err);
- leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(c_part);
- // FIXME: metadata
- }
-
- static void destroy(std::vector<TopicPartition *> &partitions);
-
- int partition() const {
- return partition_;
- }
- const std::string &topic() const {
- return topic_;
- }
-
- int64_t offset() const {
- return offset_;
- }
-
- ErrorCode err() const {
- return err_;
- }
-
- void set_offset(int64_t offset) {
- offset_ = offset;
- }
-
- int32_t get_leader_epoch() {
- return leader_epoch_;
- }
-
- void set_leader_epoch(int32_t leader_epoch) {
- leader_epoch_ = leader_epoch_;
- }
-
- std::ostream &operator<<(std::ostream &ostrm) const {
- return ostrm << topic_ << " [" << partition_ << "]";
- }
-
- std::string topic_;
- int partition_;
- int64_t offset_;
- ErrorCode err_;
- int32_t leader_epoch_;
-};
-
-
-/**
- * @class ConsumerGroupMetadata wraps the
- * C rd_kafka_consumer_group_metadata_t object.
- */
-class ConsumerGroupMetadataImpl : public ConsumerGroupMetadata {
- public:
- ~ConsumerGroupMetadataImpl() {
- rd_kafka_consumer_group_metadata_destroy(cgmetadata_);
- }
-
- ConsumerGroupMetadataImpl(rd_kafka_consumer_group_metadata_t *cgmetadata) :
- cgmetadata_(cgmetadata) {
- }
-
- rd_kafka_consumer_group_metadata_t *cgmetadata_;
-};
-
-
-class KafkaConsumerImpl : virtual public KafkaConsumer,
- virtual public HandleImpl {
- public:
- ~KafkaConsumerImpl() {
- if (rk_)
- rd_kafka_destroy_flags(rk_, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
- }
-
- static KafkaConsumer *create(Conf *conf, std::string &errstr);
-
- ErrorCode assignment(std::vector<TopicPartition *> &partitions);
- bool assignment_lost();
- std::string rebalance_protocol() {
- const char *str = rd_kafka_rebalance_protocol(rk_);
- return std::string(str ? str : "");
- }
- ErrorCode subscription(std::vector<std::string> &topics);
- ErrorCode subscribe(const std::vector<std::string> &topics);
- ErrorCode unsubscribe();
- ErrorCode assign(const std::vector<TopicPartition *> &partitions);
- ErrorCode unassign();
- Error *incremental_assign(const std::vector<TopicPartition *> &partitions);
- Error *incremental_unassign(const std::vector<TopicPartition *> &partitions);
-
- Message *consume(int timeout_ms);
- ErrorCode commitSync() {
- return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0 /*sync*/));
- }
- ErrorCode commitAsync() {
- return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1 /*async*/));
- }
- ErrorCode commitSync(Message *message) {
- MessageImpl *msgimpl = dynamic_cast<MessageImpl *>(message);
- return static_cast<ErrorCode>(
- rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0 /*sync*/));
- }
- ErrorCode commitAsync(Message *message) {
- MessageImpl *msgimpl = dynamic_cast<MessageImpl *>(message);
- return static_cast<ErrorCode>(
- rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 1 /*async*/));
- }
-
- ErrorCode commitSync(std::vector<TopicPartition *> &offsets) {
- rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets);
- rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 0);
- if (!err)
- update_partitions_from_c_parts(offsets, c_parts);
- rd_kafka_topic_partition_list_destroy(c_parts);
- return static_cast<ErrorCode>(err);
- }
-
- ErrorCode commitAsync(const std::vector<TopicPartition *> &offsets) {
- rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets);
- rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 1);
- rd_kafka_topic_partition_list_destroy(c_parts);
- return static_cast<ErrorCode>(err);
- }
-
- ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) {
- return static_cast<ErrorCode>(rd_kafka_commit_queue(
- rk_, NULL, NULL, RdKafka::offset_commit_cb_trampoline0,
- offset_commit_cb));
- }
-
- ErrorCode commitSync(std::vector<TopicPartition *> &offsets,
- OffsetCommitCb *offset_commit_cb) {
- rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets);
- rd_kafka_resp_err_t err = rd_kafka_commit_queue(
- rk_, c_parts, NULL, RdKafka::offset_commit_cb_trampoline0,
- offset_commit_cb);
- rd_kafka_topic_partition_list_destroy(c_parts);
- return static_cast<ErrorCode>(err);
- }
-
- ErrorCode committed(std::vector<TopicPartition *> &partitions,
- int timeout_ms);
- ErrorCode position(std::vector<TopicPartition *> &partitions);
-
- ConsumerGroupMetadata *groupMetadata() {
- rd_kafka_consumer_group_metadata_t *cgmetadata;
-
- cgmetadata = rd_kafka_consumer_group_metadata(rk_);
- if (!cgmetadata)
- return NULL;
-
- return new ConsumerGroupMetadataImpl(cgmetadata);
- }
-
- ErrorCode close();
-
- Error *close(Queue *queue);
-
- bool closed() {
- return rd_kafka_consumer_closed(rk_) ? true : false;
- }
-
- ErrorCode seek(const TopicPartition &partition, int timeout_ms);
-
- ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) {
- rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets);
- rd_kafka_resp_err_t err = rd_kafka_offsets_store(rk_, c_parts);
- update_partitions_from_c_parts(offsets, c_parts);
- rd_kafka_topic_partition_list_destroy(c_parts);
- return static_cast<ErrorCode>(err);
- }
-};
-
-
-class MetadataImpl : public Metadata {
- public:
- MetadataImpl(const rd_kafka_metadata_t *metadata);
- ~MetadataImpl();
-
- const std::vector<const BrokerMetadata *> *brokers() const {
- return &brokers_;
- }
-
- const std::vector<const TopicMetadata *> *topics() const {
- return &topics_;
- }
-
- std::string orig_broker_name() const {
- return std::string(metadata_->orig_broker_name);
- }
-
- int32_t orig_broker_id() const {
- return metadata_->orig_broker_id;
- }
-
- private:
- const rd_kafka_metadata_t *metadata_;
- std::vector<const BrokerMetadata *> brokers_;
- std::vector<const TopicMetadata *> topics_;
- std::string orig_broker_name_;
-};
-
-
-
-class ConsumerImpl : virtual public Consumer, virtual public HandleImpl {
- public:
- ~ConsumerImpl() {
- if (rk_)
- rd_kafka_destroy(rk_);
- }
- static Consumer *create(Conf *conf, std::string &errstr);
-
- ErrorCode start(Topic *topic, int32_t partition, int64_t offset);
- ErrorCode start(Topic *topic,
- int32_t partition,
- int64_t offset,
- Queue *queue);
- ErrorCode stop(Topic *topic, int32_t partition);
- ErrorCode seek(Topic *topic,
- int32_t partition,
- int64_t offset,
- int timeout_ms);
- Message *consume(Topic *topic, int32_t partition, int timeout_ms);
- Message *consume(Queue *queue, int timeout_ms);
- int consume_callback(Topic *topic,
- int32_t partition,
- int timeout_ms,
- ConsumeCb *cb,
- void *opaque);
- int consume_callback(Queue *queue,
- int timeout_ms,
- RdKafka::ConsumeCb *consume_cb,
- void *opaque);
-};
-
-
-
-class ProducerImpl : virtual public Producer, virtual public HandleImpl {
- public:
- ~ProducerImpl() {
- if (rk_)
- rd_kafka_destroy(rk_);
- }
-
- ErrorCode produce(Topic *topic,
- int32_t partition,
- int msgflags,
- void *payload,
- size_t len,
- const std::string *key,
- void *msg_opaque);
-
- ErrorCode produce(Topic *topic,
- int32_t partition,
- int msgflags,
- void *payload,
- size_t len,
- const void *key,
- size_t key_len,
- void *msg_opaque);
-
- ErrorCode produce(Topic *topic,
- int32_t partition,
- const std::vector<char> *payload,
- const std::vector<char> *key,
- void *msg_opaque);
-
- ErrorCode produce(const std::string topic_name,
- int32_t partition,
- int msgflags,
- void *payload,
- size_t len,
- const void *key,
- size_t key_len,
- int64_t timestamp,
- void *msg_opaque);
-
- ErrorCode produce(const std::string topic_name,
- int32_t partition,
- int msgflags,
- void *payload,
- size_t len,
- const void *key,
- size_t key_len,
- int64_t timestamp,
- RdKafka::Headers *headers,
- void *msg_opaque);
-
- ErrorCode flush(int timeout_ms) {
- return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_, timeout_ms));
- }
-
- ErrorCode purge(int purge_flags) {
- return static_cast<RdKafka::ErrorCode>(
- rd_kafka_purge(rk_, (int)purge_flags));
- }
-
- Error *init_transactions(int timeout_ms) {
- rd_kafka_error_t *c_error;
-
- c_error = rd_kafka_init_transactions(rk_, timeout_ms);
-
- if (c_error)
- return new ErrorImpl(c_error);
- else
- return NULL;
- }
-
- Error *begin_transaction() {
- rd_kafka_error_t *c_error;
-
- c_error = rd_kafka_begin_transaction(rk_);
-
- if (c_error)
- return new ErrorImpl(c_error);
- else
- return NULL;
- }
-
- Error *send_offsets_to_transaction(
- const std::vector<TopicPartition *> &offsets,
- const ConsumerGroupMetadata *group_metadata,
- int timeout_ms) {
- rd_kafka_error_t *c_error;
- const RdKafka::ConsumerGroupMetadataImpl *cgmdimpl =
- dynamic_cast<const RdKafka::ConsumerGroupMetadataImpl *>(
- group_metadata);
- rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
-
- c_error = rd_kafka_send_offsets_to_transaction(
- rk_, c_offsets, cgmdimpl->cgmetadata_, timeout_ms);
-
- rd_kafka_topic_partition_list_destroy(c_offsets);
-
- if (c_error)
- return new ErrorImpl(c_error);
- else
- return NULL;
- }
-
- Error *commit_transaction(int timeout_ms) {
- rd_kafka_error_t *c_error;
-
- c_error = rd_kafka_commit_transaction(rk_, timeout_ms);
-
- if (c_error)
- return new ErrorImpl(c_error);
- else
- return NULL;
- }
-
- Error *abort_transaction(int timeout_ms) {
- rd_kafka_error_t *c_error;
-
- c_error = rd_kafka_abort_transaction(rk_, timeout_ms);
-
- if (c_error)
- return new ErrorImpl(c_error);
- else
- return NULL;
- }
-
- static Producer *create(Conf *conf, std::string &errstr);
-};
-
-
-
-} // namespace RdKafka
-
-#endif /* _RDKAFKACPP_INT_H_ */