diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp | 197 |
1 files changed, 0 insertions, 197 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp deleted file mode 100644 index 8300dfb3b..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp +++ /dev/null @@ -1,197 +0,0 @@ -/* - * librdkafka - Apache Kafka C/C++ library - * - * Copyright (c) 2014 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include <iostream> -#include <string> -#include <list> -#include <cerrno> - -#include "rdkafkacpp_int.h" - - -RdKafka::Producer::~Producer() { -} - -static void dr_msg_cb_trampoline(rd_kafka_t *rk, - const rd_kafka_message_t *rkmessage, - void *opaque) { - RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); - RdKafka::MessageImpl message(RD_KAFKA_PRODUCER, NULL, - (rd_kafka_message_t *)rkmessage, false); - handle->dr_cb_->dr_cb(message); -} - - - -RdKafka::Producer *RdKafka::Producer::create(const RdKafka::Conf *conf, - std::string &errstr) { - char errbuf[512]; - const RdKafka::ConfImpl *confimpl = - dynamic_cast<const RdKafka::ConfImpl *>(conf); - RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl(); - rd_kafka_conf_t *rk_conf = NULL; - - if (confimpl) { - if (!confimpl->rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - delete rkp; - return NULL; - } - - rkp->set_common_config(confimpl); - - rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_); - - if (confimpl->dr_cb_) { - rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline); - rkp->dr_cb_ = confimpl->dr_cb_; - } - } - - - rd_kafka_t *rk; - if (!(rk = - rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errbuf, sizeof(errbuf)))) { - errstr = errbuf; - // rd_kafka_new() takes ownership only if succeeds - if (rk_conf) - rd_kafka_conf_destroy(rk_conf); - delete rkp; - return NULL; - } - - rkp->rk_ = rk; - - return rkp; -} - - -RdKafka::ErrorCode RdKafka::ProducerImpl::produce(RdKafka::Topic *topic, - int32_t partition, - int msgflags, - void *payload, - size_t len, - const std::string *key, - void *msg_opaque) { - RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); - - if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags, payload, len, - key ? key->c_str() : NULL, key ? key->size() : 0, - msg_opaque) == -1) - return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); - - return RdKafka::ERR_NO_ERROR; -} - - -RdKafka::ErrorCode RdKafka::ProducerImpl::produce(RdKafka::Topic *topic, - int32_t partition, - int msgflags, - void *payload, - size_t len, - const void *key, - size_t key_len, - void *msg_opaque) { - RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); - - if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags, payload, len, key, - key_len, msg_opaque) == -1) - return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); - - return RdKafka::ERR_NO_ERROR; -} - - -RdKafka::ErrorCode RdKafka::ProducerImpl::produce( - RdKafka::Topic *topic, - int32_t partition, - const std::vector<char> *payload, - const std::vector<char> *key, - void *msg_opaque) { - RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic); - - if (rd_kafka_produce(topicimpl->rkt_, partition, RD_KAFKA_MSG_F_COPY, - payload ? (void *)&(*payload)[0] : NULL, - payload ? payload->size() : 0, key ? &(*key)[0] : NULL, - key ? key->size() : 0, msg_opaque) == -1) - return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error()); - - return RdKafka::ERR_NO_ERROR; -} - -RdKafka::ErrorCode RdKafka::ProducerImpl::produce(const std::string topic_name, - int32_t partition, - int msgflags, - void *payload, - size_t len, - const void *key, - size_t key_len, - int64_t timestamp, - void *msg_opaque) { - return static_cast<RdKafka::ErrorCode>(rd_kafka_producev( - rk_, RD_KAFKA_V_TOPIC(topic_name.c_str()), - RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags), - RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len), - RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), - RD_KAFKA_V_END)); -} - -RdKafka::ErrorCode RdKafka::ProducerImpl::produce(const std::string topic_name, - int32_t partition, - int msgflags, - void *payload, - size_t len, - const void *key, - size_t key_len, - int64_t timestamp, - RdKafka::Headers *headers, - void *msg_opaque) { - rd_kafka_headers_t *hdrs = NULL; - RdKafka::HeadersImpl *headersimpl = NULL; - rd_kafka_resp_err_t err; - - if (headers) { - headersimpl = static_cast<RdKafka::HeadersImpl *>(headers); - hdrs = headersimpl->c_ptr(); - } - - err = rd_kafka_producev( - rk_, RD_KAFKA_V_TOPIC(topic_name.c_str()), - RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags), - RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len), - RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), - RD_KAFKA_V_HEADERS(hdrs), RD_KAFKA_V_END); - - if (!err && headersimpl) { - /* A successful producev() call will destroy the C headers. */ - headersimpl->c_headers_destroyed(); - delete headers; - } - - return static_cast<RdKafka::ErrorCode>(err); -} |