diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp | 425 |
1 files changed, 0 insertions, 425 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp deleted file mode 100644 index 7aa2f293..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/HandleImpl.cpp +++ /dev/null @@ -1,425 +0,0 @@ -/* - * librdkafka - Apache Kafka C/C++ library - * - * Copyright (c) 2014 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include <iostream> -#include <string> -#include <list> - -#include "rdkafkacpp_int.h" - -void RdKafka::consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - RdKafka::Topic *topic = static_cast<Topic *>(rd_kafka_topic_opaque(msg->rkt)); - - RdKafka::MessageImpl message(RD_KAFKA_CONSUMER, topic, msg, - false /*don't free*/); - - handle->consume_cb_->consume_cb(message, opaque); -} - -void RdKafka::log_cb_trampoline(const rd_kafka_t *rk, - int level, - const char *fac, - const char *buf) { - if (!rk) { - rd_kafka_log_print(rk, level, fac, buf); - return; - } - - void *opaque = rd_kafka_opaque(rk); - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - - if (!handle->event_cb_) { - rd_kafka_log_print(rk, level, fac, buf); - return; - } - - RdKafka::EventImpl event(RdKafka::Event::EVENT_LOG, RdKafka::ERR_NO_ERROR, - static_cast<RdKafka::Event::Severity>(level), fac, - buf); - - handle->event_cb_->event_cb(event); -} - - -void RdKafka::error_cb_trampoline(rd_kafka_t *rk, - int err, - const char *reason, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - char errstr[512]; - bool is_fatal = false; - - if (err == RD_KAFKA_RESP_ERR__FATAL) { - /* Translate to underlying fatal error code and string */ - err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr)); - if (err) - reason = errstr; - is_fatal = true; - } - RdKafka::EventImpl event(RdKafka::Event::EVENT_ERROR, - static_cast<RdKafka::ErrorCode>(err), - RdKafka::Event::EVENT_SEVERITY_ERROR, NULL, reason); - event.fatal_ = is_fatal; - handle->event_cb_->event_cb(event); -} - - -void RdKafka::throttle_cb_trampoline(rd_kafka_t *rk, - const char *broker_name, - int32_t broker_id, - int throttle_time_ms, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - - RdKafka::EventImpl event(RdKafka::Event::EVENT_THROTTLE); - event.str_ = broker_name; - event.id_ = broker_id; - event.throttle_time_ = throttle_time_ms; - - handle->event_cb_->event_cb(event); -} - - -int RdKafka::stats_cb_trampoline(rd_kafka_t *rk, - char *json, - size_t json_len, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - - RdKafka::EventImpl event(RdKafka::Event::EVENT_STATS, RdKafka::ERR_NO_ERROR, - RdKafka::Event::EVENT_SEVERITY_INFO, NULL, json); - - handle->event_cb_->event_cb(event); - - return 0; -} - - -int RdKafka::socket_cb_trampoline(int domain, - int type, - int protocol, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - - return handle->socket_cb_->socket_cb(domain, type, protocol); -} - -int RdKafka::open_cb_trampoline(const char *pathname, - int flags, - mode_t mode, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - - return handle->open_cb_->open_cb(pathname, flags, static_cast<int>(mode)); -} - -void RdKafka::oauthbearer_token_refresh_cb_trampoline( - rd_kafka_t *rk, - const char *oauthbearer_config, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - - handle->oauthbearer_token_refresh_cb_->oauthbearer_token_refresh_cb( - handle, std::string(oauthbearer_config ? oauthbearer_config : "")); -} - - -int RdKafka::ssl_cert_verify_cb_trampoline(rd_kafka_t *rk, - const char *broker_name, - int32_t broker_id, - int *x509_error, - int depth, - const char *buf, - size_t size, - char *errstr, - size_t errstr_size, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - std::string errbuf; - - bool res = 0 != handle->ssl_cert_verify_cb_->ssl_cert_verify_cb( - std::string(broker_name), broker_id, x509_error, depth, - buf, size, errbuf); - - if (res) - return (int)res; - - size_t errlen = - errbuf.size() > errstr_size - 1 ? errstr_size - 1 : errbuf.size(); - - memcpy(errstr, errbuf.c_str(), errlen); - if (errstr_size > 0) - errstr[errlen] = '\0'; - - return (int)res; -} - - -RdKafka::ErrorCode RdKafka::HandleImpl::metadata(bool all_topics, - const Topic *only_rkt, - Metadata **metadatap, - int timeout_ms) { - const rd_kafka_metadata_t *cmetadatap = NULL; - - rd_kafka_topic_t *topic = - only_rkt ? static_cast<const TopicImpl *>(only_rkt)->rkt_ : NULL; - - const rd_kafka_resp_err_t rc = - rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms); - - *metadatap = (rc == RD_KAFKA_RESP_ERR_NO_ERROR) - ? new RdKafka::MetadataImpl(cmetadatap) - : NULL; - - return static_cast<RdKafka::ErrorCode>(rc); -} - -/** - * Convert a list of C partitions to C++ partitions - */ -static void c_parts_to_partitions( - const rd_kafka_topic_partition_list_t *c_parts, - std::vector<RdKafka::TopicPartition *> &partitions) { - partitions.resize(c_parts->cnt); - for (int i = 0; i < c_parts->cnt; i++) - partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]); -} - -static void free_partition_vector(std::vector<RdKafka::TopicPartition *> &v) { - for (unsigned int i = 0; i < v.size(); i++) - delete v[i]; - v.clear(); -} - -void RdKafka::rebalance_cb_trampoline( - rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *c_partitions, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - std::vector<RdKafka::TopicPartition *> partitions; - - c_parts_to_partitions(c_partitions, partitions); - - handle->rebalance_cb_->rebalance_cb( - dynamic_cast<RdKafka::KafkaConsumer *>(handle), - static_cast<RdKafka::ErrorCode>(err), partitions); - - free_partition_vector(partitions); -} - - -void RdKafka::offset_commit_cb_trampoline0( - rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *c_offsets, - void *opaque) { - OffsetCommitCb *cb = static_cast<RdKafka::OffsetCommitCb *>(opaque); - std::vector<RdKafka::TopicPartition *> offsets; - - if (c_offsets) - c_parts_to_partitions(c_offsets, offsets); - - cb->offset_commit_cb(static_cast<RdKafka::ErrorCode>(err), offsets); - - free_partition_vector(offsets); -} - -static void offset_commit_cb_trampoline( - rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *c_offsets, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - RdKafka::offset_commit_cb_trampoline0(rk, err, c_offsets, - handle->offset_commit_cb_); -} - - -void RdKafka::HandleImpl::set_common_config(const RdKafka::ConfImpl *confimpl) { - rd_kafka_conf_set_opaque(confimpl->rk_conf_, this); - - if (confimpl->event_cb_) { - rd_kafka_conf_set_log_cb(confimpl->rk_conf_, RdKafka::log_cb_trampoline); - rd_kafka_conf_set_error_cb(confimpl->rk_conf_, - RdKafka::error_cb_trampoline); - rd_kafka_conf_set_throttle_cb(confimpl->rk_conf_, - RdKafka::throttle_cb_trampoline); - rd_kafka_conf_set_stats_cb(confimpl->rk_conf_, - RdKafka::stats_cb_trampoline); - event_cb_ = confimpl->event_cb_; - } - - if (confimpl->oauthbearer_token_refresh_cb_) { - rd_kafka_conf_set_oauthbearer_token_refresh_cb( - confimpl->rk_conf_, RdKafka::oauthbearer_token_refresh_cb_trampoline); - oauthbearer_token_refresh_cb_ = confimpl->oauthbearer_token_refresh_cb_; - } - - if (confimpl->socket_cb_) { - rd_kafka_conf_set_socket_cb(confimpl->rk_conf_, - RdKafka::socket_cb_trampoline); - socket_cb_ = confimpl->socket_cb_; - } - - if (confimpl->ssl_cert_verify_cb_) { - rd_kafka_conf_set_ssl_cert_verify_cb( - confimpl->rk_conf_, RdKafka::ssl_cert_verify_cb_trampoline); - ssl_cert_verify_cb_ = confimpl->ssl_cert_verify_cb_; - } - - if (confimpl->open_cb_) { -#ifndef _WIN32 - rd_kafka_conf_set_open_cb(confimpl->rk_conf_, RdKafka::open_cb_trampoline); - open_cb_ = confimpl->open_cb_; -#endif - } - - if (confimpl->rebalance_cb_) { - rd_kafka_conf_set_rebalance_cb(confimpl->rk_conf_, - RdKafka::rebalance_cb_trampoline); - rebalance_cb_ = confimpl->rebalance_cb_; - } - - if (confimpl->offset_commit_cb_) { - rd_kafka_conf_set_offset_commit_cb(confimpl->rk_conf_, - offset_commit_cb_trampoline); - offset_commit_cb_ = confimpl->offset_commit_cb_; - } - - if (confimpl->consume_cb_) { - rd_kafka_conf_set_consume_cb(confimpl->rk_conf_, - RdKafka::consume_cb_trampoline); - consume_cb_ = confimpl->consume_cb_; - } -} - - -RdKafka::ErrorCode RdKafka::HandleImpl::pause( - std::vector<RdKafka::TopicPartition *> &partitions) { - rd_kafka_topic_partition_list_t *c_parts; - rd_kafka_resp_err_t err; - - c_parts = partitions_to_c_parts(partitions); - - err = rd_kafka_pause_partitions(rk_, c_parts); - - if (!err) - update_partitions_from_c_parts(partitions, c_parts); - - rd_kafka_topic_partition_list_destroy(c_parts); - - return static_cast<RdKafka::ErrorCode>(err); -} - - -RdKafka::ErrorCode RdKafka::HandleImpl::resume( - std::vector<RdKafka::TopicPartition *> &partitions) { - rd_kafka_topic_partition_list_t *c_parts; - rd_kafka_resp_err_t err; - - c_parts = partitions_to_c_parts(partitions); - - err = rd_kafka_resume_partitions(rk_, c_parts); - - if (!err) - update_partitions_from_c_parts(partitions, c_parts); - - rd_kafka_topic_partition_list_destroy(c_parts); - - return static_cast<RdKafka::ErrorCode>(err); -} - -RdKafka::Queue *RdKafka::HandleImpl::get_partition_queue( - const TopicPartition *part) { - rd_kafka_queue_t *rkqu; - rkqu = rd_kafka_queue_get_partition(rk_, part->topic().c_str(), - part->partition()); - - if (rkqu == NULL) - return NULL; - - return new QueueImpl(rkqu); -} - -RdKafka::ErrorCode RdKafka::HandleImpl::set_log_queue(RdKafka::Queue *queue) { - rd_kafka_queue_t *rkqu = NULL; - if (queue) { - QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue); - rkqu = queueimpl->queue_; - } - return static_cast<RdKafka::ErrorCode>(rd_kafka_set_log_queue(rk_, rkqu)); -} - -namespace RdKafka { - -rd_kafka_topic_partition_list_t *partitions_to_c_parts( - const std::vector<RdKafka::TopicPartition *> &partitions) { - rd_kafka_topic_partition_list_t *c_parts; - - c_parts = rd_kafka_topic_partition_list_new((int)partitions.size()); - - for (unsigned int i = 0; i < partitions.size(); i++) { - const RdKafka::TopicPartitionImpl *tpi = - dynamic_cast<const RdKafka::TopicPartitionImpl *>(partitions[i]); - rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add( - c_parts, tpi->topic_.c_str(), tpi->partition_); - rktpar->offset = tpi->offset_; - if (tpi->leader_epoch_ != -1) - rd_kafka_topic_partition_set_leader_epoch(rktpar, tpi->leader_epoch_); - } - - return c_parts; -} - - -/** - * @brief Update the application provided 'partitions' with info from 'c_parts' - */ -void update_partitions_from_c_parts( - std::vector<RdKafka::TopicPartition *> &partitions, - const rd_kafka_topic_partition_list_t *c_parts) { - for (int i = 0; i < c_parts->cnt; i++) { - rd_kafka_topic_partition_t *p = &c_parts->elems[i]; - - /* Find corresponding C++ entry */ - for (unsigned int j = 0; j < partitions.size(); j++) { - RdKafka::TopicPartitionImpl *pp = - dynamic_cast<RdKafka::TopicPartitionImpl *>(partitions[j]); - if (!strcmp(p->topic, pp->topic_.c_str()) && - p->partition == pp->partition_) { - pp->offset_ = p->offset; - pp->err_ = static_cast<RdKafka::ErrorCode>(p->err); - pp->leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(p); - } - } - } -} - -} // namespace RdKafka |