summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/testcpp.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/testcpp.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/testcpp.h360
1 files changed, 360 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/testcpp.h b/fluent-bit/lib/librdkafka-2.1.0/tests/testcpp.h
new file mode 100644
index 000000000..2ecaed394
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/testcpp.h
@@ -0,0 +1,360 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef _TESTCPP_H_
+#define _TESTCPP_H_
+
+#include <sstream>
+
+#include "rdkafkacpp.h"
+
+extern "C" {
+#ifdef _WIN32
+/* Win32/Visual Studio */
+#include "../src/win32_config.h"
+#include "../src/rdwin32.h"
+#else
+#include "../config.h"
+/* POSIX / UNIX based systems */
+#include "../src/rdposix.h"
+#endif
+#include "testshared.h"
+}
+
+// courtesy of
+// http://stackoverview.blogspot.se/2011/04/create-string-on-fly-just-in-one-line.html
+struct tostr {
+ std::stringstream ss;
+ template <typename T>
+ tostr &operator<<(const T &data) {
+ ss << data;
+ return *this;
+ }
+ operator std::string() {
+ return ss.str();
+ }
+};
+
+
+
+#define TestMessageVerify(testid, exp_partition, msgidp, msg) \
+ test_msg_parse00(__FUNCTION__, __LINE__, testid, exp_partition, msgidp, \
+ (msg)->topic_name().c_str(), (msg)->partition(), \
+ (msg)->offset(), (const char *)(msg)->key_pointer(), \
+ (msg)->key_len())
+
+namespace Test {
+
+/**
+ * @brief Get test config object
+ */
+
+static RD_UNUSED void Fail(std::string str) {
+ test_fail0(__FILE__, __LINE__, "", 1 /*do-lock*/, 1 /*now*/, "%s",
+ str.c_str());
+}
+static RD_UNUSED void FailLater(std::string str) {
+ test_fail0(__FILE__, __LINE__, "", 1 /*do-lock*/, 0 /*later*/, "%s",
+ str.c_str());
+}
+static RD_UNUSED void Skip(std::string str) {
+ test_SKIP(__FILE__, __LINE__, str.c_str());
+}
+static RD_UNUSED void Say(int level, std::string str) {
+ test_SAY(__FILE__, __LINE__, level, str.c_str());
+}
+static RD_UNUSED void Say(std::string str) {
+ Test::Say(2, str);
+}
+
+/**
+ * @brief Generate test topic name
+ */
+static RD_UNUSED std::string mk_topic_name(std::string suffix,
+ bool randomized) {
+ return test_mk_topic_name(suffix.c_str(), (int)randomized);
+}
+
+/**
+ * @brief Generate random test group name
+ */
+static RD_UNUSED std::string mk_unique_group_name(std::string suffix) {
+ return test_mk_topic_name(suffix.c_str(), 1);
+}
+
+/**
+ * @brief Create partitions
+ */
+static RD_UNUSED void create_partitions(RdKafka::Handle *use_handle,
+ const char *topicname,
+ int new_partition_cnt) {
+ rd_kafka_t *use_rk = NULL;
+ if (use_handle != NULL)
+ use_rk = use_handle->c_ptr();
+ test_create_partitions(use_rk, topicname, new_partition_cnt);
+}
+
+/**
+ * @brief Create a topic
+ */
+static RD_UNUSED void create_topic(RdKafka::Handle *use_handle,
+ const char *topicname,
+ int partition_cnt,
+ int replication_factor) {
+ rd_kafka_t *use_rk = NULL;
+ if (use_handle != NULL)
+ use_rk = use_handle->c_ptr();
+ test_create_topic(use_rk, topicname, partition_cnt, replication_factor);
+}
+
+/**
+ * @brief Delete a topic
+ */
+static RD_UNUSED void delete_topic(RdKafka::Handle *use_handle,
+ const char *topicname) {
+ rd_kafka_t *use_rk = NULL;
+ if (use_handle != NULL)
+ use_rk = use_handle->c_ptr();
+ test_delete_topic(use_rk, topicname);
+}
+
+/**
+ * @brief Get new configuration objects
+ */
+void conf_init(RdKafka::Conf **conf, RdKafka::Conf **topic_conf, int timeout);
+
+
+static RD_UNUSED void conf_set(RdKafka::Conf *conf,
+ std::string name,
+ std::string val) {
+ std::string errstr;
+ if (conf->set(name, val, errstr) != RdKafka::Conf::CONF_OK)
+ Test::Fail("Conf failed: " + errstr);
+}
+
+static RD_UNUSED void print_TopicPartitions(
+ std::string header,
+ const std::vector<RdKafka::TopicPartition *> &partitions) {
+ Test::Say(tostr() << header << ": " << partitions.size()
+ << " TopicPartition(s):\n");
+ for (unsigned int i = 0; i < partitions.size(); i++)
+ Test::Say(tostr() << " " << partitions[i]->topic() << "["
+ << partitions[i]->partition() << "] "
+ << "offset " << partitions[i]->offset() << ": "
+ << RdKafka::err2str(partitions[i]->err()) << "\n");
+}
+
+
+/* Convenience subscribe() */
+static RD_UNUSED void subscribe(RdKafka::KafkaConsumer *c,
+ const std::string &topic) {
+ Test::Say(c->name() + ": Subscribing to " + topic + "\n");
+ std::vector<std::string> topics;
+ topics.push_back(topic);
+ RdKafka::ErrorCode err;
+ if ((err = c->subscribe(topics)))
+ Test::Fail("Subscribe failed: " + RdKafka::err2str(err));
+}
+
+
+/* Convenience subscribe() to two topics */
+static RD_UNUSED void subscribe(RdKafka::KafkaConsumer *c,
+ const std::string &topic1,
+ const std::string &topic2) {
+ Test::Say(c->name() + ": Subscribing to " + topic1 + " and " + topic2 + "\n");
+ std::vector<std::string> topics;
+ topics.push_back(topic1);
+ topics.push_back(topic2);
+ RdKafka::ErrorCode err;
+ if ((err = c->subscribe(topics)))
+ Test::Fail("Subscribe failed: " + RdKafka::err2str(err));
+}
+
+/* Convenience unsubscribe() */
+static RD_UNUSED void unsubscribe(RdKafka::KafkaConsumer *c) {
+ Test::Say(c->name() + ": Unsubscribing\n");
+ RdKafka::ErrorCode err;
+ if ((err = c->unsubscribe()))
+ Test::Fail("Unsubscribe failed: " + RdKafka::err2str(err));
+}
+
+
+static RD_UNUSED void incremental_assign(
+ RdKafka::KafkaConsumer *c,
+ const std::vector<RdKafka::TopicPartition *> &parts) {
+ Test::Say(tostr() << c->name() << ": incremental assign of " << parts.size()
+ << " partition(s)\n");
+ if (test_level >= 2)
+ print_TopicPartitions("incremental_assign()", parts);
+ RdKafka::Error *error;
+ if ((error = c->incremental_assign(parts)))
+ Test::Fail(c->name() + ": Incremental assign failed: " + error->str());
+}
+
+static RD_UNUSED void incremental_unassign(
+ RdKafka::KafkaConsumer *c,
+ const std::vector<RdKafka::TopicPartition *> &parts) {
+ Test::Say(tostr() << c->name() << ": incremental unassign of " << parts.size()
+ << " partition(s)\n");
+ if (test_level >= 2)
+ print_TopicPartitions("incremental_unassign()", parts);
+ RdKafka::Error *error;
+ if ((error = c->incremental_unassign(parts)))
+ Test::Fail(c->name() + ": Incremental unassign failed: " + error->str());
+}
+
+/**
+ * @brief Wait until the current assignment size is \p partition_count.
+ * If \p topic is not NULL, then additionally, each partition in
+ * the assignment must have topic \p topic.
+ */
+static RD_UNUSED void wait_for_assignment(RdKafka::KafkaConsumer *c,
+ size_t partition_count,
+ const std::string *topic) {
+ bool done = false;
+ while (!done) {
+ RdKafka::Message *msg1 = c->consume(500);
+ delete msg1;
+
+ std::vector<RdKafka::TopicPartition *> partitions;
+ c->assignment(partitions);
+
+ if (partitions.size() == partition_count) {
+ done = true;
+ if (topic) {
+ for (size_t i = 0; i < partitions.size(); i++) {
+ if (partitions[i]->topic() != *topic) {
+ done = false;
+ break;
+ }
+ }
+ }
+ }
+
+ RdKafka::TopicPartition::destroy(partitions);
+ }
+}
+
+
+/**
+ * @brief Check current assignment has size \p partition_count
+ * If \p topic is not NULL, then additionally check that
+ * each partition in the assignment has topic \p topic.
+ */
+static RD_UNUSED void check_assignment(RdKafka::KafkaConsumer *c,
+ size_t partition_count,
+ const std::string *topic) {
+ std::vector<RdKafka::TopicPartition *> partitions;
+ c->assignment(partitions);
+ if (partition_count != partitions.size())
+ Test::Fail(tostr() << "Expecting current assignment to have size "
+ << partition_count << ", not: " << partitions.size());
+ for (size_t i = 0; i < partitions.size(); i++) {
+ if (topic != NULL) {
+ if (partitions[i]->topic() != *topic)
+ Test::Fail(tostr() << "Expecting assignment to be " << *topic
+ << ", not " << partitions[i]->topic());
+ }
+ delete partitions[i];
+ }
+}
+
+
+/**
+ * @brief Current assignment partition count. If \p topic is
+ * NULL, then the total partition count, else the number
+ * of assigned partitions from \p topic.
+ */
+static RD_UNUSED size_t assignment_partition_count(RdKafka::KafkaConsumer *c,
+ std::string *topic) {
+ std::vector<RdKafka::TopicPartition *> partitions;
+ c->assignment(partitions);
+ int cnt = 0;
+ for (size_t i = 0; i < partitions.size(); i++) {
+ if (topic == NULL || *topic == partitions[i]->topic())
+ cnt++;
+ delete partitions[i];
+ }
+ return cnt;
+}
+
+
+/**
+ * @brief Poll the consumer once, discarding the returned message
+ * or error event.
+ * @returns true if a proper event/message was seen, or false on timeout.
+ */
+static RD_UNUSED bool poll_once(RdKafka::KafkaConsumer *c, int timeout_ms) {
+ RdKafka::Message *msg = c->consume(timeout_ms);
+ bool ret = msg->err() != RdKafka::ERR__TIMED_OUT;
+ delete msg;
+ return ret;
+}
+
+
+/**
+ * @brief Produce \p msgcnt messages to \p topic \p partition.
+ */
+static RD_UNUSED void produce_msgs(RdKafka::Producer *p,
+ const std::string &topic,
+ int32_t partition,
+ int msgcnt,
+ int msgsize,
+ bool flush) {
+ char *buf = (char *)malloc(msgsize);
+
+ for (int i = 0; i < msgsize; i++)
+ buf[i] = (char)((int)'a' + (i % 26));
+
+ for (int i = 0; i < msgcnt; i++) {
+ RdKafka::ErrorCode err;
+ err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY,
+ (void *)buf, (size_t)msgsize, NULL, 0, 0, NULL);
+ TEST_ASSERT(!err, "produce() failed: %s", RdKafka::err2str(err).c_str());
+ p->poll(0);
+ }
+
+ free(buf);
+
+ if (flush)
+ p->flush(10 * 1000);
+}
+
+
+
+/**
+ * @brief Delivery report class
+ */
+class DeliveryReportCb : public RdKafka::DeliveryReportCb {
+ public:
+ void dr_cb(RdKafka::Message &msg);
+};
+
+static DeliveryReportCb DrCb;
+}; // namespace Test
+
+#endif /* _TESTCPP_H_ */