diff options
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp')
-rw-r--r-- | src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp | 3170 |
1 files changed, 3170 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp new file mode 100644 index 000000000..430798d7f --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp @@ -0,0 +1,3170 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2020, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +extern "C" { +#include "../src/rdkafka_protocol.h" +#include "test.h" +} +#include <iostream> +#include <map> +#include <set> +#include <algorithm> +#include <cstring> +#include <cstdlib> +#include <assert.h> +#include "testcpp.h" +#include <fstream> + +using namespace std; + +/** Topic+Partition helper class */ +class Toppar { + public: + Toppar(const string &topic, int32_t partition) : + topic(topic), partition(partition) { + } + + Toppar(const RdKafka::TopicPartition *tp) : + topic(tp->topic()), partition(tp->partition()) { + } + + friend bool operator==(const Toppar &a, const Toppar &b) { + return a.partition == b.partition && a.topic == b.topic; + } + + friend bool operator<(const Toppar &a, const Toppar &b) { + if (a.partition < b.partition) + return true; + return a.topic < b.topic; + } + + string str() const { + return tostr() << topic << "[" << partition << "]"; + } + + std::string topic; + int32_t partition; +}; + + + +static std::string get_bootstrap_servers() { + RdKafka::Conf *conf; + std::string bootstrap_servers; + Test::conf_init(&conf, NULL, 0); + conf->get("bootstrap.servers", bootstrap_servers); + delete conf; + return bootstrap_servers; +} + + +class DrCb : public RdKafka::DeliveryReportCb { + public: + void dr_cb(RdKafka::Message &msg) { + if (msg.err()) + Test::Fail("Delivery failed: " + RdKafka::err2str(msg.err())); + } +}; + + +/** + * @brief Produce messages to partitions. + * + * The pair is Toppar,msg_cnt_per_partition. + * The Toppar is topic,partition_cnt. + */ +static void produce_msgs(vector<pair<Toppar, int> > partitions) { + RdKafka::Conf *conf; + Test::conf_init(&conf, NULL, 0); + + string errstr; + DrCb dr; + conf->set("dr_cb", &dr, errstr); + RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); + if (!p) + Test::Fail("Failed to create producer: " + errstr); + delete conf; + + for (vector<pair<Toppar, int> >::iterator it = partitions.begin(); + it != partitions.end(); it++) { + for (int part = 0; part < it->first.partition; part++) { + for (int i = 0; i < it->second; i++) { + RdKafka::ErrorCode err = + p->produce(it->first.topic, part, RdKafka::Producer::RK_MSG_COPY, + (void *)"Hello there", 11, NULL, 0, 0, NULL); + TEST_ASSERT(!err, "produce(%s, %d) failed: %s", it->first.topic.c_str(), + part, RdKafka::err2str(err).c_str()); + + p->poll(0); + } + } + } + + p->flush(10000); + + delete p; +} + + + +static RdKafka::KafkaConsumer *make_consumer( + string client_id, + string group_id, + string assignment_strategy, + vector<pair<string, string> > *additional_conf, + RdKafka::RebalanceCb *rebalance_cb, + int timeout_s) { + std::string bootstraps; + std::string errstr; + std::vector<std::pair<std::string, std::string> >::iterator itr; + + RdKafka::Conf *conf; + Test::conf_init(&conf, NULL, timeout_s); + Test::conf_set(conf, "client.id", client_id); + Test::conf_set(conf, "group.id", group_id); + Test::conf_set(conf, "auto.offset.reset", "earliest"); + Test::conf_set(conf, "enable.auto.commit", "false"); + Test::conf_set(conf, "partition.assignment.strategy", assignment_strategy); + if (additional_conf != NULL) { + for (itr = (*additional_conf).begin(); itr != (*additional_conf).end(); + itr++) + Test::conf_set(conf, itr->first, itr->second); + } + + if (rebalance_cb) { + if (conf->set("rebalance_cb", rebalance_cb, errstr)) + Test::Fail("Failed to set rebalance_cb: " + errstr); + } + RdKafka::KafkaConsumer *consumer = + RdKafka::KafkaConsumer::create(conf, errstr); + if (!consumer) + Test::Fail("Failed to create KafkaConsumer: " + errstr); + delete conf; + + return consumer; +} + +/** + * @returns a CSV string of the vector + */ +static string string_vec_to_str(const vector<string> &v) { + ostringstream ss; + for (vector<string>::const_iterator it = v.begin(); it != v.end(); it++) + ss << (it == v.begin() ? "" : ", ") << *it; + return ss.str(); +} + +void expect_assignment(RdKafka::KafkaConsumer *consumer, size_t count) { + std::vector<RdKafka::TopicPartition *> partitions; + RdKafka::ErrorCode err; + err = consumer->assignment(partitions); + if (err) + Test::Fail(consumer->name() + + " assignment() failed: " + RdKafka::err2str(err)); + if (partitions.size() != count) + Test::Fail(tostr() << "Expecting consumer " << consumer->name() + << " to have " << count + << " assigned partition(s), not: " << partitions.size()); + RdKafka::TopicPartition::destroy(partitions); +} + + +static bool TopicPartition_cmp(const RdKafka::TopicPartition *a, + const RdKafka::TopicPartition *b) { + if (a->topic() < b->topic()) + return true; + else if (a->topic() > b->topic()) + return false; + return a->partition() < b->partition(); +} + + +void expect_assignment(RdKafka::KafkaConsumer *consumer, + vector<RdKafka::TopicPartition *> &expected) { + vector<RdKafka::TopicPartition *> partitions; + RdKafka::ErrorCode err; + err = consumer->assignment(partitions); + if (err) + Test::Fail(consumer->name() + + " assignment() failed: " + RdKafka::err2str(err)); + + if (partitions.size() != expected.size()) + Test::Fail(tostr() << "Expecting consumer " << consumer->name() + << " to have " << expected.size() + << " assigned partition(s), not " << partitions.size()); + + sort(partitions.begin(), partitions.end(), TopicPartition_cmp); + sort(expected.begin(), expected.end(), TopicPartition_cmp); + + int fails = 0; + for (int i = 0; i < (int)partitions.size(); i++) { + if (!TopicPartition_cmp(partitions[i], expected[i])) + continue; + + Test::Say(tostr() << _C_RED << consumer->name() << ": expected assignment #" + << i << " " << expected[i]->topic() << " [" + << expected[i]->partition() << "], not " + << partitions[i]->topic() << " [" + << partitions[i]->partition() << "]\n"); + fails++; + } + + if (fails) + Test::Fail(consumer->name() + ": Expected assignment mismatch, see above"); + + RdKafka::TopicPartition::destroy(partitions); +} + + +class DefaultRebalanceCb : public RdKafka::RebalanceCb { + private: + static string part_list_print( + const vector<RdKafka::TopicPartition *> &partitions) { + ostringstream ss; + for (unsigned int i = 0; i < partitions.size(); i++) + ss << (i == 0 ? "" : ", ") << partitions[i]->topic() << " [" + << partitions[i]->partition() << "]"; + return ss.str(); + } + + public: + int assign_call_cnt; + int revoke_call_cnt; + int nonempty_assign_call_cnt; /**< ASSIGN_PARTITIONS with partitions */ + int lost_call_cnt; + int partitions_assigned_net; + bool wait_rebalance; + int64_t ts_last_assign; /**< Timestamp of last rebalance assignment */ + map<Toppar, int> msg_cnt; /**< Number of consumed messages per partition. */ + + ~DefaultRebalanceCb() { + reset_msg_cnt(); + } + + DefaultRebalanceCb() : + assign_call_cnt(0), + revoke_call_cnt(0), + nonempty_assign_call_cnt(0), + lost_call_cnt(0), + partitions_assigned_net(0), + wait_rebalance(false), + ts_last_assign(0) { + } + + + void rebalance_cb(RdKafka::KafkaConsumer *consumer, + RdKafka::ErrorCode err, + std::vector<RdKafka::TopicPartition *> &partitions) { + wait_rebalance = false; + + std::string protocol = consumer->rebalance_protocol(); + + TEST_ASSERT(protocol == "COOPERATIVE", + "%s: Expected rebalance_protocol \"COOPERATIVE\", not %s", + consumer->name().c_str(), protocol.c_str()); + + const char *lost_str = consumer->assignment_lost() ? " (LOST)" : ""; + Test::Say(tostr() << _C_YEL "RebalanceCb " << protocol << ": " + << consumer->name() << " " << RdKafka::err2str(err) + << lost_str << ": " << part_list_print(partitions) + << "\n"); + + if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { + if (consumer->assignment_lost()) + Test::Fail("unexpected lost assignment during ASSIGN rebalance"); + RdKafka::Error *error = consumer->incremental_assign(partitions); + if (error) + Test::Fail(tostr() << "consumer->incremental_assign() failed: " + << error->str()); + if (partitions.size() > 0) + nonempty_assign_call_cnt++; + assign_call_cnt += 1; + partitions_assigned_net += (int)partitions.size(); + ts_last_assign = test_clock(); + + } else { + if (consumer->assignment_lost()) + lost_call_cnt += 1; + RdKafka::Error *error = consumer->incremental_unassign(partitions); + if (error) + Test::Fail(tostr() << "consumer->incremental_unassign() failed: " + << error->str()); + if (partitions.size() == 0) + Test::Fail("revoked partitions size should never be 0"); + revoke_call_cnt += 1; + partitions_assigned_net -= (int)partitions.size(); + } + + /* Reset message counters for the given partitions. */ + Test::Say(consumer->name() + ": resetting message counters:\n"); + reset_msg_cnt(partitions); + } + + bool poll_once(RdKafka::KafkaConsumer *c, int timeout_ms) { + RdKafka::Message *msg = c->consume(timeout_ms); + bool ret = msg->err() != RdKafka::ERR__TIMED_OUT; + if (!msg->err()) + msg_cnt[Toppar(msg->topic_name(), msg->partition())]++; + delete msg; + return ret; + } + + void reset_msg_cnt() { + msg_cnt.clear(); + } + + void reset_msg_cnt(Toppar &tp) { + int msgcnt = get_msg_cnt(tp); + Test::Say(tostr() << " RESET " << tp.topic << " [" << tp.partition << "]" + << " with " << msgcnt << " messages\n"); + if (!msg_cnt.erase(tp) && msgcnt) + Test::Fail("erase failed!"); + } + + void reset_msg_cnt(const vector<RdKafka::TopicPartition *> &partitions) { + for (unsigned int i = 0; i < partitions.size(); i++) { + Toppar tp(partitions[i]->topic(), partitions[i]->partition()); + reset_msg_cnt(tp); + } + } + + int get_msg_cnt(const Toppar &tp) { + map<Toppar, int>::iterator it = msg_cnt.find(tp); + if (it == msg_cnt.end()) + return 0; + return it->second; + } +}; + + + +/** + * @brief Verify that the consumer's assignment is a subset of the + * subscribed topics. + * + * @param allow_mismatch Allow assignment of not subscribed topics. + * This can happen when the subscription is updated + * but a rebalance callback hasn't been seen yet. + * @param all_assignments Accumulated assignments for all consumers. + * If an assigned partition already exists it means + * the partition is assigned to multiple consumers and + * the test will fail. + * @param exp_msg_cnt Expected message count per assigned partition, or -1 + * if not to check. + * + * @returns the number of assigned partitions, or fails if the + * assignment is empty or there is an assignment for + * topic that is not subscribed. + */ +static int verify_consumer_assignment( + RdKafka::KafkaConsumer *consumer, + DefaultRebalanceCb &rebalance_cb, + const vector<string> &topics, + bool allow_empty, + bool allow_mismatch, + map<Toppar, RdKafka::KafkaConsumer *> *all_assignments, + int exp_msg_cnt) { + vector<RdKafka::TopicPartition *> partitions; + RdKafka::ErrorCode err; + int fails = 0; + int count; + ostringstream ss; + + err = consumer->assignment(partitions); + TEST_ASSERT(!err, "Failed to get assignment for consumer %s: %s", + consumer->name().c_str(), RdKafka::err2str(err).c_str()); + + count = (int)partitions.size(); + + for (vector<RdKafka::TopicPartition *>::iterator it = partitions.begin(); + it != partitions.end(); it++) { + RdKafka::TopicPartition *p = *it; + + if (find(topics.begin(), topics.end(), p->topic()) == topics.end()) { + Test::Say(tostr() << (allow_mismatch ? _C_YEL "Warning (allowed)" + : _C_RED "Error") + << ": " << consumer->name() << " is assigned " + << p->topic() << " [" << p->partition() << "] which is " + << "not in the list of subscribed topics: " + << string_vec_to_str(topics) << "\n"); + if (!allow_mismatch) + fails++; + } + + Toppar tp(p); + pair<map<Toppar, RdKafka::KafkaConsumer *>::iterator, bool> ret; + ret = all_assignments->insert( + pair<Toppar, RdKafka::KafkaConsumer *>(tp, consumer)); + if (!ret.second) { + Test::Say(tostr() << _C_RED << "Error: " << consumer->name() + << " is assigned " << p->topic() << " [" + << p->partition() + << "] which is " + "already assigned to consumer " + << ret.first->second->name() << "\n"); + fails++; + } + + + int msg_cnt = rebalance_cb.get_msg_cnt(tp); + + if (exp_msg_cnt != -1 && msg_cnt != exp_msg_cnt) { + Test::Say(tostr() << _C_RED << "Error: " << consumer->name() + << " expected " << exp_msg_cnt << " messages on " + << p->topic() << " [" << p->partition() << "], not " + << msg_cnt << "\n"); + fails++; + } + + ss << (it == partitions.begin() ? "" : ", ") << p->topic() << " [" + << p->partition() << "] (" << msg_cnt << "msgs)"; + } + + RdKafka::TopicPartition::destroy(partitions); + + Test::Say(tostr() << "Consumer " << consumer->name() << " assignment (" + << count << "): " << ss.str() << "\n"); + + if (count == 0 && !allow_empty) + Test::Fail("Consumer " + consumer->name() + + " has unexpected empty assignment"); + + if (fails) + Test::Fail( + tostr() << "Consumer " + consumer->name() + << " assignment verification failed (see previous error)"); + + return count; +} + + + +/* -------- a_assign_tests + * + * check behavior incremental assign / unassign outside the context of a + * rebalance. + */ + + +/** Incremental assign, then assign(NULL). + */ +static void assign_test_1(RdKafka::KafkaConsumer *consumer, + std::vector<RdKafka::TopicPartition *> toppars1, + std::vector<RdKafka::TopicPartition *> toppars2) { + RdKafka::ErrorCode err; + RdKafka::Error *error; + + Test::Say("Incremental assign, then assign(NULL)\n"); + + if ((error = consumer->incremental_assign(toppars1))) + Test::Fail(tostr() << "Incremental assign failed: " << error->str()); + Test::check_assignment(consumer, 1, &toppars1[0]->topic()); + + if ((err = consumer->unassign())) + Test::Fail("Unassign failed: " + RdKafka::err2str(err)); + Test::check_assignment(consumer, 0, NULL); +} + + +/** Assign, then incremental unassign. + */ +static void assign_test_2(RdKafka::KafkaConsumer *consumer, + std::vector<RdKafka::TopicPartition *> toppars1, + std::vector<RdKafka::TopicPartition *> toppars2) { + RdKafka::ErrorCode err; + RdKafka::Error *error; + + Test::Say("Assign, then incremental unassign\n"); + + if ((err = consumer->assign(toppars1))) + Test::Fail("Assign failed: " + RdKafka::err2str(err)); + Test::check_assignment(consumer, 1, &toppars1[0]->topic()); + + if ((error = consumer->incremental_unassign(toppars1))) + Test::Fail("Incremental unassign failed: " + error->str()); + Test::check_assignment(consumer, 0, NULL); +} + + +/** Incremental assign, then incremental unassign. + */ +static void assign_test_3(RdKafka::KafkaConsumer *consumer, + std::vector<RdKafka::TopicPartition *> toppars1, + std::vector<RdKafka::TopicPartition *> toppars2) { + RdKafka::Error *error; + + Test::Say("Incremental assign, then incremental unassign\n"); + + if ((error = consumer->incremental_assign(toppars1))) + Test::Fail("Incremental assign failed: " + error->str()); + Test::check_assignment(consumer, 1, &toppars1[0]->topic()); + + if ((error = consumer->incremental_unassign(toppars1))) + Test::Fail("Incremental unassign failed: " + error->str()); + Test::check_assignment(consumer, 0, NULL); +} + + +/** Multi-topic incremental assign and unassign + message consumption. + */ +static void assign_test_4(RdKafka::KafkaConsumer *consumer, + std::vector<RdKafka::TopicPartition *> toppars1, + std::vector<RdKafka::TopicPartition *> toppars2) { + RdKafka::Error *error; + + Test::Say( + "Multi-topic incremental assign and unassign + message consumption\n"); + + if ((error = consumer->incremental_assign(toppars1))) + Test::Fail("Incremental assign failed: " + error->str()); + Test::check_assignment(consumer, 1, &toppars1[0]->topic()); + + RdKafka::Message *m = consumer->consume(5000); + if (m->err() != RdKafka::ERR_NO_ERROR) + Test::Fail("Expecting a consumed message."); + if (m->len() != 100) + Test::Fail(tostr() << "Expecting msg len to be 100, not: " + << m->len()); /* implies read from topic 1. */ + delete m; + + if ((error = consumer->incremental_unassign(toppars1))) + Test::Fail("Incremental unassign failed: " + error->str()); + Test::check_assignment(consumer, 0, NULL); + + m = consumer->consume(100); + if (m->err() != RdKafka::ERR__TIMED_OUT) + Test::Fail("Not expecting a consumed message."); + delete m; + + if ((error = consumer->incremental_assign(toppars2))) + Test::Fail("Incremental assign failed: " + error->str()); + Test::check_assignment(consumer, 1, &toppars2[0]->topic()); + + m = consumer->consume(5000); + if (m->err() != RdKafka::ERR_NO_ERROR) + Test::Fail("Expecting a consumed message."); + if (m->len() != 200) + Test::Fail(tostr() << "Expecting msg len to be 200, not: " + << m->len()); /* implies read from topic 2. */ + delete m; + + if ((error = consumer->incremental_assign(toppars1))) + Test::Fail("Incremental assign failed: " + error->str()); + if (Test::assignment_partition_count(consumer, NULL) != 2) + Test::Fail(tostr() << "Expecting current assignment to have size 2, not: " + << Test::assignment_partition_count(consumer, NULL)); + + m = consumer->consume(5000); + if (m->err() != RdKafka::ERR_NO_ERROR) + Test::Fail("Expecting a consumed message."); + delete m; + + if ((error = consumer->incremental_unassign(toppars2))) + Test::Fail("Incremental unassign failed: " + error->str()); + if ((error = consumer->incremental_unassign(toppars1))) + Test::Fail("Incremental unassign failed: " + error->str()); + Test::check_assignment(consumer, 0, NULL); +} + + +/** Incremental assign and unassign of empty collection. + */ +static void assign_test_5(RdKafka::KafkaConsumer *consumer, + std::vector<RdKafka::TopicPartition *> toppars1, + std::vector<RdKafka::TopicPartition *> toppars2) { + RdKafka::Error *error; + std::vector<RdKafka::TopicPartition *> toppars3; + + Test::Say("Incremental assign and unassign of empty collection\n"); + + if ((error = consumer->incremental_assign(toppars3))) + Test::Fail("Incremental assign failed: " + error->str()); + Test::check_assignment(consumer, 0, NULL); + + if ((error = consumer->incremental_unassign(toppars3))) + Test::Fail("Incremental unassign failed: " + error->str()); + Test::check_assignment(consumer, 0, NULL); +} + + + +static void run_test( + const std::string &t1, + const std::string &t2, + void (*test)(RdKafka::KafkaConsumer *consumer, + std::vector<RdKafka::TopicPartition *> toppars1, + std::vector<RdKafka::TopicPartition *> toppars2)) { + std::vector<RdKafka::TopicPartition *> toppars1; + toppars1.push_back(RdKafka::TopicPartition::create(t1, 0)); + std::vector<RdKafka::TopicPartition *> toppars2; + toppars2.push_back(RdKafka::TopicPartition::create(t2, 0)); + + RdKafka::KafkaConsumer *consumer = + make_consumer("C_1", t1, "cooperative-sticky", NULL, NULL, 10); + + test(consumer, toppars1, toppars2); + + RdKafka::TopicPartition::destroy(toppars1); + RdKafka::TopicPartition::destroy(toppars2); + + consumer->close(); + delete consumer; +} + + +static void a_assign_tests() { + SUB_TEST_QUICK(); + + int msgcnt = 1000; + const int msgsize1 = 100; + const int msgsize2 = 200; + + std::string topic1_str = Test::mk_topic_name("0113-a1", 1); + test_create_topic(NULL, topic1_str.c_str(), 1, 1); + std::string topic2_str = Test::mk_topic_name("0113-a2", 1); + test_create_topic(NULL, topic2_str.c_str(), 1, 1); + + test_produce_msgs_easy_size(topic1_str.c_str(), 0, 0, msgcnt, msgsize1); + test_produce_msgs_easy_size(topic2_str.c_str(), 0, 0, msgcnt, msgsize2); + + run_test(topic1_str, topic2_str, assign_test_1); + run_test(topic1_str, topic2_str, assign_test_2); + run_test(topic1_str, topic2_str, assign_test_3); + run_test(topic1_str, topic2_str, assign_test_4); + run_test(topic1_str, topic2_str, assign_test_5); + + SUB_TEST_PASS(); +} + + + +/** + * @brief Quick Assign 1,2, Assign 2,3, Assign 1,2,3 test to verify + * that the correct OffsetFetch response is used. + * See note in rdkafka_assignment.c for details. + * + * Makes use of the mock cluster to induce latency. + */ +static void a_assign_rapid() { + SUB_TEST_QUICK(); + + std::string group_id = __FUNCTION__; + + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + + mcluster = test_mock_cluster_new(3, &bootstraps); + int32_t coord_id = 1; + rd_kafka_mock_coordinator_set(mcluster, "group", group_id.c_str(), coord_id); + + rd_kafka_mock_topic_create(mcluster, "topic1", 1, 1); + rd_kafka_mock_topic_create(mcluster, "topic2", 1, 1); + rd_kafka_mock_topic_create(mcluster, "topic3", 1, 1); + + /* + * Produce messages to topics + */ + const int msgs_per_partition = 1000; + + RdKafka::Conf *pconf; + Test::conf_init(&pconf, NULL, 10); + Test::conf_set(pconf, "bootstrap.servers", bootstraps); + Test::conf_set(pconf, "security.protocol", "plaintext"); + std::string errstr; + RdKafka::Producer *p = RdKafka::Producer::create(pconf, errstr); + if (!p) + Test::Fail(tostr() << __FUNCTION__ + << ": Failed to create producer: " << errstr); + delete pconf; + + Test::produce_msgs(p, "topic1", 0, msgs_per_partition, 10, + false /*no flush*/); + Test::produce_msgs(p, "topic2", 0, msgs_per_partition, 10, + false /*no flush*/); + Test::produce_msgs(p, "topic3", 0, msgs_per_partition, 10, + false /*no flush*/); + p->flush(10 * 1000); + + delete p; + + vector<RdKafka::TopicPartition *> toppars1; + toppars1.push_back(RdKafka::TopicPartition::create("topic1", 0)); + vector<RdKafka::TopicPartition *> toppars2; + toppars2.push_back(RdKafka::TopicPartition::create("topic2", 0)); + vector<RdKafka::TopicPartition *> toppars3; + toppars3.push_back(RdKafka::TopicPartition::create("topic3", 0)); + + + RdKafka::Conf *conf; + Test::conf_init(&conf, NULL, 20); + Test::conf_set(conf, "bootstrap.servers", bootstraps); + Test::conf_set(conf, "security.protocol", "plaintext"); + Test::conf_set(conf, "client.id", __FUNCTION__); + Test::conf_set(conf, "group.id", group_id); + Test::conf_set(conf, "auto.offset.reset", "earliest"); + Test::conf_set(conf, "enable.auto.commit", "false"); + + RdKafka::KafkaConsumer *consumer; + consumer = RdKafka::KafkaConsumer::create(conf, errstr); + if (!consumer) + Test::Fail(tostr() << __FUNCTION__ + << ": Failed to create consumer: " << errstr); + delete conf; + + vector<RdKafka::TopicPartition *> toppars; + vector<RdKafka::TopicPartition *> expected; + + map<Toppar, int64_t> pos; /* Expected consume position per partition */ + pos[Toppar(toppars1[0]->topic(), toppars1[0]->partition())] = 0; + pos[Toppar(toppars2[0]->topic(), toppars2[0]->partition())] = 0; + pos[Toppar(toppars3[0]->topic(), toppars3[0]->partition())] = 0; + + /* To make sure offset commits are fetched in proper assign sequence + * we commit an offset that should not be used in the final consume loop. + * This commit will be overwritten below with another commit. */ + vector<RdKafka::TopicPartition *> offsets; + offsets.push_back(RdKafka::TopicPartition::create( + toppars1[0]->topic(), toppars1[0]->partition(), 11)); + /* This partition should start at this position even though + * there will be a sub-sequent commit to overwrite it, that should not + * be used since this partition is never unassigned. */ + offsets.push_back(RdKafka::TopicPartition::create( + toppars2[0]->topic(), toppars2[0]->partition(), 22)); + pos[Toppar(toppars2[0]->topic(), toppars2[0]->partition())] = 22; + + Test::print_TopicPartitions("pre-commit", offsets); + + RdKafka::ErrorCode err; + err = consumer->commitSync(offsets); + if (err) + Test::Fail(tostr() << __FUNCTION__ << ": pre-commit failed: " + << RdKafka::err2str(err) << "\n"); + + /* Add coordinator delay so that the OffsetFetchRequest originating + * from the coming incremental_assign() will not finish before + * we call incremental_unassign() and incremental_assign() again, resulting + * in a situation where the initial OffsetFetchResponse will contain + * an older offset for a previous assignment of one partition. */ + rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 5000); + + + /* Assign 1,2 == 1,2 */ + toppars.push_back(toppars1[0]); + toppars.push_back(toppars2[0]); + expected.push_back(toppars1[0]); + expected.push_back(toppars2[0]); + Test::incremental_assign(consumer, toppars); + expect_assignment(consumer, expected); + + /* Unassign -1 == 2 */ + toppars.clear(); + toppars.push_back(toppars1[0]); + vector<RdKafka::TopicPartition *>::iterator it = + find(expected.begin(), expected.end(), toppars1[0]); + expected.erase(it); + + Test::incremental_unassign(consumer, toppars); + expect_assignment(consumer, expected); + + + /* Commit offset for the removed partition and the partition that is + * unchanged in the assignment. */ + RdKafka::TopicPartition::destroy(offsets); + offsets.push_back(RdKafka::TopicPartition::create( + toppars1[0]->topic(), toppars1[0]->partition(), 55)); + offsets.push_back(RdKafka::TopicPartition::create( + toppars2[0]->topic(), toppars2[0]->partition(), 33)); /* should not be + * used. */ + pos[Toppar(toppars1[0]->topic(), toppars1[0]->partition())] = 55; + Test::print_TopicPartitions("commit", offsets); + + err = consumer->commitAsync(offsets); + if (err) + Test::Fail(tostr() << __FUNCTION__ + << ": commit failed: " << RdKafka::err2str(err) << "\n"); + + /* Assign +3 == 2,3 */ + toppars.clear(); + toppars.push_back(toppars3[0]); + expected.push_back(toppars3[0]); + Test::incremental_assign(consumer, toppars); + expect_assignment(consumer, expected); + + /* Now remove the latency */ + Test::Say(_C_MAG "Clearing rtt\n"); + rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 0); + + /* Assign +1 == 1,2,3 */ + toppars.clear(); + toppars.push_back(toppars1[0]); + expected.push_back(toppars1[0]); + Test::incremental_assign(consumer, toppars); + expect_assignment(consumer, expected); + + /* + * Verify consumed messages + */ + int wait_end = (int)expected.size(); + while (wait_end > 0) { + RdKafka::Message *msg = consumer->consume(10 * 1000); + if (msg->err() == RdKafka::ERR__TIMED_OUT) + Test::Fail(tostr() << __FUNCTION__ + << ": Consume timed out waiting " + "for " + << wait_end << " more partitions"); + + Toppar tp = Toppar(msg->topic_name(), msg->partition()); + int64_t *exp_pos = &pos[tp]; + + Test::Say(3, tostr() << __FUNCTION__ << ": Received " << tp.topic << " [" + << tp.partition << "] at offset " << msg->offset() + << " (expected offset " << *exp_pos << ")\n"); + + if (*exp_pos != msg->offset()) + Test::Fail(tostr() << __FUNCTION__ << ": expected message offset " + << *exp_pos << " for " << msg->topic_name() << " [" + << msg->partition() << "], not " << msg->offset() + << "\n"); + (*exp_pos)++; + if (*exp_pos == msgs_per_partition) { + TEST_ASSERT(wait_end > 0, ""); + wait_end--; + } else if (msg->offset() > msgs_per_partition) + Test::Fail(tostr() << __FUNCTION__ << ": unexpected message with " + << "offset " << msg->offset() << " on " << tp.topic + << " [" << tp.partition << "]\n"); + + delete msg; + } + + RdKafka::TopicPartition::destroy(offsets); + RdKafka::TopicPartition::destroy(toppars1); + RdKafka::TopicPartition::destroy(toppars2); + RdKafka::TopicPartition::destroy(toppars3); + + delete consumer; + + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + + +/* Check behavior when: + * 1. single topic with 2 partitions. + * 2. consumer 1 (with rebalance_cb) subscribes to it. + * 3. consumer 2 (with rebalance_cb) subscribes to it. + * 4. close. + */ + +static void b_subscribe_with_cb_test(rd_bool_t close_consumer) { + SUB_TEST(); + + std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + test_create_topic(NULL, topic_name.c_str(), 2, 1); + + DefaultRebalanceCb rebalance_cb1; + RdKafka::KafkaConsumer *c1 = make_consumer( + "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb1, 25); + DefaultRebalanceCb rebalance_cb2; + RdKafka::KafkaConsumer *c2 = make_consumer( + "C_2", group_name, "cooperative-sticky", NULL, &rebalance_cb2, 25); + test_wait_topic_exists(c1->c_ptr(), topic_name.c_str(), 10 * 1000); + + Test::subscribe(c1, topic_name); + + bool c2_subscribed = false; + while (true) { + Test::poll_once(c1, 500); + Test::poll_once(c2, 500); + + /* Start c2 after c1 has received initial assignment */ + if (!c2_subscribed && rebalance_cb1.assign_call_cnt > 0) { + Test::subscribe(c2, topic_name); + c2_subscribed = true; + } + + /* Failure case: test will time out. */ + if (rebalance_cb1.assign_call_cnt == 3 && + rebalance_cb2.assign_call_cnt == 2) { + break; + } + } + + /* Sequence of events: + * + * 1. c1 joins group. + * 2. c1 gets assigned 2 partitions. + * - there isn't a follow-on rebalance because there aren't any revoked + * partitions. + * 3. c2 joins group. + * 4. This results in a rebalance with one partition being revoked from c1, + * and no partitions assigned to either c1 or c2 (however the rebalance + * callback will be called in each case with an empty set). + * 5. c1 then re-joins the group since it had a partition revoked. + * 6. c2 is now assigned a single partition, and c1's incremental assignment + * is empty. + * 7. Since there were no revoked partitions, no further rebalance is + * triggered. + */ + + /* The rebalance cb is always called on assign, even if empty. */ + if (rebalance_cb1.assign_call_cnt != 3) + Test::Fail(tostr() << "Expecting 3 assign calls on consumer 1, not " + << rebalance_cb1.assign_call_cnt); + if (rebalance_cb2.assign_call_cnt != 2) + Test::Fail(tostr() << "Expecting 2 assign calls on consumer 2, not: " + << rebalance_cb2.assign_call_cnt); + + /* The rebalance cb is not called on and empty revoke (unless partitions lost, + * which is not the case here) */ + if (rebalance_cb1.revoke_call_cnt != 1) + Test::Fail(tostr() << "Expecting 1 revoke call on consumer 1, not: " + << rebalance_cb1.revoke_call_cnt); + if (rebalance_cb2.revoke_call_cnt != 0) + Test::Fail(tostr() << "Expecting 0 revoke calls on consumer 2, not: " + << rebalance_cb2.revoke_call_cnt); + + /* Final state */ + + /* Expect both consumers to have 1 assigned partition (via net calculation in + * rebalance_cb) */ + if (rebalance_cb1.partitions_assigned_net != 1) + Test::Fail(tostr() + << "Expecting consumer 1 to have net 1 assigned partition, not: " + << rebalance_cb1.partitions_assigned_net); + if (rebalance_cb2.partitions_assigned_net != 1) + Test::Fail(tostr() + << "Expecting consumer 2 to have net 1 assigned partition, not: " + << rebalance_cb2.partitions_assigned_net); + + /* Expect both consumers to have 1 assigned partition (via ->assignment() + * query) */ + expect_assignment(c1, 1); + expect_assignment(c2, 1); + + /* Make sure the fetchers are running */ + int msgcnt = 100; + const int msgsize1 = 100; + test_produce_msgs_easy_size(topic_name.c_str(), 0, 0, msgcnt, msgsize1); + test_produce_msgs_easy_size(topic_name.c_str(), 0, 1, msgcnt, msgsize1); + + bool consumed_from_c1 = false; + bool consumed_from_c2 = false; + while (true) { + RdKafka::Message *msg1 = c1->consume(100); + RdKafka::Message *msg2 = c2->consume(100); + + if (msg1->err() == RdKafka::ERR_NO_ERROR) + consumed_from_c1 = true; + if (msg1->err() == RdKafka::ERR_NO_ERROR) + consumed_from_c2 = true; + + delete msg1; + delete msg2; + + /* Failure case: test will timeout. */ + if (consumed_from_c1 && consumed_from_c2) + break; + } + + if (!close_consumer) { + delete c1; + delete c2; + return; + } + + c1->close(); + c2->close(); + + /* Closing the consumer should trigger rebalance_cb (revoke): */ + if (rebalance_cb1.revoke_call_cnt != 2) + Test::Fail(tostr() << "Expecting 2 revoke calls on consumer 1, not: " + << rebalance_cb1.revoke_call_cnt); + if (rebalance_cb2.revoke_call_cnt != 1) + Test::Fail(tostr() << "Expecting 1 revoke call on consumer 2, not: " + << rebalance_cb2.revoke_call_cnt); + + /* ..and net assigned partitions should drop to 0 in both cases: */ + if (rebalance_cb1.partitions_assigned_net != 0) + Test::Fail( + tostr() + << "Expecting consumer 1 to have net 0 assigned partitions, not: " + << rebalance_cb1.partitions_assigned_net); + if (rebalance_cb2.partitions_assigned_net != 0) + Test::Fail( + tostr() + << "Expecting consumer 2 to have net 0 assigned partitions, not: " + << rebalance_cb2.partitions_assigned_net); + + /* Nothing in this test should result in lost partitions */ + if (rebalance_cb1.lost_call_cnt > 0) + Test::Fail( + tostr() << "Expecting consumer 1 to have 0 lost partition events, not: " + << rebalance_cb1.lost_call_cnt); + if (rebalance_cb2.lost_call_cnt > 0) + Test::Fail( + tostr() << "Expecting consumer 2 to have 0 lost partition events, not: " + << rebalance_cb2.lost_call_cnt); + + delete c1; + delete c2; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. Single topic with 2 partitions. + * 2. Consumer 1 (no rebalance_cb) subscribes to it. + * 3. Consumer 2 (no rebalance_cb) subscribes to it. + * 4. Close. + */ + +static void c_subscribe_no_cb_test(rd_bool_t close_consumer) { + SUB_TEST(); + + std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + test_create_topic(NULL, topic_name.c_str(), 2, 1); + + RdKafka::KafkaConsumer *c1 = + make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 20); + RdKafka::KafkaConsumer *c2 = + make_consumer("C_2", group_name, "cooperative-sticky", NULL, NULL, 20); + test_wait_topic_exists(c1->c_ptr(), topic_name.c_str(), 10 * 1000); + + Test::subscribe(c1, topic_name); + + bool c2_subscribed = false; + bool done = false; + while (!done) { + Test::poll_once(c1, 500); + Test::poll_once(c2, 500); + + if (Test::assignment_partition_count(c1, NULL) == 2 && !c2_subscribed) { + Test::subscribe(c2, topic_name); + c2_subscribed = true; + } + + if (Test::assignment_partition_count(c1, NULL) == 1 && + Test::assignment_partition_count(c2, NULL) == 1) { + Test::Say("Consumer 1 and 2 are both assigned to single partition.\n"); + done = true; + } + } + + if (close_consumer) { + Test::Say("Closing consumer 1\n"); + c1->close(); + Test::Say("Closing consumer 2\n"); + c2->close(); + } else { + Test::Say("Skipping close() of consumer 1 and 2.\n"); + } + + delete c1; + delete c2; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. Single consumer (no rebalance_cb) subscribes to topic. + * 2. Subscription is changed (topic added). + * 3. Consumer is closed. + */ + +static void d_change_subscription_add_topic(rd_bool_t close_consumer) { + SUB_TEST(); + + std::string topic_name_1 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name_1.c_str(), 2, 1); + std::string topic_name_2 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name_2.c_str(), 2, 1); + + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + + RdKafka::KafkaConsumer *c = + make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15); + test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000); + test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name_1); + + bool subscribed_to_one_topic = false; + bool done = false; + while (!done) { + Test::poll_once(c, 500); + + if (Test::assignment_partition_count(c, NULL) == 2 && + !subscribed_to_one_topic) { + subscribed_to_one_topic = true; + Test::subscribe(c, topic_name_1, topic_name_2); + } + + if (Test::assignment_partition_count(c, NULL) == 4) { + Test::Say("Consumer is assigned to two topics.\n"); + done = true; + } + } + + if (close_consumer) { + Test::Say("Closing consumer\n"); + c->close(); + } else + Test::Say("Skipping close() of consumer\n"); + + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. Single consumer (no rebalance_cb) subscribes to topic. + * 2. Subscription is changed (topic added). + * 3. Consumer is closed. + */ + +static void e_change_subscription_remove_topic(rd_bool_t close_consumer) { + SUB_TEST(); + + std::string topic_name_1 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name_1.c_str(), 2, 1); + std::string topic_name_2 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name_2.c_str(), 2, 1); + + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + + RdKafka::KafkaConsumer *c = + make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15); + test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000); + test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name_1, topic_name_2); + + bool subscribed_to_two_topics = false; + bool done = false; + while (!done) { + Test::poll_once(c, 500); + + if (Test::assignment_partition_count(c, NULL) == 4 && + !subscribed_to_two_topics) { + subscribed_to_two_topics = true; + Test::subscribe(c, topic_name_1); + } + + if (Test::assignment_partition_count(c, NULL) == 2) { + Test::Say("Consumer is assigned to one topic\n"); + done = true; + } + } + + if (!close_consumer) { + Test::Say("Closing consumer\n"); + c->close(); + } else + Test::Say("Skipping close() of consumer\n"); + + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check that use of consumer->assign() and consumer->unassign() is disallowed + * when a COOPERATIVE assignor is in use. + * + * Except when the consumer is closing, where all forms of unassign are + * allowed and treated as a full unassign. + */ + +class FTestRebalanceCb : public RdKafka::RebalanceCb { + public: + bool assigned; + bool closing; + + FTestRebalanceCb() : assigned(false), closing(false) { + } + + void rebalance_cb(RdKafka::KafkaConsumer *consumer, + RdKafka::ErrorCode err, + std::vector<RdKafka::TopicPartition *> &partitions) { + Test::Say(tostr() << "RebalanceCb: " << consumer->name() << " " + << RdKafka::err2str(err) << (closing ? " (closing)" : "") + << "\n"); + + if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { + RdKafka::ErrorCode err_resp = consumer->assign(partitions); + Test::Say(tostr() << "consumer->assign() response code: " << err_resp + << "\n"); + if (err_resp != RdKafka::ERR__STATE) + Test::Fail(tostr() << "Expected assign to fail with error code: " + << RdKafka::ERR__STATE << "(ERR__STATE)"); + + RdKafka::Error *error = consumer->incremental_assign(partitions); + if (error) + Test::Fail(tostr() << "consumer->incremental_unassign() failed: " + << error->str()); + + assigned = true; + + } else { + RdKafka::ErrorCode err_resp = consumer->unassign(); + Test::Say(tostr() << "consumer->unassign() response code: " << err_resp + << "\n"); + + if (!closing) { + if (err_resp != RdKafka::ERR__STATE) + Test::Fail(tostr() << "Expected assign to fail with error code: " + << RdKafka::ERR__STATE << "(ERR__STATE)"); + + RdKafka::Error *error = consumer->incremental_unassign(partitions); + if (error) + Test::Fail(tostr() << "consumer->incremental_unassign() failed: " + << error->str()); + + } else { + /* During termination (close()) any type of unassign*() is allowed. */ + if (err_resp) + Test::Fail(tostr() << "Expected unassign to succeed during close, " + "but got: " + << RdKafka::ERR__STATE << "(ERR__STATE)"); + } + } + } +}; + + +static void f_assign_call_cooperative() { + SUB_TEST(); + + std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name.c_str(), 1, 1); + + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + + std::vector<std::pair<std::string, std::string> > additional_conf; + additional_conf.push_back(std::pair<std::string, std::string>( + std::string("topic.metadata.refresh.interval.ms"), std::string("3000"))); + FTestRebalanceCb rebalance_cb; + RdKafka::KafkaConsumer *c = + make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf, + &rebalance_cb, 15); + test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name); + + while (!rebalance_cb.assigned) + Test::poll_once(c, 500); + + rebalance_cb.closing = true; + c->close(); + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check that use of consumer->incremental_assign() and + * consumer->incremental_unassign() is disallowed when an EAGER assignor is in + * use. + */ +class GTestRebalanceCb : public RdKafka::RebalanceCb { + public: + bool assigned; + bool closing; + + GTestRebalanceCb() : assigned(false), closing(false) { + } + + void rebalance_cb(RdKafka::KafkaConsumer *consumer, + RdKafka::ErrorCode err, + std::vector<RdKafka::TopicPartition *> &partitions) { + Test::Say(tostr() << "RebalanceCb: " << consumer->name() << " " + << RdKafka::err2str(err) << "\n"); + + if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { + RdKafka::Error *error = consumer->incremental_assign(partitions); + Test::Say(tostr() << "consumer->incremental_assign() response: " + << (!error ? "NULL" : error->str()) << "\n"); + if (!error) + Test::Fail("Expected consumer->incremental_assign() to fail"); + if (error->code() != RdKafka::ERR__STATE) + Test::Fail(tostr() << "Expected consumer->incremental_assign() to fail " + "with error code " + << RdKafka::ERR__STATE); + delete error; + + RdKafka::ErrorCode err_resp = consumer->assign(partitions); + if (err_resp) + Test::Fail(tostr() << "consumer->assign() failed: " << err_resp); + + assigned = true; + + } else { + RdKafka::Error *error = consumer->incremental_unassign(partitions); + Test::Say(tostr() << "consumer->incremental_unassign() response: " + << (!error ? "NULL" : error->str()) << "\n"); + + if (!closing) { + if (!error) + Test::Fail("Expected consumer->incremental_unassign() to fail"); + if (error->code() != RdKafka::ERR__STATE) + Test::Fail(tostr() << "Expected consumer->incremental_unassign() to " + "fail with error code " + << RdKafka::ERR__STATE); + delete error; + + RdKafka::ErrorCode err_resp = consumer->unassign(); + if (err_resp) + Test::Fail(tostr() << "consumer->unassign() failed: " << err_resp); + + } else { + /* During termination (close()) any type of unassign*() is allowed. */ + if (error) + Test::Fail( + tostr() + << "Expected incremental_unassign to succeed during close, " + "but got: " + << RdKafka::ERR__STATE << "(ERR__STATE)"); + } + } + } +}; + +static void g_incremental_assign_call_eager() { + SUB_TEST(); + + std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name.c_str(), 1, 1); + + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + + std::vector<std::pair<std::string, std::string> > additional_conf; + additional_conf.push_back(std::pair<std::string, std::string>( + std::string("topic.metadata.refresh.interval.ms"), std::string("3000"))); + GTestRebalanceCb rebalance_cb; + RdKafka::KafkaConsumer *c = make_consumer( + "C_1", group_name, "roundrobin", &additional_conf, &rebalance_cb, 15); + test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name); + + while (!rebalance_cb.assigned) + Test::poll_once(c, 500); + + rebalance_cb.closing = true; + c->close(); + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. Single consumer (rebalance_cb) subscribes to two topics. + * 2. One of the topics is deleted. + * 3. Consumer is closed. + */ + +static void h_delete_topic() { + SUB_TEST(); + + std::string topic_name_1 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name_1.c_str(), 1, 1); + std::string topic_name_2 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name_2.c_str(), 1, 1); + + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + + std::vector<std::pair<std::string, std::string> > additional_conf; + additional_conf.push_back(std::pair<std::string, std::string>( + std::string("topic.metadata.refresh.interval.ms"), std::string("3000"))); + DefaultRebalanceCb rebalance_cb; + RdKafka::KafkaConsumer *c = + make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf, + &rebalance_cb, 15); + test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000); + test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name_1, topic_name_2); + + bool deleted = false; + bool done = false; + while (!done) { + Test::poll_once(c, 500); + + std::vector<RdKafka::TopicPartition *> partitions; + c->assignment(partitions); + + if (partitions.size() == 2 && !deleted) { + if (rebalance_cb.assign_call_cnt != 1) + Test::Fail(tostr() << "Expected 1 assign call, saw " + << rebalance_cb.assign_call_cnt << "\n"); + Test::delete_topic(c, topic_name_2.c_str()); + deleted = true; + } + + if (partitions.size() == 1 && deleted) { + if (partitions[0]->topic() != topic_name_1) + Test::Fail(tostr() << "Expecting subscribed topic to be '" + << topic_name_1 << "' not '" + << partitions[0]->topic() << "'"); + Test::Say(tostr() << "Assignment no longer includes deleted topic '" + << topic_name_2 << "'\n"); + done = true; + } + + RdKafka::TopicPartition::destroy(partitions); + } + + Test::Say("Closing consumer\n"); + c->close(); + + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. Single consumer (rebalance_cb) subscribes to a single topic. + * 2. That topic is deleted leaving no topics. + * 3. Consumer is closed. + */ + +static void i_delete_topic_2() { + SUB_TEST(); + + std::string topic_name_1 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name_1.c_str(), 1, 1); + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + + std::vector<std::pair<std::string, std::string> > additional_conf; + additional_conf.push_back(std::pair<std::string, std::string>( + std::string("topic.metadata.refresh.interval.ms"), std::string("3000"))); + DefaultRebalanceCb rebalance_cb; + RdKafka::KafkaConsumer *c = + make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf, + &rebalance_cb, 15); + test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name_1); + + bool deleted = false; + bool done = false; + while (!done) { + Test::poll_once(c, 500); + + if (Test::assignment_partition_count(c, NULL) == 1 && !deleted) { + if (rebalance_cb.assign_call_cnt != 1) + Test::Fail(tostr() << "Expected one assign call, saw " + << rebalance_cb.assign_call_cnt << "\n"); + Test::delete_topic(c, topic_name_1.c_str()); + deleted = true; + } + + if (Test::assignment_partition_count(c, NULL) == 0 && deleted) { + Test::Say(tostr() << "Assignment is empty following deletion of topic\n"); + done = true; + } + } + + Test::Say("Closing consumer\n"); + c->close(); + + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. single consumer (without rebalance_cb) subscribes to a single topic. + * 2. that topic is deleted leaving no topics. + * 3. consumer is closed. + */ + +static void j_delete_topic_no_rb_callback() { + SUB_TEST(); + + std::string topic_name_1 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name_1.c_str(), 1, 1); + + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + + std::vector<std::pair<std::string, std::string> > additional_conf; + additional_conf.push_back(std::pair<std::string, std::string>( + std::string("topic.metadata.refresh.interval.ms"), std::string("3000"))); + RdKafka::KafkaConsumer *c = make_consumer( + "C_1", group_name, "cooperative-sticky", &additional_conf, NULL, 15); + test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name_1); + + bool deleted = false; + bool done = false; + while (!done) { + Test::poll_once(c, 500); + + if (Test::assignment_partition_count(c, NULL) == 1 && !deleted) { + Test::delete_topic(c, topic_name_1.c_str()); + deleted = true; + } + + if (Test::assignment_partition_count(c, NULL) == 0 && deleted) { + Test::Say(tostr() << "Assignment is empty following deletion of topic\n"); + done = true; + } + } + + Test::Say("Closing consumer\n"); + c->close(); + + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. Single consumer (rebalance_cb) subscribes to a 1 partition topic. + * 2. Number of partitions is increased to 2. + * 3. Consumer is closed. + */ + +static void k_add_partition() { + SUB_TEST(); + + std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); + test_create_topic(NULL, topic_name.c_str(), 1, 1); + + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + + std::vector<std::pair<std::string, std::string> > additional_conf; + additional_conf.push_back(std::pair<std::string, std::string>( + std::string("topic.metadata.refresh.interval.ms"), std::string("3000"))); + DefaultRebalanceCb rebalance_cb; + RdKafka::KafkaConsumer *c = + make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf, + &rebalance_cb, 15); + test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name); + + bool subscribed = false; + bool done = false; + while (!done) { + Test::poll_once(c, 500); + + if (Test::assignment_partition_count(c, NULL) == 1 && !subscribed) { + if (rebalance_cb.assign_call_cnt != 1) + Test::Fail(tostr() << "Expected 1 assign call, saw " + << rebalance_cb.assign_call_cnt); + if (rebalance_cb.revoke_call_cnt != 0) + Test::Fail(tostr() << "Expected 0 revoke calls, saw " + << rebalance_cb.revoke_call_cnt); + Test::create_partitions(c, topic_name.c_str(), 2); + subscribed = true; + } + + if (Test::assignment_partition_count(c, NULL) == 2 && subscribed) { + if (rebalance_cb.assign_call_cnt != 2) + Test::Fail(tostr() << "Expected 2 assign calls, saw " + << rebalance_cb.assign_call_cnt); + if (rebalance_cb.revoke_call_cnt != 0) + Test::Fail(tostr() << "Expected 0 revoke calls, saw " + << rebalance_cb.revoke_call_cnt); + done = true; + } + } + + Test::Say("Closing consumer\n"); + c->close(); + delete c; + + if (rebalance_cb.assign_call_cnt != 2) + Test::Fail(tostr() << "Expected 2 assign calls, saw " + << rebalance_cb.assign_call_cnt); + if (rebalance_cb.revoke_call_cnt != 1) + Test::Fail(tostr() << "Expected 1 revoke call, saw " + << rebalance_cb.revoke_call_cnt); + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. two consumers (with rebalance_cb's) subscribe to two topics. + * 2. one of the consumers calls unsubscribe. + * 3. consumers closed. + */ + +static void l_unsubscribe() { + SUB_TEST(); + + std::string topic_name_1 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + std::string topic_name_2 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + test_create_topic(NULL, topic_name_1.c_str(), 2, 1); + test_create_topic(NULL, topic_name_2.c_str(), 2, 1); + + DefaultRebalanceCb rebalance_cb1; + RdKafka::KafkaConsumer *c1 = make_consumer( + "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb1, 30); + test_wait_topic_exists(c1->c_ptr(), topic_name_1.c_str(), 10 * 1000); + test_wait_topic_exists(c1->c_ptr(), topic_name_2.c_str(), 10 * 1000); + + Test::subscribe(c1, topic_name_1, topic_name_2); + + DefaultRebalanceCb rebalance_cb2; + RdKafka::KafkaConsumer *c2 = make_consumer( + "C_2", group_name, "cooperative-sticky", NULL, &rebalance_cb2, 30); + Test::subscribe(c2, topic_name_1, topic_name_2); + + bool done = false; + bool unsubscribed = false; + while (!done) { + Test::poll_once(c1, 500); + Test::poll_once(c2, 500); + + if (Test::assignment_partition_count(c1, NULL) == 2 && + Test::assignment_partition_count(c2, NULL) == 2) { + if (rebalance_cb1.assign_call_cnt != 1) + Test::Fail( + tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: " + << rebalance_cb1.assign_call_cnt); + if (rebalance_cb2.assign_call_cnt != 1) + Test::Fail( + tostr() << "Expecting consumer 2's assign_call_cnt to be 1 not: " + << rebalance_cb2.assign_call_cnt); + Test::Say("Unsubscribing consumer 1 from both topics\n"); + c1->unsubscribe(); + unsubscribed = true; + } + + if (unsubscribed && Test::assignment_partition_count(c1, NULL) == 0 && + Test::assignment_partition_count(c2, NULL) == 4) { + if (rebalance_cb1.assign_call_cnt != + 1) /* is now unsubscribed, so rebalance_cb will no longer be called. + */ + Test::Fail( + tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: " + << rebalance_cb1.assign_call_cnt); + if (rebalance_cb2.assign_call_cnt != 2) + Test::Fail( + tostr() << "Expecting consumer 2's assign_call_cnt to be 2 not: " + << rebalance_cb2.assign_call_cnt); + if (rebalance_cb1.revoke_call_cnt != 1) + Test::Fail( + tostr() << "Expecting consumer 1's revoke_call_cnt to be 1 not: " + << rebalance_cb1.revoke_call_cnt); + if (rebalance_cb2.revoke_call_cnt != + 0) /* the rebalance_cb should not be called if the revoked partition + list is empty */ + Test::Fail( + tostr() << "Expecting consumer 2's revoke_call_cnt to be 0 not: " + << rebalance_cb2.revoke_call_cnt); + Test::Say("Unsubscribe completed"); + done = true; + } + } + + Test::Say("Closing consumer 1\n"); + c1->close(); + Test::Say("Closing consumer 2\n"); + c2->close(); + + /* there should be no assign rebalance_cb calls on close */ + if (rebalance_cb1.assign_call_cnt != 1) + Test::Fail(tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: " + << rebalance_cb1.assign_call_cnt); + if (rebalance_cb2.assign_call_cnt != 2) + Test::Fail(tostr() << "Expecting consumer 2's assign_call_cnt to be 2 not: " + << rebalance_cb2.assign_call_cnt); + + if (rebalance_cb1.revoke_call_cnt != + 1) /* should not be called a second revoke rebalance_cb */ + Test::Fail(tostr() << "Expecting consumer 1's revoke_call_cnt to be 1 not: " + << rebalance_cb1.revoke_call_cnt); + if (rebalance_cb2.revoke_call_cnt != 1) + Test::Fail(tostr() << "Expecting consumer 2's revoke_call_cnt to be 1 not: " + << rebalance_cb2.revoke_call_cnt); + + if (rebalance_cb1.lost_call_cnt != 0) + Test::Fail(tostr() << "Expecting consumer 1's lost_call_cnt to be 0, not: " + << rebalance_cb1.lost_call_cnt); + if (rebalance_cb2.lost_call_cnt != 0) + Test::Fail(tostr() << "Expecting consumer 2's lost_call_cnt to be 0, not: " + << rebalance_cb2.lost_call_cnt); + + delete c1; + delete c2; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. A consumers (with no rebalance_cb) subscribes to a topic. + * 2. The consumer calls unsubscribe. + * 3. Consumers closed. + */ + +static void m_unsubscribe_2() { + SUB_TEST(); + + std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + test_create_topic(NULL, topic_name.c_str(), 2, 1); + + RdKafka::KafkaConsumer *c = + make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15); + test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name); + + bool done = false; + bool unsubscribed = false; + while (!done) { + Test::poll_once(c, 500); + + if (Test::assignment_partition_count(c, NULL) == 2) { + Test::unsubscribe(c); + unsubscribed = true; + } + + if (unsubscribed && Test::assignment_partition_count(c, NULL) == 0) { + Test::Say("Unsubscribe completed"); + done = true; + } + } + + Test::Say("Closing consumer\n"); + c->close(); + + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. Two consumers (with rebalance_cb) subscribe to a regex (no matching + * topics exist) + * 2. Create two topics. + * 3. Remove one of the topics. + * 3. Consumers closed. + */ + +static void n_wildcard() { + SUB_TEST(); + + const string topic_base_name = Test::mk_topic_name("0113-n_wildcard", 1); + const string topic_name_1 = topic_base_name + "_1"; + const string topic_name_2 = topic_base_name + "_2"; + const string topic_regex = "^" + topic_base_name + "_."; + const string group_name = Test::mk_unique_group_name("0113-n_wildcard"); + + std::vector<std::pair<std::string, std::string> > additional_conf; + additional_conf.push_back(std::pair<std::string, std::string>( + std::string("topic.metadata.refresh.interval.ms"), std::string("3000"))); + + DefaultRebalanceCb rebalance_cb1; + RdKafka::KafkaConsumer *c1 = + make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf, + &rebalance_cb1, 30); + Test::subscribe(c1, topic_regex); + + DefaultRebalanceCb rebalance_cb2; + RdKafka::KafkaConsumer *c2 = + make_consumer("C_2", group_name, "cooperative-sticky", &additional_conf, + &rebalance_cb2, 30); + Test::subscribe(c2, topic_regex); + + /* There are no matching topics, so the consumers should not join the group + * initially */ + Test::poll_once(c1, 500); + Test::poll_once(c2, 500); + + if (rebalance_cb1.assign_call_cnt != 0) + Test::Fail(tostr() << "Expecting consumer 1's assign_call_cnt to be 0 not: " + << rebalance_cb1.assign_call_cnt); + if (rebalance_cb2.assign_call_cnt != 0) + Test::Fail(tostr() << "Expecting consumer 2's assign_call_cnt to be 0 not: " + << rebalance_cb2.assign_call_cnt); + + bool done = false; + bool created_topics = false; + bool deleted_topic = false; + int last_cb1_assign_call_cnt = 0; + int last_cb2_assign_call_cnt = 0; + while (!done) { + Test::poll_once(c1, 500); + Test::poll_once(c2, 500); + + if (Test::assignment_partition_count(c1, NULL) == 0 && + Test::assignment_partition_count(c2, NULL) == 0 && !created_topics) { + Test::Say( + "Creating two topics with 2 partitions each that match regex\n"); + test_create_topic(NULL, topic_name_1.c_str(), 2, 1); + test_create_topic(NULL, topic_name_2.c_str(), 2, 1); + /* The consumers should autonomously discover these topics and start + * consuming from them. This happens in the background - is not + * influenced by whether we wait for the topics to be created before + * continuing the main loop. It is possible that both topics are + * discovered simultaneously, requiring a single rebalance OR that + * topic 1 is discovered first (it was created first), a rebalance + * initiated, then topic 2 discovered, then another rebalance + * initiated to include it. + */ + created_topics = true; + } + + if (Test::assignment_partition_count(c1, NULL) == 2 && + Test::assignment_partition_count(c2, NULL) == 2 && !deleted_topic) { + if (rebalance_cb1.nonempty_assign_call_cnt == 1) { + /* just one rebalance was required */ + TEST_ASSERT(rebalance_cb1.nonempty_assign_call_cnt == 1, + "Expecting C_1's nonempty_assign_call_cnt to be 1 not %d ", + rebalance_cb1.nonempty_assign_call_cnt); + TEST_ASSERT(rebalance_cb2.nonempty_assign_call_cnt == 1, + "Expecting C_2's nonempty_assign_call_cnt to be 1 not %d ", + rebalance_cb2.nonempty_assign_call_cnt); + } else { + /* two rebalances were required (occurs infrequently) */ + TEST_ASSERT(rebalance_cb1.nonempty_assign_call_cnt == 2, + "Expecting C_1's nonempty_assign_call_cnt to be 2 not %d ", + rebalance_cb1.nonempty_assign_call_cnt); + TEST_ASSERT(rebalance_cb2.nonempty_assign_call_cnt == 2, + "Expecting C_2's nonempty_assign_call_cnt to be 2 not %d ", + rebalance_cb2.nonempty_assign_call_cnt); + } + + TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 0, + "Expecting C_1's revoke_call_cnt to be 0 not %d ", + rebalance_cb1.revoke_call_cnt); + TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 0, + "Expecting C_2's revoke_call_cnt to be 0 not %d ", + rebalance_cb2.revoke_call_cnt); + + last_cb1_assign_call_cnt = rebalance_cb1.assign_call_cnt; + last_cb2_assign_call_cnt = rebalance_cb2.assign_call_cnt; + + Test::Say("Deleting topic 1\n"); + Test::delete_topic(c1, topic_name_1.c_str()); + deleted_topic = true; + } + + if (Test::assignment_partition_count(c1, NULL) == 1 && + Test::assignment_partition_count(c2, NULL) == 1 && deleted_topic) { + /* accumulated in lost case as well */ + TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 1, + "Expecting C_1's revoke_call_cnt to be 1 not %d", + rebalance_cb1.revoke_call_cnt); + TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 1, + "Expecting C_2's revoke_call_cnt to be 1 not %d", + rebalance_cb2.revoke_call_cnt); + TEST_ASSERT(rebalance_cb1.lost_call_cnt == 1, + "Expecting C_1's lost_call_cnt to be 1 not %d", + rebalance_cb1.lost_call_cnt); + TEST_ASSERT(rebalance_cb2.lost_call_cnt == 1, + "Expecting C_2's lost_call_cnt to be 1 not %d", + rebalance_cb2.lost_call_cnt); + + /* Consumers will rejoin group after revoking the lost partitions. + * this will result in an rebalance_cb assign (empty partitions). + * it follows the revoke, which has already been confirmed to have + * happened. */ + Test::Say("Waiting for rebalance_cb assigns\n"); + while (rebalance_cb1.assign_call_cnt == last_cb1_assign_call_cnt || + rebalance_cb2.assign_call_cnt == last_cb2_assign_call_cnt) { + Test::poll_once(c1, 500); + Test::poll_once(c2, 500); + } + + Test::Say("Consumers are subscribed to one partition each\n"); + done = true; + } + } + + Test::Say("Closing consumer 1\n"); + last_cb1_assign_call_cnt = rebalance_cb1.assign_call_cnt; + c1->close(); + + /* There should be no assign rebalance_cb calls on close */ + TEST_ASSERT(rebalance_cb1.assign_call_cnt == last_cb1_assign_call_cnt, + "Expecting C_1's assign_call_cnt to be %d not %d", + last_cb1_assign_call_cnt, rebalance_cb1.assign_call_cnt); + + /* Let C_2 catch up on the rebalance and get assigned C_1's partitions. */ + last_cb2_assign_call_cnt = rebalance_cb2.nonempty_assign_call_cnt; + while (rebalance_cb2.nonempty_assign_call_cnt == last_cb2_assign_call_cnt) + Test::poll_once(c2, 500); + + Test::Say("Closing consumer 2\n"); + last_cb2_assign_call_cnt = rebalance_cb2.assign_call_cnt; + c2->close(); + + /* There should be no assign rebalance_cb calls on close */ + TEST_ASSERT(rebalance_cb2.assign_call_cnt == last_cb2_assign_call_cnt, + "Expecting C_2's assign_call_cnt to be %d not %d", + last_cb2_assign_call_cnt, rebalance_cb2.assign_call_cnt); + + TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 2, + "Expecting C_1's revoke_call_cnt to be 2 not %d", + rebalance_cb1.revoke_call_cnt); + TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 2, + "Expecting C_2's revoke_call_cnt to be 2 not %d", + rebalance_cb2.revoke_call_cnt); + + TEST_ASSERT(rebalance_cb1.lost_call_cnt == 1, + "Expecting C_1's lost_call_cnt to be 1, not %d", + rebalance_cb1.lost_call_cnt); + TEST_ASSERT(rebalance_cb2.lost_call_cnt == 1, + "Expecting C_2's lost_call_cnt to be 1, not %d", + rebalance_cb2.lost_call_cnt); + + delete c1; + delete c2; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * 1. Consumer (librdkafka) subscribes to two topics (2 and 6 partitions). + * 2. Consumer (java) subscribes to the same two topics. + * 3. Consumer (librdkafka) unsubscribes from the two partition topic. + * 4. Consumer (java) process closes upon detecting the above unsubscribe. + * 5. Consumer (librdkafka) will now be subscribed to 6 partitions. + * 6. Close librdkafka consumer. + */ + +static void o_java_interop() { + SUB_TEST(); + + if (*test_conf_get(NULL, "sasl.mechanism") != '\0') + SUB_TEST_SKIP( + "Cluster is set up for SASL: we won't bother with that " + "for the Java client\n"); + + std::string topic_name_1 = Test::mk_topic_name("0113_o_2", 1); + std::string topic_name_2 = Test::mk_topic_name("0113_o_6", 1); + std::string group_name = Test::mk_unique_group_name("0113_o"); + test_create_topic(NULL, topic_name_1.c_str(), 2, 1); + test_create_topic(NULL, topic_name_2.c_str(), 6, 1); + + DefaultRebalanceCb rebalance_cb; + RdKafka::KafkaConsumer *c = make_consumer( + "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb, 25); + test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000); + test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000); + + Test::subscribe(c, topic_name_1, topic_name_2); + + bool done = false; + bool changed_subscription = false; + bool changed_subscription_done = false; + int java_pid = 0; + while (!done) { + Test::poll_once(c, 500); + + if (1) // FIXME: Remove after debugging + Test::Say(tostr() << "Assignment partition count: " + << Test::assignment_partition_count(c, NULL) + << ", changed_sub " << changed_subscription + << ", changed_sub_done " << changed_subscription_done + << ", assign_call_cnt " << rebalance_cb.assign_call_cnt + << "\n"); + if (Test::assignment_partition_count(c, NULL) == 8 && !java_pid) { + Test::Say(_C_GRN "librdkafka consumer assigned to 8 partitions\n"); + string bootstrapServers = get_bootstrap_servers(); + const char *argv[1 + 1 + 1 + 1 + 1 + 1]; + size_t i = 0; + argv[i++] = "test1"; + argv[i++] = bootstrapServers.c_str(); + argv[i++] = topic_name_1.c_str(); + argv[i++] = topic_name_2.c_str(); + argv[i++] = group_name.c_str(); + argv[i] = NULL; + java_pid = test_run_java("IncrementalRebalanceCli", argv); + if (java_pid <= 0) + Test::Fail(tostr() << "Unexpected pid: " << java_pid); + } + + if (Test::assignment_partition_count(c, NULL) == 4 && java_pid != 0 && + !changed_subscription) { + if (rebalance_cb.assign_call_cnt != 2) + Test::Fail(tostr() << "Expecting consumer's assign_call_cnt to be 2, " + "not " + << rebalance_cb.assign_call_cnt); + Test::Say(_C_GRN "Java consumer is now part of the group\n"); + Test::subscribe(c, topic_name_1); + changed_subscription = true; + } + + /* Depending on the timing of resubscribe rebalancing and the + * Java consumer terminating we might have one or two rebalances, + * hence the fuzzy <=5 and >=5 checks. */ + if (Test::assignment_partition_count(c, NULL) == 2 && + changed_subscription && rebalance_cb.assign_call_cnt <= 5 && + !changed_subscription_done) { + /* All topic 1 partitions will be allocated to this consumer whether or + * not the Java consumer has unsubscribed yet because the sticky algorithm + * attempts to ensure partition counts are even. */ + Test::Say(_C_GRN "Consumer 1 has unsubscribed from topic 2\n"); + changed_subscription_done = true; + } + + if (Test::assignment_partition_count(c, NULL) == 2 && + changed_subscription && rebalance_cb.assign_call_cnt >= 5 && + changed_subscription_done) { + /* When the java consumer closes, this will cause an empty assign + * rebalance_cb event, allowing detection of when this has happened. */ + Test::Say(_C_GRN "Java consumer has left the group\n"); + done = true; + } + } + + Test::Say("Closing consumer\n"); + c->close(); + + /* Expected behavior is IncrementalRebalanceCli will exit cleanly, timeout + * otherwise. */ + test_waitpid(java_pid); + + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * - Single consumer subscribes to topic. + * - Soon after (timing such that rebalance is probably in progress) it + * subscribes to a different topic. + */ + +static void s_subscribe_when_rebalancing(int variation) { + SUB_TEST("variation %d", variation); + + std::string topic_name_1 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + std::string topic_name_2 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + std::string topic_name_3 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + test_create_topic(NULL, topic_name_1.c_str(), 1, 1); + test_create_topic(NULL, topic_name_2.c_str(), 1, 1); + test_create_topic(NULL, topic_name_3.c_str(), 1, 1); + + DefaultRebalanceCb rebalance_cb; + RdKafka::KafkaConsumer *c = make_consumer( + "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb, 25); + test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000); + test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000); + test_wait_topic_exists(c->c_ptr(), topic_name_3.c_str(), 10 * 1000); + + if (variation == 2 || variation == 4 || variation == 6) { + /* Pre-cache metadata for all topics. */ + class RdKafka::Metadata *metadata; + c->metadata(true, NULL, &metadata, 5000); + delete metadata; + } + + Test::subscribe(c, topic_name_1); + Test::wait_for_assignment(c, 1, &topic_name_1); + + Test::subscribe(c, topic_name_2); + + if (variation == 3 || variation == 5) + Test::poll_once(c, 500); + + if (variation < 5) { + // Very quickly after subscribing to topic 2, subscribe to topic 3. + Test::subscribe(c, topic_name_3); + Test::wait_for_assignment(c, 1, &topic_name_3); + } else { + // ..or unsubscribe. + Test::unsubscribe(c); + Test::wait_for_assignment(c, 0, NULL); + } + + delete c; + + SUB_TEST_PASS(); +} + + + +/* Check behavior when: + * - Two consumer subscribe to a topic. + * - Max poll interval is exceeded on the first consumer. + */ + +static void t_max_poll_interval_exceeded(int variation) { + SUB_TEST("variation %d", variation); + + std::string topic_name_1 = + Test::mk_topic_name("0113-cooperative_rebalance", 1); + std::string group_name = + Test::mk_unique_group_name("0113-cooperative_rebalance"); + test_create_topic(NULL, topic_name_1.c_str(), 2, 1); + + std::vector<std::pair<std::string, std::string> > additional_conf; + additional_conf.push_back(std::pair<std::string, std::string>( + std::string("session.timeout.ms"), std::string("6000"))); + additional_conf.push_back(std::pair<std::string, std::string>( + std::string("max.poll.interval.ms"), std::string("7000"))); + + DefaultRebalanceCb rebalance_cb1; + RdKafka::KafkaConsumer *c1 = + make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf, + &rebalance_cb1, 30); + DefaultRebalanceCb rebalance_cb2; + RdKafka::KafkaConsumer *c2 = + make_consumer("C_2", group_name, "cooperative-sticky", &additional_conf, + &rebalance_cb2, 30); + + test_wait_topic_exists(c1->c_ptr(), topic_name_1.c_str(), 10 * 1000); + test_wait_topic_exists(c2->c_ptr(), topic_name_1.c_str(), 10 * 1000); + + Test::subscribe(c1, topic_name_1); + Test::subscribe(c2, topic_name_1); + + bool done = false; + bool both_have_been_assigned = false; + while (!done) { + if (!both_have_been_assigned) + Test::poll_once(c1, 500); + Test::poll_once(c2, 500); + + if (Test::assignment_partition_count(c1, NULL) == 1 && + Test::assignment_partition_count(c2, NULL) == 1 && + !both_have_been_assigned) { + Test::Say( + tostr() + << "Both consumers are assigned to topic " << topic_name_1 + << ". WAITING 7 seconds for max.poll.interval.ms to be exceeded\n"); + both_have_been_assigned = true; + } + + if (Test::assignment_partition_count(c2, NULL) == 2 && + both_have_been_assigned) { + Test::Say("Consumer 1 is no longer assigned any partitions, done\n"); + done = true; + } + } + + if (variation == 1) { + if (rebalance_cb1.lost_call_cnt != 0) + Test::Fail( + tostr() << "Expected consumer 1 lost revoke count to be 0, not: " + << rebalance_cb1.lost_call_cnt); + Test::poll_once(c1, + 500); /* Eat the max poll interval exceeded error message */ + Test::poll_once(c1, + 500); /* Trigger the rebalance_cb with lost partitions */ + if (rebalance_cb1.lost_call_cnt != 1) + Test::Fail( + tostr() << "Expected consumer 1 lost revoke count to be 1, not: " + << rebalance_cb1.lost_call_cnt); + } + + c1->close(); + c2->close(); + + if (rebalance_cb1.lost_call_cnt != 1) + Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be 1, not: " + << rebalance_cb1.lost_call_cnt); + + if (rebalance_cb1.assign_call_cnt != 1) + Test::Fail(tostr() << "Expected consumer 1 assign count to be 1, not: " + << rebalance_cb1.assign_call_cnt); + if (rebalance_cb2.assign_call_cnt != 2) + Test::Fail(tostr() << "Expected consumer 1 assign count to be 2, not: " + << rebalance_cb1.assign_call_cnt); + + if (rebalance_cb1.revoke_call_cnt != 1) + Test::Fail(tostr() << "Expected consumer 1 revoke count to be 1, not: " + << rebalance_cb1.revoke_call_cnt); + if (rebalance_cb2.revoke_call_cnt != 1) + Test::Fail(tostr() << "Expected consumer 2 revoke count to be 1, not: " + << rebalance_cb1.revoke_call_cnt); + + delete c1; + delete c2; + + SUB_TEST_PASS(); +} + + +/** + * @brief Poll all consumers until there are no more events or messages + * and the timeout has expired. + */ +static void poll_all_consumers(RdKafka::KafkaConsumer **consumers, + DefaultRebalanceCb *rebalance_cbs, + size_t num, + int timeout_ms) { + int64_t ts_end = test_clock() + (timeout_ms * 1000); + + /* Poll all consumers until no more events are seen, + * this makes sure we exhaust the current state events before returning. */ + bool evented; + do { + evented = false; + for (size_t i = 0; i < num; i++) { + int block_ms = min(10, (int)((ts_end - test_clock()) / 1000)); + while (rebalance_cbs[i].poll_once(consumers[i], max(block_ms, 0))) + evented = true; + } + } while (evented || test_clock() < ts_end); +} + + +/** + * @brief Stress test with 8 consumers subscribing, fetching and committing. + * + * @param subscription_variation 0..2 + * + * TODO: incorporate committing offsets. + */ + +static void u_multiple_subscription_changes(bool use_rebalance_cb, + int subscription_variation) { + const int N_CONSUMERS = 8; + const int N_TOPICS = 2; + const int N_PARTS_PER_TOPIC = N_CONSUMERS * N_TOPICS; + const int N_PARTITIONS = N_PARTS_PER_TOPIC * N_TOPICS; + const int N_MSGS_PER_PARTITION = 1000; + + SUB_TEST("use_rebalance_cb: %d, subscription_variation: %d", + (int)use_rebalance_cb, subscription_variation); + + string topic_name_1 = Test::mk_topic_name("0113u_1", 1); + string topic_name_2 = Test::mk_topic_name("0113u_2", 1); + string group_name = Test::mk_unique_group_name("0113u"); + + test_create_topic(NULL, topic_name_1.c_str(), N_PARTS_PER_TOPIC, 1); + test_create_topic(NULL, topic_name_2.c_str(), N_PARTS_PER_TOPIC, 1); + + Test::Say("Creating consumers\n"); + DefaultRebalanceCb rebalance_cbs[N_CONSUMERS]; + RdKafka::KafkaConsumer *consumers[N_CONSUMERS]; + + for (int i = 0; i < N_CONSUMERS; i++) { + std::string name = tostr() << "C_" << i; + consumers[i] = + make_consumer(name.c_str(), group_name, "cooperative-sticky", NULL, + use_rebalance_cb ? &rebalance_cbs[i] : NULL, 120); + } + + test_wait_topic_exists(consumers[0]->c_ptr(), topic_name_1.c_str(), + 10 * 1000); + test_wait_topic_exists(consumers[0]->c_ptr(), topic_name_2.c_str(), + 10 * 1000); + + + /* + * Seed all partitions with the same number of messages so we later can + * verify that consumption is working. + */ + vector<pair<Toppar, int> > ptopics; + ptopics.push_back(pair<Toppar, int>(Toppar(topic_name_1, N_PARTS_PER_TOPIC), + N_MSGS_PER_PARTITION)); + ptopics.push_back(pair<Toppar, int>(Toppar(topic_name_2, N_PARTS_PER_TOPIC), + N_MSGS_PER_PARTITION)); + produce_msgs(ptopics); + + + /* + * Track what topics a consumer should be subscribed to and use this to + * verify both its subscription and assignment throughout the test. + */ + + /* consumer -> currently subscribed topics */ + map<int, vector<string> > consumer_topics; + + /* topic -> consumers subscribed to topic */ + map<string, set<int> > topic_consumers; + + /* The subscription alternatives that consumers + * alter between in the playbook. */ + vector<string> SUBSCRIPTION_1; + vector<string> SUBSCRIPTION_2; + + SUBSCRIPTION_1.push_back(topic_name_1); + + switch (subscription_variation) { + case 0: + SUBSCRIPTION_2.push_back(topic_name_1); + SUBSCRIPTION_2.push_back(topic_name_2); + break; + + case 1: + SUBSCRIPTION_2.push_back(topic_name_2); + break; + + case 2: + /* No subscription */ + break; + } + + sort(SUBSCRIPTION_1.begin(), SUBSCRIPTION_1.end()); + sort(SUBSCRIPTION_2.begin(), SUBSCRIPTION_2.end()); + + + /* + * Define playbook + */ + const struct { + int timestamp_ms; + int consumer; + const vector<string> *topics; + } playbook[] = {/* timestamp_ms, consumer_number, subscribe-to-topics */ + {0, 0, &SUBSCRIPTION_1}, /* Cmd 0 */ + {4000, 1, &SUBSCRIPTION_1}, {4000, 1, &SUBSCRIPTION_1}, + {4000, 1, &SUBSCRIPTION_1}, {4000, 2, &SUBSCRIPTION_1}, + {6000, 3, &SUBSCRIPTION_1}, /* Cmd 5 */ + {6000, 4, &SUBSCRIPTION_1}, {6000, 5, &SUBSCRIPTION_1}, + {6000, 6, &SUBSCRIPTION_1}, {6000, 7, &SUBSCRIPTION_2}, + {6000, 1, &SUBSCRIPTION_1}, /* Cmd 10 */ + {6000, 1, &SUBSCRIPTION_2}, {6000, 1, &SUBSCRIPTION_1}, + {6000, 2, &SUBSCRIPTION_2}, {7000, 2, &SUBSCRIPTION_1}, + {7000, 1, &SUBSCRIPTION_2}, /* Cmd 15 */ + {8000, 0, &SUBSCRIPTION_2}, {8000, 1, &SUBSCRIPTION_1}, + {8000, 0, &SUBSCRIPTION_1}, {13000, 2, &SUBSCRIPTION_1}, + {13000, 1, &SUBSCRIPTION_2}, /* Cmd 20 */ + {13000, 5, &SUBSCRIPTION_2}, {14000, 6, &SUBSCRIPTION_2}, + {15000, 7, &SUBSCRIPTION_1}, {15000, 1, &SUBSCRIPTION_1}, + {15000, 5, &SUBSCRIPTION_1}, /* Cmd 25 */ + {15000, 6, &SUBSCRIPTION_1}, {INT_MAX, 0, 0}}; + + /* + * Run the playbook + */ + int cmd_number = 0; + uint64_t ts_start = test_clock(); + + while (playbook[cmd_number].timestamp_ms != INT_MAX) { + TEST_ASSERT(playbook[cmd_number].consumer < N_CONSUMERS); + + Test::Say(tostr() << "Cmd #" << cmd_number << ": wait " + << playbook[cmd_number].timestamp_ms << "ms\n"); + + poll_all_consumers(consumers, rebalance_cbs, N_CONSUMERS, + playbook[cmd_number].timestamp_ms - + (int)((test_clock() - ts_start) / 1000)); + + /* Verify consumer assignments match subscribed topics */ + map<Toppar, RdKafka::KafkaConsumer *> all_assignments; + for (int i = 0; i < N_CONSUMERS; i++) + verify_consumer_assignment( + consumers[i], rebalance_cbs[i], consumer_topics[i], + /* Allow empty assignment */ + true, + /* Allow mismatch between subscribed topics + * and actual assignment since we can't + * synchronize the last subscription + * to the current assignment due to + * an unknown number of rebalances required + * for the final assignment to settle. + * This is instead checked at the end of + * this test case. */ + true, &all_assignments, -1 /* no msgcnt check*/); + + int cid = playbook[cmd_number].consumer; + RdKafka::KafkaConsumer *consumer = consumers[playbook[cmd_number].consumer]; + const vector<string> *topics = playbook[cmd_number].topics; + + /* + * Update our view of the consumer's subscribed topics and vice versa. + */ + for (vector<string>::const_iterator it = consumer_topics[cid].begin(); + it != consumer_topics[cid].end(); it++) { + topic_consumers[*it].erase(cid); + } + + consumer_topics[cid].clear(); + + for (vector<string>::const_iterator it = topics->begin(); + it != topics->end(); it++) { + consumer_topics[cid].push_back(*it); + topic_consumers[*it].insert(cid); + } + + RdKafka::ErrorCode err; + + /* + * Change subscription + */ + if (!topics->empty()) { + Test::Say(tostr() << "Consumer: " << consumer->name() + << " is subscribing to topics " + << string_vec_to_str(*topics) << " after " + << ((test_clock() - ts_start) / 1000) << "ms\n"); + err = consumer->subscribe(*topics); + TEST_ASSERT(!err, "Expected subscribe() to succeed, got %s", + RdKafka::err2str(err).c_str()); + } else { + Test::Say(tostr() << "Consumer: " << consumer->name() + << " is unsubscribing after " + << ((test_clock() - ts_start) / 1000) << "ms\n"); + Test::unsubscribe(consumer); + } + + /* Mark this consumer as waiting for rebalance so that + * verify_consumer_assignment() allows assigned partitions that + * (no longer) match the subscription. */ + rebalance_cbs[cid].wait_rebalance = true; + + + /* + * Verify subscription matches what we think it should be. + */ + vector<string> subscription; + err = consumer->subscription(subscription); + TEST_ASSERT(!err, "consumer %s subscription() failed: %s", + consumer->name().c_str(), RdKafka::err2str(err).c_str()); + + sort(subscription.begin(), subscription.end()); + + Test::Say(tostr() << "Consumer " << consumer->name() + << " subscription is now " + << string_vec_to_str(subscription) << "\n"); + + if (subscription != *topics) + Test::Fail(tostr() << "Expected consumer " << consumer->name() + << " subscription: " << string_vec_to_str(*topics) + << " but got: " << string_vec_to_str(subscription)); + + cmd_number++; + } + + + /* + * Wait for final rebalances and all consumers to settle, + * then verify assignments and received message counts. + */ + Test::Say(_C_YEL "Waiting for final assignment state\n"); + int done_count = 0; + /* Allow at least 20 seconds for group to stabilize. */ + int64_t stabilize_until = test_clock() + (20 * 1000 * 1000); /* 20s */ + + while (done_count < 2) { + bool stabilized = test_clock() > stabilize_until; + + poll_all_consumers(consumers, rebalance_cbs, N_CONSUMERS, 5000); + + /* Verify consumer assignments */ + int counts[N_CONSUMERS]; + map<Toppar, RdKafka::KafkaConsumer *> all_assignments; + Test::Say(tostr() << "Consumer assignments " + << "(subscription_variation " << subscription_variation + << ")" << (stabilized ? " (stabilized)" : "") + << (use_rebalance_cb ? " (use_rebalance_cb)" + : " (no rebalance cb)") + << ":\n"); + for (int i = 0; i < N_CONSUMERS; i++) { + bool last_rebalance_stabilized = + stabilized && + (!use_rebalance_cb || + /* session.timeout.ms * 2 + 1 */ + test_clock() > rebalance_cbs[i].ts_last_assign + (13 * 1000 * 1000)); + + counts[i] = verify_consumer_assignment( + consumers[i], rebalance_cbs[i], consumer_topics[i], + /* allow empty */ + true, + /* if we're waiting for a + * rebalance it is okay for the + * current assignment to contain + * topics that this consumer + * (no longer) subscribes to. */ + !last_rebalance_stabilized || !use_rebalance_cb || + rebalance_cbs[i].wait_rebalance, + /* do not allow assignments for + * topics that are not subscribed*/ + &all_assignments, + /* Verify received message counts + * once the assignments have + * stabilized. + * Requires the rebalance cb.*/ + done_count > 0 && use_rebalance_cb ? N_MSGS_PER_PARTITION : -1); + } + + Test::Say(tostr() << all_assignments.size() << "/" << N_PARTITIONS + << " partitions assigned\n"); + + bool done = true; + for (int i = 0; i < N_CONSUMERS; i++) { + /* For each topic the consumer subscribes to it should + * be assigned its share of partitions. */ + int exp_parts = 0; + for (vector<string>::const_iterator it = consumer_topics[i].begin(); + it != consumer_topics[i].end(); it++) + exp_parts += N_PARTS_PER_TOPIC / (int)topic_consumers[*it].size(); + + Test::Say(tostr() << (counts[i] == exp_parts ? "" : _C_YEL) << "Consumer " + << consumers[i]->name() << " has " << counts[i] + << " assigned partitions (" << consumer_topics[i].size() + << " subscribed topic(s))" + << ", expecting " << exp_parts + << " assigned partitions\n"); + + if (counts[i] != exp_parts) + done = false; + } + + if (done && stabilized) { + done_count++; + Test::Say(tostr() << "All assignments verified, done count is " + << done_count << "\n"); + } + } + + Test::Say("Disposing consumers\n"); + for (int i = 0; i < N_CONSUMERS; i++) { + TEST_ASSERT(!use_rebalance_cb || !rebalance_cbs[i].wait_rebalance, + "Consumer %d still waiting for rebalance", i); + if (i & 1) + consumers[i]->close(); + delete consumers[i]; + } + + SUB_TEST_PASS(); +} + + + +extern "C" { + +static int rebalance_cnt; +static rd_kafka_resp_err_t rebalance_exp_event; +static rd_bool_t rebalance_exp_lost; + +extern void test_print_partition_list( + const rd_kafka_topic_partition_list_t *partitions); + + +static void rebalance_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *parts, + void *opaque) { + rebalance_cnt++; + TEST_SAY("Rebalance #%d: %s: %d partition(s)\n", rebalance_cnt, + rd_kafka_err2name(err), parts->cnt); + + test_print_partition_list(parts); + + TEST_ASSERT(err == rebalance_exp_event || + rebalance_exp_event == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected rebalance event %s, not %s", + rd_kafka_err2name(rebalance_exp_event), rd_kafka_err2name(err)); + + if (rebalance_exp_lost) { + TEST_ASSERT(rd_kafka_assignment_lost(rk), "Expected partitions lost"); + TEST_SAY("Partitions were lost\n"); + } + + if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { + test_consumer_incremental_assign("assign", rk, parts); + } else { + test_consumer_incremental_unassign("unassign", rk, parts); + } +} + +/** + * @brief Wait for an expected rebalance event, or fail. + */ +static void expect_rebalance0(const char *func, + int line, + const char *what, + rd_kafka_t *c, + rd_kafka_resp_err_t exp_event, + rd_bool_t exp_lost, + int timeout_s) { + int64_t tmout = test_clock() + (timeout_s * 1000000); + int start_cnt = rebalance_cnt; + + TEST_SAY("%s:%d: Waiting for %s (%s) for %ds\n", func, line, what, + rd_kafka_err2name(exp_event), timeout_s); + + rebalance_exp_lost = exp_lost; + rebalance_exp_event = exp_event; + + while (tmout > test_clock() && rebalance_cnt == start_cnt) { + test_consumer_poll_once(c, NULL, 1000); + } + + if (rebalance_cnt == start_cnt + 1) { + rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR; + rebalance_exp_lost = exp_lost = rd_false; + return; + } + + TEST_FAIL("%s:%d: Timed out waiting for %s (%s)", func, line, what, + rd_kafka_err2name(exp_event)); +} + +#define expect_rebalance(WHAT, C, EXP_EVENT, EXP_LOST, TIMEOUT_S) \ + expect_rebalance0(__FUNCTION__, __LINE__, WHAT, C, EXP_EVENT, EXP_LOST, \ + TIMEOUT_S) + + +/* Check lost partitions revoke occurs on ILLEGAL_GENERATION heartbeat error. + */ + +static void p_lost_partitions_heartbeat_illegal_generation_test() { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + const char *groupid = "mygroup"; + const char *topic = "test"; + rd_kafka_t *c; + rd_kafka_conf_t *conf; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10, "bootstrap.servers", + bootstraps, "batch.num.messages", "10", + "security.protocol", "plaintext", NULL); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "security.protocol", "PLAINTEXT"); + test_conf_set(conf, "group.id", groupid); + test_conf_set(conf, "session.timeout.ms", "5000"); + test_conf_set(conf, "heartbeat.interval.ms", "1000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky"); + + c = test_create_consumer(groupid, rebalance_cb, conf, NULL); + + test_consumer_subscribe(c, topic); + + expect_rebalance("initial assignment", c, + RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + rd_false /*don't expect lost*/, 5 + 2); + + /* Fail heartbeats */ + rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_Heartbeat, 5, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION); + + expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, + rd_true /*expect lost*/, 10 + 2); + + rd_kafka_mock_clear_request_errors(mcluster, RD_KAFKAP_Heartbeat); + + expect_rebalance("rejoin after lost", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + rd_false /*don't expect lost*/, 10 + 2); + + TEST_SAY("Closing consumer\n"); + test_consumer_close(c); + + TEST_SAY("Destroying consumer\n"); + rd_kafka_destroy(c); + + TEST_SAY("Destroying mock cluster\n"); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + + + +/* Check lost partitions revoke occurs on ILLEGAL_GENERATION JoinGroup + * or SyncGroup error. + */ + +static void q_lost_partitions_illegal_generation_test( + rd_bool_t test_joingroup_fail) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + const char *groupid = "mygroup"; + const char *topic1 = "test1"; + const char *topic2 = "test2"; + rd_kafka_t *c; + rd_kafka_conf_t *conf; + rd_kafka_resp_err_t err; + rd_kafka_topic_partition_list_t *topics; + + SUB_TEST0(!test_joingroup_fail /*quick*/, "test_joingroup_fail=%d", + test_joingroup_fail); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1); + + /* Seed the topic1 with messages */ + test_produce_msgs_easy_v(topic1, 0, 0, 0, 100, 10, "bootstrap.servers", + bootstraps, "batch.num.messages", "10", + "security.protocol", "plaintext", NULL); + + /* Seed the topic2 with messages */ + test_produce_msgs_easy_v(topic2, 0, 0, 0, 100, 10, "bootstrap.servers", + bootstraps, "batch.num.messages", "10", + "security.protocol", "plaintext", NULL); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "security.protocol", "PLAINTEXT"); + test_conf_set(conf, "group.id", groupid); + test_conf_set(conf, "session.timeout.ms", "5000"); + test_conf_set(conf, "heartbeat.interval.ms", "1000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky"); + + c = test_create_consumer(groupid, rebalance_cb, conf, NULL); + + test_consumer_subscribe(c, topic1); + + expect_rebalance("initial assignment", c, + RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + rd_false /*don't expect lost*/, 5 + 2); + + /* Fail JoinGroups or SyncGroups */ + rd_kafka_mock_push_request_errors( + mcluster, test_joingroup_fail ? RD_KAFKAP_JoinGroup : RD_KAFKAP_SyncGroup, + 5, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION); + + topics = rd_kafka_topic_partition_list_new(2); + rd_kafka_topic_partition_list_add(topics, topic1, RD_KAFKA_PARTITION_UA); + rd_kafka_topic_partition_list_add(topics, topic2, RD_KAFKA_PARTITION_UA); + err = rd_kafka_subscribe(c, topics); + if (err) + TEST_FAIL("%s: Failed to subscribe to topics: %s\n", rd_kafka_name(c), + rd_kafka_err2str(err)); + rd_kafka_topic_partition_list_destroy(topics); + + expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, + rd_true /*expect lost*/, 10 + 2); + + rd_kafka_mock_clear_request_errors(mcluster, test_joingroup_fail + ? RD_KAFKAP_JoinGroup + : RD_KAFKAP_SyncGroup); + + expect_rebalance("rejoin group", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + rd_false /*expect lost*/, 10 + 2); + + TEST_SAY("Closing consumer\n"); + test_consumer_close(c); + + TEST_SAY("Destroying consumer\n"); + rd_kafka_destroy(c); + + TEST_SAY("Destroying mock cluster\n"); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + + + +/* Check lost partitions revoke occurs on ILLEGAL_GENERATION Commit + * error. + */ + +static void r_lost_partitions_commit_illegal_generation_test_local() { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + const char *groupid = "mygroup"; + const char *topic = "test"; + const int msgcnt = 100; + rd_kafka_t *c; + rd_kafka_conf_t *conf; + + SUB_TEST(); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 10, "bootstrap.servers", + bootstraps, "batch.num.messages", "10", + "security.protocol", "plaintext", NULL); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "security.protocol", "PLAINTEXT"); + test_conf_set(conf, "group.id", groupid); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky"); + + c = test_create_consumer(groupid, rebalance_cb, conf, NULL); + + test_consumer_subscribe(c, topic); + + expect_rebalance("initial assignment", c, + RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + rd_false /*don't expect lost*/, 5 + 2); + + + /* Consume some messages so that the commit has something to commit. */ + test_consumer_poll("consume", c, -1, -1, -1, msgcnt / 2, NULL); + + /* Fail Commit */ + rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_OffsetCommit, 5, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION); + + rd_kafka_commit(c, NULL, rd_false); + + expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, + rd_true /*expect lost*/, 10 + 2); + + expect_rebalance("rejoin group", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + rd_false /*expect lost*/, 20 + 2); + + TEST_SAY("Closing consumer\n"); + test_consumer_close(c); + + TEST_SAY("Destroying consumer\n"); + rd_kafka_destroy(c); + + TEST_SAY("Destroying mock cluster\n"); + test_mock_cluster_destroy(mcluster); +} + + +/** + * @brief Rebalance callback for the v_.. test below. + */ +static void v_rebalance_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *parts, + void *opaque) { + bool *auto_commitp = (bool *)opaque; + + TEST_SAY("%s: %s: %d partition(s)%s\n", rd_kafka_name(rk), + rd_kafka_err2name(err), parts->cnt, + rd_kafka_assignment_lost(rk) ? " - assignment lost" : ""); + + test_print_partition_list(parts); + + if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { + test_consumer_incremental_assign("assign", rk, parts); + } else { + test_consumer_incremental_unassign("unassign", rk, parts); + + if (!*auto_commitp) { + rd_kafka_resp_err_t commit_err; + + TEST_SAY("Attempting manual commit after unassign, in 2 seconds..\n"); + /* Sleep enough to have the generation-id bumped by rejoin. */ + rd_sleep(2); + commit_err = rd_kafka_commit(rk, NULL, 0 /*sync*/); + TEST_ASSERT(!commit_err || commit_err == RD_KAFKA_RESP_ERR__NO_OFFSET || + commit_err == RD_KAFKA_RESP_ERR__DESTROY, + "%s: manual commit failed: %s", rd_kafka_name(rk), + rd_kafka_err2str(commit_err)); + } + } +} + +/** + * @brief Commit callback for the v_.. test. + */ +static void v_commit_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *offsets, + void *opaque) { + TEST_SAY("%s offset commit for %d offsets: %s\n", rd_kafka_name(rk), + offsets ? offsets->cnt : -1, rd_kafka_err2name(err)); + TEST_ASSERT(!err || err == RD_KAFKA_RESP_ERR__NO_OFFSET || + err == RD_KAFKA_RESP_ERR__DESTROY /* consumer was closed */, + "%s offset commit failed: %s", rd_kafka_name(rk), + rd_kafka_err2str(err)); +} + + +static void v_commit_during_rebalance(bool with_rebalance_cb, + bool auto_commit) { + rd_kafka_t *p, *c1, *c2; + rd_kafka_conf_t *conf; + const char *topic = test_mk_topic_name("0113_v", 1); + const int partition_cnt = 6; + const int msgcnt_per_partition = 100; + const int msgcnt = partition_cnt * msgcnt_per_partition; + uint64_t testid; + int i; + + + SUB_TEST("With%s rebalance callback and %s-commit", + with_rebalance_cb ? "" : "out", auto_commit ? "auto" : "manual"); + + test_conf_init(&conf, NULL, 30); + testid = test_id_generate(); + + /* + * Produce messages to topic + */ + p = test_create_producer(); + + test_create_topic(p, topic, partition_cnt, 1); + + for (i = 0; i < partition_cnt; i++) { + test_produce_msgs2(p, topic, testid, i, i * msgcnt_per_partition, + msgcnt_per_partition, NULL, 0); + } + + test_flush(p, -1); + + rd_kafka_destroy(p); + + + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", auto_commit ? "true" : "false"); + test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky"); + rd_kafka_conf_set_offset_commit_cb(conf, v_commit_cb); + rd_kafka_conf_set_opaque(conf, (void *)&auto_commit); + + TEST_SAY("Create and subscribe first consumer\n"); + c1 = test_create_consumer(topic, with_rebalance_cb ? v_rebalance_cb : NULL, + rd_kafka_conf_dup(conf), NULL); + TEST_ASSERT(rd_kafka_opaque(c1) == (void *)&auto_commit, + "c1 opaque mismatch"); + test_consumer_subscribe(c1, topic); + + /* Consume some messages so that we know we have an assignment + * and something to commit. */ + test_consumer_poll("C1.PRECONSUME", c1, testid, -1, 0, + msgcnt / partition_cnt / 2, NULL); + + TEST_SAY("Create and subscribe second consumer\n"); + c2 = test_create_consumer(topic, with_rebalance_cb ? v_rebalance_cb : NULL, + conf, NULL); + TEST_ASSERT(rd_kafka_opaque(c2) == (void *)&auto_commit, + "c2 opaque mismatch"); + test_consumer_subscribe(c2, topic); + + /* Poll both consumers */ + for (i = 0; i < 10; i++) { + test_consumer_poll_once(c1, NULL, 1000); + test_consumer_poll_once(c2, NULL, 1000); + } + + TEST_SAY("Closing consumers\n"); + test_consumer_close(c1); + test_consumer_close(c2); + + rd_kafka_destroy(c1); + rd_kafka_destroy(c2); + + SUB_TEST_PASS(); +} + + +/** + * @brief Verify that incremental rebalances retain stickyness. + */ +static void x_incremental_rebalances(void) { +#define _NUM_CONS 3 + rd_kafka_t *c[_NUM_CONS]; + rd_kafka_conf_t *conf; + const char *topic = test_mk_topic_name("0113_x", 1); + int i; + + SUB_TEST(); + test_conf_init(&conf, NULL, 60); + + test_create_topic(NULL, topic, 6, 1); + + test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky"); + for (i = 0; i < _NUM_CONS; i++) { + char clientid[32]; + rd_snprintf(clientid, sizeof(clientid), "consumer%d", i); + test_conf_set(conf, "client.id", clientid); + + c[i] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); + } + rd_kafka_conf_destroy(conf); + + /* First consumer joins group */ + TEST_SAY("%s: joining\n", rd_kafka_name(c[0])); + test_consumer_subscribe(c[0], topic); + test_consumer_wait_assignment(c[0], rd_true /*poll*/); + test_consumer_verify_assignment(c[0], rd_true /*fail immediately*/, topic, 0, + topic, 1, topic, 2, topic, 3, topic, 4, topic, + 5, NULL); + + + /* Second consumer joins group */ + TEST_SAY("%s: joining\n", rd_kafka_name(c[1])); + test_consumer_subscribe(c[1], topic); + test_consumer_wait_assignment(c[1], rd_true /*poll*/); + rd_sleep(3); + test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 3, + topic, 4, topic, 5, NULL); + test_consumer_verify_assignment(c[1], rd_false /*fail later*/, topic, 0, + topic, 1, topic, 2, NULL); + + /* Third consumer joins group */ + TEST_SAY("%s: joining\n", rd_kafka_name(c[2])); + test_consumer_subscribe(c[2], topic); + test_consumer_wait_assignment(c[2], rd_true /*poll*/); + rd_sleep(3); + test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 4, + topic, 5, NULL); + test_consumer_verify_assignment(c[1], rd_false /*fail later*/, topic, 1, + topic, 2, NULL); + test_consumer_verify_assignment(c[2], rd_false /*fail later*/, topic, 3, + topic, 0, NULL); + + /* Raise any previously failed verify_assignment calls and fail the test */ + TEST_LATER_CHECK(); + + for (i = 0; i < _NUM_CONS; i++) + rd_kafka_destroy(c[i]); + + SUB_TEST_PASS(); + +#undef _NUM_CONS +} + +/* Local tests not needing a cluster */ +int main_0113_cooperative_rebalance_local(int argc, char **argv) { + a_assign_rapid(); + p_lost_partitions_heartbeat_illegal_generation_test(); + q_lost_partitions_illegal_generation_test(rd_false /*joingroup*/); + q_lost_partitions_illegal_generation_test(rd_true /*syncgroup*/); + r_lost_partitions_commit_illegal_generation_test_local(); + return 0; +} + +int main_0113_cooperative_rebalance(int argc, char **argv) { + int i; + + a_assign_tests(); + b_subscribe_with_cb_test(true /*close consumer*/); + b_subscribe_with_cb_test(false /*don't close consumer*/); + c_subscribe_no_cb_test(true /*close consumer*/); + + if (test_quick) { + Test::Say("Skipping tests >= c_ .. due to quick mode\n"); + return 0; + } + + c_subscribe_no_cb_test(false /*don't close consumer*/); + d_change_subscription_add_topic(true /*close consumer*/); + d_change_subscription_add_topic(false /*don't close consumer*/); + e_change_subscription_remove_topic(true /*close consumer*/); + e_change_subscription_remove_topic(false /*don't close consumer*/); + f_assign_call_cooperative(); + g_incremental_assign_call_eager(); + h_delete_topic(); + i_delete_topic_2(); + j_delete_topic_no_rb_callback(); + k_add_partition(); + l_unsubscribe(); + m_unsubscribe_2(); + n_wildcard(); + o_java_interop(); + for (i = 1; i <= 6; i++) /* iterate over 6 different test variations */ + s_subscribe_when_rebalancing(i); + for (i = 1; i <= 2; i++) + t_max_poll_interval_exceeded(i); + /* Run all 2*3 variations of the u_.. test */ + for (i = 0; i < 3; i++) { + u_multiple_subscription_changes(true /*with rebalance_cb*/, i); + u_multiple_subscription_changes(false /*without rebalance_cb*/, i); + } + v_commit_during_rebalance(true /*with rebalance callback*/, + true /*auto commit*/); + v_commit_during_rebalance(false /*without rebalance callback*/, + true /*auto commit*/); + v_commit_during_rebalance(true /*with rebalance callback*/, + false /*manual commit*/); + x_incremental_rebalances(); + + return 0; +} +} |