summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp3170
1 files changed, 3170 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp
new file mode 100644
index 000000000..430798d7f
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp
@@ -0,0 +1,3170 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+extern "C" {
+#include "../src/rdkafka_protocol.h"
+#include "test.h"
+}
+#include <iostream>
+#include <map>
+#include <set>
+#include <algorithm>
+#include <cstring>
+#include <cstdlib>
+#include <assert.h>
+#include "testcpp.h"
+#include <fstream>
+
+using namespace std;
+
+/** Topic+Partition helper class */
+class Toppar {
+ public:
+ Toppar(const string &topic, int32_t partition) :
+ topic(topic), partition(partition) {
+ }
+
+ Toppar(const RdKafka::TopicPartition *tp) :
+ topic(tp->topic()), partition(tp->partition()) {
+ }
+
+ friend bool operator==(const Toppar &a, const Toppar &b) {
+ return a.partition == b.partition && a.topic == b.topic;
+ }
+
+ friend bool operator<(const Toppar &a, const Toppar &b) {
+ if (a.partition < b.partition)
+ return true;
+ return a.topic < b.topic;
+ }
+
+ string str() const {
+ return tostr() << topic << "[" << partition << "]";
+ }
+
+ std::string topic;
+ int32_t partition;
+};
+
+
+
+static std::string get_bootstrap_servers() {
+ RdKafka::Conf *conf;
+ std::string bootstrap_servers;
+ Test::conf_init(&conf, NULL, 0);
+ conf->get("bootstrap.servers", bootstrap_servers);
+ delete conf;
+ return bootstrap_servers;
+}
+
+
+class DrCb : public RdKafka::DeliveryReportCb {
+ public:
+ void dr_cb(RdKafka::Message &msg) {
+ if (msg.err())
+ Test::Fail("Delivery failed: " + RdKafka::err2str(msg.err()));
+ }
+};
+
+
+/**
+ * @brief Produce messages to partitions.
+ *
+ * The pair is Toppar,msg_cnt_per_partition.
+ * The Toppar is topic,partition_cnt.
+ */
+static void produce_msgs(vector<pair<Toppar, int> > partitions) {
+ RdKafka::Conf *conf;
+ Test::conf_init(&conf, NULL, 0);
+
+ string errstr;
+ DrCb dr;
+ conf->set("dr_cb", &dr, errstr);
+ RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
+ if (!p)
+ Test::Fail("Failed to create producer: " + errstr);
+ delete conf;
+
+ for (vector<pair<Toppar, int> >::iterator it = partitions.begin();
+ it != partitions.end(); it++) {
+ for (int part = 0; part < it->first.partition; part++) {
+ for (int i = 0; i < it->second; i++) {
+ RdKafka::ErrorCode err =
+ p->produce(it->first.topic, part, RdKafka::Producer::RK_MSG_COPY,
+ (void *)"Hello there", 11, NULL, 0, 0, NULL);
+ TEST_ASSERT(!err, "produce(%s, %d) failed: %s", it->first.topic.c_str(),
+ part, RdKafka::err2str(err).c_str());
+
+ p->poll(0);
+ }
+ }
+ }
+
+ p->flush(10000);
+
+ delete p;
+}
+
+
+
+static RdKafka::KafkaConsumer *make_consumer(
+ string client_id,
+ string group_id,
+ string assignment_strategy,
+ vector<pair<string, string> > *additional_conf,
+ RdKafka::RebalanceCb *rebalance_cb,
+ int timeout_s) {
+ std::string bootstraps;
+ std::string errstr;
+ std::vector<std::pair<std::string, std::string> >::iterator itr;
+
+ RdKafka::Conf *conf;
+ Test::conf_init(&conf, NULL, timeout_s);
+ Test::conf_set(conf, "client.id", client_id);
+ Test::conf_set(conf, "group.id", group_id);
+ Test::conf_set(conf, "auto.offset.reset", "earliest");
+ Test::conf_set(conf, "enable.auto.commit", "false");
+ Test::conf_set(conf, "partition.assignment.strategy", assignment_strategy);
+ if (additional_conf != NULL) {
+ for (itr = (*additional_conf).begin(); itr != (*additional_conf).end();
+ itr++)
+ Test::conf_set(conf, itr->first, itr->second);
+ }
+
+ if (rebalance_cb) {
+ if (conf->set("rebalance_cb", rebalance_cb, errstr))
+ Test::Fail("Failed to set rebalance_cb: " + errstr);
+ }
+ RdKafka::KafkaConsumer *consumer =
+ RdKafka::KafkaConsumer::create(conf, errstr);
+ if (!consumer)
+ Test::Fail("Failed to create KafkaConsumer: " + errstr);
+ delete conf;
+
+ return consumer;
+}
+
+/**
+ * @returns a CSV string of the vector
+ */
+static string string_vec_to_str(const vector<string> &v) {
+ ostringstream ss;
+ for (vector<string>::const_iterator it = v.begin(); it != v.end(); it++)
+ ss << (it == v.begin() ? "" : ", ") << *it;
+ return ss.str();
+}
+
+void expect_assignment(RdKafka::KafkaConsumer *consumer, size_t count) {
+ std::vector<RdKafka::TopicPartition *> partitions;
+ RdKafka::ErrorCode err;
+ err = consumer->assignment(partitions);
+ if (err)
+ Test::Fail(consumer->name() +
+ " assignment() failed: " + RdKafka::err2str(err));
+ if (partitions.size() != count)
+ Test::Fail(tostr() << "Expecting consumer " << consumer->name()
+ << " to have " << count
+ << " assigned partition(s), not: " << partitions.size());
+ RdKafka::TopicPartition::destroy(partitions);
+}
+
+
+static bool TopicPartition_cmp(const RdKafka::TopicPartition *a,
+ const RdKafka::TopicPartition *b) {
+ if (a->topic() < b->topic())
+ return true;
+ else if (a->topic() > b->topic())
+ return false;
+ return a->partition() < b->partition();
+}
+
+
+void expect_assignment(RdKafka::KafkaConsumer *consumer,
+ vector<RdKafka::TopicPartition *> &expected) {
+ vector<RdKafka::TopicPartition *> partitions;
+ RdKafka::ErrorCode err;
+ err = consumer->assignment(partitions);
+ if (err)
+ Test::Fail(consumer->name() +
+ " assignment() failed: " + RdKafka::err2str(err));
+
+ if (partitions.size() != expected.size())
+ Test::Fail(tostr() << "Expecting consumer " << consumer->name()
+ << " to have " << expected.size()
+ << " assigned partition(s), not " << partitions.size());
+
+ sort(partitions.begin(), partitions.end(), TopicPartition_cmp);
+ sort(expected.begin(), expected.end(), TopicPartition_cmp);
+
+ int fails = 0;
+ for (int i = 0; i < (int)partitions.size(); i++) {
+ if (!TopicPartition_cmp(partitions[i], expected[i]))
+ continue;
+
+ Test::Say(tostr() << _C_RED << consumer->name() << ": expected assignment #"
+ << i << " " << expected[i]->topic() << " ["
+ << expected[i]->partition() << "], not "
+ << partitions[i]->topic() << " ["
+ << partitions[i]->partition() << "]\n");
+ fails++;
+ }
+
+ if (fails)
+ Test::Fail(consumer->name() + ": Expected assignment mismatch, see above");
+
+ RdKafka::TopicPartition::destroy(partitions);
+}
+
+
+class DefaultRebalanceCb : public RdKafka::RebalanceCb {
+ private:
+ static string part_list_print(
+ const vector<RdKafka::TopicPartition *> &partitions) {
+ ostringstream ss;
+ for (unsigned int i = 0; i < partitions.size(); i++)
+ ss << (i == 0 ? "" : ", ") << partitions[i]->topic() << " ["
+ << partitions[i]->partition() << "]";
+ return ss.str();
+ }
+
+ public:
+ int assign_call_cnt;
+ int revoke_call_cnt;
+ int nonempty_assign_call_cnt; /**< ASSIGN_PARTITIONS with partitions */
+ int lost_call_cnt;
+ int partitions_assigned_net;
+ bool wait_rebalance;
+ int64_t ts_last_assign; /**< Timestamp of last rebalance assignment */
+ map<Toppar, int> msg_cnt; /**< Number of consumed messages per partition. */
+
+ ~DefaultRebalanceCb() {
+ reset_msg_cnt();
+ }
+
+ DefaultRebalanceCb() :
+ assign_call_cnt(0),
+ revoke_call_cnt(0),
+ nonempty_assign_call_cnt(0),
+ lost_call_cnt(0),
+ partitions_assigned_net(0),
+ wait_rebalance(false),
+ ts_last_assign(0) {
+ }
+
+
+ void rebalance_cb(RdKafka::KafkaConsumer *consumer,
+ RdKafka::ErrorCode err,
+ std::vector<RdKafka::TopicPartition *> &partitions) {
+ wait_rebalance = false;
+
+ std::string protocol = consumer->rebalance_protocol();
+
+ TEST_ASSERT(protocol == "COOPERATIVE",
+ "%s: Expected rebalance_protocol \"COOPERATIVE\", not %s",
+ consumer->name().c_str(), protocol.c_str());
+
+ const char *lost_str = consumer->assignment_lost() ? " (LOST)" : "";
+ Test::Say(tostr() << _C_YEL "RebalanceCb " << protocol << ": "
+ << consumer->name() << " " << RdKafka::err2str(err)
+ << lost_str << ": " << part_list_print(partitions)
+ << "\n");
+
+ if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
+ if (consumer->assignment_lost())
+ Test::Fail("unexpected lost assignment during ASSIGN rebalance");
+ RdKafka::Error *error = consumer->incremental_assign(partitions);
+ if (error)
+ Test::Fail(tostr() << "consumer->incremental_assign() failed: "
+ << error->str());
+ if (partitions.size() > 0)
+ nonempty_assign_call_cnt++;
+ assign_call_cnt += 1;
+ partitions_assigned_net += (int)partitions.size();
+ ts_last_assign = test_clock();
+
+ } else {
+ if (consumer->assignment_lost())
+ lost_call_cnt += 1;
+ RdKafka::Error *error = consumer->incremental_unassign(partitions);
+ if (error)
+ Test::Fail(tostr() << "consumer->incremental_unassign() failed: "
+ << error->str());
+ if (partitions.size() == 0)
+ Test::Fail("revoked partitions size should never be 0");
+ revoke_call_cnt += 1;
+ partitions_assigned_net -= (int)partitions.size();
+ }
+
+ /* Reset message counters for the given partitions. */
+ Test::Say(consumer->name() + ": resetting message counters:\n");
+ reset_msg_cnt(partitions);
+ }
+
+ bool poll_once(RdKafka::KafkaConsumer *c, int timeout_ms) {
+ RdKafka::Message *msg = c->consume(timeout_ms);
+ bool ret = msg->err() != RdKafka::ERR__TIMED_OUT;
+ if (!msg->err())
+ msg_cnt[Toppar(msg->topic_name(), msg->partition())]++;
+ delete msg;
+ return ret;
+ }
+
+ void reset_msg_cnt() {
+ msg_cnt.clear();
+ }
+
+ void reset_msg_cnt(Toppar &tp) {
+ int msgcnt = get_msg_cnt(tp);
+ Test::Say(tostr() << " RESET " << tp.topic << " [" << tp.partition << "]"
+ << " with " << msgcnt << " messages\n");
+ if (!msg_cnt.erase(tp) && msgcnt)
+ Test::Fail("erase failed!");
+ }
+
+ void reset_msg_cnt(const vector<RdKafka::TopicPartition *> &partitions) {
+ for (unsigned int i = 0; i < partitions.size(); i++) {
+ Toppar tp(partitions[i]->topic(), partitions[i]->partition());
+ reset_msg_cnt(tp);
+ }
+ }
+
+ int get_msg_cnt(const Toppar &tp) {
+ map<Toppar, int>::iterator it = msg_cnt.find(tp);
+ if (it == msg_cnt.end())
+ return 0;
+ return it->second;
+ }
+};
+
+
+
+/**
+ * @brief Verify that the consumer's assignment is a subset of the
+ * subscribed topics.
+ *
+ * @param allow_mismatch Allow assignment of not subscribed topics.
+ * This can happen when the subscription is updated
+ * but a rebalance callback hasn't been seen yet.
+ * @param all_assignments Accumulated assignments for all consumers.
+ * If an assigned partition already exists it means
+ * the partition is assigned to multiple consumers and
+ * the test will fail.
+ * @param exp_msg_cnt Expected message count per assigned partition, or -1
+ * if not to check.
+ *
+ * @returns the number of assigned partitions, or fails if the
+ * assignment is empty or there is an assignment for
+ * topic that is not subscribed.
+ */
+static int verify_consumer_assignment(
+ RdKafka::KafkaConsumer *consumer,
+ DefaultRebalanceCb &rebalance_cb,
+ const vector<string> &topics,
+ bool allow_empty,
+ bool allow_mismatch,
+ map<Toppar, RdKafka::KafkaConsumer *> *all_assignments,
+ int exp_msg_cnt) {
+ vector<RdKafka::TopicPartition *> partitions;
+ RdKafka::ErrorCode err;
+ int fails = 0;
+ int count;
+ ostringstream ss;
+
+ err = consumer->assignment(partitions);
+ TEST_ASSERT(!err, "Failed to get assignment for consumer %s: %s",
+ consumer->name().c_str(), RdKafka::err2str(err).c_str());
+
+ count = (int)partitions.size();
+
+ for (vector<RdKafka::TopicPartition *>::iterator it = partitions.begin();
+ it != partitions.end(); it++) {
+ RdKafka::TopicPartition *p = *it;
+
+ if (find(topics.begin(), topics.end(), p->topic()) == topics.end()) {
+ Test::Say(tostr() << (allow_mismatch ? _C_YEL "Warning (allowed)"
+ : _C_RED "Error")
+ << ": " << consumer->name() << " is assigned "
+ << p->topic() << " [" << p->partition() << "] which is "
+ << "not in the list of subscribed topics: "
+ << string_vec_to_str(topics) << "\n");
+ if (!allow_mismatch)
+ fails++;
+ }
+
+ Toppar tp(p);
+ pair<map<Toppar, RdKafka::KafkaConsumer *>::iterator, bool> ret;
+ ret = all_assignments->insert(
+ pair<Toppar, RdKafka::KafkaConsumer *>(tp, consumer));
+ if (!ret.second) {
+ Test::Say(tostr() << _C_RED << "Error: " << consumer->name()
+ << " is assigned " << p->topic() << " ["
+ << p->partition()
+ << "] which is "
+ "already assigned to consumer "
+ << ret.first->second->name() << "\n");
+ fails++;
+ }
+
+
+ int msg_cnt = rebalance_cb.get_msg_cnt(tp);
+
+ if (exp_msg_cnt != -1 && msg_cnt != exp_msg_cnt) {
+ Test::Say(tostr() << _C_RED << "Error: " << consumer->name()
+ << " expected " << exp_msg_cnt << " messages on "
+ << p->topic() << " [" << p->partition() << "], not "
+ << msg_cnt << "\n");
+ fails++;
+ }
+
+ ss << (it == partitions.begin() ? "" : ", ") << p->topic() << " ["
+ << p->partition() << "] (" << msg_cnt << "msgs)";
+ }
+
+ RdKafka::TopicPartition::destroy(partitions);
+
+ Test::Say(tostr() << "Consumer " << consumer->name() << " assignment ("
+ << count << "): " << ss.str() << "\n");
+
+ if (count == 0 && !allow_empty)
+ Test::Fail("Consumer " + consumer->name() +
+ " has unexpected empty assignment");
+
+ if (fails)
+ Test::Fail(
+ tostr() << "Consumer " + consumer->name()
+ << " assignment verification failed (see previous error)");
+
+ return count;
+}
+
+
+
+/* -------- a_assign_tests
+ *
+ * check behavior incremental assign / unassign outside the context of a
+ * rebalance.
+ */
+
+
+/** Incremental assign, then assign(NULL).
+ */
+static void assign_test_1(RdKafka::KafkaConsumer *consumer,
+ std::vector<RdKafka::TopicPartition *> toppars1,
+ std::vector<RdKafka::TopicPartition *> toppars2) {
+ RdKafka::ErrorCode err;
+ RdKafka::Error *error;
+
+ Test::Say("Incremental assign, then assign(NULL)\n");
+
+ if ((error = consumer->incremental_assign(toppars1)))
+ Test::Fail(tostr() << "Incremental assign failed: " << error->str());
+ Test::check_assignment(consumer, 1, &toppars1[0]->topic());
+
+ if ((err = consumer->unassign()))
+ Test::Fail("Unassign failed: " + RdKafka::err2str(err));
+ Test::check_assignment(consumer, 0, NULL);
+}
+
+
+/** Assign, then incremental unassign.
+ */
+static void assign_test_2(RdKafka::KafkaConsumer *consumer,
+ std::vector<RdKafka::TopicPartition *> toppars1,
+ std::vector<RdKafka::TopicPartition *> toppars2) {
+ RdKafka::ErrorCode err;
+ RdKafka::Error *error;
+
+ Test::Say("Assign, then incremental unassign\n");
+
+ if ((err = consumer->assign(toppars1)))
+ Test::Fail("Assign failed: " + RdKafka::err2str(err));
+ Test::check_assignment(consumer, 1, &toppars1[0]->topic());
+
+ if ((error = consumer->incremental_unassign(toppars1)))
+ Test::Fail("Incremental unassign failed: " + error->str());
+ Test::check_assignment(consumer, 0, NULL);
+}
+
+
+/** Incremental assign, then incremental unassign.
+ */
+static void assign_test_3(RdKafka::KafkaConsumer *consumer,
+ std::vector<RdKafka::TopicPartition *> toppars1,
+ std::vector<RdKafka::TopicPartition *> toppars2) {
+ RdKafka::Error *error;
+
+ Test::Say("Incremental assign, then incremental unassign\n");
+
+ if ((error = consumer->incremental_assign(toppars1)))
+ Test::Fail("Incremental assign failed: " + error->str());
+ Test::check_assignment(consumer, 1, &toppars1[0]->topic());
+
+ if ((error = consumer->incremental_unassign(toppars1)))
+ Test::Fail("Incremental unassign failed: " + error->str());
+ Test::check_assignment(consumer, 0, NULL);
+}
+
+
+/** Multi-topic incremental assign and unassign + message consumption.
+ */
+static void assign_test_4(RdKafka::KafkaConsumer *consumer,
+ std::vector<RdKafka::TopicPartition *> toppars1,
+ std::vector<RdKafka::TopicPartition *> toppars2) {
+ RdKafka::Error *error;
+
+ Test::Say(
+ "Multi-topic incremental assign and unassign + message consumption\n");
+
+ if ((error = consumer->incremental_assign(toppars1)))
+ Test::Fail("Incremental assign failed: " + error->str());
+ Test::check_assignment(consumer, 1, &toppars1[0]->topic());
+
+ RdKafka::Message *m = consumer->consume(5000);
+ if (m->err() != RdKafka::ERR_NO_ERROR)
+ Test::Fail("Expecting a consumed message.");
+ if (m->len() != 100)
+ Test::Fail(tostr() << "Expecting msg len to be 100, not: "
+ << m->len()); /* implies read from topic 1. */
+ delete m;
+
+ if ((error = consumer->incremental_unassign(toppars1)))
+ Test::Fail("Incremental unassign failed: " + error->str());
+ Test::check_assignment(consumer, 0, NULL);
+
+ m = consumer->consume(100);
+ if (m->err() != RdKafka::ERR__TIMED_OUT)
+ Test::Fail("Not expecting a consumed message.");
+ delete m;
+
+ if ((error = consumer->incremental_assign(toppars2)))
+ Test::Fail("Incremental assign failed: " + error->str());
+ Test::check_assignment(consumer, 1, &toppars2[0]->topic());
+
+ m = consumer->consume(5000);
+ if (m->err() != RdKafka::ERR_NO_ERROR)
+ Test::Fail("Expecting a consumed message.");
+ if (m->len() != 200)
+ Test::Fail(tostr() << "Expecting msg len to be 200, not: "
+ << m->len()); /* implies read from topic 2. */
+ delete m;
+
+ if ((error = consumer->incremental_assign(toppars1)))
+ Test::Fail("Incremental assign failed: " + error->str());
+ if (Test::assignment_partition_count(consumer, NULL) != 2)
+ Test::Fail(tostr() << "Expecting current assignment to have size 2, not: "
+ << Test::assignment_partition_count(consumer, NULL));
+
+ m = consumer->consume(5000);
+ if (m->err() != RdKafka::ERR_NO_ERROR)
+ Test::Fail("Expecting a consumed message.");
+ delete m;
+
+ if ((error = consumer->incremental_unassign(toppars2)))
+ Test::Fail("Incremental unassign failed: " + error->str());
+ if ((error = consumer->incremental_unassign(toppars1)))
+ Test::Fail("Incremental unassign failed: " + error->str());
+ Test::check_assignment(consumer, 0, NULL);
+}
+
+
+/** Incremental assign and unassign of empty collection.
+ */
+static void assign_test_5(RdKafka::KafkaConsumer *consumer,
+ std::vector<RdKafka::TopicPartition *> toppars1,
+ std::vector<RdKafka::TopicPartition *> toppars2) {
+ RdKafka::Error *error;
+ std::vector<RdKafka::TopicPartition *> toppars3;
+
+ Test::Say("Incremental assign and unassign of empty collection\n");
+
+ if ((error = consumer->incremental_assign(toppars3)))
+ Test::Fail("Incremental assign failed: " + error->str());
+ Test::check_assignment(consumer, 0, NULL);
+
+ if ((error = consumer->incremental_unassign(toppars3)))
+ Test::Fail("Incremental unassign failed: " + error->str());
+ Test::check_assignment(consumer, 0, NULL);
+}
+
+
+
+static void run_test(
+ const std::string &t1,
+ const std::string &t2,
+ void (*test)(RdKafka::KafkaConsumer *consumer,
+ std::vector<RdKafka::TopicPartition *> toppars1,
+ std::vector<RdKafka::TopicPartition *> toppars2)) {
+ std::vector<RdKafka::TopicPartition *> toppars1;
+ toppars1.push_back(RdKafka::TopicPartition::create(t1, 0));
+ std::vector<RdKafka::TopicPartition *> toppars2;
+ toppars2.push_back(RdKafka::TopicPartition::create(t2, 0));
+
+ RdKafka::KafkaConsumer *consumer =
+ make_consumer("C_1", t1, "cooperative-sticky", NULL, NULL, 10);
+
+ test(consumer, toppars1, toppars2);
+
+ RdKafka::TopicPartition::destroy(toppars1);
+ RdKafka::TopicPartition::destroy(toppars2);
+
+ consumer->close();
+ delete consumer;
+}
+
+
+static void a_assign_tests() {
+ SUB_TEST_QUICK();
+
+ int msgcnt = 1000;
+ const int msgsize1 = 100;
+ const int msgsize2 = 200;
+
+ std::string topic1_str = Test::mk_topic_name("0113-a1", 1);
+ test_create_topic(NULL, topic1_str.c_str(), 1, 1);
+ std::string topic2_str = Test::mk_topic_name("0113-a2", 1);
+ test_create_topic(NULL, topic2_str.c_str(), 1, 1);
+
+ test_produce_msgs_easy_size(topic1_str.c_str(), 0, 0, msgcnt, msgsize1);
+ test_produce_msgs_easy_size(topic2_str.c_str(), 0, 0, msgcnt, msgsize2);
+
+ run_test(topic1_str, topic2_str, assign_test_1);
+ run_test(topic1_str, topic2_str, assign_test_2);
+ run_test(topic1_str, topic2_str, assign_test_3);
+ run_test(topic1_str, topic2_str, assign_test_4);
+ run_test(topic1_str, topic2_str, assign_test_5);
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief Quick Assign 1,2, Assign 2,3, Assign 1,2,3 test to verify
+ * that the correct OffsetFetch response is used.
+ * See note in rdkafka_assignment.c for details.
+ *
+ * Makes use of the mock cluster to induce latency.
+ */
+static void a_assign_rapid() {
+ SUB_TEST_QUICK();
+
+ std::string group_id = __FUNCTION__;
+
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *bootstraps;
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+ int32_t coord_id = 1;
+ rd_kafka_mock_coordinator_set(mcluster, "group", group_id.c_str(), coord_id);
+
+ rd_kafka_mock_topic_create(mcluster, "topic1", 1, 1);
+ rd_kafka_mock_topic_create(mcluster, "topic2", 1, 1);
+ rd_kafka_mock_topic_create(mcluster, "topic3", 1, 1);
+
+ /*
+ * Produce messages to topics
+ */
+ const int msgs_per_partition = 1000;
+
+ RdKafka::Conf *pconf;
+ Test::conf_init(&pconf, NULL, 10);
+ Test::conf_set(pconf, "bootstrap.servers", bootstraps);
+ Test::conf_set(pconf, "security.protocol", "plaintext");
+ std::string errstr;
+ RdKafka::Producer *p = RdKafka::Producer::create(pconf, errstr);
+ if (!p)
+ Test::Fail(tostr() << __FUNCTION__
+ << ": Failed to create producer: " << errstr);
+ delete pconf;
+
+ Test::produce_msgs(p, "topic1", 0, msgs_per_partition, 10,
+ false /*no flush*/);
+ Test::produce_msgs(p, "topic2", 0, msgs_per_partition, 10,
+ false /*no flush*/);
+ Test::produce_msgs(p, "topic3", 0, msgs_per_partition, 10,
+ false /*no flush*/);
+ p->flush(10 * 1000);
+
+ delete p;
+
+ vector<RdKafka::TopicPartition *> toppars1;
+ toppars1.push_back(RdKafka::TopicPartition::create("topic1", 0));
+ vector<RdKafka::TopicPartition *> toppars2;
+ toppars2.push_back(RdKafka::TopicPartition::create("topic2", 0));
+ vector<RdKafka::TopicPartition *> toppars3;
+ toppars3.push_back(RdKafka::TopicPartition::create("topic3", 0));
+
+
+ RdKafka::Conf *conf;
+ Test::conf_init(&conf, NULL, 20);
+ Test::conf_set(conf, "bootstrap.servers", bootstraps);
+ Test::conf_set(conf, "security.protocol", "plaintext");
+ Test::conf_set(conf, "client.id", __FUNCTION__);
+ Test::conf_set(conf, "group.id", group_id);
+ Test::conf_set(conf, "auto.offset.reset", "earliest");
+ Test::conf_set(conf, "enable.auto.commit", "false");
+
+ RdKafka::KafkaConsumer *consumer;
+ consumer = RdKafka::KafkaConsumer::create(conf, errstr);
+ if (!consumer)
+ Test::Fail(tostr() << __FUNCTION__
+ << ": Failed to create consumer: " << errstr);
+ delete conf;
+
+ vector<RdKafka::TopicPartition *> toppars;
+ vector<RdKafka::TopicPartition *> expected;
+
+ map<Toppar, int64_t> pos; /* Expected consume position per partition */
+ pos[Toppar(toppars1[0]->topic(), toppars1[0]->partition())] = 0;
+ pos[Toppar(toppars2[0]->topic(), toppars2[0]->partition())] = 0;
+ pos[Toppar(toppars3[0]->topic(), toppars3[0]->partition())] = 0;
+
+ /* To make sure offset commits are fetched in proper assign sequence
+ * we commit an offset that should not be used in the final consume loop.
+ * This commit will be overwritten below with another commit. */
+ vector<RdKafka::TopicPartition *> offsets;
+ offsets.push_back(RdKafka::TopicPartition::create(
+ toppars1[0]->topic(), toppars1[0]->partition(), 11));
+ /* This partition should start at this position even though
+ * there will be a sub-sequent commit to overwrite it, that should not
+ * be used since this partition is never unassigned. */
+ offsets.push_back(RdKafka::TopicPartition::create(
+ toppars2[0]->topic(), toppars2[0]->partition(), 22));
+ pos[Toppar(toppars2[0]->topic(), toppars2[0]->partition())] = 22;
+
+ Test::print_TopicPartitions("pre-commit", offsets);
+
+ RdKafka::ErrorCode err;
+ err = consumer->commitSync(offsets);
+ if (err)
+ Test::Fail(tostr() << __FUNCTION__ << ": pre-commit failed: "
+ << RdKafka::err2str(err) << "\n");
+
+ /* Add coordinator delay so that the OffsetFetchRequest originating
+ * from the coming incremental_assign() will not finish before
+ * we call incremental_unassign() and incremental_assign() again, resulting
+ * in a situation where the initial OffsetFetchResponse will contain
+ * an older offset for a previous assignment of one partition. */
+ rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 5000);
+
+
+ /* Assign 1,2 == 1,2 */
+ toppars.push_back(toppars1[0]);
+ toppars.push_back(toppars2[0]);
+ expected.push_back(toppars1[0]);
+ expected.push_back(toppars2[0]);
+ Test::incremental_assign(consumer, toppars);
+ expect_assignment(consumer, expected);
+
+ /* Unassign -1 == 2 */
+ toppars.clear();
+ toppars.push_back(toppars1[0]);
+ vector<RdKafka::TopicPartition *>::iterator it =
+ find(expected.begin(), expected.end(), toppars1[0]);
+ expected.erase(it);
+
+ Test::incremental_unassign(consumer, toppars);
+ expect_assignment(consumer, expected);
+
+
+ /* Commit offset for the removed partition and the partition that is
+ * unchanged in the assignment. */
+ RdKafka::TopicPartition::destroy(offsets);
+ offsets.push_back(RdKafka::TopicPartition::create(
+ toppars1[0]->topic(), toppars1[0]->partition(), 55));
+ offsets.push_back(RdKafka::TopicPartition::create(
+ toppars2[0]->topic(), toppars2[0]->partition(), 33)); /* should not be
+ * used. */
+ pos[Toppar(toppars1[0]->topic(), toppars1[0]->partition())] = 55;
+ Test::print_TopicPartitions("commit", offsets);
+
+ err = consumer->commitAsync(offsets);
+ if (err)
+ Test::Fail(tostr() << __FUNCTION__
+ << ": commit failed: " << RdKafka::err2str(err) << "\n");
+
+ /* Assign +3 == 2,3 */
+ toppars.clear();
+ toppars.push_back(toppars3[0]);
+ expected.push_back(toppars3[0]);
+ Test::incremental_assign(consumer, toppars);
+ expect_assignment(consumer, expected);
+
+ /* Now remove the latency */
+ Test::Say(_C_MAG "Clearing rtt\n");
+ rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 0);
+
+ /* Assign +1 == 1,2,3 */
+ toppars.clear();
+ toppars.push_back(toppars1[0]);
+ expected.push_back(toppars1[0]);
+ Test::incremental_assign(consumer, toppars);
+ expect_assignment(consumer, expected);
+
+ /*
+ * Verify consumed messages
+ */
+ int wait_end = (int)expected.size();
+ while (wait_end > 0) {
+ RdKafka::Message *msg = consumer->consume(10 * 1000);
+ if (msg->err() == RdKafka::ERR__TIMED_OUT)
+ Test::Fail(tostr() << __FUNCTION__
+ << ": Consume timed out waiting "
+ "for "
+ << wait_end << " more partitions");
+
+ Toppar tp = Toppar(msg->topic_name(), msg->partition());
+ int64_t *exp_pos = &pos[tp];
+
+ Test::Say(3, tostr() << __FUNCTION__ << ": Received " << tp.topic << " ["
+ << tp.partition << "] at offset " << msg->offset()
+ << " (expected offset " << *exp_pos << ")\n");
+
+ if (*exp_pos != msg->offset())
+ Test::Fail(tostr() << __FUNCTION__ << ": expected message offset "
+ << *exp_pos << " for " << msg->topic_name() << " ["
+ << msg->partition() << "], not " << msg->offset()
+ << "\n");
+ (*exp_pos)++;
+ if (*exp_pos == msgs_per_partition) {
+ TEST_ASSERT(wait_end > 0, "");
+ wait_end--;
+ } else if (msg->offset() > msgs_per_partition)
+ Test::Fail(tostr() << __FUNCTION__ << ": unexpected message with "
+ << "offset " << msg->offset() << " on " << tp.topic
+ << " [" << tp.partition << "]\n");
+
+ delete msg;
+ }
+
+ RdKafka::TopicPartition::destroy(offsets);
+ RdKafka::TopicPartition::destroy(toppars1);
+ RdKafka::TopicPartition::destroy(toppars2);
+ RdKafka::TopicPartition::destroy(toppars3);
+
+ delete consumer;
+
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+
+/* Check behavior when:
+ * 1. single topic with 2 partitions.
+ * 2. consumer 1 (with rebalance_cb) subscribes to it.
+ * 3. consumer 2 (with rebalance_cb) subscribes to it.
+ * 4. close.
+ */
+
+static void b_subscribe_with_cb_test(rd_bool_t close_consumer) {
+ SUB_TEST();
+
+ std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+ test_create_topic(NULL, topic_name.c_str(), 2, 1);
+
+ DefaultRebalanceCb rebalance_cb1;
+ RdKafka::KafkaConsumer *c1 = make_consumer(
+ "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb1, 25);
+ DefaultRebalanceCb rebalance_cb2;
+ RdKafka::KafkaConsumer *c2 = make_consumer(
+ "C_2", group_name, "cooperative-sticky", NULL, &rebalance_cb2, 25);
+ test_wait_topic_exists(c1->c_ptr(), topic_name.c_str(), 10 * 1000);
+
+ Test::subscribe(c1, topic_name);
+
+ bool c2_subscribed = false;
+ while (true) {
+ Test::poll_once(c1, 500);
+ Test::poll_once(c2, 500);
+
+ /* Start c2 after c1 has received initial assignment */
+ if (!c2_subscribed && rebalance_cb1.assign_call_cnt > 0) {
+ Test::subscribe(c2, topic_name);
+ c2_subscribed = true;
+ }
+
+ /* Failure case: test will time out. */
+ if (rebalance_cb1.assign_call_cnt == 3 &&
+ rebalance_cb2.assign_call_cnt == 2) {
+ break;
+ }
+ }
+
+ /* Sequence of events:
+ *
+ * 1. c1 joins group.
+ * 2. c1 gets assigned 2 partitions.
+ * - there isn't a follow-on rebalance because there aren't any revoked
+ * partitions.
+ * 3. c2 joins group.
+ * 4. This results in a rebalance with one partition being revoked from c1,
+ * and no partitions assigned to either c1 or c2 (however the rebalance
+ * callback will be called in each case with an empty set).
+ * 5. c1 then re-joins the group since it had a partition revoked.
+ * 6. c2 is now assigned a single partition, and c1's incremental assignment
+ * is empty.
+ * 7. Since there were no revoked partitions, no further rebalance is
+ * triggered.
+ */
+
+ /* The rebalance cb is always called on assign, even if empty. */
+ if (rebalance_cb1.assign_call_cnt != 3)
+ Test::Fail(tostr() << "Expecting 3 assign calls on consumer 1, not "
+ << rebalance_cb1.assign_call_cnt);
+ if (rebalance_cb2.assign_call_cnt != 2)
+ Test::Fail(tostr() << "Expecting 2 assign calls on consumer 2, not: "
+ << rebalance_cb2.assign_call_cnt);
+
+ /* The rebalance cb is not called on and empty revoke (unless partitions lost,
+ * which is not the case here) */
+ if (rebalance_cb1.revoke_call_cnt != 1)
+ Test::Fail(tostr() << "Expecting 1 revoke call on consumer 1, not: "
+ << rebalance_cb1.revoke_call_cnt);
+ if (rebalance_cb2.revoke_call_cnt != 0)
+ Test::Fail(tostr() << "Expecting 0 revoke calls on consumer 2, not: "
+ << rebalance_cb2.revoke_call_cnt);
+
+ /* Final state */
+
+ /* Expect both consumers to have 1 assigned partition (via net calculation in
+ * rebalance_cb) */
+ if (rebalance_cb1.partitions_assigned_net != 1)
+ Test::Fail(tostr()
+ << "Expecting consumer 1 to have net 1 assigned partition, not: "
+ << rebalance_cb1.partitions_assigned_net);
+ if (rebalance_cb2.partitions_assigned_net != 1)
+ Test::Fail(tostr()
+ << "Expecting consumer 2 to have net 1 assigned partition, not: "
+ << rebalance_cb2.partitions_assigned_net);
+
+ /* Expect both consumers to have 1 assigned partition (via ->assignment()
+ * query) */
+ expect_assignment(c1, 1);
+ expect_assignment(c2, 1);
+
+ /* Make sure the fetchers are running */
+ int msgcnt = 100;
+ const int msgsize1 = 100;
+ test_produce_msgs_easy_size(topic_name.c_str(), 0, 0, msgcnt, msgsize1);
+ test_produce_msgs_easy_size(topic_name.c_str(), 0, 1, msgcnt, msgsize1);
+
+ bool consumed_from_c1 = false;
+ bool consumed_from_c2 = false;
+ while (true) {
+ RdKafka::Message *msg1 = c1->consume(100);
+ RdKafka::Message *msg2 = c2->consume(100);
+
+ if (msg1->err() == RdKafka::ERR_NO_ERROR)
+ consumed_from_c1 = true;
+ if (msg1->err() == RdKafka::ERR_NO_ERROR)
+ consumed_from_c2 = true;
+
+ delete msg1;
+ delete msg2;
+
+ /* Failure case: test will timeout. */
+ if (consumed_from_c1 && consumed_from_c2)
+ break;
+ }
+
+ if (!close_consumer) {
+ delete c1;
+ delete c2;
+ return;
+ }
+
+ c1->close();
+ c2->close();
+
+ /* Closing the consumer should trigger rebalance_cb (revoke): */
+ if (rebalance_cb1.revoke_call_cnt != 2)
+ Test::Fail(tostr() << "Expecting 2 revoke calls on consumer 1, not: "
+ << rebalance_cb1.revoke_call_cnt);
+ if (rebalance_cb2.revoke_call_cnt != 1)
+ Test::Fail(tostr() << "Expecting 1 revoke call on consumer 2, not: "
+ << rebalance_cb2.revoke_call_cnt);
+
+ /* ..and net assigned partitions should drop to 0 in both cases: */
+ if (rebalance_cb1.partitions_assigned_net != 0)
+ Test::Fail(
+ tostr()
+ << "Expecting consumer 1 to have net 0 assigned partitions, not: "
+ << rebalance_cb1.partitions_assigned_net);
+ if (rebalance_cb2.partitions_assigned_net != 0)
+ Test::Fail(
+ tostr()
+ << "Expecting consumer 2 to have net 0 assigned partitions, not: "
+ << rebalance_cb2.partitions_assigned_net);
+
+ /* Nothing in this test should result in lost partitions */
+ if (rebalance_cb1.lost_call_cnt > 0)
+ Test::Fail(
+ tostr() << "Expecting consumer 1 to have 0 lost partition events, not: "
+ << rebalance_cb1.lost_call_cnt);
+ if (rebalance_cb2.lost_call_cnt > 0)
+ Test::Fail(
+ tostr() << "Expecting consumer 2 to have 0 lost partition events, not: "
+ << rebalance_cb2.lost_call_cnt);
+
+ delete c1;
+ delete c2;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. Single topic with 2 partitions.
+ * 2. Consumer 1 (no rebalance_cb) subscribes to it.
+ * 3. Consumer 2 (no rebalance_cb) subscribes to it.
+ * 4. Close.
+ */
+
+static void c_subscribe_no_cb_test(rd_bool_t close_consumer) {
+ SUB_TEST();
+
+ std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+ test_create_topic(NULL, topic_name.c_str(), 2, 1);
+
+ RdKafka::KafkaConsumer *c1 =
+ make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 20);
+ RdKafka::KafkaConsumer *c2 =
+ make_consumer("C_2", group_name, "cooperative-sticky", NULL, NULL, 20);
+ test_wait_topic_exists(c1->c_ptr(), topic_name.c_str(), 10 * 1000);
+
+ Test::subscribe(c1, topic_name);
+
+ bool c2_subscribed = false;
+ bool done = false;
+ while (!done) {
+ Test::poll_once(c1, 500);
+ Test::poll_once(c2, 500);
+
+ if (Test::assignment_partition_count(c1, NULL) == 2 && !c2_subscribed) {
+ Test::subscribe(c2, topic_name);
+ c2_subscribed = true;
+ }
+
+ if (Test::assignment_partition_count(c1, NULL) == 1 &&
+ Test::assignment_partition_count(c2, NULL) == 1) {
+ Test::Say("Consumer 1 and 2 are both assigned to single partition.\n");
+ done = true;
+ }
+ }
+
+ if (close_consumer) {
+ Test::Say("Closing consumer 1\n");
+ c1->close();
+ Test::Say("Closing consumer 2\n");
+ c2->close();
+ } else {
+ Test::Say("Skipping close() of consumer 1 and 2.\n");
+ }
+
+ delete c1;
+ delete c2;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. Single consumer (no rebalance_cb) subscribes to topic.
+ * 2. Subscription is changed (topic added).
+ * 3. Consumer is closed.
+ */
+
+static void d_change_subscription_add_topic(rd_bool_t close_consumer) {
+ SUB_TEST();
+
+ std::string topic_name_1 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
+ std::string topic_name_2 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name_2.c_str(), 2, 1);
+
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+
+ RdKafka::KafkaConsumer *c =
+ make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15);
+ test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+ test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name_1);
+
+ bool subscribed_to_one_topic = false;
+ bool done = false;
+ while (!done) {
+ Test::poll_once(c, 500);
+
+ if (Test::assignment_partition_count(c, NULL) == 2 &&
+ !subscribed_to_one_topic) {
+ subscribed_to_one_topic = true;
+ Test::subscribe(c, topic_name_1, topic_name_2);
+ }
+
+ if (Test::assignment_partition_count(c, NULL) == 4) {
+ Test::Say("Consumer is assigned to two topics.\n");
+ done = true;
+ }
+ }
+
+ if (close_consumer) {
+ Test::Say("Closing consumer\n");
+ c->close();
+ } else
+ Test::Say("Skipping close() of consumer\n");
+
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. Single consumer (no rebalance_cb) subscribes to topic.
+ * 2. Subscription is changed (topic added).
+ * 3. Consumer is closed.
+ */
+
+static void e_change_subscription_remove_topic(rd_bool_t close_consumer) {
+ SUB_TEST();
+
+ std::string topic_name_1 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
+ std::string topic_name_2 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name_2.c_str(), 2, 1);
+
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+
+ RdKafka::KafkaConsumer *c =
+ make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15);
+ test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+ test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name_1, topic_name_2);
+
+ bool subscribed_to_two_topics = false;
+ bool done = false;
+ while (!done) {
+ Test::poll_once(c, 500);
+
+ if (Test::assignment_partition_count(c, NULL) == 4 &&
+ !subscribed_to_two_topics) {
+ subscribed_to_two_topics = true;
+ Test::subscribe(c, topic_name_1);
+ }
+
+ if (Test::assignment_partition_count(c, NULL) == 2) {
+ Test::Say("Consumer is assigned to one topic\n");
+ done = true;
+ }
+ }
+
+ if (!close_consumer) {
+ Test::Say("Closing consumer\n");
+ c->close();
+ } else
+ Test::Say("Skipping close() of consumer\n");
+
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check that use of consumer->assign() and consumer->unassign() is disallowed
+ * when a COOPERATIVE assignor is in use.
+ *
+ * Except when the consumer is closing, where all forms of unassign are
+ * allowed and treated as a full unassign.
+ */
+
+class FTestRebalanceCb : public RdKafka::RebalanceCb {
+ public:
+ bool assigned;
+ bool closing;
+
+ FTestRebalanceCb() : assigned(false), closing(false) {
+ }
+
+ void rebalance_cb(RdKafka::KafkaConsumer *consumer,
+ RdKafka::ErrorCode err,
+ std::vector<RdKafka::TopicPartition *> &partitions) {
+ Test::Say(tostr() << "RebalanceCb: " << consumer->name() << " "
+ << RdKafka::err2str(err) << (closing ? " (closing)" : "")
+ << "\n");
+
+ if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
+ RdKafka::ErrorCode err_resp = consumer->assign(partitions);
+ Test::Say(tostr() << "consumer->assign() response code: " << err_resp
+ << "\n");
+ if (err_resp != RdKafka::ERR__STATE)
+ Test::Fail(tostr() << "Expected assign to fail with error code: "
+ << RdKafka::ERR__STATE << "(ERR__STATE)");
+
+ RdKafka::Error *error = consumer->incremental_assign(partitions);
+ if (error)
+ Test::Fail(tostr() << "consumer->incremental_unassign() failed: "
+ << error->str());
+
+ assigned = true;
+
+ } else {
+ RdKafka::ErrorCode err_resp = consumer->unassign();
+ Test::Say(tostr() << "consumer->unassign() response code: " << err_resp
+ << "\n");
+
+ if (!closing) {
+ if (err_resp != RdKafka::ERR__STATE)
+ Test::Fail(tostr() << "Expected assign to fail with error code: "
+ << RdKafka::ERR__STATE << "(ERR__STATE)");
+
+ RdKafka::Error *error = consumer->incremental_unassign(partitions);
+ if (error)
+ Test::Fail(tostr() << "consumer->incremental_unassign() failed: "
+ << error->str());
+
+ } else {
+ /* During termination (close()) any type of unassign*() is allowed. */
+ if (err_resp)
+ Test::Fail(tostr() << "Expected unassign to succeed during close, "
+ "but got: "
+ << RdKafka::ERR__STATE << "(ERR__STATE)");
+ }
+ }
+ }
+};
+
+
+static void f_assign_call_cooperative() {
+ SUB_TEST();
+
+ std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name.c_str(), 1, 1);
+
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+
+ std::vector<std::pair<std::string, std::string> > additional_conf;
+ additional_conf.push_back(std::pair<std::string, std::string>(
+ std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
+ FTestRebalanceCb rebalance_cb;
+ RdKafka::KafkaConsumer *c =
+ make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
+ &rebalance_cb, 15);
+ test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name);
+
+ while (!rebalance_cb.assigned)
+ Test::poll_once(c, 500);
+
+ rebalance_cb.closing = true;
+ c->close();
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check that use of consumer->incremental_assign() and
+ * consumer->incremental_unassign() is disallowed when an EAGER assignor is in
+ * use.
+ */
+class GTestRebalanceCb : public RdKafka::RebalanceCb {
+ public:
+ bool assigned;
+ bool closing;
+
+ GTestRebalanceCb() : assigned(false), closing(false) {
+ }
+
+ void rebalance_cb(RdKafka::KafkaConsumer *consumer,
+ RdKafka::ErrorCode err,
+ std::vector<RdKafka::TopicPartition *> &partitions) {
+ Test::Say(tostr() << "RebalanceCb: " << consumer->name() << " "
+ << RdKafka::err2str(err) << "\n");
+
+ if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
+ RdKafka::Error *error = consumer->incremental_assign(partitions);
+ Test::Say(tostr() << "consumer->incremental_assign() response: "
+ << (!error ? "NULL" : error->str()) << "\n");
+ if (!error)
+ Test::Fail("Expected consumer->incremental_assign() to fail");
+ if (error->code() != RdKafka::ERR__STATE)
+ Test::Fail(tostr() << "Expected consumer->incremental_assign() to fail "
+ "with error code "
+ << RdKafka::ERR__STATE);
+ delete error;
+
+ RdKafka::ErrorCode err_resp = consumer->assign(partitions);
+ if (err_resp)
+ Test::Fail(tostr() << "consumer->assign() failed: " << err_resp);
+
+ assigned = true;
+
+ } else {
+ RdKafka::Error *error = consumer->incremental_unassign(partitions);
+ Test::Say(tostr() << "consumer->incremental_unassign() response: "
+ << (!error ? "NULL" : error->str()) << "\n");
+
+ if (!closing) {
+ if (!error)
+ Test::Fail("Expected consumer->incremental_unassign() to fail");
+ if (error->code() != RdKafka::ERR__STATE)
+ Test::Fail(tostr() << "Expected consumer->incremental_unassign() to "
+ "fail with error code "
+ << RdKafka::ERR__STATE);
+ delete error;
+
+ RdKafka::ErrorCode err_resp = consumer->unassign();
+ if (err_resp)
+ Test::Fail(tostr() << "consumer->unassign() failed: " << err_resp);
+
+ } else {
+ /* During termination (close()) any type of unassign*() is allowed. */
+ if (error)
+ Test::Fail(
+ tostr()
+ << "Expected incremental_unassign to succeed during close, "
+ "but got: "
+ << RdKafka::ERR__STATE << "(ERR__STATE)");
+ }
+ }
+ }
+};
+
+static void g_incremental_assign_call_eager() {
+ SUB_TEST();
+
+ std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name.c_str(), 1, 1);
+
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+
+ std::vector<std::pair<std::string, std::string> > additional_conf;
+ additional_conf.push_back(std::pair<std::string, std::string>(
+ std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
+ GTestRebalanceCb rebalance_cb;
+ RdKafka::KafkaConsumer *c = make_consumer(
+ "C_1", group_name, "roundrobin", &additional_conf, &rebalance_cb, 15);
+ test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name);
+
+ while (!rebalance_cb.assigned)
+ Test::poll_once(c, 500);
+
+ rebalance_cb.closing = true;
+ c->close();
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. Single consumer (rebalance_cb) subscribes to two topics.
+ * 2. One of the topics is deleted.
+ * 3. Consumer is closed.
+ */
+
+static void h_delete_topic() {
+ SUB_TEST();
+
+ std::string topic_name_1 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
+ std::string topic_name_2 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name_2.c_str(), 1, 1);
+
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+
+ std::vector<std::pair<std::string, std::string> > additional_conf;
+ additional_conf.push_back(std::pair<std::string, std::string>(
+ std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
+ DefaultRebalanceCb rebalance_cb;
+ RdKafka::KafkaConsumer *c =
+ make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
+ &rebalance_cb, 15);
+ test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+ test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name_1, topic_name_2);
+
+ bool deleted = false;
+ bool done = false;
+ while (!done) {
+ Test::poll_once(c, 500);
+
+ std::vector<RdKafka::TopicPartition *> partitions;
+ c->assignment(partitions);
+
+ if (partitions.size() == 2 && !deleted) {
+ if (rebalance_cb.assign_call_cnt != 1)
+ Test::Fail(tostr() << "Expected 1 assign call, saw "
+ << rebalance_cb.assign_call_cnt << "\n");
+ Test::delete_topic(c, topic_name_2.c_str());
+ deleted = true;
+ }
+
+ if (partitions.size() == 1 && deleted) {
+ if (partitions[0]->topic() != topic_name_1)
+ Test::Fail(tostr() << "Expecting subscribed topic to be '"
+ << topic_name_1 << "' not '"
+ << partitions[0]->topic() << "'");
+ Test::Say(tostr() << "Assignment no longer includes deleted topic '"
+ << topic_name_2 << "'\n");
+ done = true;
+ }
+
+ RdKafka::TopicPartition::destroy(partitions);
+ }
+
+ Test::Say("Closing consumer\n");
+ c->close();
+
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. Single consumer (rebalance_cb) subscribes to a single topic.
+ * 2. That topic is deleted leaving no topics.
+ * 3. Consumer is closed.
+ */
+
+static void i_delete_topic_2() {
+ SUB_TEST();
+
+ std::string topic_name_1 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+
+ std::vector<std::pair<std::string, std::string> > additional_conf;
+ additional_conf.push_back(std::pair<std::string, std::string>(
+ std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
+ DefaultRebalanceCb rebalance_cb;
+ RdKafka::KafkaConsumer *c =
+ make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
+ &rebalance_cb, 15);
+ test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name_1);
+
+ bool deleted = false;
+ bool done = false;
+ while (!done) {
+ Test::poll_once(c, 500);
+
+ if (Test::assignment_partition_count(c, NULL) == 1 && !deleted) {
+ if (rebalance_cb.assign_call_cnt != 1)
+ Test::Fail(tostr() << "Expected one assign call, saw "
+ << rebalance_cb.assign_call_cnt << "\n");
+ Test::delete_topic(c, topic_name_1.c_str());
+ deleted = true;
+ }
+
+ if (Test::assignment_partition_count(c, NULL) == 0 && deleted) {
+ Test::Say(tostr() << "Assignment is empty following deletion of topic\n");
+ done = true;
+ }
+ }
+
+ Test::Say("Closing consumer\n");
+ c->close();
+
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. single consumer (without rebalance_cb) subscribes to a single topic.
+ * 2. that topic is deleted leaving no topics.
+ * 3. consumer is closed.
+ */
+
+static void j_delete_topic_no_rb_callback() {
+ SUB_TEST();
+
+ std::string topic_name_1 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
+
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+
+ std::vector<std::pair<std::string, std::string> > additional_conf;
+ additional_conf.push_back(std::pair<std::string, std::string>(
+ std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
+ RdKafka::KafkaConsumer *c = make_consumer(
+ "C_1", group_name, "cooperative-sticky", &additional_conf, NULL, 15);
+ test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name_1);
+
+ bool deleted = false;
+ bool done = false;
+ while (!done) {
+ Test::poll_once(c, 500);
+
+ if (Test::assignment_partition_count(c, NULL) == 1 && !deleted) {
+ Test::delete_topic(c, topic_name_1.c_str());
+ deleted = true;
+ }
+
+ if (Test::assignment_partition_count(c, NULL) == 0 && deleted) {
+ Test::Say(tostr() << "Assignment is empty following deletion of topic\n");
+ done = true;
+ }
+ }
+
+ Test::Say("Closing consumer\n");
+ c->close();
+
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. Single consumer (rebalance_cb) subscribes to a 1 partition topic.
+ * 2. Number of partitions is increased to 2.
+ * 3. Consumer is closed.
+ */
+
+static void k_add_partition() {
+ SUB_TEST();
+
+ std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ test_create_topic(NULL, topic_name.c_str(), 1, 1);
+
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+
+ std::vector<std::pair<std::string, std::string> > additional_conf;
+ additional_conf.push_back(std::pair<std::string, std::string>(
+ std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
+ DefaultRebalanceCb rebalance_cb;
+ RdKafka::KafkaConsumer *c =
+ make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
+ &rebalance_cb, 15);
+ test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name);
+
+ bool subscribed = false;
+ bool done = false;
+ while (!done) {
+ Test::poll_once(c, 500);
+
+ if (Test::assignment_partition_count(c, NULL) == 1 && !subscribed) {
+ if (rebalance_cb.assign_call_cnt != 1)
+ Test::Fail(tostr() << "Expected 1 assign call, saw "
+ << rebalance_cb.assign_call_cnt);
+ if (rebalance_cb.revoke_call_cnt != 0)
+ Test::Fail(tostr() << "Expected 0 revoke calls, saw "
+ << rebalance_cb.revoke_call_cnt);
+ Test::create_partitions(c, topic_name.c_str(), 2);
+ subscribed = true;
+ }
+
+ if (Test::assignment_partition_count(c, NULL) == 2 && subscribed) {
+ if (rebalance_cb.assign_call_cnt != 2)
+ Test::Fail(tostr() << "Expected 2 assign calls, saw "
+ << rebalance_cb.assign_call_cnt);
+ if (rebalance_cb.revoke_call_cnt != 0)
+ Test::Fail(tostr() << "Expected 0 revoke calls, saw "
+ << rebalance_cb.revoke_call_cnt);
+ done = true;
+ }
+ }
+
+ Test::Say("Closing consumer\n");
+ c->close();
+ delete c;
+
+ if (rebalance_cb.assign_call_cnt != 2)
+ Test::Fail(tostr() << "Expected 2 assign calls, saw "
+ << rebalance_cb.assign_call_cnt);
+ if (rebalance_cb.revoke_call_cnt != 1)
+ Test::Fail(tostr() << "Expected 1 revoke call, saw "
+ << rebalance_cb.revoke_call_cnt);
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. two consumers (with rebalance_cb's) subscribe to two topics.
+ * 2. one of the consumers calls unsubscribe.
+ * 3. consumers closed.
+ */
+
+static void l_unsubscribe() {
+ SUB_TEST();
+
+ std::string topic_name_1 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ std::string topic_name_2 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+ test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
+ test_create_topic(NULL, topic_name_2.c_str(), 2, 1);
+
+ DefaultRebalanceCb rebalance_cb1;
+ RdKafka::KafkaConsumer *c1 = make_consumer(
+ "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb1, 30);
+ test_wait_topic_exists(c1->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+ test_wait_topic_exists(c1->c_ptr(), topic_name_2.c_str(), 10 * 1000);
+
+ Test::subscribe(c1, topic_name_1, topic_name_2);
+
+ DefaultRebalanceCb rebalance_cb2;
+ RdKafka::KafkaConsumer *c2 = make_consumer(
+ "C_2", group_name, "cooperative-sticky", NULL, &rebalance_cb2, 30);
+ Test::subscribe(c2, topic_name_1, topic_name_2);
+
+ bool done = false;
+ bool unsubscribed = false;
+ while (!done) {
+ Test::poll_once(c1, 500);
+ Test::poll_once(c2, 500);
+
+ if (Test::assignment_partition_count(c1, NULL) == 2 &&
+ Test::assignment_partition_count(c2, NULL) == 2) {
+ if (rebalance_cb1.assign_call_cnt != 1)
+ Test::Fail(
+ tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: "
+ << rebalance_cb1.assign_call_cnt);
+ if (rebalance_cb2.assign_call_cnt != 1)
+ Test::Fail(
+ tostr() << "Expecting consumer 2's assign_call_cnt to be 1 not: "
+ << rebalance_cb2.assign_call_cnt);
+ Test::Say("Unsubscribing consumer 1 from both topics\n");
+ c1->unsubscribe();
+ unsubscribed = true;
+ }
+
+ if (unsubscribed && Test::assignment_partition_count(c1, NULL) == 0 &&
+ Test::assignment_partition_count(c2, NULL) == 4) {
+ if (rebalance_cb1.assign_call_cnt !=
+ 1) /* is now unsubscribed, so rebalance_cb will no longer be called.
+ */
+ Test::Fail(
+ tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: "
+ << rebalance_cb1.assign_call_cnt);
+ if (rebalance_cb2.assign_call_cnt != 2)
+ Test::Fail(
+ tostr() << "Expecting consumer 2's assign_call_cnt to be 2 not: "
+ << rebalance_cb2.assign_call_cnt);
+ if (rebalance_cb1.revoke_call_cnt != 1)
+ Test::Fail(
+ tostr() << "Expecting consumer 1's revoke_call_cnt to be 1 not: "
+ << rebalance_cb1.revoke_call_cnt);
+ if (rebalance_cb2.revoke_call_cnt !=
+ 0) /* the rebalance_cb should not be called if the revoked partition
+ list is empty */
+ Test::Fail(
+ tostr() << "Expecting consumer 2's revoke_call_cnt to be 0 not: "
+ << rebalance_cb2.revoke_call_cnt);
+ Test::Say("Unsubscribe completed");
+ done = true;
+ }
+ }
+
+ Test::Say("Closing consumer 1\n");
+ c1->close();
+ Test::Say("Closing consumer 2\n");
+ c2->close();
+
+ /* there should be no assign rebalance_cb calls on close */
+ if (rebalance_cb1.assign_call_cnt != 1)
+ Test::Fail(tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: "
+ << rebalance_cb1.assign_call_cnt);
+ if (rebalance_cb2.assign_call_cnt != 2)
+ Test::Fail(tostr() << "Expecting consumer 2's assign_call_cnt to be 2 not: "
+ << rebalance_cb2.assign_call_cnt);
+
+ if (rebalance_cb1.revoke_call_cnt !=
+ 1) /* should not be called a second revoke rebalance_cb */
+ Test::Fail(tostr() << "Expecting consumer 1's revoke_call_cnt to be 1 not: "
+ << rebalance_cb1.revoke_call_cnt);
+ if (rebalance_cb2.revoke_call_cnt != 1)
+ Test::Fail(tostr() << "Expecting consumer 2's revoke_call_cnt to be 1 not: "
+ << rebalance_cb2.revoke_call_cnt);
+
+ if (rebalance_cb1.lost_call_cnt != 0)
+ Test::Fail(tostr() << "Expecting consumer 1's lost_call_cnt to be 0, not: "
+ << rebalance_cb1.lost_call_cnt);
+ if (rebalance_cb2.lost_call_cnt != 0)
+ Test::Fail(tostr() << "Expecting consumer 2's lost_call_cnt to be 0, not: "
+ << rebalance_cb2.lost_call_cnt);
+
+ delete c1;
+ delete c2;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. A consumers (with no rebalance_cb) subscribes to a topic.
+ * 2. The consumer calls unsubscribe.
+ * 3. Consumers closed.
+ */
+
+static void m_unsubscribe_2() {
+ SUB_TEST();
+
+ std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+ test_create_topic(NULL, topic_name.c_str(), 2, 1);
+
+ RdKafka::KafkaConsumer *c =
+ make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15);
+ test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name);
+
+ bool done = false;
+ bool unsubscribed = false;
+ while (!done) {
+ Test::poll_once(c, 500);
+
+ if (Test::assignment_partition_count(c, NULL) == 2) {
+ Test::unsubscribe(c);
+ unsubscribed = true;
+ }
+
+ if (unsubscribed && Test::assignment_partition_count(c, NULL) == 0) {
+ Test::Say("Unsubscribe completed");
+ done = true;
+ }
+ }
+
+ Test::Say("Closing consumer\n");
+ c->close();
+
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. Two consumers (with rebalance_cb) subscribe to a regex (no matching
+ * topics exist)
+ * 2. Create two topics.
+ * 3. Remove one of the topics.
+ * 3. Consumers closed.
+ */
+
+static void n_wildcard() {
+ SUB_TEST();
+
+ const string topic_base_name = Test::mk_topic_name("0113-n_wildcard", 1);
+ const string topic_name_1 = topic_base_name + "_1";
+ const string topic_name_2 = topic_base_name + "_2";
+ const string topic_regex = "^" + topic_base_name + "_.";
+ const string group_name = Test::mk_unique_group_name("0113-n_wildcard");
+
+ std::vector<std::pair<std::string, std::string> > additional_conf;
+ additional_conf.push_back(std::pair<std::string, std::string>(
+ std::string("topic.metadata.refresh.interval.ms"), std::string("3000")));
+
+ DefaultRebalanceCb rebalance_cb1;
+ RdKafka::KafkaConsumer *c1 =
+ make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
+ &rebalance_cb1, 30);
+ Test::subscribe(c1, topic_regex);
+
+ DefaultRebalanceCb rebalance_cb2;
+ RdKafka::KafkaConsumer *c2 =
+ make_consumer("C_2", group_name, "cooperative-sticky", &additional_conf,
+ &rebalance_cb2, 30);
+ Test::subscribe(c2, topic_regex);
+
+ /* There are no matching topics, so the consumers should not join the group
+ * initially */
+ Test::poll_once(c1, 500);
+ Test::poll_once(c2, 500);
+
+ if (rebalance_cb1.assign_call_cnt != 0)
+ Test::Fail(tostr() << "Expecting consumer 1's assign_call_cnt to be 0 not: "
+ << rebalance_cb1.assign_call_cnt);
+ if (rebalance_cb2.assign_call_cnt != 0)
+ Test::Fail(tostr() << "Expecting consumer 2's assign_call_cnt to be 0 not: "
+ << rebalance_cb2.assign_call_cnt);
+
+ bool done = false;
+ bool created_topics = false;
+ bool deleted_topic = false;
+ int last_cb1_assign_call_cnt = 0;
+ int last_cb2_assign_call_cnt = 0;
+ while (!done) {
+ Test::poll_once(c1, 500);
+ Test::poll_once(c2, 500);
+
+ if (Test::assignment_partition_count(c1, NULL) == 0 &&
+ Test::assignment_partition_count(c2, NULL) == 0 && !created_topics) {
+ Test::Say(
+ "Creating two topics with 2 partitions each that match regex\n");
+ test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
+ test_create_topic(NULL, topic_name_2.c_str(), 2, 1);
+ /* The consumers should autonomously discover these topics and start
+ * consuming from them. This happens in the background - is not
+ * influenced by whether we wait for the topics to be created before
+ * continuing the main loop. It is possible that both topics are
+ * discovered simultaneously, requiring a single rebalance OR that
+ * topic 1 is discovered first (it was created first), a rebalance
+ * initiated, then topic 2 discovered, then another rebalance
+ * initiated to include it.
+ */
+ created_topics = true;
+ }
+
+ if (Test::assignment_partition_count(c1, NULL) == 2 &&
+ Test::assignment_partition_count(c2, NULL) == 2 && !deleted_topic) {
+ if (rebalance_cb1.nonempty_assign_call_cnt == 1) {
+ /* just one rebalance was required */
+ TEST_ASSERT(rebalance_cb1.nonempty_assign_call_cnt == 1,
+ "Expecting C_1's nonempty_assign_call_cnt to be 1 not %d ",
+ rebalance_cb1.nonempty_assign_call_cnt);
+ TEST_ASSERT(rebalance_cb2.nonempty_assign_call_cnt == 1,
+ "Expecting C_2's nonempty_assign_call_cnt to be 1 not %d ",
+ rebalance_cb2.nonempty_assign_call_cnt);
+ } else {
+ /* two rebalances were required (occurs infrequently) */
+ TEST_ASSERT(rebalance_cb1.nonempty_assign_call_cnt == 2,
+ "Expecting C_1's nonempty_assign_call_cnt to be 2 not %d ",
+ rebalance_cb1.nonempty_assign_call_cnt);
+ TEST_ASSERT(rebalance_cb2.nonempty_assign_call_cnt == 2,
+ "Expecting C_2's nonempty_assign_call_cnt to be 2 not %d ",
+ rebalance_cb2.nonempty_assign_call_cnt);
+ }
+
+ TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 0,
+ "Expecting C_1's revoke_call_cnt to be 0 not %d ",
+ rebalance_cb1.revoke_call_cnt);
+ TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 0,
+ "Expecting C_2's revoke_call_cnt to be 0 not %d ",
+ rebalance_cb2.revoke_call_cnt);
+
+ last_cb1_assign_call_cnt = rebalance_cb1.assign_call_cnt;
+ last_cb2_assign_call_cnt = rebalance_cb2.assign_call_cnt;
+
+ Test::Say("Deleting topic 1\n");
+ Test::delete_topic(c1, topic_name_1.c_str());
+ deleted_topic = true;
+ }
+
+ if (Test::assignment_partition_count(c1, NULL) == 1 &&
+ Test::assignment_partition_count(c2, NULL) == 1 && deleted_topic) {
+ /* accumulated in lost case as well */
+ TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 1,
+ "Expecting C_1's revoke_call_cnt to be 1 not %d",
+ rebalance_cb1.revoke_call_cnt);
+ TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 1,
+ "Expecting C_2's revoke_call_cnt to be 1 not %d",
+ rebalance_cb2.revoke_call_cnt);
+ TEST_ASSERT(rebalance_cb1.lost_call_cnt == 1,
+ "Expecting C_1's lost_call_cnt to be 1 not %d",
+ rebalance_cb1.lost_call_cnt);
+ TEST_ASSERT(rebalance_cb2.lost_call_cnt == 1,
+ "Expecting C_2's lost_call_cnt to be 1 not %d",
+ rebalance_cb2.lost_call_cnt);
+
+ /* Consumers will rejoin group after revoking the lost partitions.
+ * this will result in an rebalance_cb assign (empty partitions).
+ * it follows the revoke, which has already been confirmed to have
+ * happened. */
+ Test::Say("Waiting for rebalance_cb assigns\n");
+ while (rebalance_cb1.assign_call_cnt == last_cb1_assign_call_cnt ||
+ rebalance_cb2.assign_call_cnt == last_cb2_assign_call_cnt) {
+ Test::poll_once(c1, 500);
+ Test::poll_once(c2, 500);
+ }
+
+ Test::Say("Consumers are subscribed to one partition each\n");
+ done = true;
+ }
+ }
+
+ Test::Say("Closing consumer 1\n");
+ last_cb1_assign_call_cnt = rebalance_cb1.assign_call_cnt;
+ c1->close();
+
+ /* There should be no assign rebalance_cb calls on close */
+ TEST_ASSERT(rebalance_cb1.assign_call_cnt == last_cb1_assign_call_cnt,
+ "Expecting C_1's assign_call_cnt to be %d not %d",
+ last_cb1_assign_call_cnt, rebalance_cb1.assign_call_cnt);
+
+ /* Let C_2 catch up on the rebalance and get assigned C_1's partitions. */
+ last_cb2_assign_call_cnt = rebalance_cb2.nonempty_assign_call_cnt;
+ while (rebalance_cb2.nonempty_assign_call_cnt == last_cb2_assign_call_cnt)
+ Test::poll_once(c2, 500);
+
+ Test::Say("Closing consumer 2\n");
+ last_cb2_assign_call_cnt = rebalance_cb2.assign_call_cnt;
+ c2->close();
+
+ /* There should be no assign rebalance_cb calls on close */
+ TEST_ASSERT(rebalance_cb2.assign_call_cnt == last_cb2_assign_call_cnt,
+ "Expecting C_2's assign_call_cnt to be %d not %d",
+ last_cb2_assign_call_cnt, rebalance_cb2.assign_call_cnt);
+
+ TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 2,
+ "Expecting C_1's revoke_call_cnt to be 2 not %d",
+ rebalance_cb1.revoke_call_cnt);
+ TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 2,
+ "Expecting C_2's revoke_call_cnt to be 2 not %d",
+ rebalance_cb2.revoke_call_cnt);
+
+ TEST_ASSERT(rebalance_cb1.lost_call_cnt == 1,
+ "Expecting C_1's lost_call_cnt to be 1, not %d",
+ rebalance_cb1.lost_call_cnt);
+ TEST_ASSERT(rebalance_cb2.lost_call_cnt == 1,
+ "Expecting C_2's lost_call_cnt to be 1, not %d",
+ rebalance_cb2.lost_call_cnt);
+
+ delete c1;
+ delete c2;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * 1. Consumer (librdkafka) subscribes to two topics (2 and 6 partitions).
+ * 2. Consumer (java) subscribes to the same two topics.
+ * 3. Consumer (librdkafka) unsubscribes from the two partition topic.
+ * 4. Consumer (java) process closes upon detecting the above unsubscribe.
+ * 5. Consumer (librdkafka) will now be subscribed to 6 partitions.
+ * 6. Close librdkafka consumer.
+ */
+
+static void o_java_interop() {
+ SUB_TEST();
+
+ if (*test_conf_get(NULL, "sasl.mechanism") != '\0')
+ SUB_TEST_SKIP(
+ "Cluster is set up for SASL: we won't bother with that "
+ "for the Java client\n");
+
+ std::string topic_name_1 = Test::mk_topic_name("0113_o_2", 1);
+ std::string topic_name_2 = Test::mk_topic_name("0113_o_6", 1);
+ std::string group_name = Test::mk_unique_group_name("0113_o");
+ test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
+ test_create_topic(NULL, topic_name_2.c_str(), 6, 1);
+
+ DefaultRebalanceCb rebalance_cb;
+ RdKafka::KafkaConsumer *c = make_consumer(
+ "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb, 25);
+ test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+ test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
+
+ Test::subscribe(c, topic_name_1, topic_name_2);
+
+ bool done = false;
+ bool changed_subscription = false;
+ bool changed_subscription_done = false;
+ int java_pid = 0;
+ while (!done) {
+ Test::poll_once(c, 500);
+
+ if (1) // FIXME: Remove after debugging
+ Test::Say(tostr() << "Assignment partition count: "
+ << Test::assignment_partition_count(c, NULL)
+ << ", changed_sub " << changed_subscription
+ << ", changed_sub_done " << changed_subscription_done
+ << ", assign_call_cnt " << rebalance_cb.assign_call_cnt
+ << "\n");
+ if (Test::assignment_partition_count(c, NULL) == 8 && !java_pid) {
+ Test::Say(_C_GRN "librdkafka consumer assigned to 8 partitions\n");
+ string bootstrapServers = get_bootstrap_servers();
+ const char *argv[1 + 1 + 1 + 1 + 1 + 1];
+ size_t i = 0;
+ argv[i++] = "test1";
+ argv[i++] = bootstrapServers.c_str();
+ argv[i++] = topic_name_1.c_str();
+ argv[i++] = topic_name_2.c_str();
+ argv[i++] = group_name.c_str();
+ argv[i] = NULL;
+ java_pid = test_run_java("IncrementalRebalanceCli", argv);
+ if (java_pid <= 0)
+ Test::Fail(tostr() << "Unexpected pid: " << java_pid);
+ }
+
+ if (Test::assignment_partition_count(c, NULL) == 4 && java_pid != 0 &&
+ !changed_subscription) {
+ if (rebalance_cb.assign_call_cnt != 2)
+ Test::Fail(tostr() << "Expecting consumer's assign_call_cnt to be 2, "
+ "not "
+ << rebalance_cb.assign_call_cnt);
+ Test::Say(_C_GRN "Java consumer is now part of the group\n");
+ Test::subscribe(c, topic_name_1);
+ changed_subscription = true;
+ }
+
+ /* Depending on the timing of resubscribe rebalancing and the
+ * Java consumer terminating we might have one or two rebalances,
+ * hence the fuzzy <=5 and >=5 checks. */
+ if (Test::assignment_partition_count(c, NULL) == 2 &&
+ changed_subscription && rebalance_cb.assign_call_cnt <= 5 &&
+ !changed_subscription_done) {
+ /* All topic 1 partitions will be allocated to this consumer whether or
+ * not the Java consumer has unsubscribed yet because the sticky algorithm
+ * attempts to ensure partition counts are even. */
+ Test::Say(_C_GRN "Consumer 1 has unsubscribed from topic 2\n");
+ changed_subscription_done = true;
+ }
+
+ if (Test::assignment_partition_count(c, NULL) == 2 &&
+ changed_subscription && rebalance_cb.assign_call_cnt >= 5 &&
+ changed_subscription_done) {
+ /* When the java consumer closes, this will cause an empty assign
+ * rebalance_cb event, allowing detection of when this has happened. */
+ Test::Say(_C_GRN "Java consumer has left the group\n");
+ done = true;
+ }
+ }
+
+ Test::Say("Closing consumer\n");
+ c->close();
+
+ /* Expected behavior is IncrementalRebalanceCli will exit cleanly, timeout
+ * otherwise. */
+ test_waitpid(java_pid);
+
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * - Single consumer subscribes to topic.
+ * - Soon after (timing such that rebalance is probably in progress) it
+ * subscribes to a different topic.
+ */
+
+static void s_subscribe_when_rebalancing(int variation) {
+ SUB_TEST("variation %d", variation);
+
+ std::string topic_name_1 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ std::string topic_name_2 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ std::string topic_name_3 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+ test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
+ test_create_topic(NULL, topic_name_2.c_str(), 1, 1);
+ test_create_topic(NULL, topic_name_3.c_str(), 1, 1);
+
+ DefaultRebalanceCb rebalance_cb;
+ RdKafka::KafkaConsumer *c = make_consumer(
+ "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb, 25);
+ test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+ test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
+ test_wait_topic_exists(c->c_ptr(), topic_name_3.c_str(), 10 * 1000);
+
+ if (variation == 2 || variation == 4 || variation == 6) {
+ /* Pre-cache metadata for all topics. */
+ class RdKafka::Metadata *metadata;
+ c->metadata(true, NULL, &metadata, 5000);
+ delete metadata;
+ }
+
+ Test::subscribe(c, topic_name_1);
+ Test::wait_for_assignment(c, 1, &topic_name_1);
+
+ Test::subscribe(c, topic_name_2);
+
+ if (variation == 3 || variation == 5)
+ Test::poll_once(c, 500);
+
+ if (variation < 5) {
+ // Very quickly after subscribing to topic 2, subscribe to topic 3.
+ Test::subscribe(c, topic_name_3);
+ Test::wait_for_assignment(c, 1, &topic_name_3);
+ } else {
+ // ..or unsubscribe.
+ Test::unsubscribe(c);
+ Test::wait_for_assignment(c, 0, NULL);
+ }
+
+ delete c;
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check behavior when:
+ * - Two consumer subscribe to a topic.
+ * - Max poll interval is exceeded on the first consumer.
+ */
+
+static void t_max_poll_interval_exceeded(int variation) {
+ SUB_TEST("variation %d", variation);
+
+ std::string topic_name_1 =
+ Test::mk_topic_name("0113-cooperative_rebalance", 1);
+ std::string group_name =
+ Test::mk_unique_group_name("0113-cooperative_rebalance");
+ test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
+
+ std::vector<std::pair<std::string, std::string> > additional_conf;
+ additional_conf.push_back(std::pair<std::string, std::string>(
+ std::string("session.timeout.ms"), std::string("6000")));
+ additional_conf.push_back(std::pair<std::string, std::string>(
+ std::string("max.poll.interval.ms"), std::string("7000")));
+
+ DefaultRebalanceCb rebalance_cb1;
+ RdKafka::KafkaConsumer *c1 =
+ make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
+ &rebalance_cb1, 30);
+ DefaultRebalanceCb rebalance_cb2;
+ RdKafka::KafkaConsumer *c2 =
+ make_consumer("C_2", group_name, "cooperative-sticky", &additional_conf,
+ &rebalance_cb2, 30);
+
+ test_wait_topic_exists(c1->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+ test_wait_topic_exists(c2->c_ptr(), topic_name_1.c_str(), 10 * 1000);
+
+ Test::subscribe(c1, topic_name_1);
+ Test::subscribe(c2, topic_name_1);
+
+ bool done = false;
+ bool both_have_been_assigned = false;
+ while (!done) {
+ if (!both_have_been_assigned)
+ Test::poll_once(c1, 500);
+ Test::poll_once(c2, 500);
+
+ if (Test::assignment_partition_count(c1, NULL) == 1 &&
+ Test::assignment_partition_count(c2, NULL) == 1 &&
+ !both_have_been_assigned) {
+ Test::Say(
+ tostr()
+ << "Both consumers are assigned to topic " << topic_name_1
+ << ". WAITING 7 seconds for max.poll.interval.ms to be exceeded\n");
+ both_have_been_assigned = true;
+ }
+
+ if (Test::assignment_partition_count(c2, NULL) == 2 &&
+ both_have_been_assigned) {
+ Test::Say("Consumer 1 is no longer assigned any partitions, done\n");
+ done = true;
+ }
+ }
+
+ if (variation == 1) {
+ if (rebalance_cb1.lost_call_cnt != 0)
+ Test::Fail(
+ tostr() << "Expected consumer 1 lost revoke count to be 0, not: "
+ << rebalance_cb1.lost_call_cnt);
+ Test::poll_once(c1,
+ 500); /* Eat the max poll interval exceeded error message */
+ Test::poll_once(c1,
+ 500); /* Trigger the rebalance_cb with lost partitions */
+ if (rebalance_cb1.lost_call_cnt != 1)
+ Test::Fail(
+ tostr() << "Expected consumer 1 lost revoke count to be 1, not: "
+ << rebalance_cb1.lost_call_cnt);
+ }
+
+ c1->close();
+ c2->close();
+
+ if (rebalance_cb1.lost_call_cnt != 1)
+ Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be 1, not: "
+ << rebalance_cb1.lost_call_cnt);
+
+ if (rebalance_cb1.assign_call_cnt != 1)
+ Test::Fail(tostr() << "Expected consumer 1 assign count to be 1, not: "
+ << rebalance_cb1.assign_call_cnt);
+ if (rebalance_cb2.assign_call_cnt != 2)
+ Test::Fail(tostr() << "Expected consumer 1 assign count to be 2, not: "
+ << rebalance_cb1.assign_call_cnt);
+
+ if (rebalance_cb1.revoke_call_cnt != 1)
+ Test::Fail(tostr() << "Expected consumer 1 revoke count to be 1, not: "
+ << rebalance_cb1.revoke_call_cnt);
+ if (rebalance_cb2.revoke_call_cnt != 1)
+ Test::Fail(tostr() << "Expected consumer 2 revoke count to be 1, not: "
+ << rebalance_cb1.revoke_call_cnt);
+
+ delete c1;
+ delete c2;
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief Poll all consumers until there are no more events or messages
+ * and the timeout has expired.
+ */
+static void poll_all_consumers(RdKafka::KafkaConsumer **consumers,
+ DefaultRebalanceCb *rebalance_cbs,
+ size_t num,
+ int timeout_ms) {
+ int64_t ts_end = test_clock() + (timeout_ms * 1000);
+
+ /* Poll all consumers until no more events are seen,
+ * this makes sure we exhaust the current state events before returning. */
+ bool evented;
+ do {
+ evented = false;
+ for (size_t i = 0; i < num; i++) {
+ int block_ms = min(10, (int)((ts_end - test_clock()) / 1000));
+ while (rebalance_cbs[i].poll_once(consumers[i], max(block_ms, 0)))
+ evented = true;
+ }
+ } while (evented || test_clock() < ts_end);
+}
+
+
+/**
+ * @brief Stress test with 8 consumers subscribing, fetching and committing.
+ *
+ * @param subscription_variation 0..2
+ *
+ * TODO: incorporate committing offsets.
+ */
+
+static void u_multiple_subscription_changes(bool use_rebalance_cb,
+ int subscription_variation) {
+ const int N_CONSUMERS = 8;
+ const int N_TOPICS = 2;
+ const int N_PARTS_PER_TOPIC = N_CONSUMERS * N_TOPICS;
+ const int N_PARTITIONS = N_PARTS_PER_TOPIC * N_TOPICS;
+ const int N_MSGS_PER_PARTITION = 1000;
+
+ SUB_TEST("use_rebalance_cb: %d, subscription_variation: %d",
+ (int)use_rebalance_cb, subscription_variation);
+
+ string topic_name_1 = Test::mk_topic_name("0113u_1", 1);
+ string topic_name_2 = Test::mk_topic_name("0113u_2", 1);
+ string group_name = Test::mk_unique_group_name("0113u");
+
+ test_create_topic(NULL, topic_name_1.c_str(), N_PARTS_PER_TOPIC, 1);
+ test_create_topic(NULL, topic_name_2.c_str(), N_PARTS_PER_TOPIC, 1);
+
+ Test::Say("Creating consumers\n");
+ DefaultRebalanceCb rebalance_cbs[N_CONSUMERS];
+ RdKafka::KafkaConsumer *consumers[N_CONSUMERS];
+
+ for (int i = 0; i < N_CONSUMERS; i++) {
+ std::string name = tostr() << "C_" << i;
+ consumers[i] =
+ make_consumer(name.c_str(), group_name, "cooperative-sticky", NULL,
+ use_rebalance_cb ? &rebalance_cbs[i] : NULL, 120);
+ }
+
+ test_wait_topic_exists(consumers[0]->c_ptr(), topic_name_1.c_str(),
+ 10 * 1000);
+ test_wait_topic_exists(consumers[0]->c_ptr(), topic_name_2.c_str(),
+ 10 * 1000);
+
+
+ /*
+ * Seed all partitions with the same number of messages so we later can
+ * verify that consumption is working.
+ */
+ vector<pair<Toppar, int> > ptopics;
+ ptopics.push_back(pair<Toppar, int>(Toppar(topic_name_1, N_PARTS_PER_TOPIC),
+ N_MSGS_PER_PARTITION));
+ ptopics.push_back(pair<Toppar, int>(Toppar(topic_name_2, N_PARTS_PER_TOPIC),
+ N_MSGS_PER_PARTITION));
+ produce_msgs(ptopics);
+
+
+ /*
+ * Track what topics a consumer should be subscribed to and use this to
+ * verify both its subscription and assignment throughout the test.
+ */
+
+ /* consumer -> currently subscribed topics */
+ map<int, vector<string> > consumer_topics;
+
+ /* topic -> consumers subscribed to topic */
+ map<string, set<int> > topic_consumers;
+
+ /* The subscription alternatives that consumers
+ * alter between in the playbook. */
+ vector<string> SUBSCRIPTION_1;
+ vector<string> SUBSCRIPTION_2;
+
+ SUBSCRIPTION_1.push_back(topic_name_1);
+
+ switch (subscription_variation) {
+ case 0:
+ SUBSCRIPTION_2.push_back(topic_name_1);
+ SUBSCRIPTION_2.push_back(topic_name_2);
+ break;
+
+ case 1:
+ SUBSCRIPTION_2.push_back(topic_name_2);
+ break;
+
+ case 2:
+ /* No subscription */
+ break;
+ }
+
+ sort(SUBSCRIPTION_1.begin(), SUBSCRIPTION_1.end());
+ sort(SUBSCRIPTION_2.begin(), SUBSCRIPTION_2.end());
+
+
+ /*
+ * Define playbook
+ */
+ const struct {
+ int timestamp_ms;
+ int consumer;
+ const vector<string> *topics;
+ } playbook[] = {/* timestamp_ms, consumer_number, subscribe-to-topics */
+ {0, 0, &SUBSCRIPTION_1}, /* Cmd 0 */
+ {4000, 1, &SUBSCRIPTION_1}, {4000, 1, &SUBSCRIPTION_1},
+ {4000, 1, &SUBSCRIPTION_1}, {4000, 2, &SUBSCRIPTION_1},
+ {6000, 3, &SUBSCRIPTION_1}, /* Cmd 5 */
+ {6000, 4, &SUBSCRIPTION_1}, {6000, 5, &SUBSCRIPTION_1},
+ {6000, 6, &SUBSCRIPTION_1}, {6000, 7, &SUBSCRIPTION_2},
+ {6000, 1, &SUBSCRIPTION_1}, /* Cmd 10 */
+ {6000, 1, &SUBSCRIPTION_2}, {6000, 1, &SUBSCRIPTION_1},
+ {6000, 2, &SUBSCRIPTION_2}, {7000, 2, &SUBSCRIPTION_1},
+ {7000, 1, &SUBSCRIPTION_2}, /* Cmd 15 */
+ {8000, 0, &SUBSCRIPTION_2}, {8000, 1, &SUBSCRIPTION_1},
+ {8000, 0, &SUBSCRIPTION_1}, {13000, 2, &SUBSCRIPTION_1},
+ {13000, 1, &SUBSCRIPTION_2}, /* Cmd 20 */
+ {13000, 5, &SUBSCRIPTION_2}, {14000, 6, &SUBSCRIPTION_2},
+ {15000, 7, &SUBSCRIPTION_1}, {15000, 1, &SUBSCRIPTION_1},
+ {15000, 5, &SUBSCRIPTION_1}, /* Cmd 25 */
+ {15000, 6, &SUBSCRIPTION_1}, {INT_MAX, 0, 0}};
+
+ /*
+ * Run the playbook
+ */
+ int cmd_number = 0;
+ uint64_t ts_start = test_clock();
+
+ while (playbook[cmd_number].timestamp_ms != INT_MAX) {
+ TEST_ASSERT(playbook[cmd_number].consumer < N_CONSUMERS);
+
+ Test::Say(tostr() << "Cmd #" << cmd_number << ": wait "
+ << playbook[cmd_number].timestamp_ms << "ms\n");
+
+ poll_all_consumers(consumers, rebalance_cbs, N_CONSUMERS,
+ playbook[cmd_number].timestamp_ms -
+ (int)((test_clock() - ts_start) / 1000));
+
+ /* Verify consumer assignments match subscribed topics */
+ map<Toppar, RdKafka::KafkaConsumer *> all_assignments;
+ for (int i = 0; i < N_CONSUMERS; i++)
+ verify_consumer_assignment(
+ consumers[i], rebalance_cbs[i], consumer_topics[i],
+ /* Allow empty assignment */
+ true,
+ /* Allow mismatch between subscribed topics
+ * and actual assignment since we can't
+ * synchronize the last subscription
+ * to the current assignment due to
+ * an unknown number of rebalances required
+ * for the final assignment to settle.
+ * This is instead checked at the end of
+ * this test case. */
+ true, &all_assignments, -1 /* no msgcnt check*/);
+
+ int cid = playbook[cmd_number].consumer;
+ RdKafka::KafkaConsumer *consumer = consumers[playbook[cmd_number].consumer];
+ const vector<string> *topics = playbook[cmd_number].topics;
+
+ /*
+ * Update our view of the consumer's subscribed topics and vice versa.
+ */
+ for (vector<string>::const_iterator it = consumer_topics[cid].begin();
+ it != consumer_topics[cid].end(); it++) {
+ topic_consumers[*it].erase(cid);
+ }
+
+ consumer_topics[cid].clear();
+
+ for (vector<string>::const_iterator it = topics->begin();
+ it != topics->end(); it++) {
+ consumer_topics[cid].push_back(*it);
+ topic_consumers[*it].insert(cid);
+ }
+
+ RdKafka::ErrorCode err;
+
+ /*
+ * Change subscription
+ */
+ if (!topics->empty()) {
+ Test::Say(tostr() << "Consumer: " << consumer->name()
+ << " is subscribing to topics "
+ << string_vec_to_str(*topics) << " after "
+ << ((test_clock() - ts_start) / 1000) << "ms\n");
+ err = consumer->subscribe(*topics);
+ TEST_ASSERT(!err, "Expected subscribe() to succeed, got %s",
+ RdKafka::err2str(err).c_str());
+ } else {
+ Test::Say(tostr() << "Consumer: " << consumer->name()
+ << " is unsubscribing after "
+ << ((test_clock() - ts_start) / 1000) << "ms\n");
+ Test::unsubscribe(consumer);
+ }
+
+ /* Mark this consumer as waiting for rebalance so that
+ * verify_consumer_assignment() allows assigned partitions that
+ * (no longer) match the subscription. */
+ rebalance_cbs[cid].wait_rebalance = true;
+
+
+ /*
+ * Verify subscription matches what we think it should be.
+ */
+ vector<string> subscription;
+ err = consumer->subscription(subscription);
+ TEST_ASSERT(!err, "consumer %s subscription() failed: %s",
+ consumer->name().c_str(), RdKafka::err2str(err).c_str());
+
+ sort(subscription.begin(), subscription.end());
+
+ Test::Say(tostr() << "Consumer " << consumer->name()
+ << " subscription is now "
+ << string_vec_to_str(subscription) << "\n");
+
+ if (subscription != *topics)
+ Test::Fail(tostr() << "Expected consumer " << consumer->name()
+ << " subscription: " << string_vec_to_str(*topics)
+ << " but got: " << string_vec_to_str(subscription));
+
+ cmd_number++;
+ }
+
+
+ /*
+ * Wait for final rebalances and all consumers to settle,
+ * then verify assignments and received message counts.
+ */
+ Test::Say(_C_YEL "Waiting for final assignment state\n");
+ int done_count = 0;
+ /* Allow at least 20 seconds for group to stabilize. */
+ int64_t stabilize_until = test_clock() + (20 * 1000 * 1000); /* 20s */
+
+ while (done_count < 2) {
+ bool stabilized = test_clock() > stabilize_until;
+
+ poll_all_consumers(consumers, rebalance_cbs, N_CONSUMERS, 5000);
+
+ /* Verify consumer assignments */
+ int counts[N_CONSUMERS];
+ map<Toppar, RdKafka::KafkaConsumer *> all_assignments;
+ Test::Say(tostr() << "Consumer assignments "
+ << "(subscription_variation " << subscription_variation
+ << ")" << (stabilized ? " (stabilized)" : "")
+ << (use_rebalance_cb ? " (use_rebalance_cb)"
+ : " (no rebalance cb)")
+ << ":\n");
+ for (int i = 0; i < N_CONSUMERS; i++) {
+ bool last_rebalance_stabilized =
+ stabilized &&
+ (!use_rebalance_cb ||
+ /* session.timeout.ms * 2 + 1 */
+ test_clock() > rebalance_cbs[i].ts_last_assign + (13 * 1000 * 1000));
+
+ counts[i] = verify_consumer_assignment(
+ consumers[i], rebalance_cbs[i], consumer_topics[i],
+ /* allow empty */
+ true,
+ /* if we're waiting for a
+ * rebalance it is okay for the
+ * current assignment to contain
+ * topics that this consumer
+ * (no longer) subscribes to. */
+ !last_rebalance_stabilized || !use_rebalance_cb ||
+ rebalance_cbs[i].wait_rebalance,
+ /* do not allow assignments for
+ * topics that are not subscribed*/
+ &all_assignments,
+ /* Verify received message counts
+ * once the assignments have
+ * stabilized.
+ * Requires the rebalance cb.*/
+ done_count > 0 && use_rebalance_cb ? N_MSGS_PER_PARTITION : -1);
+ }
+
+ Test::Say(tostr() << all_assignments.size() << "/" << N_PARTITIONS
+ << " partitions assigned\n");
+
+ bool done = true;
+ for (int i = 0; i < N_CONSUMERS; i++) {
+ /* For each topic the consumer subscribes to it should
+ * be assigned its share of partitions. */
+ int exp_parts = 0;
+ for (vector<string>::const_iterator it = consumer_topics[i].begin();
+ it != consumer_topics[i].end(); it++)
+ exp_parts += N_PARTS_PER_TOPIC / (int)topic_consumers[*it].size();
+
+ Test::Say(tostr() << (counts[i] == exp_parts ? "" : _C_YEL) << "Consumer "
+ << consumers[i]->name() << " has " << counts[i]
+ << " assigned partitions (" << consumer_topics[i].size()
+ << " subscribed topic(s))"
+ << ", expecting " << exp_parts
+ << " assigned partitions\n");
+
+ if (counts[i] != exp_parts)
+ done = false;
+ }
+
+ if (done && stabilized) {
+ done_count++;
+ Test::Say(tostr() << "All assignments verified, done count is "
+ << done_count << "\n");
+ }
+ }
+
+ Test::Say("Disposing consumers\n");
+ for (int i = 0; i < N_CONSUMERS; i++) {
+ TEST_ASSERT(!use_rebalance_cb || !rebalance_cbs[i].wait_rebalance,
+ "Consumer %d still waiting for rebalance", i);
+ if (i & 1)
+ consumers[i]->close();
+ delete consumers[i];
+ }
+
+ SUB_TEST_PASS();
+}
+
+
+
+extern "C" {
+
+static int rebalance_cnt;
+static rd_kafka_resp_err_t rebalance_exp_event;
+static rd_bool_t rebalance_exp_lost;
+
+extern void test_print_partition_list(
+ const rd_kafka_topic_partition_list_t *partitions);
+
+
+static void rebalance_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *parts,
+ void *opaque) {
+ rebalance_cnt++;
+ TEST_SAY("Rebalance #%d: %s: %d partition(s)\n", rebalance_cnt,
+ rd_kafka_err2name(err), parts->cnt);
+
+ test_print_partition_list(parts);
+
+ TEST_ASSERT(err == rebalance_exp_event ||
+ rebalance_exp_event == RD_KAFKA_RESP_ERR_NO_ERROR,
+ "Expected rebalance event %s, not %s",
+ rd_kafka_err2name(rebalance_exp_event), rd_kafka_err2name(err));
+
+ if (rebalance_exp_lost) {
+ TEST_ASSERT(rd_kafka_assignment_lost(rk), "Expected partitions lost");
+ TEST_SAY("Partitions were lost\n");
+ }
+
+ if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
+ test_consumer_incremental_assign("assign", rk, parts);
+ } else {
+ test_consumer_incremental_unassign("unassign", rk, parts);
+ }
+}
+
+/**
+ * @brief Wait for an expected rebalance event, or fail.
+ */
+static void expect_rebalance0(const char *func,
+ int line,
+ const char *what,
+ rd_kafka_t *c,
+ rd_kafka_resp_err_t exp_event,
+ rd_bool_t exp_lost,
+ int timeout_s) {
+ int64_t tmout = test_clock() + (timeout_s * 1000000);
+ int start_cnt = rebalance_cnt;
+
+ TEST_SAY("%s:%d: Waiting for %s (%s) for %ds\n", func, line, what,
+ rd_kafka_err2name(exp_event), timeout_s);
+
+ rebalance_exp_lost = exp_lost;
+ rebalance_exp_event = exp_event;
+
+ while (tmout > test_clock() && rebalance_cnt == start_cnt) {
+ test_consumer_poll_once(c, NULL, 1000);
+ }
+
+ if (rebalance_cnt == start_cnt + 1) {
+ rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rebalance_exp_lost = exp_lost = rd_false;
+ return;
+ }
+
+ TEST_FAIL("%s:%d: Timed out waiting for %s (%s)", func, line, what,
+ rd_kafka_err2name(exp_event));
+}
+
+#define expect_rebalance(WHAT, C, EXP_EVENT, EXP_LOST, TIMEOUT_S) \
+ expect_rebalance0(__FUNCTION__, __LINE__, WHAT, C, EXP_EVENT, EXP_LOST, \
+ TIMEOUT_S)
+
+
+/* Check lost partitions revoke occurs on ILLEGAL_GENERATION heartbeat error.
+ */
+
+static void p_lost_partitions_heartbeat_illegal_generation_test() {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *groupid = "mygroup";
+ const char *topic = "test";
+ rd_kafka_t *c;
+ rd_kafka_conf_t *conf;
+
+ SUB_TEST_QUICK();
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10, "bootstrap.servers",
+ bootstraps, "batch.num.messages", "10",
+ "security.protocol", "plaintext", NULL);
+
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "security.protocol", "PLAINTEXT");
+ test_conf_set(conf, "group.id", groupid);
+ test_conf_set(conf, "session.timeout.ms", "5000");
+ test_conf_set(conf, "heartbeat.interval.ms", "1000");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "enable.auto.commit", "false");
+ test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
+
+ c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
+
+ test_consumer_subscribe(c, topic);
+
+ expect_rebalance("initial assignment", c,
+ RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ rd_false /*don't expect lost*/, 5 + 2);
+
+ /* Fail heartbeats */
+ rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_Heartbeat, 5,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION);
+
+ expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ rd_true /*expect lost*/, 10 + 2);
+
+ rd_kafka_mock_clear_request_errors(mcluster, RD_KAFKAP_Heartbeat);
+
+ expect_rebalance("rejoin after lost", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ rd_false /*don't expect lost*/, 10 + 2);
+
+ TEST_SAY("Closing consumer\n");
+ test_consumer_close(c);
+
+ TEST_SAY("Destroying consumer\n");
+ rd_kafka_destroy(c);
+
+ TEST_SAY("Destroying mock cluster\n");
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check lost partitions revoke occurs on ILLEGAL_GENERATION JoinGroup
+ * or SyncGroup error.
+ */
+
+static void q_lost_partitions_illegal_generation_test(
+ rd_bool_t test_joingroup_fail) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *groupid = "mygroup";
+ const char *topic1 = "test1";
+ const char *topic2 = "test2";
+ rd_kafka_t *c;
+ rd_kafka_conf_t *conf;
+ rd_kafka_resp_err_t err;
+ rd_kafka_topic_partition_list_t *topics;
+
+ SUB_TEST0(!test_joingroup_fail /*quick*/, "test_joingroup_fail=%d",
+ test_joingroup_fail);
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
+
+ /* Seed the topic1 with messages */
+ test_produce_msgs_easy_v(topic1, 0, 0, 0, 100, 10, "bootstrap.servers",
+ bootstraps, "batch.num.messages", "10",
+ "security.protocol", "plaintext", NULL);
+
+ /* Seed the topic2 with messages */
+ test_produce_msgs_easy_v(topic2, 0, 0, 0, 100, 10, "bootstrap.servers",
+ bootstraps, "batch.num.messages", "10",
+ "security.protocol", "plaintext", NULL);
+
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "security.protocol", "PLAINTEXT");
+ test_conf_set(conf, "group.id", groupid);
+ test_conf_set(conf, "session.timeout.ms", "5000");
+ test_conf_set(conf, "heartbeat.interval.ms", "1000");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "enable.auto.commit", "false");
+ test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
+
+ c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
+
+ test_consumer_subscribe(c, topic1);
+
+ expect_rebalance("initial assignment", c,
+ RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ rd_false /*don't expect lost*/, 5 + 2);
+
+ /* Fail JoinGroups or SyncGroups */
+ rd_kafka_mock_push_request_errors(
+ mcluster, test_joingroup_fail ? RD_KAFKAP_JoinGroup : RD_KAFKAP_SyncGroup,
+ 5, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION);
+
+ topics = rd_kafka_topic_partition_list_new(2);
+ rd_kafka_topic_partition_list_add(topics, topic1, RD_KAFKA_PARTITION_UA);
+ rd_kafka_topic_partition_list_add(topics, topic2, RD_KAFKA_PARTITION_UA);
+ err = rd_kafka_subscribe(c, topics);
+ if (err)
+ TEST_FAIL("%s: Failed to subscribe to topics: %s\n", rd_kafka_name(c),
+ rd_kafka_err2str(err));
+ rd_kafka_topic_partition_list_destroy(topics);
+
+ expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ rd_true /*expect lost*/, 10 + 2);
+
+ rd_kafka_mock_clear_request_errors(mcluster, test_joingroup_fail
+ ? RD_KAFKAP_JoinGroup
+ : RD_KAFKAP_SyncGroup);
+
+ expect_rebalance("rejoin group", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ rd_false /*expect lost*/, 10 + 2);
+
+ TEST_SAY("Closing consumer\n");
+ test_consumer_close(c);
+
+ TEST_SAY("Destroying consumer\n");
+ rd_kafka_destroy(c);
+
+ TEST_SAY("Destroying mock cluster\n");
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+
+
+/* Check lost partitions revoke occurs on ILLEGAL_GENERATION Commit
+ * error.
+ */
+
+static void r_lost_partitions_commit_illegal_generation_test_local() {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *groupid = "mygroup";
+ const char *topic = "test";
+ const int msgcnt = 100;
+ rd_kafka_t *c;
+ rd_kafka_conf_t *conf;
+
+ SUB_TEST();
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 10, "bootstrap.servers",
+ bootstraps, "batch.num.messages", "10",
+ "security.protocol", "plaintext", NULL);
+
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "security.protocol", "PLAINTEXT");
+ test_conf_set(conf, "group.id", groupid);
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "enable.auto.commit", "false");
+ test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
+
+ c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
+
+ test_consumer_subscribe(c, topic);
+
+ expect_rebalance("initial assignment", c,
+ RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ rd_false /*don't expect lost*/, 5 + 2);
+
+
+ /* Consume some messages so that the commit has something to commit. */
+ test_consumer_poll("consume", c, -1, -1, -1, msgcnt / 2, NULL);
+
+ /* Fail Commit */
+ rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_OffsetCommit, 5,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION);
+
+ rd_kafka_commit(c, NULL, rd_false);
+
+ expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ rd_true /*expect lost*/, 10 + 2);
+
+ expect_rebalance("rejoin group", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ rd_false /*expect lost*/, 20 + 2);
+
+ TEST_SAY("Closing consumer\n");
+ test_consumer_close(c);
+
+ TEST_SAY("Destroying consumer\n");
+ rd_kafka_destroy(c);
+
+ TEST_SAY("Destroying mock cluster\n");
+ test_mock_cluster_destroy(mcluster);
+}
+
+
+/**
+ * @brief Rebalance callback for the v_.. test below.
+ */
+static void v_rebalance_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *parts,
+ void *opaque) {
+ bool *auto_commitp = (bool *)opaque;
+
+ TEST_SAY("%s: %s: %d partition(s)%s\n", rd_kafka_name(rk),
+ rd_kafka_err2name(err), parts->cnt,
+ rd_kafka_assignment_lost(rk) ? " - assignment lost" : "");
+
+ test_print_partition_list(parts);
+
+ if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
+ test_consumer_incremental_assign("assign", rk, parts);
+ } else {
+ test_consumer_incremental_unassign("unassign", rk, parts);
+
+ if (!*auto_commitp) {
+ rd_kafka_resp_err_t commit_err;
+
+ TEST_SAY("Attempting manual commit after unassign, in 2 seconds..\n");
+ /* Sleep enough to have the generation-id bumped by rejoin. */
+ rd_sleep(2);
+ commit_err = rd_kafka_commit(rk, NULL, 0 /*sync*/);
+ TEST_ASSERT(!commit_err || commit_err == RD_KAFKA_RESP_ERR__NO_OFFSET ||
+ commit_err == RD_KAFKA_RESP_ERR__DESTROY,
+ "%s: manual commit failed: %s", rd_kafka_name(rk),
+ rd_kafka_err2str(commit_err));
+ }
+ }
+}
+
+/**
+ * @brief Commit callback for the v_.. test.
+ */
+static void v_commit_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *offsets,
+ void *opaque) {
+ TEST_SAY("%s offset commit for %d offsets: %s\n", rd_kafka_name(rk),
+ offsets ? offsets->cnt : -1, rd_kafka_err2name(err));
+ TEST_ASSERT(!err || err == RD_KAFKA_RESP_ERR__NO_OFFSET ||
+ err == RD_KAFKA_RESP_ERR__DESTROY /* consumer was closed */,
+ "%s offset commit failed: %s", rd_kafka_name(rk),
+ rd_kafka_err2str(err));
+}
+
+
+static void v_commit_during_rebalance(bool with_rebalance_cb,
+ bool auto_commit) {
+ rd_kafka_t *p, *c1, *c2;
+ rd_kafka_conf_t *conf;
+ const char *topic = test_mk_topic_name("0113_v", 1);
+ const int partition_cnt = 6;
+ const int msgcnt_per_partition = 100;
+ const int msgcnt = partition_cnt * msgcnt_per_partition;
+ uint64_t testid;
+ int i;
+
+
+ SUB_TEST("With%s rebalance callback and %s-commit",
+ with_rebalance_cb ? "" : "out", auto_commit ? "auto" : "manual");
+
+ test_conf_init(&conf, NULL, 30);
+ testid = test_id_generate();
+
+ /*
+ * Produce messages to topic
+ */
+ p = test_create_producer();
+
+ test_create_topic(p, topic, partition_cnt, 1);
+
+ for (i = 0; i < partition_cnt; i++) {
+ test_produce_msgs2(p, topic, testid, i, i * msgcnt_per_partition,
+ msgcnt_per_partition, NULL, 0);
+ }
+
+ test_flush(p, -1);
+
+ rd_kafka_destroy(p);
+
+
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "enable.auto.commit", auto_commit ? "true" : "false");
+ test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
+ rd_kafka_conf_set_offset_commit_cb(conf, v_commit_cb);
+ rd_kafka_conf_set_opaque(conf, (void *)&auto_commit);
+
+ TEST_SAY("Create and subscribe first consumer\n");
+ c1 = test_create_consumer(topic, with_rebalance_cb ? v_rebalance_cb : NULL,
+ rd_kafka_conf_dup(conf), NULL);
+ TEST_ASSERT(rd_kafka_opaque(c1) == (void *)&auto_commit,
+ "c1 opaque mismatch");
+ test_consumer_subscribe(c1, topic);
+
+ /* Consume some messages so that we know we have an assignment
+ * and something to commit. */
+ test_consumer_poll("C1.PRECONSUME", c1, testid, -1, 0,
+ msgcnt / partition_cnt / 2, NULL);
+
+ TEST_SAY("Create and subscribe second consumer\n");
+ c2 = test_create_consumer(topic, with_rebalance_cb ? v_rebalance_cb : NULL,
+ conf, NULL);
+ TEST_ASSERT(rd_kafka_opaque(c2) == (void *)&auto_commit,
+ "c2 opaque mismatch");
+ test_consumer_subscribe(c2, topic);
+
+ /* Poll both consumers */
+ for (i = 0; i < 10; i++) {
+ test_consumer_poll_once(c1, NULL, 1000);
+ test_consumer_poll_once(c2, NULL, 1000);
+ }
+
+ TEST_SAY("Closing consumers\n");
+ test_consumer_close(c1);
+ test_consumer_close(c2);
+
+ rd_kafka_destroy(c1);
+ rd_kafka_destroy(c2);
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief Verify that incremental rebalances retain stickyness.
+ */
+static void x_incremental_rebalances(void) {
+#define _NUM_CONS 3
+ rd_kafka_t *c[_NUM_CONS];
+ rd_kafka_conf_t *conf;
+ const char *topic = test_mk_topic_name("0113_x", 1);
+ int i;
+
+ SUB_TEST();
+ test_conf_init(&conf, NULL, 60);
+
+ test_create_topic(NULL, topic, 6, 1);
+
+ test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
+ for (i = 0; i < _NUM_CONS; i++) {
+ char clientid[32];
+ rd_snprintf(clientid, sizeof(clientid), "consumer%d", i);
+ test_conf_set(conf, "client.id", clientid);
+
+ c[i] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
+ }
+ rd_kafka_conf_destroy(conf);
+
+ /* First consumer joins group */
+ TEST_SAY("%s: joining\n", rd_kafka_name(c[0]));
+ test_consumer_subscribe(c[0], topic);
+ test_consumer_wait_assignment(c[0], rd_true /*poll*/);
+ test_consumer_verify_assignment(c[0], rd_true /*fail immediately*/, topic, 0,
+ topic, 1, topic, 2, topic, 3, topic, 4, topic,
+ 5, NULL);
+
+
+ /* Second consumer joins group */
+ TEST_SAY("%s: joining\n", rd_kafka_name(c[1]));
+ test_consumer_subscribe(c[1], topic);
+ test_consumer_wait_assignment(c[1], rd_true /*poll*/);
+ rd_sleep(3);
+ test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 3,
+ topic, 4, topic, 5, NULL);
+ test_consumer_verify_assignment(c[1], rd_false /*fail later*/, topic, 0,
+ topic, 1, topic, 2, NULL);
+
+ /* Third consumer joins group */
+ TEST_SAY("%s: joining\n", rd_kafka_name(c[2]));
+ test_consumer_subscribe(c[2], topic);
+ test_consumer_wait_assignment(c[2], rd_true /*poll*/);
+ rd_sleep(3);
+ test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 4,
+ topic, 5, NULL);
+ test_consumer_verify_assignment(c[1], rd_false /*fail later*/, topic, 1,
+ topic, 2, NULL);
+ test_consumer_verify_assignment(c[2], rd_false /*fail later*/, topic, 3,
+ topic, 0, NULL);
+
+ /* Raise any previously failed verify_assignment calls and fail the test */
+ TEST_LATER_CHECK();
+
+ for (i = 0; i < _NUM_CONS; i++)
+ rd_kafka_destroy(c[i]);
+
+ SUB_TEST_PASS();
+
+#undef _NUM_CONS
+}
+
+/* Local tests not needing a cluster */
+int main_0113_cooperative_rebalance_local(int argc, char **argv) {
+ a_assign_rapid();
+ p_lost_partitions_heartbeat_illegal_generation_test();
+ q_lost_partitions_illegal_generation_test(rd_false /*joingroup*/);
+ q_lost_partitions_illegal_generation_test(rd_true /*syncgroup*/);
+ r_lost_partitions_commit_illegal_generation_test_local();
+ return 0;
+}
+
+int main_0113_cooperative_rebalance(int argc, char **argv) {
+ int i;
+
+ a_assign_tests();
+ b_subscribe_with_cb_test(true /*close consumer*/);
+ b_subscribe_with_cb_test(false /*don't close consumer*/);
+ c_subscribe_no_cb_test(true /*close consumer*/);
+
+ if (test_quick) {
+ Test::Say("Skipping tests >= c_ .. due to quick mode\n");
+ return 0;
+ }
+
+ c_subscribe_no_cb_test(false /*don't close consumer*/);
+ d_change_subscription_add_topic(true /*close consumer*/);
+ d_change_subscription_add_topic(false /*don't close consumer*/);
+ e_change_subscription_remove_topic(true /*close consumer*/);
+ e_change_subscription_remove_topic(false /*don't close consumer*/);
+ f_assign_call_cooperative();
+ g_incremental_assign_call_eager();
+ h_delete_topic();
+ i_delete_topic_2();
+ j_delete_topic_no_rb_callback();
+ k_add_partition();
+ l_unsubscribe();
+ m_unsubscribe_2();
+ n_wildcard();
+ o_java_interop();
+ for (i = 1; i <= 6; i++) /* iterate over 6 different test variations */
+ s_subscribe_when_rebalancing(i);
+ for (i = 1; i <= 2; i++)
+ t_max_poll_interval_exceeded(i);
+ /* Run all 2*3 variations of the u_.. test */
+ for (i = 0; i < 3; i++) {
+ u_multiple_subscription_changes(true /*with rebalance_cb*/, i);
+ u_multiple_subscription_changes(false /*without rebalance_cb*/, i);
+ }
+ v_commit_during_rebalance(true /*with rebalance callback*/,
+ true /*auto commit*/);
+ v_commit_during_rebalance(false /*without rebalance callback*/,
+ true /*auto commit*/);
+ v_commit_during_rebalance(true /*with rebalance callback*/,
+ false /*manual commit*/);
+ x_incremental_rebalances();
+
+ return 0;
+}
+}