summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0053-stats_cb.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0053-stats_cb.cpp
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0053-stats_cb.cpp')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0053-stats_cb.cpp535
1 files changed, 535 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0053-stats_cb.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0053-stats_cb.cpp
new file mode 100644
index 000000000..a61755c30
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0053-stats_cb.cpp
@@ -0,0 +1,535 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2018, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <fstream>
+#include <iterator>
+#include <string>
+#include "testcpp.h"
+
+#if WITH_RAPIDJSON
+#include <rapidjson/document.h>
+#include <rapidjson/schema.h>
+#include <rapidjson/filereadstream.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/prettywriter.h>
+#endif
+
+static const char *stats_schema_path = "../src/statistics_schema.json";
+
+#if WITH_RAPIDJSON
+/**
+ * @brief Statistics schema validator
+ */
+class TestSchemaValidator {
+ public:
+ TestSchemaValidator() {
+ }
+ TestSchemaValidator(const std::string schema_path) {
+ /* Read schema from file */
+ schema_path_ = schema_path;
+
+ std::ifstream f(schema_path.c_str());
+ if (!f.is_open())
+ Test::Fail(tostr() << "Failed to open schema " << schema_path << ": "
+ << strerror(errno));
+ std::string schema_str((std::istreambuf_iterator<char>(f)),
+ (std::istreambuf_iterator<char>()));
+
+ /* Parse schema */
+ sd_ = new rapidjson::Document();
+ if (sd_->Parse(schema_str.c_str()).HasParseError())
+ Test::Fail(tostr() << "Failed to parse statistics schema: "
+ << rapidjson::GetParseError_En(sd_->GetParseError())
+ << " at " << sd_->GetErrorOffset());
+
+ schema_ = new rapidjson::SchemaDocument(*sd_);
+ validator_ = new rapidjson::SchemaValidator(*schema_);
+ }
+
+ ~TestSchemaValidator() {
+ if (sd_)
+ delete sd_;
+ if (schema_)
+ delete schema_;
+ if (validator_)
+ delete validator_;
+ }
+
+ void validate(const std::string &json_doc) {
+ /* Parse JSON to validate */
+ rapidjson::Document d;
+ if (d.Parse(json_doc.c_str()).HasParseError())
+ Test::Fail(tostr() << "Failed to parse stats JSON: "
+ << rapidjson::GetParseError_En(d.GetParseError())
+ << " at " << d.GetErrorOffset());
+
+ /* Validate using schema */
+ if (!d.Accept(*validator_)) {
+ rapidjson::StringBuffer sb;
+
+ validator_->GetInvalidSchemaPointer().StringifyUriFragment(sb);
+ Test::Say(tostr() << "Schema: " << sb.GetString() << "\n");
+ Test::Say(tostr() << "Invalid keyword: "
+ << validator_->GetInvalidSchemaKeyword() << "\n");
+ sb.Clear();
+
+ validator_->GetInvalidDocumentPointer().StringifyUriFragment(sb);
+ Test::Say(tostr() << "Invalid document: " << sb.GetString() << "\n");
+ sb.Clear();
+
+ Test::Fail(tostr() << "JSON validation using schema " << schema_path_
+ << " failed");
+ }
+
+ Test::Say(3, "JSON document validated using schema " + schema_path_ + "\n");
+ }
+
+ private:
+ std::string schema_path_;
+ rapidjson::Document *sd_;
+ rapidjson::SchemaDocument *schema_;
+ rapidjson::SchemaValidator *validator_;
+};
+
+
+#else
+
+/* Dummy validator doing nothing when RapidJSON is unavailable */
+class TestSchemaValidator {
+ public:
+ TestSchemaValidator() {
+ }
+ TestSchemaValidator(const std::string schema_path) {
+ }
+
+ ~TestSchemaValidator() {
+ }
+
+ void validate(const std::string &json_doc) {
+ }
+};
+
+#endif
+
+class myEventCb : public RdKafka::EventCb {
+ public:
+ myEventCb(const std::string schema_path) :
+ validator_(TestSchemaValidator(schema_path)) {
+ stats_cnt = 0;
+ }
+
+ int stats_cnt;
+ std::string last; /**< Last stats document */
+
+ void event_cb(RdKafka::Event &event) {
+ switch (event.type()) {
+ case RdKafka::Event::EVENT_STATS:
+ if (!(stats_cnt % 10))
+ Test::Say(tostr() << "Stats (#" << stats_cnt << "): " << event.str()
+ << "\n");
+ if (event.str().length() > 20)
+ stats_cnt += 1;
+ validator_.validate(event.str());
+ last = event.str();
+ break;
+ default:
+ break;
+ }
+ }
+
+ private:
+ TestSchemaValidator validator_;
+};
+
+
+/**
+ * @brief Verify that stats are emitted according to statistics.interval.ms
+ */
+void test_stats_timing() {
+ RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+ myEventCb my_event = myEventCb(stats_schema_path);
+ std::string errstr;
+
+ if (conf->set("statistics.interval.ms", "100", errstr) !=
+ RdKafka::Conf::CONF_OK)
+ Test::Fail(errstr);
+
+ if (conf->set("event_cb", &my_event, errstr) != RdKafka::Conf::CONF_OK)
+ Test::Fail(errstr);
+
+ RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
+ if (!p)
+ Test::Fail("Failed to create Producer: " + errstr);
+ delete conf;
+
+ int64_t t_start = test_clock();
+
+ while (my_event.stats_cnt < 12)
+ p->poll(1000);
+
+ int elapsed = (int)((test_clock() - t_start) / 1000);
+ const int expected_time = 1200;
+
+ Test::Say(tostr() << my_event.stats_cnt
+ << " (expected 12) stats callbacks received in " << elapsed
+ << "ms (expected " << expected_time << "ms +-25%)\n");
+
+ if (elapsed < expected_time * 0.75 || elapsed > expected_time * 1.25) {
+ /* We can't rely on CIs giving our test job enough CPU to finish
+ * in time, so don't error out even if the time is outside the window */
+ if (test_on_ci)
+ Test::Say(tostr() << "WARNING: Elapsed time " << elapsed
+ << "ms outside +-25% window (" << expected_time
+ << "ms), cnt " << my_event.stats_cnt);
+ else
+ Test::Fail(tostr() << "Elapsed time " << elapsed
+ << "ms outside +-25% window (" << expected_time
+ << "ms), cnt " << my_event.stats_cnt);
+ }
+ delete p;
+}
+
+
+
+#if WITH_RAPIDJSON
+
+/**
+ * @brief Expected partition stats
+ */
+struct exp_part_stats {
+ std::string topic; /**< Topic */
+ int32_t part; /**< Partition id */
+ int msgcnt; /**< Expected message count */
+ int msgsize; /**< Expected per message size.
+ * This includes both key and value lengths */
+
+ /* Calculated */
+ int64_t totsize; /**< Message size sum */
+};
+
+/**
+ * @brief Verify end-to-end producer and consumer stats.
+ */
+static void verify_e2e_stats(const std::string &prod_stats,
+ const std::string &cons_stats,
+ struct exp_part_stats *exp_parts,
+ int partcnt) {
+ /**
+ * Parse JSON stats
+ * These documents are already validated in the Event callback.
+ */
+ rapidjson::Document p;
+ if (p.Parse<rapidjson::kParseValidateEncodingFlag>(prod_stats.c_str())
+ .HasParseError())
+ Test::Fail(tostr() << "Failed to parse producer stats JSON: "
+ << rapidjson::GetParseError_En(p.GetParseError())
+ << " at " << p.GetErrorOffset());
+
+ rapidjson::Document c;
+ if (c.Parse<rapidjson::kParseValidateEncodingFlag>(cons_stats.c_str())
+ .HasParseError())
+ Test::Fail(tostr() << "Failed to parse consumer stats JSON: "
+ << rapidjson::GetParseError_En(c.GetParseError())
+ << " at " << c.GetErrorOffset());
+
+ assert(p.HasMember("name"));
+ assert(c.HasMember("name"));
+ assert(p.HasMember("type"));
+ assert(c.HasMember("type"));
+
+ Test::Say(tostr() << "Verifying stats from Producer " << p["name"].GetString()
+ << " and Consumer " << c["name"].GetString() << "\n");
+
+ assert(!strcmp(p["type"].GetString(), "producer"));
+ assert(!strcmp(c["type"].GetString(), "consumer"));
+
+ int64_t exp_tot_txmsgs = 0;
+ int64_t exp_tot_txmsg_bytes = 0;
+ int64_t exp_tot_rxmsgs = 0;
+ int64_t exp_tot_rxmsg_bytes = 0;
+
+ for (int part = 0; part < partcnt; part++) {
+ /*
+ * Find partition stats.
+ */
+
+ /* Construct the partition path. */
+ char path[256];
+ rd_snprintf(path, sizeof(path), "/topics/%s/partitions/%d",
+ exp_parts[part].topic.c_str(), exp_parts[part].part);
+ Test::Say(tostr() << "Looking up partition " << exp_parts[part].part
+ << " with path " << path << "\n");
+
+ /* Even though GetValueByPointer() takes a "char[]" it can only be used
+ * with perfectly sized char buffers or string literals since it
+ * does not respect NUL terminators.
+ * So instead convert the path to a Pointer.*/
+ rapidjson::Pointer jpath((const char *)path);
+
+ rapidjson::Value *pp = rapidjson::GetValueByPointer(p, jpath);
+ if (!pp)
+ Test::Fail(tostr() << "Producer: could not find " << path << " in "
+ << prod_stats << "\n");
+
+ rapidjson::Value *cp = rapidjson::GetValueByPointer(c, jpath);
+ if (!pp)
+ Test::Fail(tostr() << "Consumer: could not find " << path << " in "
+ << cons_stats << "\n");
+
+ assert(pp->HasMember("partition"));
+ assert(pp->HasMember("txmsgs"));
+ assert(pp->HasMember("txbytes"));
+
+ assert(cp->HasMember("partition"));
+ assert(cp->HasMember("rxmsgs"));
+ assert(cp->HasMember("rxbytes"));
+
+ Test::Say(tostr() << "partition: " << (*pp)["partition"].GetInt() << "\n");
+
+ int64_t txmsgs = (*pp)["txmsgs"].GetInt();
+ int64_t txbytes = (*pp)["txbytes"].GetInt();
+ int64_t rxmsgs = (*cp)["rxmsgs"].GetInt();
+ int64_t rxbytes = (*cp)["rxbytes"].GetInt();
+
+ exp_tot_txmsgs += txmsgs;
+ exp_tot_txmsg_bytes += txbytes;
+ exp_tot_rxmsgs += rxmsgs;
+ exp_tot_rxmsg_bytes += rxbytes;
+
+ Test::Say(tostr() << "Producer partition: " << (*pp)["partition"].GetInt()
+ << ": "
+ << "txmsgs: " << txmsgs << " vs "
+ << exp_parts[part].msgcnt << ", "
+ << "txbytes: " << txbytes << " vs "
+ << exp_parts[part].totsize << "\n");
+ Test::Say(tostr() << "Consumer partition: " << (*cp)["partition"].GetInt()
+ << ": "
+ << "rxmsgs: " << rxmsgs << " vs "
+ << exp_parts[part].msgcnt << ", "
+ << "rxbytes: " << rxbytes << " vs "
+ << exp_parts[part].totsize << "\n");
+ }
+
+ /* Check top-level total stats */
+
+ assert(p.HasMember("txmsgs"));
+ assert(p.HasMember("txmsg_bytes"));
+ assert(p.HasMember("rxmsgs"));
+ assert(p.HasMember("rxmsg_bytes"));
+
+ int64_t tot_txmsgs = p["txmsgs"].GetInt();
+ int64_t tot_txmsg_bytes = p["txmsg_bytes"].GetInt();
+ int64_t tot_rxmsgs = c["rxmsgs"].GetInt();
+ int64_t tot_rxmsg_bytes = c["rxmsg_bytes"].GetInt();
+
+ Test::Say(tostr() << "Producer total: "
+ << "txmsgs: " << tot_txmsgs << " vs " << exp_tot_txmsgs
+ << ", "
+ << "txbytes: " << tot_txmsg_bytes << " vs "
+ << exp_tot_txmsg_bytes << "\n");
+ Test::Say(tostr() << "Consumer total: "
+ << "rxmsgs: " << tot_rxmsgs << " vs " << exp_tot_rxmsgs
+ << ", "
+ << "rxbytes: " << tot_rxmsg_bytes << " vs "
+ << exp_tot_rxmsg_bytes << "\n");
+}
+
+/**
+ * @brief Verify stats JSON structure and individual metric fields.
+ *
+ * To capture as much verifiable data as possible we run a full
+ * producer - consumer end to end test and verify that counters
+ * and states are emitted accordingly.
+ *
+ * Requires RapidJSON (for parsing the stats).
+ */
+static void test_stats() {
+ std::string errstr;
+ RdKafka::Conf *conf;
+ myEventCb producer_event(stats_schema_path);
+ myEventCb consumer_event(stats_schema_path);
+
+ std::string topic = Test::mk_topic_name("0053_stats", 1);
+
+ const int partcnt = 2;
+ int msgcnt = (test_quick ? 10 : 100) * partcnt;
+ const int msgsize = 6 * 1024;
+
+ /*
+ * Common config for producer and consumer
+ */
+ Test::conf_init(&conf, NULL, 60);
+ if (conf->set("statistics.interval.ms", "1000", errstr) !=
+ RdKafka::Conf::CONF_OK)
+ Test::Fail(errstr);
+
+
+ /*
+ * Create Producer
+ */
+ if (conf->set("event_cb", &producer_event, errstr) != RdKafka::Conf::CONF_OK)
+ Test::Fail(errstr);
+
+ RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
+ if (!p)
+ Test::Fail("Failed to create Producer: " + errstr);
+
+
+ /*
+ * Create Consumer
+ */
+ conf->set("group.id", topic, errstr);
+ conf->set("auto.offset.reset", "earliest", errstr);
+ conf->set("enable.partition.eof", "false", errstr);
+ if (conf->set("event_cb", &consumer_event, errstr) != RdKafka::Conf::CONF_OK)
+ Test::Fail(errstr);
+
+ RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
+ if (!c)
+ Test::Fail("Failed to create KafkaConsumer: " + errstr);
+ delete conf;
+
+ /*
+ * Set up consumer assignment (but assign after producing
+ * since there will be no topics now) and expected partitions
+ * for later verification.
+ */
+ std::vector<RdKafka::TopicPartition *> toppars;
+ struct exp_part_stats exp_parts[partcnt] = {};
+
+ for (int32_t part = 0; part < (int32_t)partcnt; part++) {
+ toppars.push_back(RdKafka::TopicPartition::create(
+ topic, part, RdKafka::Topic::OFFSET_BEGINNING));
+ exp_parts[part].topic = topic;
+ exp_parts[part].part = part;
+ exp_parts[part].msgcnt = msgcnt / partcnt;
+ exp_parts[part].msgsize = msgsize;
+ exp_parts[part].totsize = 0;
+ }
+
+ /*
+ * Produce messages
+ */
+ uint64_t testid = test_id_generate();
+
+ char key[256];
+ char *buf = (char *)malloc(msgsize);
+
+ for (int32_t part = 0; part < (int32_t)partcnt; part++) {
+ for (int i = 0; i < msgcnt / partcnt; i++) {
+ test_prepare_msg(testid, part, i, buf, msgsize, key, sizeof(key));
+ RdKafka::ErrorCode err =
+ p->produce(topic, part, RdKafka::Producer::RK_MSG_COPY, buf, msgsize,
+ key, sizeof(key), -1, NULL);
+ if (err)
+ Test::Fail("Produce failed: " + RdKafka::err2str(err));
+ exp_parts[part].totsize += msgsize + sizeof(key);
+ p->poll(0);
+ }
+ }
+
+ free(buf);
+
+ Test::Say("Waiting for final message delivery\n");
+ /* Wait for delivery */
+ p->flush(15 * 1000);
+
+ /*
+ * Start consuming partitions
+ */
+ c->assign(toppars);
+ RdKafka::TopicPartition::destroy(toppars);
+
+ /*
+ * Consume the messages
+ */
+ int recvcnt = 0;
+ Test::Say(tostr() << "Consuming " << msgcnt << " messages\n");
+ while (recvcnt < msgcnt) {
+ RdKafka::Message *msg = c->consume(-1);
+ if (msg->err())
+ Test::Fail("Consume failed: " + msg->errstr());
+
+ int msgid;
+ TestMessageVerify(testid, -1, &msgid, msg);
+ recvcnt++;
+ delete msg;
+ }
+
+ /*
+ * Producer:
+ * Wait for one last stats emit when all messages have been delivered.
+ */
+ int prev_cnt = producer_event.stats_cnt;
+ while (prev_cnt == producer_event.stats_cnt) {
+ Test::Say("Waiting for final producer stats event\n");
+ p->poll(100);
+ }
+
+ /*
+ * Consumer:
+ * Wait for a one last stats emit when all messages have been received,
+ * since previous stats may have been enqueued but not served we
+ * skip the first 2.
+ */
+ prev_cnt = consumer_event.stats_cnt;
+ while (prev_cnt + 2 >= consumer_event.stats_cnt) {
+ Test::Say(tostr() << "Waiting for final consumer stats event: "
+ << consumer_event.stats_cnt << "\n");
+ c->poll(100);
+ }
+
+
+ verify_e2e_stats(producer_event.last, consumer_event.last, exp_parts,
+ partcnt);
+
+
+ c->close();
+
+ delete p;
+ delete c;
+}
+#endif
+
+extern "C" {
+int main_0053_stats_timing(int argc, char **argv) {
+ test_stats_timing();
+ return 0;
+}
+
+int main_0053_stats(int argc, char **argv) {
+#if WITH_RAPIDJSON
+ test_stats();
+#else
+ Test::Skip("RapidJSON >=1.1.0 not available\n");
+#endif
+ return 0;
+}
+}