summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/java
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/java
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/java')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore1
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java97
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile12
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java46
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md14
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java162
-rwxr-xr-xsrc/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh11
7 files changed, 343 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore
new file mode 100644
index 000000000..5241a7220
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore
@@ -0,0 +1 @@
+*.class \ No newline at end of file
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java
new file mode 100644
index 000000000..de044ae58
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java
@@ -0,0 +1,97 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.common.KafkaException;
+
+import java.lang.Integer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.time.Duration;
+
+
+public class IncrementalRebalanceCli {
+ public static void main (String[] args) throws Exception {
+ String testName = args[0];
+ String brokerList = args[1];
+ String topic1 = args[2];
+ String topic2 = args[3];
+ String group = args[4];
+
+ if (!testName.equals("test1")) {
+ throw new Exception("Unknown command: " + testName);
+ }
+
+ Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, group);
+ consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "java_incrreb_consumer");
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
+ Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig);
+
+ List<String> topics = new ArrayList<>();
+ topics.add(topic1);
+ topics.add(topic2);
+ consumer.subscribe(topics);
+
+ long startTime = System.currentTimeMillis();
+ long timeout_s = 300;
+
+ try {
+ boolean running = true;
+ while (running) {
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000));
+ if (System.currentTimeMillis() - startTime > 1000 * timeout_s) {
+ // Ensure process exits eventually no matter what happens.
+ System.out.println("IncrementalRebalanceCli timed out");
+ running = false;
+ }
+ if (consumer.assignment().size() == 6) {
+ // librdkafka has unsubscribed from topic #2, exit cleanly.
+ running = false;
+ }
+ }
+ } finally {
+ consumer.close();
+ }
+
+ System.out.println("Java consumer process exiting");
+ }
+}
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile
new file mode 100644
index 000000000..68847075a
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile
@@ -0,0 +1,12 @@
+
+KAFKA_JARS?=$(KAFKA_PATH)/libs
+
+CLASSES=Murmur2Cli.class TransactionProducerCli.class IncrementalRebalanceCli.class
+
+all: $(CLASSES)
+
+%.class: %.java
+ javac -classpath $(KAFKA_JARS)/kafka-clients-*.jar $^
+
+clean:
+ rm -f *.class
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java
new file mode 100644
index 000000000..22444532d
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java
@@ -0,0 +1,46 @@
+
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import org.apache.kafka.common.utils.Utils;
+
+public class Murmur2Cli {
+ public static int toPositive(int number) {
+ return number & 0x7fffffff;
+ }
+ public static void main (String[] args) throws Exception {
+ for (String key : args) {
+ System.out.println(String.format("%s\t0x%08x", key,
+ toPositive(Utils.murmur2(key.getBytes()))));
+ }
+ /* If no args, print hash for empty string */
+ if (args.length == 0)
+ System.out.println(String.format("%s\t0x%08x", "",
+ toPositive(Utils.murmur2("".getBytes()))));
+ }
+}
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md
new file mode 100644
index 000000000..a2754c258
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md
@@ -0,0 +1,14 @@
+# Misc Java tools
+
+## Murmur2 CLI
+
+Build:
+
+ $ KAFKA_JARS=/your/kafka/libs make
+
+Run:
+
+ $ KAFKA_JARS=/your/kafka/libs ./run-class.sh Murmur2Cli "a sentence" and a word
+
+If KAFKA_JARS is not set it will default to $KAFKA_PATH/libs
+
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java
new file mode 100644
index 000000000..f880c1422
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java
@@ -0,0 +1,162 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+
+import java.lang.Integer;
+import java.util.HashMap;
+import java.util.Properties;
+
+
+public class TransactionProducerCli {
+
+ enum TransactionType {
+ None,
+ BeginAbort,
+ BeginCommit,
+ BeginOpen,
+ ContinueAbort,
+ ContinueCommit,
+ ContinueOpen
+ }
+
+ enum FlushType {
+ DoFlush,
+ DontFlush
+ }
+
+ static Producer<byte[], byte[]> createProducer(String testid, String id, String brokerList, boolean transactional) {
+ Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, transactional ? "transactional-producer-" + id : "producer-" + id);
+ producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ if (transactional) {
+ producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-" + testid + "-" + id);
+ }
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "5"); // ensure batching.
+ Producer<byte[], byte[]> producer = new KafkaProducer<>(producerConfig);
+ if (transactional) {
+ producer.initTransactions();
+ }
+ return producer;
+ }
+
+ static void makeTestMessages(
+ Producer<byte[], byte[]> producer,
+ String topic, int partition,
+ int idStart, int count,
+ TransactionType tt,
+ FlushType flush) throws InterruptedException {
+ byte[] payload = { 0x10, 0x20, 0x30, 0x40 };
+ if (tt != TransactionType.None &&
+ tt != TransactionType.ContinueOpen &&
+ tt != TransactionType.ContinueCommit &&
+ tt != TransactionType.ContinueAbort) {
+ producer.beginTransaction();
+ }
+ for (int i = 0; i <count; ++i) {
+ ProducerRecord<byte[], byte[]> r = partition != -1
+ ? new ProducerRecord<byte[],byte[]>(topic, partition, new byte[] { (byte)(i + idStart) }, payload)
+ : new ProducerRecord<byte[], byte[]>(topic, new byte[] { (byte)(i + idStart) }, payload);
+ producer.send(r);
+ }
+ if (flush == FlushType.DoFlush) {
+ producer.flush();
+ }
+ if (tt == TransactionType.BeginAbort || tt == TransactionType.ContinueAbort) {
+ producer.abortTransaction();
+ } else if (tt == TransactionType.BeginCommit || tt == TransactionType.ContinueCommit) {
+ producer.commitTransaction();
+ }
+ }
+
+ static String[] csvSplit(String input) {
+ return input.split("\\s*,\\s*");
+ }
+
+ public static void main (String[] args) throws Exception {
+
+ String bootstrapServers = args[0];
+
+ HashMap<String, Producer<byte[], byte[]>> producers = new HashMap<String, Producer<byte[], byte[]>>();
+
+ String topic = null;
+ String testid = null;
+
+ /* Parse commands */
+ for (int i = 1 ; i < args.length ; i++) {
+ String cmd[] = csvSplit(args[i]);
+
+ System.out.println("TransactionProducerCli.java: command: '" + args[i] + "'");
+
+ if (cmd[0].equals("sleep")) {
+ Thread.sleep(Integer.decode(cmd[1]));
+
+ } else if (cmd[0].equals("exit")) {
+ System.exit(Integer.decode(cmd[1]));
+
+ } else if (cmd[0].equals("topic")) {
+ topic = cmd[1];
+
+ } else if (cmd[0].equals("testid")) {
+ testid = cmd[1];
+
+ } else if (cmd[0].startsWith("producer")) {
+ Producer<byte[], byte[]> producer = producers.get(cmd[0]);
+
+ if (producer == null) {
+ producer = createProducer(testid, cmd[0], bootstrapServers,
+ TransactionType.valueOf(cmd[4]) != TransactionType.None);
+ producers.put(cmd[0], producer);
+ }
+
+ makeTestMessages(producer, /* producer */
+ topic, /* topic */
+ Integer.decode(cmd[1]), /* partition, or -1 for any */
+ Integer.decode(cmd[2]), /* idStart */
+ Integer.decode(cmd[3]), /* msg count */
+ TransactionType.valueOf(cmd[4]), /* TransactionType */
+ FlushType.valueOf(cmd[5])); /* Flush */
+
+ } else {
+ throw new Exception("Unknown command: " + args[i]);
+ }
+ }
+
+ producers.forEach((k,p) -> p.close());
+ }
+}
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh
new file mode 100755
index 000000000..e3e52b1cc
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+#
+
+if [[ -z $KAFKA_PATH ]]; then
+ echo "$0: requires \$KAFKA_PATH to point to the kafka release top directory"
+ exit 1
+fi
+
+JAVA_TESTS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+
+CLASSPATH=$JAVA_TESTS_DIR $KAFKA_PATH/bin/kafka-run-class.sh "$@"