diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp | |
parent | Initial commit. (diff) | |
download | netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp b/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp new file mode 100644 index 00000000..156b8a57 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp @@ -0,0 +1,163 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2016, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <iostream> +#include "testcpp.h" + +/** + * Verify prioritization of non-message ops. + * MO: + * + * - Seed topic with 1000 messages + * - Start consumer with auto offset commit disabled, + * but with commit and stats callbacks registered, + * - Consume one message + * - Commit that message manually + * - Consume one message per second + * - The commit callback should be fired within reasonable time, long before + * - The stats callback should behave the same. + * all messages are consumed. + */ + + + +class MyCbs : public RdKafka::OffsetCommitCb, public RdKafka::EventCb { + public: + int seen_commit; + int seen_stats; + + void offset_commit_cb(RdKafka::ErrorCode err, + std::vector<RdKafka::TopicPartition *> &offsets) { + if (err) + Test::Fail("Offset commit failed: " + RdKafka::err2str(err)); + + seen_commit++; + Test::Say("Got commit callback!\n"); + } + + void event_cb(RdKafka::Event &event) { + switch (event.type()) { + case RdKafka::Event::EVENT_STATS: + Test::Say("Got stats callback!\n"); + seen_stats++; + break; + default: + break; + } + } +}; + + + +static void do_test_commit_cb(void) { + const int msgcnt = test_quick ? 100 : 1000; + std::string errstr; + RdKafka::ErrorCode err; + std::string topic = Test::mk_topic_name("0060-op_prio", 1); + + test_produce_msgs_easy(topic.c_str(), 0, 0, msgcnt); + + /* + * Create consumer + */ + + /* Create consumer */ + RdKafka::Conf *conf; + Test::conf_init(&conf, NULL, 10); + Test::conf_set(conf, "group.id", topic); + Test::conf_set(conf, "socket.timeout.ms", "10000"); + Test::conf_set(conf, "enable.auto.commit", "false"); + Test::conf_set(conf, "enable.partition.eof", "false"); + Test::conf_set(conf, "auto.offset.reset", "earliest"); + Test::conf_set(conf, "statistics.interval.ms", "1000"); + + MyCbs cbs; + cbs.seen_commit = 0; + cbs.seen_stats = 0; + if (conf->set("offset_commit_cb", (RdKafka::OffsetCommitCb *)&cbs, errstr) != + RdKafka::Conf::CONF_OK) + Test::Fail("Failed to set commit callback: " + errstr); + if (conf->set("event_cb", (RdKafka::EventCb *)&cbs, errstr) != + RdKafka::Conf::CONF_OK) + Test::Fail("Failed to set event callback: " + errstr); + + RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); + if (!c) + Test::Fail("Failed to create KafkaConsumer: " + errstr); + delete conf; + + /* Subscribe */ + std::vector<std::string> topics; + topics.push_back(topic); + if ((err = c->subscribe(topics))) + Test::Fail("subscribe failed: " + RdKafka::err2str(err)); + + /* Wait for messages and commit callback. */ + Test::Say("Consuming topic " + topic + "\n"); + int cnt = 0; + while (!cbs.seen_commit || !cbs.seen_stats) { + RdKafka::Message *msg = c->consume(tmout_multip(1000)); + if (!msg->err()) { + cnt++; + Test::Say(tostr() << "Received message #" << cnt << "\n"); + if (cnt > 10) + Test::Fail(tostr() << "Should've seen the " + "offset commit (" + << cbs.seen_commit + << ") and " + "stats callbacks (" + << cbs.seen_stats << ") by now"); + + /* Commit the first message to trigger the offset commit_cb */ + if (cnt == 1) { + err = c->commitAsync(msg); + if (err) + Test::Fail("commitAsync() failed: " + RdKafka::err2str(err)); + rd_sleep(1); /* Sleep to simulate slow processing, making sure + * that the offset commit callback op gets + * inserted on the consume queue in front of + * the messages. */ + } + + } else if (msg->err() == RdKafka::ERR__TIMED_OUT) + ; /* Stil rebalancing? */ + else + Test::Fail("consume() failed: " + msg->errstr()); + delete msg; + } + + c->close(); + delete c; +} + +extern "C" { +int main_0060_op_prio(int argc, char **argv) { + do_test_commit_cb(); + return 0; +} +} |