diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/java')
7 files changed, 0 insertions, 343 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore b/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore deleted file mode 100644 index 5241a722..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore +++ /dev/null @@ -1 +0,0 @@ -*.class
\ No newline at end of file diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java b/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java deleted file mode 100644 index de044ae5..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2020, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -import java.io.IOException; -import java.io.PrintWriter; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; -import org.apache.kafka.common.KafkaException; - -import java.lang.Integer; -import java.util.HashMap; -import java.util.List; -import java.util.ArrayList; -import java.util.Properties; -import java.time.Duration; - - -public class IncrementalRebalanceCli { - public static void main (String[] args) throws Exception { - String testName = args[0]; - String brokerList = args[1]; - String topic1 = args[2]; - String topic2 = args[3]; - String group = args[4]; - - if (!testName.equals("test1")) { - throw new Exception("Unknown command: " + testName); - } - - Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, group); - consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "java_incrreb_consumer"); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName()); - Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig); - - List<String> topics = new ArrayList<>(); - topics.add(topic1); - topics.add(topic2); - consumer.subscribe(topics); - - long startTime = System.currentTimeMillis(); - long timeout_s = 300; - - try { - boolean running = true; - while (running) { - ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000)); - if (System.currentTimeMillis() - startTime > 1000 * timeout_s) { - // Ensure process exits eventually no matter what happens. - System.out.println("IncrementalRebalanceCli timed out"); - running = false; - } - if (consumer.assignment().size() == 6) { - // librdkafka has unsubscribed from topic #2, exit cleanly. - running = false; - } - } - } finally { - consumer.close(); - } - - System.out.println("Java consumer process exiting"); - } -} diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile b/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile deleted file mode 100644 index 68847075..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile +++ /dev/null @@ -1,12 +0,0 @@ - -KAFKA_JARS?=$(KAFKA_PATH)/libs - -CLASSES=Murmur2Cli.class TransactionProducerCli.class IncrementalRebalanceCli.class - -all: $(CLASSES) - -%.class: %.java - javac -classpath $(KAFKA_JARS)/kafka-clients-*.jar $^ - -clean: - rm -f *.class diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java b/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java deleted file mode 100644 index 22444532..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java +++ /dev/null @@ -1,46 +0,0 @@ - -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2020, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -import org.apache.kafka.common.utils.Utils; - -public class Murmur2Cli { - public static int toPositive(int number) { - return number & 0x7fffffff; - } - public static void main (String[] args) throws Exception { - for (String key : args) { - System.out.println(String.format("%s\t0x%08x", key, - toPositive(Utils.murmur2(key.getBytes())))); - } - /* If no args, print hash for empty string */ - if (args.length == 0) - System.out.println(String.format("%s\t0x%08x", "", - toPositive(Utils.murmur2("".getBytes())))); - } -} diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md b/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md deleted file mode 100644 index a2754c25..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md +++ /dev/null @@ -1,14 +0,0 @@ -# Misc Java tools - -## Murmur2 CLI - -Build: - - $ KAFKA_JARS=/your/kafka/libs make - -Run: - - $ KAFKA_JARS=/your/kafka/libs ./run-class.sh Murmur2Cli "a sentence" and a word - -If KAFKA_JARS is not set it will default to $KAFKA_PATH/libs - diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java b/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java deleted file mode 100644 index f880c142..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2020, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -import java.io.IOException; -import java.io.PrintWriter; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.KafkaException; - -import java.lang.Integer; -import java.util.HashMap; -import java.util.Properties; - - -public class TransactionProducerCli { - - enum TransactionType { - None, - BeginAbort, - BeginCommit, - BeginOpen, - ContinueAbort, - ContinueCommit, - ContinueOpen - } - - enum FlushType { - DoFlush, - DontFlush - } - - static Producer<byte[], byte[]> createProducer(String testid, String id, String brokerList, boolean transactional) { - Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); - producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, transactional ? "transactional-producer-" + id : "producer-" + id); - producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - if (transactional) { - producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-" + testid + "-" + id); - } - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "5"); // ensure batching. - Producer<byte[], byte[]> producer = new KafkaProducer<>(producerConfig); - if (transactional) { - producer.initTransactions(); - } - return producer; - } - - static void makeTestMessages( - Producer<byte[], byte[]> producer, - String topic, int partition, - int idStart, int count, - TransactionType tt, - FlushType flush) throws InterruptedException { - byte[] payload = { 0x10, 0x20, 0x30, 0x40 }; - if (tt != TransactionType.None && - tt != TransactionType.ContinueOpen && - tt != TransactionType.ContinueCommit && - tt != TransactionType.ContinueAbort) { - producer.beginTransaction(); - } - for (int i = 0; i <count; ++i) { - ProducerRecord<byte[], byte[]> r = partition != -1 - ? new ProducerRecord<byte[],byte[]>(topic, partition, new byte[] { (byte)(i + idStart) }, payload) - : new ProducerRecord<byte[], byte[]>(topic, new byte[] { (byte)(i + idStart) }, payload); - producer.send(r); - } - if (flush == FlushType.DoFlush) { - producer.flush(); - } - if (tt == TransactionType.BeginAbort || tt == TransactionType.ContinueAbort) { - producer.abortTransaction(); - } else if (tt == TransactionType.BeginCommit || tt == TransactionType.ContinueCommit) { - producer.commitTransaction(); - } - } - - static String[] csvSplit(String input) { - return input.split("\\s*,\\s*"); - } - - public static void main (String[] args) throws Exception { - - String bootstrapServers = args[0]; - - HashMap<String, Producer<byte[], byte[]>> producers = new HashMap<String, Producer<byte[], byte[]>>(); - - String topic = null; - String testid = null; - - /* Parse commands */ - for (int i = 1 ; i < args.length ; i++) { - String cmd[] = csvSplit(args[i]); - - System.out.println("TransactionProducerCli.java: command: '" + args[i] + "'"); - - if (cmd[0].equals("sleep")) { - Thread.sleep(Integer.decode(cmd[1])); - - } else if (cmd[0].equals("exit")) { - System.exit(Integer.decode(cmd[1])); - - } else if (cmd[0].equals("topic")) { - topic = cmd[1]; - - } else if (cmd[0].equals("testid")) { - testid = cmd[1]; - - } else if (cmd[0].startsWith("producer")) { - Producer<byte[], byte[]> producer = producers.get(cmd[0]); - - if (producer == null) { - producer = createProducer(testid, cmd[0], bootstrapServers, - TransactionType.valueOf(cmd[4]) != TransactionType.None); - producers.put(cmd[0], producer); - } - - makeTestMessages(producer, /* producer */ - topic, /* topic */ - Integer.decode(cmd[1]), /* partition, or -1 for any */ - Integer.decode(cmd[2]), /* idStart */ - Integer.decode(cmd[3]), /* msg count */ - TransactionType.valueOf(cmd[4]), /* TransactionType */ - FlushType.valueOf(cmd[5])); /* Flush */ - - } else { - throw new Exception("Unknown command: " + args[i]); - } - } - - producers.forEach((k,p) -> p.close()); - } -} diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh b/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh deleted file mode 100755 index e3e52b1c..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash -# - -if [[ -z $KAFKA_PATH ]]; then - echo "$0: requires \$KAFKA_PATH to point to the kafka release top directory" - exit 1 -fi - -JAVA_TESTS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" - -CLASSPATH=$JAVA_TESTS_DIR $KAFKA_PATH/bin/kafka-run-class.sh "$@" |