summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/java
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/java')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore1
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java97
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile12
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java46
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md14
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java162
-rwxr-xr-xfluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh11
7 files changed, 0 insertions, 343 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore b/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore
deleted file mode 100644
index 5241a722..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-*.class \ No newline at end of file
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java b/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java
deleted file mode 100644
index de044ae5..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/IncrementalRebalanceCli.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2020, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-import java.io.IOException;
-import java.io.PrintWriter;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
-import org.apache.kafka.common.KafkaException;
-
-import java.lang.Integer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Properties;
-import java.time.Duration;
-
-
-public class IncrementalRebalanceCli {
- public static void main (String[] args) throws Exception {
- String testName = args[0];
- String brokerList = args[1];
- String topic1 = args[2];
- String topic2 = args[3];
- String group = args[4];
-
- if (!testName.equals("test1")) {
- throw new Exception("Unknown command: " + testName);
- }
-
- Properties consumerConfig = new Properties();
- consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, group);
- consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "java_incrreb_consumer");
- consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
- Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig);
-
- List<String> topics = new ArrayList<>();
- topics.add(topic1);
- topics.add(topic2);
- consumer.subscribe(topics);
-
- long startTime = System.currentTimeMillis();
- long timeout_s = 300;
-
- try {
- boolean running = true;
- while (running) {
- ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000));
- if (System.currentTimeMillis() - startTime > 1000 * timeout_s) {
- // Ensure process exits eventually no matter what happens.
- System.out.println("IncrementalRebalanceCli timed out");
- running = false;
- }
- if (consumer.assignment().size() == 6) {
- // librdkafka has unsubscribed from topic #2, exit cleanly.
- running = false;
- }
- }
- } finally {
- consumer.close();
- }
-
- System.out.println("Java consumer process exiting");
- }
-}
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile b/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile
deleted file mode 100644
index 68847075..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/Makefile
+++ /dev/null
@@ -1,12 +0,0 @@
-
-KAFKA_JARS?=$(KAFKA_PATH)/libs
-
-CLASSES=Murmur2Cli.class TransactionProducerCli.class IncrementalRebalanceCli.class
-
-all: $(CLASSES)
-
-%.class: %.java
- javac -classpath $(KAFKA_JARS)/kafka-clients-*.jar $^
-
-clean:
- rm -f *.class
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java b/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java
deleted file mode 100644
index 22444532..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/Murmur2Cli.java
+++ /dev/null
@@ -1,46 +0,0 @@
-
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2020, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-import org.apache.kafka.common.utils.Utils;
-
-public class Murmur2Cli {
- public static int toPositive(int number) {
- return number & 0x7fffffff;
- }
- public static void main (String[] args) throws Exception {
- for (String key : args) {
- System.out.println(String.format("%s\t0x%08x", key,
- toPositive(Utils.murmur2(key.getBytes()))));
- }
- /* If no args, print hash for empty string */
- if (args.length == 0)
- System.out.println(String.format("%s\t0x%08x", "",
- toPositive(Utils.murmur2("".getBytes()))));
- }
-}
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md b/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md
deleted file mode 100644
index a2754c25..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/README.md
+++ /dev/null
@@ -1,14 +0,0 @@
-# Misc Java tools
-
-## Murmur2 CLI
-
-Build:
-
- $ KAFKA_JARS=/your/kafka/libs make
-
-Run:
-
- $ KAFKA_JARS=/your/kafka/libs ./run-class.sh Murmur2Cli "a sentence" and a word
-
-If KAFKA_JARS is not set it will default to $KAFKA_PATH/libs
-
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java b/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java
deleted file mode 100644
index f880c142..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/TransactionProducerCli.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2020, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-import java.io.IOException;
-import java.io.PrintWriter;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.KafkaException;
-
-import java.lang.Integer;
-import java.util.HashMap;
-import java.util.Properties;
-
-
-public class TransactionProducerCli {
-
- enum TransactionType {
- None,
- BeginAbort,
- BeginCommit,
- BeginOpen,
- ContinueAbort,
- ContinueCommit,
- ContinueOpen
- }
-
- enum FlushType {
- DoFlush,
- DontFlush
- }
-
- static Producer<byte[], byte[]> createProducer(String testid, String id, String brokerList, boolean transactional) {
- Properties producerConfig = new Properties();
- producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, transactional ? "transactional-producer-" + id : "producer-" + id);
- producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
- if (transactional) {
- producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-" + testid + "-" + id);
- }
- producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "5"); // ensure batching.
- Producer<byte[], byte[]> producer = new KafkaProducer<>(producerConfig);
- if (transactional) {
- producer.initTransactions();
- }
- return producer;
- }
-
- static void makeTestMessages(
- Producer<byte[], byte[]> producer,
- String topic, int partition,
- int idStart, int count,
- TransactionType tt,
- FlushType flush) throws InterruptedException {
- byte[] payload = { 0x10, 0x20, 0x30, 0x40 };
- if (tt != TransactionType.None &&
- tt != TransactionType.ContinueOpen &&
- tt != TransactionType.ContinueCommit &&
- tt != TransactionType.ContinueAbort) {
- producer.beginTransaction();
- }
- for (int i = 0; i <count; ++i) {
- ProducerRecord<byte[], byte[]> r = partition != -1
- ? new ProducerRecord<byte[],byte[]>(topic, partition, new byte[] { (byte)(i + idStart) }, payload)
- : new ProducerRecord<byte[], byte[]>(topic, new byte[] { (byte)(i + idStart) }, payload);
- producer.send(r);
- }
- if (flush == FlushType.DoFlush) {
- producer.flush();
- }
- if (tt == TransactionType.BeginAbort || tt == TransactionType.ContinueAbort) {
- producer.abortTransaction();
- } else if (tt == TransactionType.BeginCommit || tt == TransactionType.ContinueCommit) {
- producer.commitTransaction();
- }
- }
-
- static String[] csvSplit(String input) {
- return input.split("\\s*,\\s*");
- }
-
- public static void main (String[] args) throws Exception {
-
- String bootstrapServers = args[0];
-
- HashMap<String, Producer<byte[], byte[]>> producers = new HashMap<String, Producer<byte[], byte[]>>();
-
- String topic = null;
- String testid = null;
-
- /* Parse commands */
- for (int i = 1 ; i < args.length ; i++) {
- String cmd[] = csvSplit(args[i]);
-
- System.out.println("TransactionProducerCli.java: command: '" + args[i] + "'");
-
- if (cmd[0].equals("sleep")) {
- Thread.sleep(Integer.decode(cmd[1]));
-
- } else if (cmd[0].equals("exit")) {
- System.exit(Integer.decode(cmd[1]));
-
- } else if (cmd[0].equals("topic")) {
- topic = cmd[1];
-
- } else if (cmd[0].equals("testid")) {
- testid = cmd[1];
-
- } else if (cmd[0].startsWith("producer")) {
- Producer<byte[], byte[]> producer = producers.get(cmd[0]);
-
- if (producer == null) {
- producer = createProducer(testid, cmd[0], bootstrapServers,
- TransactionType.valueOf(cmd[4]) != TransactionType.None);
- producers.put(cmd[0], producer);
- }
-
- makeTestMessages(producer, /* producer */
- topic, /* topic */
- Integer.decode(cmd[1]), /* partition, or -1 for any */
- Integer.decode(cmd[2]), /* idStart */
- Integer.decode(cmd[3]), /* msg count */
- TransactionType.valueOf(cmd[4]), /* TransactionType */
- FlushType.valueOf(cmd[5])); /* Flush */
-
- } else {
- throw new Exception("Unknown command: " + args[i]);
- }
- }
-
- producers.forEach((k,p) -> p.close());
- }
-}
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh b/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh
deleted file mode 100755
index e3e52b1c..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/java/run-class.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/bin/bash
-#
-
-if [[ -z $KAFKA_PATH ]]; then
- echo "$0: requires \$KAFKA_PATH to point to the kafka release top directory"
- exit 1
-fi
-
-JAVA_TESTS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-
-CLASSPATH=$JAVA_TESTS_DIR $KAFKA_PATH/bin/kafka-run-class.sh "$@"