1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2020, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
import java.io.IOException;
import java.io.PrintWriter;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.common.KafkaException;
import java.lang.Integer;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Properties;
import java.time.Duration;
public class IncrementalRebalanceCli {
public static void main (String[] args) throws Exception {
String testName = args[0];
String brokerList = args[1];
String topic1 = args[2];
String topic2 = args[3];
String group = args[4];
if (!testName.equals("test1")) {
throw new Exception("Unknown command: " + testName);
}
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, group);
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "java_incrreb_consumer");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig);
List<String> topics = new ArrayList<>();
topics.add(topic1);
topics.add(topic2);
consumer.subscribe(topics);
long startTime = System.currentTimeMillis();
long timeout_s = 300;
try {
boolean running = true;
while (running) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000));
if (System.currentTimeMillis() - startTime > 1000 * timeout_s) {
// Ensure process exits eventually no matter what happens.
System.out.println("IncrementalRebalanceCli timed out");
running = false;
}
if (consumer.assignment().size() == 6) {
// librdkafka has unsubscribed from topic #2, exit cleanly.
running = false;
}
}
} finally {
consumer.close();
}
System.out.println("Java consumer process exiting");
}
}
|