diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/kafkatest_verifiable_client.cpp')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/examples/kafkatest_verifiable_client.cpp | 961 |
1 files changed, 961 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/kafkatest_verifiable_client.cpp b/fluent-bit/lib/librdkafka-2.1.0/examples/kafkatest_verifiable_client.cpp new file mode 100644 index 00000000..bdb8607a --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/examples/kafkatest_verifiable_client.cpp @@ -0,0 +1,961 @@ +/* + * Copyright (c) 2015, Confluent Inc + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * librdkafka version of the Java VerifiableProducer and VerifiableConsumer + * for use with the official Kafka client tests. + */ + + +#include <iostream> +#include <fstream> +#include <sstream> +#include <map> +#include <string> +#include <algorithm> +#include <cstdlib> +#include <cstdio> +#include <csignal> +#include <cstring> +#include <unistd.h> +#include <sys/time.h> +#include <assert.h> +#include <ctype.h> +#include <strings.h> + +#ifdef _WIN32 +#include "../win32/wingetopt.h" +#elif _AIX +#include <unistd.h> +#else +#include <getopt.h> +#endif + +/* + * Typically include path in a real application would be + * #include <librdkafka/rdkafkacpp.h> + */ +#include "rdkafkacpp.h" + +static volatile sig_atomic_t run = 1; +static bool exit_eof = false; +static int verbosity = 1; +static std::string value_prefix; + +class Assignment { + public: + static std::string name(const std::string &t, int partition) { + std::stringstream stm; + stm << t << "." << partition; + return stm.str(); + } + + Assignment() : + topic(""), + partition(-1), + consumedMessages(0), + minOffset(-1), + maxOffset(0) { + printf("Created assignment\n"); + } + Assignment(const Assignment &a) { + topic = a.topic; + partition = a.partition; + consumedMessages = a.consumedMessages; + minOffset = a.minOffset; + maxOffset = a.maxOffset; + } + + Assignment &operator=(const Assignment &a) { + this->topic = a.topic; + this->partition = a.partition; + this->consumedMessages = a.consumedMessages; + this->minOffset = a.minOffset; + this->maxOffset = a.maxOffset; + return *this; + } + + int operator==(const Assignment &a) const { + return !(this->topic == a.topic && this->partition == a.partition); + } + + int operator<(const Assignment &a) const { + if (this->topic < a.topic) + return 1; + if (this->topic >= a.topic) + return 0; + return (this->partition < a.partition); + } + + void setup(std::string t, int32_t p) { + assert(!t.empty()); + assert(topic.empty() || topic == t); + assert(partition == -1 || partition == p); + topic = t; + partition = p; + } + + std::string topic; + int partition; + int consumedMessages; + int64_t minOffset; + int64_t maxOffset; +}; + + + +static struct { + int maxMessages; + + struct { + int numAcked; + int numSent; + int numErr; + } producer; + + struct { + int consumedMessages; + int consumedMessagesLastReported; + int consumedMessagesAtLastCommit; + bool useAutoCommit; + std::map<std::string, Assignment> assignments; + } consumer; +} state = { + /* .maxMessages = */ -1}; + + +static RdKafka::KafkaConsumer *consumer; + + +static std::string now() { + struct timeval tv; + gettimeofday(&tv, NULL); + time_t t = tv.tv_sec; + struct tm tm; + char buf[64]; + + localtime_r(&t, &tm); + strftime(buf, sizeof(buf), "%H:%M:%S", &tm); + snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), ".%03d", + (int)(tv.tv_usec / 1000)); + + return buf; +} + + +static time_t watchdog_last_kick; +static const int watchdog_timeout = 20; /* Must be > socket.timeout.ms */ +static void sigwatchdog(int sig) { + time_t t = time(NULL); + if (watchdog_last_kick + watchdog_timeout <= t) { + std::cerr << now() << ": WATCHDOG TIMEOUT (" + << (int)(t - watchdog_last_kick) << "s): TERMINATING" + << std::endl; + int *i = NULL; + *i = 100; + abort(); + } +} + +static void watchdog_kick() { + watchdog_last_kick = time(NULL); + + /* Safe guard against hangs-on-exit */ + alarm(watchdog_timeout); +} + + + +static void errorString(const std::string &name, + const std::string &errmsg, + const std::string &topic, + const std::string *key, + const std::string &value) { + std::cout << "{ " + << "\"name\": \"" << name << "\", " + << "\"_time\": \"" << now() << "\", " + << "\"message\": \"" << errmsg << "\", " + << "\"topic\": \"" << topic << "\", " + << "\"key\": \"" << (key ? *key : "NULL") << "\", " + << "\"value\": \"" << value << "\" " + << "}" << std::endl; +} + + +static void successString(const std::string &name, + const std::string &topic, + int partition, + int64_t offset, + const std::string *key, + const std::string &value) { + std::cout << "{ " + << "\"name\": \"" << name << "\", " + << "\"_time\": \"" << now() << "\", " + << "\"topic\": \"" << topic << "\", " + << "\"partition\": " << partition << ", " + << "\"offset\": " << offset << ", " + << "\"key\": \"" << (key ? *key : "NULL") << "\", " + << "\"value\": \"" << value << "\" " + << "}" << std::endl; +} + + +#if FIXME +static void offsetStatus(bool success, + const std::string &topic, + int partition, + int64_t offset, + const std::string &errstr) { + std::cout << "{ " + "\"name\": \"offsets_committed\", " + << "\"success\": " << success << ", " + << "\"offsets\": [ " + << " { " + << " \"topic\": \"" << topic << "\", " + << " \"partition\": " << partition << ", " + << " \"offset\": " << (int)offset << ", " + << " \"error\": \"" << errstr << "\" " + << " } " + << "] }" << std::endl; +} +#endif + + +static void sigterm(int sig) { + std::cerr << now() << ": Terminating because of signal " << sig << std::endl; + + if (!run) { + std::cerr << now() << ": Forced termination" << std::endl; + exit(1); + } + run = 0; +} + + +class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { + public: + void dr_cb(RdKafka::Message &message) { + if (message.err()) { + state.producer.numErr++; + errorString("producer_send_error", message.errstr(), message.topic_name(), + message.key(), + std::string(static_cast<const char *>(message.payload()), + message.len())); + } else { + successString("producer_send_success", message.topic_name(), + (int)message.partition(), message.offset(), message.key(), + std::string(static_cast<const char *>(message.payload()), + message.len())); + state.producer.numAcked++; + } + } +}; + + +class ExampleEventCb : public RdKafka::EventCb { + public: + void event_cb(RdKafka::Event &event) { + switch (event.type()) { + case RdKafka::Event::EVENT_ERROR: + std::cerr << now() << ": ERROR (" << RdKafka::err2str(event.err()) + << "): " << event.str() << std::endl; + break; + + case RdKafka::Event::EVENT_STATS: + std::cerr << now() << ": \"STATS\": " << event.str() << std::endl; + break; + + case RdKafka::Event::EVENT_LOG: + std::cerr << now() << ": LOG-" << event.severity() << "-" << event.fac() + << ": " << event.str() << std::endl; + break; + + default: + std::cerr << now() << ": EVENT " << event.type() << " (" + << RdKafka::err2str(event.err()) << "): " << event.str() + << std::endl; + break; + } + } +}; + + +/* Use of this partitioner is pretty pointless since no key is provided + * in the produce() call. */ +class MyHashPartitionerCb : public RdKafka::PartitionerCb { + public: + int32_t partitioner_cb(const RdKafka::Topic *topic, + const std::string *key, + int32_t partition_cnt, + void *msg_opaque) { + return djb_hash(key->c_str(), key->size()) % partition_cnt; + } + + private: + static inline unsigned int djb_hash(const char *str, size_t len) { + unsigned int hash = 5381; + for (size_t i = 0; i < len; i++) + hash = ((hash << 5) + hash) + str[i]; + return hash; + } +}; + + + +/** + * Print number of records consumed, every 100 messages or on timeout. + */ +static void report_records_consumed(int immediate) { + std::map<std::string, Assignment> *assignments = &state.consumer.assignments; + + if (state.consumer.consumedMessages <= + state.consumer.consumedMessagesLastReported + (immediate ? 0 : 999)) + return; + + std::cout << "{ " + "\"name\": \"records_consumed\", " + << "\"_totcount\": " << state.consumer.consumedMessages << ", " + << "\"count\": " + << (state.consumer.consumedMessages - + state.consumer.consumedMessagesLastReported) + << ", " + << "\"partitions\": [ "; + + for (std::map<std::string, Assignment>::iterator ii = assignments->begin(); + ii != assignments->end(); ii++) { + Assignment *a = &(*ii).second; + assert(!a->topic.empty()); + std::cout << (ii == assignments->begin() ? "" : ", ") << " { " + << " \"topic\": \"" << a->topic << "\", " + << " \"partition\": " << a->partition << ", " + << " \"minOffset\": " << a->minOffset << ", " + << " \"maxOffset\": " << a->maxOffset << " " + << " } "; + a->minOffset = -1; + } + + std::cout << "] }" << std::endl; + + state.consumer.consumedMessagesLastReported = state.consumer.consumedMessages; +} + + +class ExampleOffsetCommitCb : public RdKafka::OffsetCommitCb { + public: + void offset_commit_cb(RdKafka::ErrorCode err, + std::vector<RdKafka::TopicPartition *> &offsets) { + std::cerr << now() << ": Propagate offset for " << offsets.size() + << " partitions, error: " << RdKafka::err2str(err) << std::endl; + + /* No offsets to commit, dont report anything. */ + if (err == RdKafka::ERR__NO_OFFSET) + return; + + /* Send up-to-date records_consumed report to make sure consumed > committed + */ + report_records_consumed(1); + + std::cout << "{ " + << "\"name\": \"offsets_committed\", " + << "\"success\": " << (err ? "false" : "true") << ", " + << "\"error\": \"" << (err ? RdKafka::err2str(err) : "") << "\", " + << "\"_autocommit\": " + << (state.consumer.useAutoCommit ? "true" : "false") << ", " + << "\"offsets\": [ "; + assert(offsets.size() > 0); + for (unsigned int i = 0; i < offsets.size(); i++) { + std::cout << (i == 0 ? "" : ", ") << "{ " + << " \"topic\": \"" << offsets[i]->topic() << "\", " + << " \"partition\": " << offsets[i]->partition() << ", " + << " \"offset\": " << (int)offsets[i]->offset() << ", " + << " \"error\": \"" + << (offsets[i]->err() ? RdKafka::err2str(offsets[i]->err()) + : "") + << "\" " + << " }"; + } + std::cout << " ] }" << std::endl; + } +}; + +static ExampleOffsetCommitCb ex_offset_commit_cb; + + +/** + * Commit every 1000 messages or whenever there is a consume timeout. + */ +static void do_commit(RdKafka::KafkaConsumer *consumer, int immediate) { + if (!immediate && (state.consumer.useAutoCommit || + state.consumer.consumedMessagesAtLastCommit + 1000 > + state.consumer.consumedMessages)) + return; + + /* Make sure we report consumption before commit, + * otherwise tests may fail because of commit > consumed. */ + if (state.consumer.consumedMessagesLastReported < + state.consumer.consumedMessages) + report_records_consumed(1); + + std::cerr << now() << ": committing " + << (state.consumer.consumedMessages - + state.consumer.consumedMessagesAtLastCommit) + << " messages" << std::endl; + + RdKafka::ErrorCode err; + err = consumer->commitSync(&ex_offset_commit_cb); + + std::cerr << now() << ": " + << "sync commit returned " << RdKafka::err2str(err) << std::endl; + + state.consumer.consumedMessagesAtLastCommit = state.consumer.consumedMessages; +} + + +void msg_consume(RdKafka::KafkaConsumer *consumer, + RdKafka::Message *msg, + void *opaque) { + switch (msg->err()) { + case RdKafka::ERR__TIMED_OUT: + /* Try reporting consumed messages */ + report_records_consumed(1); + /* Commit one every consume() timeout instead of on every message. + * Also commit on every 1000 messages, whichever comes first. */ + do_commit(consumer, 1); + break; + + + case RdKafka::ERR_NO_ERROR: { + /* Real message */ + if (verbosity > 2) + std::cerr << now() << ": Read msg from " << msg->topic_name() << " [" + << (int)msg->partition() << "] at offset " << msg->offset() + << std::endl; + + if (state.maxMessages >= 0 && + state.consumer.consumedMessages >= state.maxMessages) + return; + + + Assignment *a = &state.consumer.assignments[Assignment::name( + msg->topic_name(), msg->partition())]; + a->setup(msg->topic_name(), msg->partition()); + + a->consumedMessages++; + if (a->minOffset == -1) + a->minOffset = msg->offset(); + if (a->maxOffset < msg->offset()) + a->maxOffset = msg->offset(); + + if (msg->key()) { + if (verbosity >= 3) + std::cerr << now() << ": Key: " << *msg->key() << std::endl; + } + + if (verbosity >= 3) + fprintf(stderr, "%.*s\n", static_cast<int>(msg->len()), + static_cast<const char *>(msg->payload())); + + state.consumer.consumedMessages++; + + report_records_consumed(0); + + do_commit(consumer, 0); + } break; + + case RdKafka::ERR__PARTITION_EOF: + /* Last message */ + if (exit_eof) { + std::cerr << now() << ": Terminate: exit on EOF" << std::endl; + run = 0; + } + break; + + case RdKafka::ERR__UNKNOWN_TOPIC: + case RdKafka::ERR__UNKNOWN_PARTITION: + std::cerr << now() << ": Consume failed: " << msg->errstr() << std::endl; + run = 0; + break; + + case RdKafka::ERR_GROUP_COORDINATOR_NOT_AVAILABLE: + std::cerr << now() << ": Warning: " << msg->errstr() << std::endl; + break; + + default: + /* Errors */ + std::cerr << now() << ": Consume failed: " << msg->errstr() << std::endl; + run = 0; + } +} + + + +class ExampleConsumeCb : public RdKafka::ConsumeCb { + public: + void consume_cb(RdKafka::Message &msg, void *opaque) { + msg_consume(consumer_, &msg, opaque); + } + RdKafka::KafkaConsumer *consumer_; +}; + +class ExampleRebalanceCb : public RdKafka::RebalanceCb { + private: + static std::string part_list_json( + const std::vector<RdKafka::TopicPartition *> &partitions) { + std::ostringstream out; + for (unsigned int i = 0; i < partitions.size(); i++) + out << (i == 0 ? "" : ", ") << "{ " + << " \"topic\": \"" << partitions[i]->topic() << "\", " + << " \"partition\": " << partitions[i]->partition() << " }"; + return out.str(); + } + + public: + void rebalance_cb(RdKafka::KafkaConsumer *consumer, + RdKafka::ErrorCode err, + std::vector<RdKafka::TopicPartition *> &partitions) { + std::cerr << now() << ": rebalance_cb " << RdKafka::err2str(err) << " for " + << partitions.size() << " partitions" << std::endl; + /* Send message report prior to rebalancing event to make sure they + * are accounted for on the "right side" of the rebalance. */ + report_records_consumed(1); + + if (err == RdKafka::ERR__ASSIGN_PARTITIONS) + consumer->assign(partitions); + else { + do_commit(consumer, 1); + consumer->unassign(); + } + + std::cout << "{ " + << "\"name\": \"partitions_" + << (err == RdKafka::ERR__ASSIGN_PARTITIONS ? "assigned" + : "revoked") + << "\", " + << "\"partitions\": [ " << part_list_json(partitions) << "] }" + << std::endl; + } +}; + + + +/** + * @brief Read (Java client) configuration file + */ +static void read_conf_file(RdKafka::Conf *conf, const std::string &conf_file) { + std::ifstream inf(conf_file.c_str()); + + if (!inf) { + std::cerr << now() << ": " << conf_file << ": could not open file" + << std::endl; + exit(1); + } + + std::cerr << now() << ": " << conf_file << ": read config file" << std::endl; + + std::string line; + int linenr = 0; + + while (std::getline(inf, line)) { + linenr++; + + // Ignore comments and empty lines + if (line[0] == '#' || line.length() == 0) + continue; + + // Match on key=value.. + size_t d = line.find("="); + if (d == 0 || d == std::string::npos) { + std::cerr << now() << ": " << conf_file << ":" << linenr << ": " << line + << ": ignoring invalid line (expect key=value): " + << ::std::endl; + continue; + } + + std::string key = line.substr(0, d); + std::string val = line.substr(d + 1); + + std::string errstr; + if (conf->set(key, val, errstr)) { + std::cerr << now() << ": " << conf_file << ":" << linenr << ": " << key + << "=" << val << ": " << errstr << ": ignoring error" + << std::endl; + } else { + std::cerr << now() << ": " << conf_file << ":" << linenr << ": " << key + << "=" << val << ": applied to configuration" << std::endl; + } + } + + inf.close(); +} + + + +int main(int argc, char **argv) { + std::string brokers = "localhost"; + std::string errstr; + std::vector<std::string> topics; + std::string mode = "P"; + int throughput = 0; + int32_t partition = RdKafka::Topic::PARTITION_UA; + MyHashPartitionerCb hash_partitioner; + int64_t create_time = -1; + + std::cerr << now() << ": librdkafka version " << RdKafka::version_str() + << " (" << RdKafka::version() << ")" << std::endl; + + /* + * Create configuration objects + */ + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + + /* Java VerifiableProducer defaults to acks=all */ + if (conf->set("acks", "all", errstr)) { + std::cerr << now() << ": " << errstr << std::endl; + exit(1); + } + + /* Avoid slow shutdown on error */ + if (conf->set("message.timeout.ms", "60000", errstr)) { + std::cerr << now() << ": " << errstr << std::endl; + exit(1); + } + + { + char hostname[128]; + gethostname(hostname, sizeof(hostname) - 1); + conf->set("client.id", std::string("rdkafka@") + hostname, errstr); + } + + conf->set("log.thread.name", "true", errstr); + + /* auto commit is explicitly enabled with --enable-autocommit */ + conf->set("enable.auto.commit", "false", errstr); + + /* keep protocol request timeouts under the watchdog timeout + * to make sure things like commitSync() dont fall victim to the watchdog. */ + conf->set("socket.timeout.ms", "10000", errstr); + + conf->set("fetch.wait.max.ms", "500", errstr); + conf->set("fetch.min.bytes", "4096", errstr); + + conf->set("enable.partition.eof", "true", errstr); + + for (int i = 1; i < argc; i++) { + const char *name = argv[i]; + const char *val = i + 1 < argc ? argv[i + 1] : NULL; + + if (val && !strncmp(val, "-", 1)) + val = NULL; + + std::cout << now() << ": argument: " << name << " " << (val ? val : "") + << std::endl; + + if (val) { + if (!strcmp(name, "--topic")) + topics.push_back(val); + else if (!strcmp(name, "--broker-list")) + brokers = val; + else if (!strcmp(name, "--max-messages")) + state.maxMessages = atoi(val); + else if (!strcmp(name, "--throughput")) + throughput = atoi(val); + else if (!strcmp(name, "--producer.config") || + !strcmp(name, "--consumer.config")) + read_conf_file(conf, val); + else if (!strcmp(name, "--group-id")) + conf->set("group.id", val, errstr); + else if (!strcmp(name, "--session-timeout")) + conf->set("session.timeout.ms", val, errstr); + else if (!strcmp(name, "--reset-policy")) { + if (conf->set("auto.offset.reset", val, errstr)) { + std::cerr << now() << ": " << errstr << std::endl; + exit(1); + } + } else if (!strcmp(name, "--assignment-strategy")) { + /* The system tests pass the Java class name(s) rather than + * the configuration value. Fix it. + * "org.apache.kafka.clients.consumer.RangeAssignor,.." -> "range,.." + */ + std::string s = val; + size_t pos; + + while ((pos = s.find("org.apache.kafka.clients.consumer.")) != + std::string::npos) + s.erase(pos, strlen("org.apache.kafka.clients.consumer.")); + + while ((pos = s.find("Assignor")) != std::string::npos) + s.erase(pos, strlen("Assignor")); + + std::transform(s.begin(), s.end(), s.begin(), tolower); + + std::cerr << now() << ": converted " << name << " " << val << " to " + << s << std::endl; + + if (conf->set("partition.assignment.strategy", s.c_str(), errstr)) { + std::cerr << now() << ": " << errstr << std::endl; + exit(1); + } + } else if (!strcmp(name, "--value-prefix")) { + value_prefix = std::string(val) + "."; + } else if (!strcmp(name, "--acks")) { + if (conf->set("acks", val, errstr)) { + std::cerr << now() << ": " << errstr << std::endl; + exit(1); + } + } else if (!strcmp(name, "--message-create-time")) { + create_time = (int64_t)atoi(val); + } else if (!strcmp(name, "--debug")) { + conf->set("debug", val, errstr); + } else if (!strcmp(name, "-X")) { + char *s = strdup(val); + char *t = strchr(s, '='); + if (!t) + t = (char *)""; + else { + *t = '\0'; + t++; + } + if (conf->set(s, t, errstr)) { + std::cerr << now() << ": " << errstr << std::endl; + exit(1); + } + free(s); + } else { + std::cerr << now() << ": Unknown option " << name << std::endl; + exit(1); + } + + i++; + + } else { + if (!strcmp(name, "--consumer")) + mode = "C"; + else if (!strcmp(name, "--producer")) + mode = "P"; + else if (!strcmp(name, "--enable-autocommit")) { + state.consumer.useAutoCommit = true; + conf->set("enable.auto.commit", "true", errstr); + } else if (!strcmp(name, "-v")) + verbosity++; + else if (!strcmp(name, "-q")) + verbosity--; + else { + std::cerr << now() << ": Unknown option or missing argument to " << name + << std::endl; + exit(1); + } + } + } + + if (topics.empty() || brokers.empty()) { + std::cerr << now() << ": Missing --topic and --broker-list" << std::endl; + exit(1); + } + + + /* + * Set configuration properties + */ + conf->set("metadata.broker.list", brokers, errstr); + + ExampleEventCb ex_event_cb; + conf->set("event_cb", &ex_event_cb, errstr); + + signal(SIGINT, sigterm); + signal(SIGTERM, sigterm); + signal(SIGALRM, sigwatchdog); + + + if (mode == "P") { + /* + * Producer mode + */ + + ExampleDeliveryReportCb ex_dr_cb; + + /* Set delivery report callback */ + conf->set("dr_cb", &ex_dr_cb, errstr); + + /* + * Create producer using accumulated global configuration. + */ + RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); + if (!producer) { + std::cerr << now() << ": Failed to create producer: " << errstr + << std::endl; + exit(1); + } + + std::cerr << now() << ": % Created producer " << producer->name() + << std::endl; + + /* + * Create topic handle. + */ + RdKafka::Topic *topic = + RdKafka::Topic::create(producer, topics[0], NULL, errstr); + if (!topic) { + std::cerr << now() << ": Failed to create topic: " << errstr << std::endl; + exit(1); + } + + static const int delay_us = throughput ? 1000000 / throughput : 10; + + if (state.maxMessages == -1) + state.maxMessages = 1000000; /* Avoid infinite produce */ + + for (int i = 0; run && i < state.maxMessages; i++) { + /* + * Produce message + */ + std::ostringstream msg; + msg << value_prefix << i; + while (true) { + RdKafka::ErrorCode resp; + if (create_time == -1) { + resp = producer->produce( + topic, partition, + RdKafka::Producer::RK_MSG_COPY /* Copy payload */, + const_cast<char *>(msg.str().c_str()), msg.str().size(), NULL, + NULL); + } else { + resp = producer->produce( + topics[0], partition, + RdKafka::Producer::RK_MSG_COPY /* Copy payload */, + const_cast<char *>(msg.str().c_str()), msg.str().size(), NULL, 0, + create_time, NULL); + } + + if (resp == RdKafka::ERR__QUEUE_FULL) { + producer->poll(100); + continue; + } else if (resp != RdKafka::ERR_NO_ERROR) { + errorString("producer_send_error", RdKafka::err2str(resp), + topic->name(), NULL, msg.str()); + state.producer.numErr++; + } else { + state.producer.numSent++; + } + break; + } + + producer->poll(delay_us / 1000); + usleep(1000); + watchdog_kick(); + } + run = 1; + + while (run && producer->outq_len() > 0) { + std::cerr << now() << ": Waiting for " << producer->outq_len() + << std::endl; + producer->poll(1000); + watchdog_kick(); + } + + std::cerr << now() << ": " << state.producer.numAcked << "/" + << state.producer.numSent << "/" << state.maxMessages + << " msgs acked/sent/max, " << state.producer.numErr << " errored" + << std::endl; + + delete topic; + delete producer; + + + } else if (mode == "C") { + /* + * Consumer mode + */ + + conf->set("auto.offset.reset", "smallest", errstr); + + ExampleRebalanceCb ex_rebalance_cb; + conf->set("rebalance_cb", &ex_rebalance_cb, errstr); + + conf->set("offset_commit_cb", &ex_offset_commit_cb, errstr); + + + /* + * Create consumer using accumulated global configuration. + */ + consumer = RdKafka::KafkaConsumer::create(conf, errstr); + if (!consumer) { + std::cerr << now() << ": Failed to create consumer: " << errstr + << std::endl; + exit(1); + } + + std::cerr << now() << ": % Created consumer " << consumer->name() + << std::endl; + + /* + * Subscribe to topic(s) + */ + RdKafka::ErrorCode resp = consumer->subscribe(topics); + if (resp != RdKafka::ERR_NO_ERROR) { + std::cerr << now() << ": Failed to subscribe to " << topics.size() + << " topics: " << RdKafka::err2str(resp) << std::endl; + exit(1); + } + + watchdog_kick(); + + /* + * Consume messages + */ + while (run) { + RdKafka::Message *msg = consumer->consume(500); + msg_consume(consumer, msg, NULL); + delete msg; + watchdog_kick(); + } + + std::cerr << now() << ": Final commit on termination" << std::endl; + + /* Final commit */ + do_commit(consumer, 1); + + /* + * Stop consumer + */ + consumer->close(); + + delete consumer; + } + + std::cout << "{ \"name\": \"shutdown_complete\" }" << std::endl; + + /* + * Wait for RdKafka to decommission. + * This is not strictly needed (when check outq_len() above), but + * allows RdKafka to clean up all its resources before the application + * exits so that memory profilers such as valgrind wont complain about + * memory leaks. + */ + RdKafka::wait_destroyed(5000); + + std::cerr << now() << ": EXITING WITH RETURN VALUE 0" << std::endl; + return 0; +} |