summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp237
1 files changed, 237 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp
new file mode 100644
index 000000000..67508ff82
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp
@@ -0,0 +1,237 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2016, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include "testcpp.h"
+
+/**
+ * binary search by timestamp: excercices KafkaConsumer's seek() API.
+ */
+
+
+static std::string topic;
+static const int partition = 0;
+static int64_t golden_timestamp = -1;
+static int64_t golden_offset = -1;
+
+/**
+ * @brief Seek to offset and consume that message.
+ *
+ * Asserts on failure.
+ */
+static RdKafka::Message *get_msg(RdKafka::KafkaConsumer *c,
+ int64_t offset,
+ bool use_seek) {
+ RdKafka::TopicPartition *next =
+ RdKafka::TopicPartition::create(topic, partition, offset);
+ RdKafka::ErrorCode err;
+
+ /* Since seek() can only be used to change the currently consumed
+ * offset we need to start consuming the first time we run this
+ * loop by calling assign() */
+
+ test_timing_t t_seek;
+ TIMING_START(&t_seek, "seek");
+ if (!use_seek) {
+ std::vector<RdKafka::TopicPartition *> parts;
+ parts.push_back(next);
+ err = c->assign(parts);
+ if (err)
+ Test::Fail("assign() failed: " + RdKafka::err2str(err));
+ } else {
+ err = c->seek(*next, tmout_multip(5000));
+ if (err)
+ Test::Fail("seek() failed: " + RdKafka::err2str(err));
+ }
+ TIMING_STOP(&t_seek);
+ delete next;
+
+ test_timing_t t_consume;
+ TIMING_START(&t_consume, "consume");
+
+ RdKafka::Message *msg = c->consume(tmout_multip(5000));
+ if (!msg)
+ Test::Fail("consume() returned NULL");
+ TIMING_STOP(&t_consume);
+
+ if (msg->err())
+ Test::Fail("consume() returned error: " + msg->errstr());
+
+ if (msg->offset() != offset)
+ Test::Fail(tostr() << "seek()ed to offset " << offset
+ << " but consume() returned offset " << msg->offset());
+
+ return msg;
+}
+
+class MyDeliveryReportCb : public RdKafka::DeliveryReportCb {
+ public:
+ void dr_cb(RdKafka::Message &msg) {
+ if (msg.err())
+ Test::Fail("Delivery failed: " + msg.errstr());
+
+ if (!msg.msg_opaque())
+ return;
+
+ RdKafka::MessageTimestamp ts = msg.timestamp();
+ if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
+ Test::Fail(tostr() << "Dr msg timestamp type wrong: " << ts.type);
+
+ golden_timestamp = ts.timestamp;
+ golden_offset = msg.offset();
+ }
+};
+
+static void do_test_bsearch(void) {
+ RdKafka::Conf *conf, *tconf;
+ int msgcnt = 1000;
+ int64_t timestamp;
+ std::string errstr;
+ RdKafka::ErrorCode err;
+ MyDeliveryReportCb my_dr;
+
+ topic = Test::mk_topic_name("0059-bsearch", 1);
+ Test::conf_init(&conf, &tconf, 0);
+ Test::conf_set(tconf, "acks", "all");
+ Test::conf_set(conf, "api.version.request", "true");
+ conf->set("dr_cb", &my_dr, errstr);
+ conf->set("default_topic_conf", tconf, errstr);
+
+ RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
+ if (!p)
+ Test::Fail("Failed to create Producer: " + errstr);
+ delete conf;
+ delete tconf;
+
+ timestamp = 1000;
+ for (int i = 0; i < msgcnt; i++) {
+ err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY,
+ (void *)topic.c_str(), topic.size(), NULL, 0, timestamp,
+ i == 357 ? (void *)1 /*golden*/ : NULL);
+ if (err != RdKafka::ERR_NO_ERROR)
+ Test::Fail("Produce failed: " + RdKafka::err2str(err));
+ timestamp += 100 + (timestamp % 9);
+ }
+
+ if (p->flush(tmout_multip(5000)) != 0)
+ Test::Fail("Not all messages flushed");
+
+ Test::Say(tostr() << "Produced " << msgcnt << " messages, "
+ << "golden message with timestamp " << golden_timestamp
+ << " at offset " << golden_offset << "\n");
+
+ delete p;
+
+ /*
+ * Now find the golden message using bsearch
+ */
+
+ /* Create consumer */
+ Test::conf_init(&conf, NULL, 10);
+ Test::conf_set(conf, "group.id", topic);
+ Test::conf_set(conf, "api.version.request", "true");
+ Test::conf_set(conf, "fetch.wait.max.ms", "1");
+ Test::conf_set(conf, "fetch.error.backoff.ms", "1");
+ Test::conf_set(conf, "queued.min.messages", "1");
+ Test::conf_set(conf, "enable.auto.commit", "false");
+
+ RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
+ if (!c)
+ Test::Fail("Failed to create KafkaConsumer: " + errstr);
+ delete conf;
+
+ Test::Say("Find initial middle offset\n");
+ int64_t low, high;
+ test_timing_t t_qr;
+ TIMING_START(&t_qr, "query_watermark_offsets");
+ err = c->query_watermark_offsets(topic, partition, &low, &high,
+ tmout_multip(5000));
+ TIMING_STOP(&t_qr);
+ if (err)
+ Test::Fail("query_watermark_offsets failed: " + RdKafka::err2str(err));
+
+ /* Divide and conquer */
+ test_timing_t t_bsearch;
+ TIMING_START(&t_bsearch, "actual bsearch");
+ int itcnt = 0;
+ do {
+ int64_t mid;
+
+ mid = low + ((high - low) / 2);
+
+ Test::Say(1, tostr() << "Get message at mid point of " << low << ".."
+ << high << " -> " << mid << "\n");
+
+ RdKafka::Message *msg = get_msg(c, mid,
+ /* use assign() on first iteration,
+ * then seek() */
+ itcnt > 0);
+
+ RdKafka::MessageTimestamp ts = msg->timestamp();
+ if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
+ Test::Fail(tostr() << "Expected CreateTime timestamp, not " << ts.type
+ << " at offset " << msg->offset());
+
+ Test::Say(1, tostr() << "Message at offset " << msg->offset()
+ << " with timestamp " << ts.timestamp << "\n");
+
+ if (ts.timestamp == golden_timestamp) {
+ Test::Say(1, tostr() << "Found golden timestamp " << ts.timestamp
+ << " at offset " << msg->offset() << " in "
+ << itcnt + 1 << " iterations\n");
+ delete msg;
+ break;
+ }
+
+ if (low == high) {
+ Test::Fail(tostr() << "Search exhausted at offset " << msg->offset()
+ << " with timestamp " << ts.timestamp
+ << " without finding golden timestamp "
+ << golden_timestamp << " at offset " << golden_offset);
+
+ } else if (ts.timestamp < golden_timestamp)
+ low = msg->offset() + 1;
+ else if (ts.timestamp > golden_timestamp)
+ high = msg->offset() - 1;
+
+ delete msg;
+ itcnt++;
+ } while (true);
+ TIMING_STOP(&t_bsearch);
+
+ c->close();
+
+ delete c;
+}
+
+extern "C" {
+int main_0059_bsearch(int argc, char **argv) {
+ do_test_bsearch();
+ return 0;
+}
+}