summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0114-sticky_partitioning.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0114-sticky_partitioning.cpp')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0114-sticky_partitioning.cpp176
1 files changed, 176 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0114-sticky_partitioning.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0114-sticky_partitioning.cpp
new file mode 100644
index 000000000..8ef88e7df
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0114-sticky_partitioning.cpp
@@ -0,0 +1,176 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Test sticky.partitioning.linger.ms producer property.
+ *
+ */
+
+#include <iostream>
+#include <fstream>
+#include <iterator>
+#include <string>
+#include "testcpp.h"
+#include "test.h"
+
+/**
+ * @brief Specify sticky.partitioning.linger.ms and check consumed
+ * messages to verify it takes effect.
+ */
+static void do_test_sticky_partitioning(int sticky_delay) {
+ std::string topic = Test::mk_topic_name(__FILE__, 1);
+ Test::create_topic(NULL, topic.c_str(), 3, 1);
+
+ RdKafka::Conf *conf;
+ Test::conf_init(&conf, NULL, 0);
+
+ Test::conf_set(conf, "sticky.partitioning.linger.ms",
+ tostr() << sticky_delay);
+
+ std::string errstr;
+ RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
+ if (!p)
+ Test::Fail("Failed to create Producer: " + errstr);
+
+ RdKafka::Consumer *c = RdKafka::Consumer::create(conf, errstr);
+ if (!c)
+ Test::Fail("Failed to create Consumer: " + errstr);
+ delete conf;
+
+ RdKafka::Topic *t = RdKafka::Topic::create(c, topic, NULL, errstr);
+ if (!t)
+ Test::Fail("Failed to create Topic: " + errstr);
+
+ c->start(t, 0, RdKafka::Topic::OFFSET_BEGINNING);
+ c->start(t, 1, RdKafka::Topic::OFFSET_BEGINNING);
+ c->start(t, 2, RdKafka::Topic::OFFSET_BEGINNING);
+
+ const int msgrate = 100;
+ const int msgsize = 10;
+
+ /* Produce messages */
+ char val[msgsize];
+ memset(val, 'a', msgsize);
+
+ /* produce for for seconds at 100 msgs/sec */
+ for (int s = 0; s < 4; s++) {
+ int64_t end_wait = test_clock() + (1 * 1000000);
+
+ for (int i = 0; i < msgrate; i++) {
+ RdKafka::ErrorCode err = p->produce(topic, RdKafka::Topic::PARTITION_UA,
+ RdKafka::Producer::RK_MSG_COPY, val,
+ msgsize, NULL, 0, -1, NULL);
+ if (err)
+ Test::Fail("Produce failed: " + RdKafka::err2str(err));
+ }
+
+ while (test_clock() < end_wait)
+ p->poll(100);
+ }
+
+ Test::Say(tostr() << "Produced " << 4 * msgrate << " messages\n");
+ p->flush(5 * 1000);
+
+ /* Consume messages */
+ int partition_msgcnt[3] = {0, 0, 0};
+ int num_partitions_active = 0;
+ int i = 0;
+
+ int64_t end_wait = test_clock() + (5 * 1000000);
+ while (test_clock() < end_wait) {
+ RdKafka::Message *msg = c->consume(t, i, 5);
+
+ switch (msg->err()) {
+ case RdKafka::ERR__TIMED_OUT:
+ i++;
+ if (i > 2)
+ i = 0;
+ break;
+
+ case RdKafka::ERR_NO_ERROR:
+ partition_msgcnt[msg->partition()]++;
+ break;
+
+ default:
+ Test::Fail("Consume error: " + msg->errstr());
+ break;
+ }
+
+ delete msg;
+ }
+
+ c->stop(t, 0);
+ c->stop(t, 1);
+ c->stop(t, 2);
+
+ for (int i = 0; i < 3; i++) {
+ /* Partitions must receive 100+ messages to be deemed 'active'. This
+ * is because while topics are being updated, it is possible for some
+ * number of messages to be partitioned to joining partitions before
+ * they become available. This can cause some initial turnover in
+ * selecting a sticky partition. This behavior is acceptable, and is
+ * not important for the purpose of this segment of the test. */
+
+ if (partition_msgcnt[i] > (msgrate - 1))
+ num_partitions_active++;
+ }
+
+ Test::Say("Partition Message Count: \n");
+ for (int i = 0; i < 3; i++) {
+ Test::Say(tostr() << " " << i << ": " << partition_msgcnt[i] << "\n");
+ }
+
+ /* When sticky.partitioning.linger.ms is long (greater than expected
+ * length of run), one partition should be sticky and receive messages. */
+ if (sticky_delay == 5000 && num_partitions_active > 1)
+ Test::Fail(tostr() << "Expected only 1 partition to receive msgs"
+ << " but " << num_partitions_active
+ << " partitions received msgs.");
+
+ /* When sticky.partitioning.linger.ms is short (sufficiently smaller than
+ * length of run), it is extremely likely that all partitions are sticky
+ * at least once and receive messages. */
+ if (sticky_delay == 1000 && num_partitions_active <= 1)
+ Test::Fail(tostr() << "Expected more than one partition to receive msgs"
+ << " but only " << num_partitions_active
+ << " partition received msgs.");
+
+ delete t;
+ delete p;
+ delete c;
+}
+
+extern "C" {
+int main_0114_sticky_partitioning(int argc, char **argv) {
+ /* long delay (5 secs) */
+ do_test_sticky_partitioning(5000);
+ /* short delay (0.001 secs) */
+ do_test_sticky_partitioning(1);
+ return 0;
+}
+}