diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp b/fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp new file mode 100644 index 000000000..67508ff82 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0059-bsearch.cpp @@ -0,0 +1,237 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2016, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include "testcpp.h" + +/** + * binary search by timestamp: excercices KafkaConsumer's seek() API. + */ + + +static std::string topic; +static const int partition = 0; +static int64_t golden_timestamp = -1; +static int64_t golden_offset = -1; + +/** + * @brief Seek to offset and consume that message. + * + * Asserts on failure. + */ +static RdKafka::Message *get_msg(RdKafka::KafkaConsumer *c, + int64_t offset, + bool use_seek) { + RdKafka::TopicPartition *next = + RdKafka::TopicPartition::create(topic, partition, offset); + RdKafka::ErrorCode err; + + /* Since seek() can only be used to change the currently consumed + * offset we need to start consuming the first time we run this + * loop by calling assign() */ + + test_timing_t t_seek; + TIMING_START(&t_seek, "seek"); + if (!use_seek) { + std::vector<RdKafka::TopicPartition *> parts; + parts.push_back(next); + err = c->assign(parts); + if (err) + Test::Fail("assign() failed: " + RdKafka::err2str(err)); + } else { + err = c->seek(*next, tmout_multip(5000)); + if (err) + Test::Fail("seek() failed: " + RdKafka::err2str(err)); + } + TIMING_STOP(&t_seek); + delete next; + + test_timing_t t_consume; + TIMING_START(&t_consume, "consume"); + + RdKafka::Message *msg = c->consume(tmout_multip(5000)); + if (!msg) + Test::Fail("consume() returned NULL"); + TIMING_STOP(&t_consume); + + if (msg->err()) + Test::Fail("consume() returned error: " + msg->errstr()); + + if (msg->offset() != offset) + Test::Fail(tostr() << "seek()ed to offset " << offset + << " but consume() returned offset " << msg->offset()); + + return msg; +} + +class MyDeliveryReportCb : public RdKafka::DeliveryReportCb { + public: + void dr_cb(RdKafka::Message &msg) { + if (msg.err()) + Test::Fail("Delivery failed: " + msg.errstr()); + + if (!msg.msg_opaque()) + return; + + RdKafka::MessageTimestamp ts = msg.timestamp(); + if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) + Test::Fail(tostr() << "Dr msg timestamp type wrong: " << ts.type); + + golden_timestamp = ts.timestamp; + golden_offset = msg.offset(); + } +}; + +static void do_test_bsearch(void) { + RdKafka::Conf *conf, *tconf; + int msgcnt = 1000; + int64_t timestamp; + std::string errstr; + RdKafka::ErrorCode err; + MyDeliveryReportCb my_dr; + + topic = Test::mk_topic_name("0059-bsearch", 1); + Test::conf_init(&conf, &tconf, 0); + Test::conf_set(tconf, "acks", "all"); + Test::conf_set(conf, "api.version.request", "true"); + conf->set("dr_cb", &my_dr, errstr); + conf->set("default_topic_conf", tconf, errstr); + + RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); + if (!p) + Test::Fail("Failed to create Producer: " + errstr); + delete conf; + delete tconf; + + timestamp = 1000; + for (int i = 0; i < msgcnt; i++) { + err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, + (void *)topic.c_str(), topic.size(), NULL, 0, timestamp, + i == 357 ? (void *)1 /*golden*/ : NULL); + if (err != RdKafka::ERR_NO_ERROR) + Test::Fail("Produce failed: " + RdKafka::err2str(err)); + timestamp += 100 + (timestamp % 9); + } + + if (p->flush(tmout_multip(5000)) != 0) + Test::Fail("Not all messages flushed"); + + Test::Say(tostr() << "Produced " << msgcnt << " messages, " + << "golden message with timestamp " << golden_timestamp + << " at offset " << golden_offset << "\n"); + + delete p; + + /* + * Now find the golden message using bsearch + */ + + /* Create consumer */ + Test::conf_init(&conf, NULL, 10); + Test::conf_set(conf, "group.id", topic); + Test::conf_set(conf, "api.version.request", "true"); + Test::conf_set(conf, "fetch.wait.max.ms", "1"); + Test::conf_set(conf, "fetch.error.backoff.ms", "1"); + Test::conf_set(conf, "queued.min.messages", "1"); + Test::conf_set(conf, "enable.auto.commit", "false"); + + RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); + if (!c) + Test::Fail("Failed to create KafkaConsumer: " + errstr); + delete conf; + + Test::Say("Find initial middle offset\n"); + int64_t low, high; + test_timing_t t_qr; + TIMING_START(&t_qr, "query_watermark_offsets"); + err = c->query_watermark_offsets(topic, partition, &low, &high, + tmout_multip(5000)); + TIMING_STOP(&t_qr); + if (err) + Test::Fail("query_watermark_offsets failed: " + RdKafka::err2str(err)); + + /* Divide and conquer */ + test_timing_t t_bsearch; + TIMING_START(&t_bsearch, "actual bsearch"); + int itcnt = 0; + do { + int64_t mid; + + mid = low + ((high - low) / 2); + + Test::Say(1, tostr() << "Get message at mid point of " << low << ".." + << high << " -> " << mid << "\n"); + + RdKafka::Message *msg = get_msg(c, mid, + /* use assign() on first iteration, + * then seek() */ + itcnt > 0); + + RdKafka::MessageTimestamp ts = msg->timestamp(); + if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) + Test::Fail(tostr() << "Expected CreateTime timestamp, not " << ts.type + << " at offset " << msg->offset()); + + Test::Say(1, tostr() << "Message at offset " << msg->offset() + << " with timestamp " << ts.timestamp << "\n"); + + if (ts.timestamp == golden_timestamp) { + Test::Say(1, tostr() << "Found golden timestamp " << ts.timestamp + << " at offset " << msg->offset() << " in " + << itcnt + 1 << " iterations\n"); + delete msg; + break; + } + + if (low == high) { + Test::Fail(tostr() << "Search exhausted at offset " << msg->offset() + << " with timestamp " << ts.timestamp + << " without finding golden timestamp " + << golden_timestamp << " at offset " << golden_offset); + + } else if (ts.timestamp < golden_timestamp) + low = msg->offset() + 1; + else if (ts.timestamp > golden_timestamp) + high = msg->offset() - 1; + + delete msg; + itcnt++; + } while (true); + TIMING_STOP(&t_bsearch); + + c->close(); + + delete c; +} + +extern "C" { +int main_0059_bsearch(int argc, char **argv) { + do_test_bsearch(); + return 0; +} +} |