summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp425
1 files changed, 425 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp
new file mode 100644
index 000000000..7aa2f2939
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp
@@ -0,0 +1,425 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+
+#include "rdkafkacpp_int.h"
+
+void RdKafka::consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+ RdKafka::Topic *topic = static_cast<Topic *>(rd_kafka_topic_opaque(msg->rkt));
+
+ RdKafka::MessageImpl message(RD_KAFKA_CONSUMER, topic, msg,
+ false /*don't free*/);
+
+ handle->consume_cb_->consume_cb(message, opaque);
+}
+
+void RdKafka::log_cb_trampoline(const rd_kafka_t *rk,
+ int level,
+ const char *fac,
+ const char *buf) {
+ if (!rk) {
+ rd_kafka_log_print(rk, level, fac, buf);
+ return;
+ }
+
+ void *opaque = rd_kafka_opaque(rk);
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ if (!handle->event_cb_) {
+ rd_kafka_log_print(rk, level, fac, buf);
+ return;
+ }
+
+ RdKafka::EventImpl event(RdKafka::Event::EVENT_LOG, RdKafka::ERR_NO_ERROR,
+ static_cast<RdKafka::Event::Severity>(level), fac,
+ buf);
+
+ handle->event_cb_->event_cb(event);
+}
+
+
+void RdKafka::error_cb_trampoline(rd_kafka_t *rk,
+ int err,
+ const char *reason,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+ char errstr[512];
+ bool is_fatal = false;
+
+ if (err == RD_KAFKA_RESP_ERR__FATAL) {
+ /* Translate to underlying fatal error code and string */
+ err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
+ if (err)
+ reason = errstr;
+ is_fatal = true;
+ }
+ RdKafka::EventImpl event(RdKafka::Event::EVENT_ERROR,
+ static_cast<RdKafka::ErrorCode>(err),
+ RdKafka::Event::EVENT_SEVERITY_ERROR, NULL, reason);
+ event.fatal_ = is_fatal;
+ handle->event_cb_->event_cb(event);
+}
+
+
+void RdKafka::throttle_cb_trampoline(rd_kafka_t *rk,
+ const char *broker_name,
+ int32_t broker_id,
+ int throttle_time_ms,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ RdKafka::EventImpl event(RdKafka::Event::EVENT_THROTTLE);
+ event.str_ = broker_name;
+ event.id_ = broker_id;
+ event.throttle_time_ = throttle_time_ms;
+
+ handle->event_cb_->event_cb(event);
+}
+
+
+int RdKafka::stats_cb_trampoline(rd_kafka_t *rk,
+ char *json,
+ size_t json_len,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ RdKafka::EventImpl event(RdKafka::Event::EVENT_STATS, RdKafka::ERR_NO_ERROR,
+ RdKafka::Event::EVENT_SEVERITY_INFO, NULL, json);
+
+ handle->event_cb_->event_cb(event);
+
+ return 0;
+}
+
+
+int RdKafka::socket_cb_trampoline(int domain,
+ int type,
+ int protocol,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ return handle->socket_cb_->socket_cb(domain, type, protocol);
+}
+
+int RdKafka::open_cb_trampoline(const char *pathname,
+ int flags,
+ mode_t mode,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ return handle->open_cb_->open_cb(pathname, flags, static_cast<int>(mode));
+}
+
+void RdKafka::oauthbearer_token_refresh_cb_trampoline(
+ rd_kafka_t *rk,
+ const char *oauthbearer_config,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ handle->oauthbearer_token_refresh_cb_->oauthbearer_token_refresh_cb(
+ handle, std::string(oauthbearer_config ? oauthbearer_config : ""));
+}
+
+
+int RdKafka::ssl_cert_verify_cb_trampoline(rd_kafka_t *rk,
+ const char *broker_name,
+ int32_t broker_id,
+ int *x509_error,
+ int depth,
+ const char *buf,
+ size_t size,
+ char *errstr,
+ size_t errstr_size,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+ std::string errbuf;
+
+ bool res = 0 != handle->ssl_cert_verify_cb_->ssl_cert_verify_cb(
+ std::string(broker_name), broker_id, x509_error, depth,
+ buf, size, errbuf);
+
+ if (res)
+ return (int)res;
+
+ size_t errlen =
+ errbuf.size() > errstr_size - 1 ? errstr_size - 1 : errbuf.size();
+
+ memcpy(errstr, errbuf.c_str(), errlen);
+ if (errstr_size > 0)
+ errstr[errlen] = '\0';
+
+ return (int)res;
+}
+
+
+RdKafka::ErrorCode RdKafka::HandleImpl::metadata(bool all_topics,
+ const Topic *only_rkt,
+ Metadata **metadatap,
+ int timeout_ms) {
+ const rd_kafka_metadata_t *cmetadatap = NULL;
+
+ rd_kafka_topic_t *topic =
+ only_rkt ? static_cast<const TopicImpl *>(only_rkt)->rkt_ : NULL;
+
+ const rd_kafka_resp_err_t rc =
+ rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms);
+
+ *metadatap = (rc == RD_KAFKA_RESP_ERR_NO_ERROR)
+ ? new RdKafka::MetadataImpl(cmetadatap)
+ : NULL;
+
+ return static_cast<RdKafka::ErrorCode>(rc);
+}
+
+/**
+ * Convert a list of C partitions to C++ partitions
+ */
+static void c_parts_to_partitions(
+ const rd_kafka_topic_partition_list_t *c_parts,
+ std::vector<RdKafka::TopicPartition *> &partitions) {
+ partitions.resize(c_parts->cnt);
+ for (int i = 0; i < c_parts->cnt; i++)
+ partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]);
+}
+
+static void free_partition_vector(std::vector<RdKafka::TopicPartition *> &v) {
+ for (unsigned int i = 0; i < v.size(); i++)
+ delete v[i];
+ v.clear();
+}
+
+void RdKafka::rebalance_cb_trampoline(
+ rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *c_partitions,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+ std::vector<RdKafka::TopicPartition *> partitions;
+
+ c_parts_to_partitions(c_partitions, partitions);
+
+ handle->rebalance_cb_->rebalance_cb(
+ dynamic_cast<RdKafka::KafkaConsumer *>(handle),
+ static_cast<RdKafka::ErrorCode>(err), partitions);
+
+ free_partition_vector(partitions);
+}
+
+
+void RdKafka::offset_commit_cb_trampoline0(
+ rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *c_offsets,
+ void *opaque) {
+ OffsetCommitCb *cb = static_cast<RdKafka::OffsetCommitCb *>(opaque);
+ std::vector<RdKafka::TopicPartition *> offsets;
+
+ if (c_offsets)
+ c_parts_to_partitions(c_offsets, offsets);
+
+ cb->offset_commit_cb(static_cast<RdKafka::ErrorCode>(err), offsets);
+
+ free_partition_vector(offsets);
+}
+
+static void offset_commit_cb_trampoline(
+ rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *c_offsets,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+ RdKafka::offset_commit_cb_trampoline0(rk, err, c_offsets,
+ handle->offset_commit_cb_);
+}
+
+
+void RdKafka::HandleImpl::set_common_config(const RdKafka::ConfImpl *confimpl) {
+ rd_kafka_conf_set_opaque(confimpl->rk_conf_, this);
+
+ if (confimpl->event_cb_) {
+ rd_kafka_conf_set_log_cb(confimpl->rk_conf_, RdKafka::log_cb_trampoline);
+ rd_kafka_conf_set_error_cb(confimpl->rk_conf_,
+ RdKafka::error_cb_trampoline);
+ rd_kafka_conf_set_throttle_cb(confimpl->rk_conf_,
+ RdKafka::throttle_cb_trampoline);
+ rd_kafka_conf_set_stats_cb(confimpl->rk_conf_,
+ RdKafka::stats_cb_trampoline);
+ event_cb_ = confimpl->event_cb_;
+ }
+
+ if (confimpl->oauthbearer_token_refresh_cb_) {
+ rd_kafka_conf_set_oauthbearer_token_refresh_cb(
+ confimpl->rk_conf_, RdKafka::oauthbearer_token_refresh_cb_trampoline);
+ oauthbearer_token_refresh_cb_ = confimpl->oauthbearer_token_refresh_cb_;
+ }
+
+ if (confimpl->socket_cb_) {
+ rd_kafka_conf_set_socket_cb(confimpl->rk_conf_,
+ RdKafka::socket_cb_trampoline);
+ socket_cb_ = confimpl->socket_cb_;
+ }
+
+ if (confimpl->ssl_cert_verify_cb_) {
+ rd_kafka_conf_set_ssl_cert_verify_cb(
+ confimpl->rk_conf_, RdKafka::ssl_cert_verify_cb_trampoline);
+ ssl_cert_verify_cb_ = confimpl->ssl_cert_verify_cb_;
+ }
+
+ if (confimpl->open_cb_) {
+#ifndef _WIN32
+ rd_kafka_conf_set_open_cb(confimpl->rk_conf_, RdKafka::open_cb_trampoline);
+ open_cb_ = confimpl->open_cb_;
+#endif
+ }
+
+ if (confimpl->rebalance_cb_) {
+ rd_kafka_conf_set_rebalance_cb(confimpl->rk_conf_,
+ RdKafka::rebalance_cb_trampoline);
+ rebalance_cb_ = confimpl->rebalance_cb_;
+ }
+
+ if (confimpl->offset_commit_cb_) {
+ rd_kafka_conf_set_offset_commit_cb(confimpl->rk_conf_,
+ offset_commit_cb_trampoline);
+ offset_commit_cb_ = confimpl->offset_commit_cb_;
+ }
+
+ if (confimpl->consume_cb_) {
+ rd_kafka_conf_set_consume_cb(confimpl->rk_conf_,
+ RdKafka::consume_cb_trampoline);
+ consume_cb_ = confimpl->consume_cb_;
+ }
+}
+
+
+RdKafka::ErrorCode RdKafka::HandleImpl::pause(
+ std::vector<RdKafka::TopicPartition *> &partitions) {
+ rd_kafka_topic_partition_list_t *c_parts;
+ rd_kafka_resp_err_t err;
+
+ c_parts = partitions_to_c_parts(partitions);
+
+ err = rd_kafka_pause_partitions(rk_, c_parts);
+
+ if (!err)
+ update_partitions_from_c_parts(partitions, c_parts);
+
+ rd_kafka_topic_partition_list_destroy(c_parts);
+
+ return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+RdKafka::ErrorCode RdKafka::HandleImpl::resume(
+ std::vector<RdKafka::TopicPartition *> &partitions) {
+ rd_kafka_topic_partition_list_t *c_parts;
+ rd_kafka_resp_err_t err;
+
+ c_parts = partitions_to_c_parts(partitions);
+
+ err = rd_kafka_resume_partitions(rk_, c_parts);
+
+ if (!err)
+ update_partitions_from_c_parts(partitions, c_parts);
+
+ rd_kafka_topic_partition_list_destroy(c_parts);
+
+ return static_cast<RdKafka::ErrorCode>(err);
+}
+
+RdKafka::Queue *RdKafka::HandleImpl::get_partition_queue(
+ const TopicPartition *part) {
+ rd_kafka_queue_t *rkqu;
+ rkqu = rd_kafka_queue_get_partition(rk_, part->topic().c_str(),
+ part->partition());
+
+ if (rkqu == NULL)
+ return NULL;
+
+ return new QueueImpl(rkqu);
+}
+
+RdKafka::ErrorCode RdKafka::HandleImpl::set_log_queue(RdKafka::Queue *queue) {
+ rd_kafka_queue_t *rkqu = NULL;
+ if (queue) {
+ QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue);
+ rkqu = queueimpl->queue_;
+ }
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_set_log_queue(rk_, rkqu));
+}
+
+namespace RdKafka {
+
+rd_kafka_topic_partition_list_t *partitions_to_c_parts(
+ const std::vector<RdKafka::TopicPartition *> &partitions) {
+ rd_kafka_topic_partition_list_t *c_parts;
+
+ c_parts = rd_kafka_topic_partition_list_new((int)partitions.size());
+
+ for (unsigned int i = 0; i < partitions.size(); i++) {
+ const RdKafka::TopicPartitionImpl *tpi =
+ dynamic_cast<const RdKafka::TopicPartitionImpl *>(partitions[i]);
+ rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add(
+ c_parts, tpi->topic_.c_str(), tpi->partition_);
+ rktpar->offset = tpi->offset_;
+ if (tpi->leader_epoch_ != -1)
+ rd_kafka_topic_partition_set_leader_epoch(rktpar, tpi->leader_epoch_);
+ }
+
+ return c_parts;
+}
+
+
+/**
+ * @brief Update the application provided 'partitions' with info from 'c_parts'
+ */
+void update_partitions_from_c_parts(
+ std::vector<RdKafka::TopicPartition *> &partitions,
+ const rd_kafka_topic_partition_list_t *c_parts) {
+ for (int i = 0; i < c_parts->cnt; i++) {
+ rd_kafka_topic_partition_t *p = &c_parts->elems[i];
+
+ /* Find corresponding C++ entry */
+ for (unsigned int j = 0; j < partitions.size(); j++) {
+ RdKafka::TopicPartitionImpl *pp =
+ dynamic_cast<RdKafka::TopicPartitionImpl *>(partitions[j]);
+ if (!strcmp(p->topic, pp->topic_.c_str()) &&
+ p->partition == pp->partition_) {
+ pp->offset_ = p->offset;
+ pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
+ pp->leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(p);
+ }
+ }
+ }
+}
+
+} // namespace RdKafka