summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp197
1 files changed, 0 insertions, 197 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp
deleted file mode 100644
index 8300dfb3b..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/ProducerImpl.cpp
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * librdkafka - Apache Kafka C/C++ library
- *
- * Copyright (c) 2014 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include <iostream>
-#include <string>
-#include <list>
-#include <cerrno>
-
-#include "rdkafkacpp_int.h"
-
-
-RdKafka::Producer::~Producer() {
-}
-
-static void dr_msg_cb_trampoline(rd_kafka_t *rk,
- const rd_kafka_message_t *rkmessage,
- void *opaque) {
- RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
- RdKafka::MessageImpl message(RD_KAFKA_PRODUCER, NULL,
- (rd_kafka_message_t *)rkmessage, false);
- handle->dr_cb_->dr_cb(message);
-}
-
-
-
-RdKafka::Producer *RdKafka::Producer::create(const RdKafka::Conf *conf,
- std::string &errstr) {
- char errbuf[512];
- const RdKafka::ConfImpl *confimpl =
- dynamic_cast<const RdKafka::ConfImpl *>(conf);
- RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl();
- rd_kafka_conf_t *rk_conf = NULL;
-
- if (confimpl) {
- if (!confimpl->rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- delete rkp;
- return NULL;
- }
-
- rkp->set_common_config(confimpl);
-
- rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
-
- if (confimpl->dr_cb_) {
- rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline);
- rkp->dr_cb_ = confimpl->dr_cb_;
- }
- }
-
-
- rd_kafka_t *rk;
- if (!(rk =
- rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errbuf, sizeof(errbuf)))) {
- errstr = errbuf;
- // rd_kafka_new() takes ownership only if succeeds
- if (rk_conf)
- rd_kafka_conf_destroy(rk_conf);
- delete rkp;
- return NULL;
- }
-
- rkp->rk_ = rk;
-
- return rkp;
-}
-
-
-RdKafka::ErrorCode RdKafka::ProducerImpl::produce(RdKafka::Topic *topic,
- int32_t partition,
- int msgflags,
- void *payload,
- size_t len,
- const std::string *key,
- void *msg_opaque) {
- RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
-
- if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags, payload, len,
- key ? key->c_str() : NULL, key ? key->size() : 0,
- msg_opaque) == -1)
- return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
-
- return RdKafka::ERR_NO_ERROR;
-}
-
-
-RdKafka::ErrorCode RdKafka::ProducerImpl::produce(RdKafka::Topic *topic,
- int32_t partition,
- int msgflags,
- void *payload,
- size_t len,
- const void *key,
- size_t key_len,
- void *msg_opaque) {
- RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
-
- if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags, payload, len, key,
- key_len, msg_opaque) == -1)
- return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
-
- return RdKafka::ERR_NO_ERROR;
-}
-
-
-RdKafka::ErrorCode RdKafka::ProducerImpl::produce(
- RdKafka::Topic *topic,
- int32_t partition,
- const std::vector<char> *payload,
- const std::vector<char> *key,
- void *msg_opaque) {
- RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
-
- if (rd_kafka_produce(topicimpl->rkt_, partition, RD_KAFKA_MSG_F_COPY,
- payload ? (void *)&(*payload)[0] : NULL,
- payload ? payload->size() : 0, key ? &(*key)[0] : NULL,
- key ? key->size() : 0, msg_opaque) == -1)
- return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
-
- return RdKafka::ERR_NO_ERROR;
-}
-
-RdKafka::ErrorCode RdKafka::ProducerImpl::produce(const std::string topic_name,
- int32_t partition,
- int msgflags,
- void *payload,
- size_t len,
- const void *key,
- size_t key_len,
- int64_t timestamp,
- void *msg_opaque) {
- return static_cast<RdKafka::ErrorCode>(rd_kafka_producev(
- rk_, RD_KAFKA_V_TOPIC(topic_name.c_str()),
- RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags),
- RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len),
- RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque),
- RD_KAFKA_V_END));
-}
-
-RdKafka::ErrorCode RdKafka::ProducerImpl::produce(const std::string topic_name,
- int32_t partition,
- int msgflags,
- void *payload,
- size_t len,
- const void *key,
- size_t key_len,
- int64_t timestamp,
- RdKafka::Headers *headers,
- void *msg_opaque) {
- rd_kafka_headers_t *hdrs = NULL;
- RdKafka::HeadersImpl *headersimpl = NULL;
- rd_kafka_resp_err_t err;
-
- if (headers) {
- headersimpl = static_cast<RdKafka::HeadersImpl *>(headers);
- hdrs = headersimpl->c_ptr();
- }
-
- err = rd_kafka_producev(
- rk_, RD_KAFKA_V_TOPIC(topic_name.c_str()),
- RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags),
- RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len),
- RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque),
- RD_KAFKA_V_HEADERS(hdrs), RD_KAFKA_V_END);
-
- if (!err && headersimpl) {
- /* A successful producev() call will destroy the C headers. */
- headersimpl->c_headers_destroyed();
- delete headers;
- }
-
- return static_cast<RdKafka::ErrorCode>(err);
-}