/* * librdkafka - Apache Kafka C/C++ library * * Copyright (c) 2014 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include "rdkafkacpp_int.h" RdKafka::Producer::~Producer() { } static void dr_msg_cb_trampoline(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { RdKafka::HandleImpl *handle = static_cast(opaque); RdKafka::MessageImpl message(RD_KAFKA_PRODUCER, NULL, (rd_kafka_message_t *)rkmessage, false); handle->dr_cb_->dr_cb(message); } RdKafka::Producer *RdKafka::Producer::create(const RdKafka::Conf *conf, std::string &errstr) { char errbuf[512]; const RdKafka::ConfImpl *confimpl = dynamic_cast(conf); RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl(); rd_kafka_conf_t *rk_conf = NULL; if (confimpl) { if (!confimpl->rk_conf_) { errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; delete rkp; return NULL; } rkp->set_common_config(confimpl); rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_); if (confimpl->dr_cb_) { rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline); rkp->dr_cb_ = confimpl->dr_cb_; } } rd_kafka_t *rk; if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errbuf, sizeof(errbuf)))) { errstr = errbuf; // rd_kafka_new() takes ownership only if succeeds if (rk_conf) rd_kafka_conf_destroy(rk_conf); delete rkp; return NULL; } rkp->rk_ = rk; return rkp; } RdKafka::ErrorCode RdKafka::ProducerImpl::produce(RdKafka::Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque) { RdKafka::TopicImpl *topicimpl = dynamic_cast(topic); if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags, payload, len, key ? key->c_str() : NULL, key ? key->size() : 0, msg_opaque) == -1) return static_cast(rd_kafka_last_error()); return RdKafka::ERR_NO_ERROR; } RdKafka::ErrorCode RdKafka::ProducerImpl::produce(RdKafka::Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque) { RdKafka::TopicImpl *topicimpl = dynamic_cast(topic); if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags, payload, len, key, key_len, msg_opaque) == -1) return static_cast(rd_kafka_last_error()); return RdKafka::ERR_NO_ERROR; } RdKafka::ErrorCode RdKafka::ProducerImpl::produce( RdKafka::Topic *topic, int32_t partition, const std::vector *payload, const std::vector *key, void *msg_opaque) { RdKafka::TopicImpl *topicimpl = dynamic_cast(topic); if (rd_kafka_produce(topicimpl->rkt_, partition, RD_KAFKA_MSG_F_COPY, payload ? (void *)&(*payload)[0] : NULL, payload ? payload->size() : 0, key ? &(*key)[0] : NULL, key ? key->size() : 0, msg_opaque) == -1) return static_cast(rd_kafka_last_error()); return RdKafka::ERR_NO_ERROR; } RdKafka::ErrorCode RdKafka::ProducerImpl::produce(const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque) { return static_cast(rd_kafka_producev( rk_, RD_KAFKA_V_TOPIC(topic_name.c_str()), RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags), RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len), RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), RD_KAFKA_V_END)); } RdKafka::ErrorCode RdKafka::ProducerImpl::produce(const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, RdKafka::Headers *headers, void *msg_opaque) { rd_kafka_headers_t *hdrs = NULL; RdKafka::HeadersImpl *headersimpl = NULL; rd_kafka_resp_err_t err; if (headers) { headersimpl = static_cast(headers); hdrs = headersimpl->c_ptr(); } err = rd_kafka_producev( rk_, RD_KAFKA_V_TOPIC(topic_name.c_str()), RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags), RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len), RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), RD_KAFKA_V_HEADERS(hdrs), RD_KAFKA_V_END); if (!err && headersimpl) { /* A successful producev() call will destroy the C headers. */ headersimpl->c_headers_destroyed(); delete headers; } return static_cast(err); }