summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp163
1 files changed, 163 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp
new file mode 100644
index 000000000..156b8a57a
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0060-op_prio.cpp
@@ -0,0 +1,163 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2016, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include "testcpp.h"
+
+/**
+ * Verify prioritization of non-message ops.
+ * MO:
+ *
+ * - Seed topic with 1000 messages
+ * - Start consumer with auto offset commit disabled,
+ * but with commit and stats callbacks registered,
+ * - Consume one message
+ * - Commit that message manually
+ * - Consume one message per second
+ * - The commit callback should be fired within reasonable time, long before
+ * - The stats callback should behave the same.
+ * all messages are consumed.
+ */
+
+
+
+class MyCbs : public RdKafka::OffsetCommitCb, public RdKafka::EventCb {
+ public:
+ int seen_commit;
+ int seen_stats;
+
+ void offset_commit_cb(RdKafka::ErrorCode err,
+ std::vector<RdKafka::TopicPartition *> &offsets) {
+ if (err)
+ Test::Fail("Offset commit failed: " + RdKafka::err2str(err));
+
+ seen_commit++;
+ Test::Say("Got commit callback!\n");
+ }
+
+ void event_cb(RdKafka::Event &event) {
+ switch (event.type()) {
+ case RdKafka::Event::EVENT_STATS:
+ Test::Say("Got stats callback!\n");
+ seen_stats++;
+ break;
+ default:
+ break;
+ }
+ }
+};
+
+
+
+static void do_test_commit_cb(void) {
+ const int msgcnt = test_quick ? 100 : 1000;
+ std::string errstr;
+ RdKafka::ErrorCode err;
+ std::string topic = Test::mk_topic_name("0060-op_prio", 1);
+
+ test_produce_msgs_easy(topic.c_str(), 0, 0, msgcnt);
+
+ /*
+ * Create consumer
+ */
+
+ /* Create consumer */
+ RdKafka::Conf *conf;
+ Test::conf_init(&conf, NULL, 10);
+ Test::conf_set(conf, "group.id", topic);
+ Test::conf_set(conf, "socket.timeout.ms", "10000");
+ Test::conf_set(conf, "enable.auto.commit", "false");
+ Test::conf_set(conf, "enable.partition.eof", "false");
+ Test::conf_set(conf, "auto.offset.reset", "earliest");
+ Test::conf_set(conf, "statistics.interval.ms", "1000");
+
+ MyCbs cbs;
+ cbs.seen_commit = 0;
+ cbs.seen_stats = 0;
+ if (conf->set("offset_commit_cb", (RdKafka::OffsetCommitCb *)&cbs, errstr) !=
+ RdKafka::Conf::CONF_OK)
+ Test::Fail("Failed to set commit callback: " + errstr);
+ if (conf->set("event_cb", (RdKafka::EventCb *)&cbs, errstr) !=
+ RdKafka::Conf::CONF_OK)
+ Test::Fail("Failed to set event callback: " + errstr);
+
+ RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
+ if (!c)
+ Test::Fail("Failed to create KafkaConsumer: " + errstr);
+ delete conf;
+
+ /* Subscribe */
+ std::vector<std::string> topics;
+ topics.push_back(topic);
+ if ((err = c->subscribe(topics)))
+ Test::Fail("subscribe failed: " + RdKafka::err2str(err));
+
+ /* Wait for messages and commit callback. */
+ Test::Say("Consuming topic " + topic + "\n");
+ int cnt = 0;
+ while (!cbs.seen_commit || !cbs.seen_stats) {
+ RdKafka::Message *msg = c->consume(tmout_multip(1000));
+ if (!msg->err()) {
+ cnt++;
+ Test::Say(tostr() << "Received message #" << cnt << "\n");
+ if (cnt > 10)
+ Test::Fail(tostr() << "Should've seen the "
+ "offset commit ("
+ << cbs.seen_commit
+ << ") and "
+ "stats callbacks ("
+ << cbs.seen_stats << ") by now");
+
+ /* Commit the first message to trigger the offset commit_cb */
+ if (cnt == 1) {
+ err = c->commitAsync(msg);
+ if (err)
+ Test::Fail("commitAsync() failed: " + RdKafka::err2str(err));
+ rd_sleep(1); /* Sleep to simulate slow processing, making sure
+ * that the offset commit callback op gets
+ * inserted on the consume queue in front of
+ * the messages. */
+ }
+
+ } else if (msg->err() == RdKafka::ERR__TIMED_OUT)
+ ; /* Stil rebalancing? */
+ else
+ Test::Fail("consume() failed: " + msg->errstr());
+ delete msg;
+ }
+
+ c->close();
+ delete c;
+}
+
+extern "C" {
+int main_0060_op_prio(int argc, char **argv) {
+ do_test_commit_cb();
+ return 0;
+}
+}