From b485aab7e71c1625cfc27e0f92c9509f42378458 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 13:19:16 +0200 Subject: Adding upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../lib/librdkafka-2.1.0/tests/java/.gitignore | 1 + .../tests/java/IncrementalRebalanceCli.java | 97 ++++++++++++ .../lib/librdkafka-2.1.0/tests/java/Makefile | 12 ++ .../librdkafka-2.1.0/tests/java/Murmur2Cli.java | 46 ++++++ .../lib/librdkafka-2.1.0/tests/java/README.md | 14 ++ .../tests/java/TransactionProducerCli.java | 162 +++++++++++++++++++++ .../lib/librdkafka-2.1.0/tests/java/run-class.sh | 11 ++ 7 files changed, 343 insertions(+) create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java create mode 100755 src/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/java') diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore new file mode 100644 index 000000000..5241a7220 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore @@ -0,0 +1 @@ +*.class \ No newline at end of file diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java new file mode 100644 index 000000000..de044ae58 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java @@ -0,0 +1,97 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2020, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +import java.io.IOException; +import java.io.PrintWriter; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.common.KafkaException; + +import java.lang.Integer; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.time.Duration; + + +public class IncrementalRebalanceCli { + public static void main (String[] args) throws Exception { + String testName = args[0]; + String brokerList = args[1]; + String topic1 = args[2]; + String topic2 = args[3]; + String group = args[4]; + + if (!testName.equals("test1")) { + throw new Exception("Unknown command: " + testName); + } + + Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, group); + consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "java_incrreb_consumer"); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName()); + Consumer consumer = new KafkaConsumer<>(consumerConfig); + + List topics = new ArrayList<>(); + topics.add(topic1); + topics.add(topic2); + consumer.subscribe(topics); + + long startTime = System.currentTimeMillis(); + long timeout_s = 300; + + try { + boolean running = true; + while (running) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + if (System.currentTimeMillis() - startTime > 1000 * timeout_s) { + // Ensure process exits eventually no matter what happens. + System.out.println("IncrementalRebalanceCli timed out"); + running = false; + } + if (consumer.assignment().size() == 6) { + // librdkafka has unsubscribed from topic #2, exit cleanly. + running = false; + } + } + } finally { + consumer.close(); + } + + System.out.println("Java consumer process exiting"); + } +} diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile new file mode 100644 index 000000000..68847075a --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile @@ -0,0 +1,12 @@ + +KAFKA_JARS?=$(KAFKA_PATH)/libs + +CLASSES=Murmur2Cli.class TransactionProducerCli.class IncrementalRebalanceCli.class + +all: $(CLASSES) + +%.class: %.java + javac -classpath $(KAFKA_JARS)/kafka-clients-*.jar $^ + +clean: + rm -f *.class diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java new file mode 100644 index 000000000..22444532d --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java @@ -0,0 +1,46 @@ + +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2020, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +import org.apache.kafka.common.utils.Utils; + +public class Murmur2Cli { + public static int toPositive(int number) { + return number & 0x7fffffff; + } + public static void main (String[] args) throws Exception { + for (String key : args) { + System.out.println(String.format("%s\t0x%08x", key, + toPositive(Utils.murmur2(key.getBytes())))); + } + /* If no args, print hash for empty string */ + if (args.length == 0) + System.out.println(String.format("%s\t0x%08x", "", + toPositive(Utils.murmur2("".getBytes())))); + } +} diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md new file mode 100644 index 000000000..a2754c258 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md @@ -0,0 +1,14 @@ +# Misc Java tools + +## Murmur2 CLI + +Build: + + $ KAFKA_JARS=/your/kafka/libs make + +Run: + + $ KAFKA_JARS=/your/kafka/libs ./run-class.sh Murmur2Cli "a sentence" and a word + +If KAFKA_JARS is not set it will default to $KAFKA_PATH/libs + diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java new file mode 100644 index 000000000..f880c1422 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java @@ -0,0 +1,162 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2020, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +import java.io.IOException; +import java.io.PrintWriter; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; + +import java.lang.Integer; +import java.util.HashMap; +import java.util.Properties; + + +public class TransactionProducerCli { + + enum TransactionType { + None, + BeginAbort, + BeginCommit, + BeginOpen, + ContinueAbort, + ContinueCommit, + ContinueOpen + } + + enum FlushType { + DoFlush, + DontFlush + } + + static Producer createProducer(String testid, String id, String brokerList, boolean transactional) { + Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, transactional ? "transactional-producer-" + id : "producer-" + id); + producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + if (transactional) { + producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-" + testid + "-" + id); + } + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "5"); // ensure batching. + Producer producer = new KafkaProducer<>(producerConfig); + if (transactional) { + producer.initTransactions(); + } + return producer; + } + + static void makeTestMessages( + Producer producer, + String topic, int partition, + int idStart, int count, + TransactionType tt, + FlushType flush) throws InterruptedException { + byte[] payload = { 0x10, 0x20, 0x30, 0x40 }; + if (tt != TransactionType.None && + tt != TransactionType.ContinueOpen && + tt != TransactionType.ContinueCommit && + tt != TransactionType.ContinueAbort) { + producer.beginTransaction(); + } + for (int i = 0; i r = partition != -1 + ? new ProducerRecord(topic, partition, new byte[] { (byte)(i + idStart) }, payload) + : new ProducerRecord(topic, new byte[] { (byte)(i + idStart) }, payload); + producer.send(r); + } + if (flush == FlushType.DoFlush) { + producer.flush(); + } + if (tt == TransactionType.BeginAbort || tt == TransactionType.ContinueAbort) { + producer.abortTransaction(); + } else if (tt == TransactionType.BeginCommit || tt == TransactionType.ContinueCommit) { + producer.commitTransaction(); + } + } + + static String[] csvSplit(String input) { + return input.split("\\s*,\\s*"); + } + + public static void main (String[] args) throws Exception { + + String bootstrapServers = args[0]; + + HashMap> producers = new HashMap>(); + + String topic = null; + String testid = null; + + /* Parse commands */ + for (int i = 1 ; i < args.length ; i++) { + String cmd[] = csvSplit(args[i]); + + System.out.println("TransactionProducerCli.java: command: '" + args[i] + "'"); + + if (cmd[0].equals("sleep")) { + Thread.sleep(Integer.decode(cmd[1])); + + } else if (cmd[0].equals("exit")) { + System.exit(Integer.decode(cmd[1])); + + } else if (cmd[0].equals("topic")) { + topic = cmd[1]; + + } else if (cmd[0].equals("testid")) { + testid = cmd[1]; + + } else if (cmd[0].startsWith("producer")) { + Producer producer = producers.get(cmd[0]); + + if (producer == null) { + producer = createProducer(testid, cmd[0], bootstrapServers, + TransactionType.valueOf(cmd[4]) != TransactionType.None); + producers.put(cmd[0], producer); + } + + makeTestMessages(producer, /* producer */ + topic, /* topic */ + Integer.decode(cmd[1]), /* partition, or -1 for any */ + Integer.decode(cmd[2]), /* idStart */ + Integer.decode(cmd[3]), /* msg count */ + TransactionType.valueOf(cmd[4]), /* TransactionType */ + FlushType.valueOf(cmd[5])); /* Flush */ + + } else { + throw new Exception("Unknown command: " + args[i]); + } + } + + producers.forEach((k,p) -> p.close()); + } +} diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh new file mode 100755 index 000000000..e3e52b1cc --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh @@ -0,0 +1,11 @@ +#!/bin/bash +# + +if [[ -z $KAFKA_PATH ]]; then + echo "$0: requires \$KAFKA_PATH to point to the kafka release top directory" + exit 1 +fi + +JAVA_TESTS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" + +CLASSPATH=$JAVA_TESTS_DIR $KAFKA_PATH/bin/kafka-run-class.sh "$@" -- cgit v1.2.3