path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp
diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp')
1 files changed, 0 insertions, 3170 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp b/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp
deleted file mode 100644
index 430798d7..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/0113-cooperative_rebalance.cpp
+++ /dev/null
@@ -1,3170 +0,0 @@
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2020, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- */
-extern "C" {
-#include "../src/rdkafka_protocol.h"
-#include "test.h"
-#include <iostream>
-#include <map>
-#include <set>
-#include <algorithm>
-#include <cstring>
-#include <cstdlib>
-#include <assert.h>
-#include "testcpp.h"
-#include <fstream>
-using namespace std;
-/** Topic+Partition helper class */
-class Toppar {
- public:
- Toppar(const string &topic, int32_t partition) :
- topic(topic), partition(partition) {
- }
- Toppar(const RdKafka::TopicPartition *tp) :
- topic(tp->topic()), partition(tp->partition()) {
- }
- friend bool operator==(const Toppar &a, const Toppar &b) {
- return a.partition == b.partition && a.topic == b.topic;
- }
- friend bool operator<(const Toppar &a, const Toppar &b) {
- if (a.partition < b.partition)
- return true;
- return a.topic < b.topic;
- }
- string str() const {
- return tostr() << topic << "[" << partition << "]";
- }
- std::string topic;
- int32_t partition;
-static std::string get_bootstrap_servers() {
- RdKafka::Conf *conf;
- std::string bootstrap_servers;
- Test::conf_init(&conf, NULL, 0);
- conf->get("bootstrap.servers", bootstrap_servers);
- delete conf;
- return bootstrap_servers;
-class DrCb : public RdKafka::DeliveryReportCb {
- public:
- void dr_cb(RdKafka::Message &msg) {
- if (msg.err())
- Test::Fail("Delivery failed: " + RdKafka::err2str(msg.err()));
- }
- * @brief Produce messages to partitions.
- *
- * The pair is Toppar,msg_cnt_per_partition.
- * The Toppar is topic,partition_cnt.
- */
-static void produce_msgs(vector<pair<Toppar, int> > partitions) {
- RdKafka::Conf *conf;
- Test::conf_init(&conf, NULL, 0);
- string errstr;
- DrCb dr;
- conf->set("dr_cb", &dr, errstr);
- RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
- if (!p)
- Test::Fail("Failed to create producer: " + errstr);
- delete conf;
- for (vector<pair<Toppar, int> >::iterator it = partitions.begin();
- it != partitions.end(); it++) {
- for (int part = 0; part < it->first.partition; part++) {
- for (int i = 0; i < it->second; i++) {
- RdKafka::ErrorCode err =
- p->produce(it->first.topic, part, RdKafka::Producer::RK_MSG_COPY,
- (void *)"Hello there", 11, NULL, 0, 0, NULL);
- TEST_ASSERT(!err, "produce(%s, %d) failed: %s", it->first.topic.c_str(),
- part, RdKafka::err2str(err).c_str());
- p->poll(0);
- }
- }
- }
- p->flush(10000);
- delete p;
-static RdKafka::KafkaConsumer *make_consumer(
- string client_id,
- string group_id,
- string assignment_strategy,
- vector<pair<string, string> > *additional_conf,
- RdKafka::RebalanceCb *rebalance_cb,
- int timeout_s) {
- std::string bootstraps;
- std::string errstr;
- std::vector<std::pair<std::string, std::string> >::iterator itr;
- RdKafka::Conf *conf;
- Test::conf_init(&conf, NULL, timeout_s);
- Test::conf_set(conf, "", client_id);
- Test::conf_set(conf, "", group_id);
- Test::conf_set(conf, "auto.offset.reset", "earliest");
- Test::conf_set(conf, "", "false");
- Test::conf_set(conf, "partition.assignment.strategy", assignment_strategy);
- if (additional_conf != NULL) {
- for (itr = (*additional_conf).begin(); itr != (*additional_conf).end();
- itr++)
- Test::conf_set(conf, itr->first, itr->second);
- }
- if (rebalance_cb) {
- if (conf->set("rebalance_cb", rebalance_cb, errstr))
- Test::Fail("Failed to set rebalance_cb: " + errstr);
- }
- RdKafka::KafkaConsumer *consumer =
- RdKafka::KafkaConsumer::create(conf, errstr);
- if (!consumer)
- Test::Fail("Failed to create KafkaConsumer: " + errstr);
- delete conf;
- return consumer;
- * @returns a CSV string of the vector
- */
-static string string_vec_to_str(const vector<string> &v) {
- ostringstream ss;
- for (vector<string>::const_iterator it = v.begin(); it != v.end(); it++)
- ss << (it == v.begin() ? "" : ", ") << *it;
- return ss.str();
-void expect_assignment(RdKafka::KafkaConsumer *consumer, size_t count) {
- std::vector<RdKafka::TopicPartition *> partitions;
- RdKafka::ErrorCode err;
- err = consumer->assignment(partitions);
- if (err)
- Test::Fail(consumer->name() +
- " assignment() failed: " + RdKafka::err2str(err));
- if (partitions.size() != count)
- Test::Fail(tostr() << "Expecting consumer " << consumer->name()
- << " to have " << count
- << " assigned partition(s), not: " << partitions.size());
- RdKafka::TopicPartition::destroy(partitions);
-static bool TopicPartition_cmp(const RdKafka::TopicPartition *a,
- const RdKafka::TopicPartition *b) {
- if (a->topic() < b->topic())
- return true;
- else if (a->topic() > b->topic())
- return false;
- return a->partition() < b->partition();
-void expect_assignment(RdKafka::KafkaConsumer *consumer,
- vector<RdKafka::TopicPartition *> &expected) {
- vector<RdKafka::TopicPartition *> partitions;
- RdKafka::ErrorCode err;
- err = consumer->assignment(partitions);
- if (err)
- Test::Fail(consumer->name() +
- " assignment() failed: " + RdKafka::err2str(err));
- if (partitions.size() != expected.size())
- Test::Fail(tostr() << "Expecting consumer " << consumer->name()
- << " to have " << expected.size()
- << " assigned partition(s), not " << partitions.size());
- sort(partitions.begin(), partitions.end(), TopicPartition_cmp);
- sort(expected.begin(), expected.end(), TopicPartition_cmp);
- int fails = 0;
- for (int i = 0; i < (int)partitions.size(); i++) {
- if (!TopicPartition_cmp(partitions[i], expected[i]))
- continue;
- Test::Say(tostr() << _C_RED << consumer->name() << ": expected assignment #"
- << i << " " << expected[i]->topic() << " ["
- << expected[i]->partition() << "], not "
- << partitions[i]->topic() << " ["
- << partitions[i]->partition() << "]\n");
- fails++;
- }
- if (fails)
- Test::Fail(consumer->name() + ": Expected assignment mismatch, see above");
- RdKafka::TopicPartition::destroy(partitions);
-class DefaultRebalanceCb : public RdKafka::RebalanceCb {
- private:
- static string part_list_print(
- const vector<RdKafka::TopicPartition *> &partitions) {
- ostringstream ss;
- for (unsigned int i = 0; i < partitions.size(); i++)
- ss << (i == 0 ? "" : ", ") << partitions[i]->topic() << " ["
- << partitions[i]->partition() << "]";
- return ss.str();
- }
- public:
- int assign_call_cnt;
- int revoke_call_cnt;
- int nonempty_assign_call_cnt; /**< ASSIGN_PARTITIONS with partitions */
- int lost_call_cnt;
- int partitions_assigned_net;
- bool wait_rebalance;
- int64_t ts_last_assign; /**< Timestamp of last rebalance assignment */
- map<Toppar, int> msg_cnt; /**< Number of consumed messages per partition. */
- ~DefaultRebalanceCb() {
- reset_msg_cnt();
- }
- DefaultRebalanceCb() :
- assign_call_cnt(0),
- revoke_call_cnt(0),
- nonempty_assign_call_cnt(0),
- lost_call_cnt(0),
- partitions_assigned_net(0),
- wait_rebalance(false),
- ts_last_assign(0) {
- }
- void rebalance_cb(RdKafka::KafkaConsumer *consumer,
- RdKafka::ErrorCode err,
- std::vector<RdKafka::TopicPartition *> &partitions) {
- wait_rebalance = false;
- std::string protocol = consumer->rebalance_protocol();
- "%s: Expected rebalance_protocol \"COOPERATIVE\", not %s",
- consumer->name().c_str(), protocol.c_str());
- const char *lost_str = consumer->assignment_lost() ? " (LOST)" : "";
- Test::Say(tostr() << _C_YEL "RebalanceCb " << protocol << ": "
- << consumer->name() << " " << RdKafka::err2str(err)
- << lost_str << ": " << part_list_print(partitions)
- << "\n");
- if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
- if (consumer->assignment_lost())
- Test::Fail("unexpected lost assignment during ASSIGN rebalance");
- RdKafka::Error *error = consumer->incremental_assign(partitions);
- if (error)
- Test::Fail(tostr() << "consumer->incremental_assign() failed: "
- << error->str());
- if (partitions.size() > 0)
- nonempty_assign_call_cnt++;
- assign_call_cnt += 1;
- partitions_assigned_net += (int)partitions.size();
- ts_last_assign = test_clock();
- } else {
- if (consumer->assignment_lost())
- lost_call_cnt += 1;
- RdKafka::Error *error = consumer->incremental_unassign(partitions);
- if (error)
- Test::Fail(tostr() << "consumer->incremental_unassign() failed: "
- << error->str());
- if (partitions.size() == 0)
- Test::Fail("revoked partitions size should never be 0");
- revoke_call_cnt += 1;
- partitions_assigned_net -= (int)partitions.size();
- }
- /* Reset message counters for the given partitions. */
- Test::Say(consumer->name() + ": resetting message counters:\n");
- reset_msg_cnt(partitions);
- }
- bool poll_once(RdKafka::KafkaConsumer *c, int timeout_ms) {
- RdKafka::Message *msg = c->consume(timeout_ms);
- bool ret = msg->err() != RdKafka::ERR__TIMED_OUT;
- if (!msg->err())
- msg_cnt[Toppar(msg->topic_name(), msg->partition())]++;
- delete msg;
- return ret;
- }
- void reset_msg_cnt() {
- msg_cnt.clear();
- }
- void reset_msg_cnt(Toppar &tp) {
- int msgcnt = get_msg_cnt(tp);
- Test::Say(tostr() << " RESET " << tp.topic << " [" << tp.partition << "]"
- << " with " << msgcnt << " messages\n");
- if (!msg_cnt.erase(tp) && msgcnt)
- Test::Fail("erase failed!");
- }
- void reset_msg_cnt(const vector<RdKafka::TopicPartition *> &partitions) {
- for (unsigned int i = 0; i < partitions.size(); i++) {
- Toppar tp(partitions[i]->topic(), partitions[i]->partition());
- reset_msg_cnt(tp);
- }
- }
- int get_msg_cnt(const Toppar &tp) {
- map<Toppar, int>::iterator it = msg_cnt.find(tp);
- if (it == msg_cnt.end())
- return 0;
- return it->second;
- }
- * @brief Verify that the consumer's assignment is a subset of the
- * subscribed topics.
- *
- * @param allow_mismatch Allow assignment of not subscribed topics.
- * This can happen when the subscription is updated
- * but a rebalance callback hasn't been seen yet.
- * @param all_assignments Accumulated assignments for all consumers.
- * If an assigned partition already exists it means
- * the partition is assigned to multiple consumers and
- * the test will fail.
- * @param exp_msg_cnt Expected message count per assigned partition, or -1
- * if not to check.
- *
- * @returns the number of assigned partitions, or fails if the
- * assignment is empty or there is an assignment for
- * topic that is not subscribed.
- */
-static int verify_consumer_assignment(
- RdKafka::KafkaConsumer *consumer,
- DefaultRebalanceCb &rebalance_cb,
- const vector<string> &topics,
- bool allow_empty,
- bool allow_mismatch,
- map<Toppar, RdKafka::KafkaConsumer *> *all_assignments,
- int exp_msg_cnt) {
- vector<RdKafka::TopicPartition *> partitions;
- RdKafka::ErrorCode err;
- int fails = 0;
- int count;
- ostringstream ss;
- err = consumer->assignment(partitions);
- TEST_ASSERT(!err, "Failed to get assignment for consumer %s: %s",
- consumer->name().c_str(), RdKafka::err2str(err).c_str());
- count = (int)partitions.size();
- for (vector<RdKafka::TopicPartition *>::iterator it = partitions.begin();
- it != partitions.end(); it++) {
- RdKafka::TopicPartition *p = *it;
- if (find(topics.begin(), topics.end(), p->topic()) == topics.end()) {
- Test::Say(tostr() << (allow_mismatch ? _C_YEL "Warning (allowed)"
- : _C_RED "Error")
- << ": " << consumer->name() << " is assigned "
- << p->topic() << " [" << p->partition() << "] which is "
- << "not in the list of subscribed topics: "
- << string_vec_to_str(topics) << "\n");
- if (!allow_mismatch)
- fails++;
- }
- Toppar tp(p);
- pair<map<Toppar, RdKafka::KafkaConsumer *>::iterator, bool> ret;
- ret = all_assignments->insert(
- pair<Toppar, RdKafka::KafkaConsumer *>(tp, consumer));
- if (!ret.second) {
- Test::Say(tostr() << _C_RED << "Error: " << consumer->name()
- << " is assigned " << p->topic() << " ["
- << p->partition()
- << "] which is "
- "already assigned to consumer "
- << ret.first->second->name() << "\n");
- fails++;
- }
- int msg_cnt = rebalance_cb.get_msg_cnt(tp);
- if (exp_msg_cnt != -1 && msg_cnt != exp_msg_cnt) {
- Test::Say(tostr() << _C_RED << "Error: " << consumer->name()
- << " expected " << exp_msg_cnt << " messages on "
- << p->topic() << " [" << p->partition() << "], not "
- << msg_cnt << "\n");
- fails++;
- }
- ss << (it == partitions.begin() ? "" : ", ") << p->topic() << " ["
- << p->partition() << "] (" << msg_cnt << "msgs)";
- }
- RdKafka::TopicPartition::destroy(partitions);
- Test::Say(tostr() << "Consumer " << consumer->name() << " assignment ("
- << count << "): " << ss.str() << "\n");
- if (count == 0 && !allow_empty)
- Test::Fail("Consumer " + consumer->name() +
- " has unexpected empty assignment");
- if (fails)
- Test::Fail(
- tostr() << "Consumer " + consumer->name()
- << " assignment verification failed (see previous error)");
- return count;
-/* -------- a_assign_tests
- *
- * check behavior incremental assign / unassign outside the context of a
- * rebalance.
- */
-/** Incremental assign, then assign(NULL).
- */
-static void assign_test_1(RdKafka::KafkaConsumer *consumer,
- std::vector<RdKafka::TopicPartition *> toppars1,
- std::vector<RdKafka::TopicPartition *> toppars2) {
- RdKafka::ErrorCode err;
- RdKafka::Error *error;
- Test::Say("Incremental assign, then assign(NULL)\n");
- if ((error = consumer->incremental_assign(toppars1)))
- Test::Fail(tostr() << "Incremental assign failed: " << error->str());
- Test::check_assignment(consumer, 1, &toppars1[0]->topic());
- if ((err = consumer->unassign()))
- Test::Fail("Unassign failed: " + RdKafka::err2str(err));
- Test::check_assignment(consumer, 0, NULL);
-/** Assign, then incremental unassign.
- */
-static void assign_test_2(RdKafka::KafkaConsumer *consumer,
- std::vector<RdKafka::TopicPartition *> toppars1,
- std::vector<RdKafka::TopicPartition *> toppars2) {
- RdKafka::ErrorCode err;
- RdKafka::Error *error;
- Test::Say("Assign, then incremental unassign\n");
- if ((err = consumer->assign(toppars1)))
- Test::Fail("Assign failed: " + RdKafka::err2str(err));
- Test::check_assignment(consumer, 1, &toppars1[0]->topic());
- if ((error = consumer->incremental_unassign(toppars1)))
- Test::Fail("Incremental unassign failed: " + error->str());
- Test::check_assignment(consumer, 0, NULL);
-/** Incremental assign, then incremental unassign.
- */
-static void assign_test_3(RdKafka::KafkaConsumer *consumer,
- std::vector<RdKafka::TopicPartition *> toppars1,
- std::vector<RdKafka::TopicPartition *> toppars2) {
- RdKafka::Error *error;
- Test::Say("Incremental assign, then incremental unassign\n");
- if ((error = consumer->incremental_assign(toppars1)))
- Test::Fail("Incremental assign failed: " + error->str());
- Test::check_assignment(consumer, 1, &toppars1[0]->topic());
- if ((error = consumer->incremental_unassign(toppars1)))
- Test::Fail("Incremental unassign failed: " + error->str());
- Test::check_assignment(consumer, 0, NULL);
-/** Multi-topic incremental assign and unassign + message consumption.
- */
-static void assign_test_4(RdKafka::KafkaConsumer *consumer,
- std::vector<RdKafka::TopicPartition *> toppars1,
- std::vector<RdKafka::TopicPartition *> toppars2) {
- RdKafka::Error *error;
- Test::Say(
- "Multi-topic incremental assign and unassign + message consumption\n");
- if ((error = consumer->incremental_assign(toppars1)))
- Test::Fail("Incremental assign failed: " + error->str());
- Test::check_assignment(consumer, 1, &toppars1[0]->topic());
- RdKafka::Message *m = consumer->consume(5000);
- if (m->err() != RdKafka::ERR_NO_ERROR)
- Test::Fail("Expecting a consumed message.");
- if (m->len() != 100)
- Test::Fail(tostr() << "Expecting msg len to be 100, not: "
- << m->len()); /* implies read from topic 1. */
- delete m;
- if ((error = consumer->incremental_unassign(toppars1)))
- Test::Fail("Incremental unassign failed: " + error->str());
- Test::check_assignment(consumer, 0, NULL);
- m = consumer->consume(100);
- if (m->err() != RdKafka::ERR__TIMED_OUT)
- Test::Fail("Not expecting a consumed message.");
- delete m;
- if ((error = consumer->incremental_assign(toppars2)))
- Test::Fail("Incremental assign failed: " + error->str());
- Test::check_assignment(consumer, 1, &toppars2[0]->topic());
- m = consumer->consume(5000);
- if (m->err() != RdKafka::ERR_NO_ERROR)
- Test::Fail("Expecting a consumed message.");
- if (m->len() != 200)
- Test::Fail(tostr() << "Expecting msg len to be 200, not: "
- << m->len()); /* implies read from topic 2. */
- delete m;
- if ((error = consumer->incremental_assign(toppars1)))
- Test::Fail("Incremental assign failed: " + error->str());
- if (Test::assignment_partition_count(consumer, NULL) != 2)
- Test::Fail(tostr() << "Expecting current assignment to have size 2, not: "
- << Test::assignment_partition_count(consumer, NULL));
- m = consumer->consume(5000);
- if (m->err() != RdKafka::ERR_NO_ERROR)
- Test::Fail("Expecting a consumed message.");
- delete m;
- if ((error = consumer->incremental_unassign(toppars2)))
- Test::Fail("Incremental unassign failed: " + error->str());
- if ((error = consumer->incremental_unassign(toppars1)))
- Test::Fail("Incremental unassign failed: " + error->str());
- Test::check_assignment(consumer, 0, NULL);
-/** Incremental assign and unassign of empty collection.
- */
-static void assign_test_5(RdKafka::KafkaConsumer *consumer,
- std::vector<RdKafka::TopicPartition *> toppars1,
- std::vector<RdKafka::TopicPartition *> toppars2) {
- RdKafka::Error *error;
- std::vector<RdKafka::TopicPartition *> toppars3;
- Test::Say("Incremental assign and unassign of empty collection\n");
- if ((error = consumer->incremental_assign(toppars3)))
- Test::Fail("Incremental assign failed: " + error->str());
- Test::check_assignment(consumer, 0, NULL);
- if ((error = consumer->incremental_unassign(toppars3)))
- Test::Fail("Incremental unassign failed: " + error->str());
- Test::check_assignment(consumer, 0, NULL);
-static void run_test(
- const std::string &t1,
- const std::string &t2,
- void (*test)(RdKafka::KafkaConsumer *consumer,
- std::vector<RdKafka::TopicPartition *> toppars1,
- std::vector<RdKafka::TopicPartition *> toppars2)) {
- std::vector<RdKafka::TopicPartition *> toppars1;
- toppars1.push_back(RdKafka::TopicPartition::create(t1, 0));
- std::vector<RdKafka::TopicPartition *> toppars2;
- toppars2.push_back(RdKafka::TopicPartition::create(t2, 0));
- RdKafka::KafkaConsumer *consumer =
- make_consumer("C_1", t1, "cooperative-sticky", NULL, NULL, 10);
- test(consumer, toppars1, toppars2);
- RdKafka::TopicPartition::destroy(toppars1);
- RdKafka::TopicPartition::destroy(toppars2);
- consumer->close();
- delete consumer;
-static void a_assign_tests() {
- int msgcnt = 1000;
- const int msgsize1 = 100;
- const int msgsize2 = 200;
- std::string topic1_str = Test::mk_topic_name("0113-a1", 1);
- test_create_topic(NULL, topic1_str.c_str(), 1, 1);
- std::string topic2_str = Test::mk_topic_name("0113-a2", 1);
- test_create_topic(NULL, topic2_str.c_str(), 1, 1);
- test_produce_msgs_easy_size(topic1_str.c_str(), 0, 0, msgcnt, msgsize1);
- test_produce_msgs_easy_size(topic2_str.c_str(), 0, 0, msgcnt, msgsize2);
- run_test(topic1_str, topic2_str, assign_test_1);
- run_test(topic1_str, topic2_str, assign_test_2);
- run_test(topic1_str, topic2_str, assign_test_3);
- run_test(topic1_str, topic2_str, assign_test_4);
- run_test(topic1_str, topic2_str, assign_test_5);
- * @brief Quick Assign 1,2, Assign 2,3, Assign 1,2,3 test to verify
- * that the correct OffsetFetch response is used.
- * See note in rdkafka_assignment.c for details.
- *
- * Makes use of the mock cluster to induce latency.
- */
-static void a_assign_rapid() {
- std::string group_id = __FUNCTION__;
- rd_kafka_mock_cluster_t *mcluster;
- const char *bootstraps;
- mcluster = test_mock_cluster_new(3, &bootstraps);
- int32_t coord_id = 1;
- rd_kafka_mock_coordinator_set(mcluster, "group", group_id.c_str(), coord_id);
- rd_kafka_mock_topic_create(mcluster, "topic1", 1, 1);
- rd_kafka_mock_topic_create(mcluster, "topic2", 1, 1);
- rd_kafka_mock_topic_create(mcluster, "topic3", 1, 1);
- /*
- * Produce messages to topics
- */
- const int msgs_per_partition = 1000;
- RdKafka::Conf *pconf;
- Test::conf_init(&pconf, NULL, 10);
- Test::conf_set(pconf, "bootstrap.servers", bootstraps);
- Test::conf_set(pconf, "security.protocol", "plaintext");
- std::string errstr;
- RdKafka::Producer *p = RdKafka::Producer::create(pconf, errstr);
- if (!p)
- Test::Fail(tostr() << __FUNCTION__
- << ": Failed to create producer: " << errstr);
- delete pconf;
- Test::produce_msgs(p, "topic1", 0, msgs_per_partition, 10,
- false /*no flush*/);
- Test::produce_msgs(p, "topic2", 0, msgs_per_partition, 10,
- false /*no flush*/);
- Test::produce_msgs(p, "topic3", 0, msgs_per_partition, 10,
- false /*no flush*/);
- p->flush(10 * 1000);
- delete p;
- vector<RdKafka::TopicPartition *> toppars1;
- toppars1.push_back(RdKafka::TopicPartition::create("topic1", 0));
- vector<RdKafka::TopicPartition *> toppars2;
- toppars2.push_back(RdKafka::TopicPartition::create("topic2", 0));
- vector<RdKafka::TopicPartition *> toppars3;
- toppars3.push_back(RdKafka::TopicPartition::create("topic3", 0));
- RdKafka::Conf *conf;
- Test::conf_init(&conf, NULL, 20);
- Test::conf_set(conf, "bootstrap.servers", bootstraps);
- Test::conf_set(conf, "security.protocol", "plaintext");
- Test::conf_set(conf, "", __FUNCTION__);
- Test::conf_set(conf, "", group_id);
- Test::conf_set(conf, "auto.offset.reset", "earliest");
- Test::conf_set(conf, "", "false");
- RdKafka::KafkaConsumer *consumer;
- consumer = RdKafka::KafkaConsumer::create(conf, errstr);
- if (!consumer)
- Test::Fail(tostr() << __FUNCTION__
- << ": Failed to create consumer: " << errstr);
- delete conf;
- vector<RdKafka::TopicPartition *> toppars;
- vector<RdKafka::TopicPartition *> expected;
- map<Toppar, int64_t> pos; /* Expected consume position per partition */
- pos[Toppar(toppars1[0]->topic(), toppars1[0]->partition())] = 0;
- pos[Toppar(toppars2[0]->topic(), toppars2[0]->partition())] = 0;
- pos[Toppar(toppars3[0]->topic(), toppars3[0]->partition())] = 0;
- /* To make sure offset commits are fetched in proper assign sequence
- * we commit an offset that should not be used in the final consume loop.
- * This commit will be overwritten below with another commit. */
- vector<RdKafka::TopicPartition *> offsets;
- offsets.push_back(RdKafka::TopicPartition::create(
- toppars1[0]->topic(), toppars1[0]->partition(), 11));
- /* This partition should start at this position even though
- * there will be a sub-sequent commit to overwrite it, that should not
- * be used since this partition is never unassigned. */
- offsets.push_back(RdKafka::TopicPartition::create(
- toppars2[0]->topic(), toppars2[0]->partition(), 22));
- pos[Toppar(toppars2[0]->topic(), toppars2[0]->partition())] = 22;
- Test::print_TopicPartitions("pre-commit", offsets);
- RdKafka::ErrorCode err;
- err = consumer->commitSync(offsets);
- if (err)
- Test::Fail(tostr() << __FUNCTION__ << ": pre-commit failed: "
- << RdKafka::err2str(err) << "\n");
- /* Add coordinator delay so that the OffsetFetchRequest originating
- * from the coming incremental_assign() will not finish before
- * we call incremental_unassign() and incremental_assign() again, resulting
- * in a situation where the initial OffsetFetchResponse will contain
- * an older offset for a previous assignment of one partition. */
- rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 5000);
- /* Assign 1,2 == 1,2 */
- toppars.push_back(toppars1[0]);
- toppars.push_back(toppars2[0]);
- expected.push_back(toppars1[0]);
- expected.push_back(toppars2[0]);
- Test::incremental_assign(consumer, toppars);
- expect_assignment(consumer, expected);
- /* Unassign -1 == 2 */
- toppars.clear();
- toppars.push_back(toppars1[0]);
- vector<RdKafka::TopicPartition *>::iterator it =
- find(expected.begin(), expected.end(), toppars1[0]);
- expected.erase(it);
- Test::incremental_unassign(consumer, toppars);
- expect_assignment(consumer, expected);
- /* Commit offset for the removed partition and the partition that is
- * unchanged in the assignment. */
- RdKafka::TopicPartition::destroy(offsets);
- offsets.push_back(RdKafka::TopicPartition::create(
- toppars1[0]->topic(), toppars1[0]->partition(), 55));
- offsets.push_back(RdKafka::TopicPartition::create(
- toppars2[0]->topic(), toppars2[0]->partition(), 33)); /* should not be
- * used. */
- pos[Toppar(toppars1[0]->topic(), toppars1[0]->partition())] = 55;
- Test::print_TopicPartitions("commit", offsets);
- err = consumer->commitAsync(offsets);
- if (err)
- Test::Fail(tostr() << __FUNCTION__
- << ": commit failed: " << RdKafka::err2str(err) << "\n");
- /* Assign +3 == 2,3 */
- toppars.clear();
- toppars.push_back(toppars3[0]);
- expected.push_back(toppars3[0]);
- Test::incremental_assign(consumer, toppars);
- expect_assignment(consumer, expected);
- /* Now remove the latency */
- Test::Say(_C_MAG "Clearing rtt\n");
- rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 0);
- /* Assign +1 == 1,2,3 */
- toppars.clear();
- toppars.push_back(toppars1[0]);
- expected.push_back(toppars1[0]);
- Test::incremental_assign(consumer, toppars);
- expect_assignment(consumer, expected);
- /*
- * Verify consumed messages
- */
- int wait_end = (int)expected.size();
- while (wait_end > 0) {
- RdKafka::Message *msg = consumer->consume(10 * 1000);
- if (msg->err() == RdKafka::ERR__TIMED_OUT)
- Test::Fail(tostr() << __FUNCTION__
- << ": Consume timed out waiting "
- "for "
- << wait_end << " more partitions");
- Toppar tp = Toppar(msg->topic_name(), msg->partition());
- int64_t *exp_pos = &pos[tp];
- Test::Say(3, tostr() << __FUNCTION__ << ": Received " << tp.topic << " ["
- << tp.partition << "] at offset " << msg->offset()
- << " (expected offset " << *exp_pos << ")\n");
- if (*exp_pos != msg->offset())
- Test::Fail(tostr() << __FUNCTION__ << ": expected message offset "
- << *exp_pos << " for " << msg->topic_name() << " ["
- << msg->partition() << "], not " << msg->offset()
- << "\n");
- (*exp_pos)++;
- if (*exp_pos == msgs_per_partition) {
- TEST_ASSERT(wait_end > 0, "");
- wait_end--;
- } else if (msg->offset() > msgs_per_partition)
- Test::Fail(tostr() << __FUNCTION__ << ": unexpected message with "
- << "offset " << msg->offset() << " on " << tp.topic
- << " [" << tp.partition << "]\n");
- delete msg;
- }
- RdKafka::TopicPartition::destroy(offsets);
- RdKafka::TopicPartition::destroy(toppars1);
- RdKafka::TopicPartition::destroy(toppars2);
- RdKafka::TopicPartition::destroy(toppars3);
- delete consumer;
- test_mock_cluster_destroy(mcluster);
-/* Check behavior when:
- * 1. single topic with 2 partitions.
- * 2. consumer 1 (with rebalance_cb) subscribes to it.
- * 3. consumer 2 (with rebalance_cb) subscribes to it.
- * 4. close.
- */
-static void b_subscribe_with_cb_test(rd_bool_t close_consumer) {
- std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- test_create_topic(NULL, topic_name.c_str(), 2, 1);
- DefaultRebalanceCb rebalance_cb1;
- RdKafka::KafkaConsumer *c1 = make_consumer(
- "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb1, 25);
- DefaultRebalanceCb rebalance_cb2;
- RdKafka::KafkaConsumer *c2 = make_consumer(
- "C_2", group_name, "cooperative-sticky", NULL, &rebalance_cb2, 25);
- test_wait_topic_exists(c1->c_ptr(), topic_name.c_str(), 10 * 1000);
- Test::subscribe(c1, topic_name);
- bool c2_subscribed = false;
- while (true) {
- Test::poll_once(c1, 500);
- Test::poll_once(c2, 500);
- /* Start c2 after c1 has received initial assignment */
- if (!c2_subscribed && rebalance_cb1.assign_call_cnt > 0) {
- Test::subscribe(c2, topic_name);
- c2_subscribed = true;
- }
- /* Failure case: test will time out. */
- if (rebalance_cb1.assign_call_cnt == 3 &&
- rebalance_cb2.assign_call_cnt == 2) {
- break;
- }
- }
- /* Sequence of events:
- *
- * 1. c1 joins group.
- * 2. c1 gets assigned 2 partitions.
- * - there isn't a follow-on rebalance because there aren't any revoked
- * partitions.
- * 3. c2 joins group.
- * 4. This results in a rebalance with one partition being revoked from c1,
- * and no partitions assigned to either c1 or c2 (however the rebalance
- * callback will be called in each case with an empty set).
- * 5. c1 then re-joins the group since it had a partition revoked.
- * 6. c2 is now assigned a single partition, and c1's incremental assignment
- * is empty.
- * 7. Since there were no revoked partitions, no further rebalance is
- * triggered.
- */
- /* The rebalance cb is always called on assign, even if empty. */
- if (rebalance_cb1.assign_call_cnt != 3)
- Test::Fail(tostr() << "Expecting 3 assign calls on consumer 1, not "
- << rebalance_cb1.assign_call_cnt);
- if (rebalance_cb2.assign_call_cnt != 2)
- Test::Fail(tostr() << "Expecting 2 assign calls on consumer 2, not: "
- << rebalance_cb2.assign_call_cnt);
- /* The rebalance cb is not called on and empty revoke (unless partitions lost,
- * which is not the case here) */
- if (rebalance_cb1.revoke_call_cnt != 1)
- Test::Fail(tostr() << "Expecting 1 revoke call on consumer 1, not: "
- << rebalance_cb1.revoke_call_cnt);
- if (rebalance_cb2.revoke_call_cnt != 0)
- Test::Fail(tostr() << "Expecting 0 revoke calls on consumer 2, not: "
- << rebalance_cb2.revoke_call_cnt);
- /* Final state */
- /* Expect both consumers to have 1 assigned partition (via net calculation in
- * rebalance_cb) */
- if (rebalance_cb1.partitions_assigned_net != 1)
- Test::Fail(tostr()
- << "Expecting consumer 1 to have net 1 assigned partition, not: "
- << rebalance_cb1.partitions_assigned_net);
- if (rebalance_cb2.partitions_assigned_net != 1)
- Test::Fail(tostr()
- << "Expecting consumer 2 to have net 1 assigned partition, not: "
- << rebalance_cb2.partitions_assigned_net);
- /* Expect both consumers to have 1 assigned partition (via ->assignment()
- * query) */
- expect_assignment(c1, 1);
- expect_assignment(c2, 1);
- /* Make sure the fetchers are running */
- int msgcnt = 100;
- const int msgsize1 = 100;
- test_produce_msgs_easy_size(topic_name.c_str(), 0, 0, msgcnt, msgsize1);
- test_produce_msgs_easy_size(topic_name.c_str(), 0, 1, msgcnt, msgsize1);
- bool consumed_from_c1 = false;
- bool consumed_from_c2 = false;
- while (true) {
- RdKafka::Message *msg1 = c1->consume(100);
- RdKafka::Message *msg2 = c2->consume(100);
- if (msg1->err() == RdKafka::ERR_NO_ERROR)
- consumed_from_c1 = true;
- if (msg1->err() == RdKafka::ERR_NO_ERROR)
- consumed_from_c2 = true;
- delete msg1;
- delete msg2;
- /* Failure case: test will timeout. */
- if (consumed_from_c1 && consumed_from_c2)
- break;
- }
- if (!close_consumer) {
- delete c1;
- delete c2;
- return;
- }
- c1->close();
- c2->close();
- /* Closing the consumer should trigger rebalance_cb (revoke): */
- if (rebalance_cb1.revoke_call_cnt != 2)
- Test::Fail(tostr() << "Expecting 2 revoke calls on consumer 1, not: "
- << rebalance_cb1.revoke_call_cnt);
- if (rebalance_cb2.revoke_call_cnt != 1)
- Test::Fail(tostr() << "Expecting 1 revoke call on consumer 2, not: "
- << rebalance_cb2.revoke_call_cnt);
- /* ..and net assigned partitions should drop to 0 in both cases: */
- if (rebalance_cb1.partitions_assigned_net != 0)
- Test::Fail(
- tostr()
- << "Expecting consumer 1 to have net 0 assigned partitions, not: "
- << rebalance_cb1.partitions_assigned_net);
- if (rebalance_cb2.partitions_assigned_net != 0)
- Test::Fail(
- tostr()
- << "Expecting consumer 2 to have net 0 assigned partitions, not: "
- << rebalance_cb2.partitions_assigned_net);
- /* Nothing in this test should result in lost partitions */
- if (rebalance_cb1.lost_call_cnt > 0)
- Test::Fail(
- tostr() << "Expecting consumer 1 to have 0 lost partition events, not: "
- << rebalance_cb1.lost_call_cnt);
- if (rebalance_cb2.lost_call_cnt > 0)
- Test::Fail(
- tostr() << "Expecting consumer 2 to have 0 lost partition events, not: "
- << rebalance_cb2.lost_call_cnt);
- delete c1;
- delete c2;
-/* Check behavior when:
- * 1. Single topic with 2 partitions.
- * 2. Consumer 1 (no rebalance_cb) subscribes to it.
- * 3. Consumer 2 (no rebalance_cb) subscribes to it.
- * 4. Close.
- */
-static void c_subscribe_no_cb_test(rd_bool_t close_consumer) {
- std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- test_create_topic(NULL, topic_name.c_str(), 2, 1);
- RdKafka::KafkaConsumer *c1 =
- make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 20);
- RdKafka::KafkaConsumer *c2 =
- make_consumer("C_2", group_name, "cooperative-sticky", NULL, NULL, 20);
- test_wait_topic_exists(c1->c_ptr(), topic_name.c_str(), 10 * 1000);
- Test::subscribe(c1, topic_name);
- bool c2_subscribed = false;
- bool done = false;
- while (!done) {
- Test::poll_once(c1, 500);
- Test::poll_once(c2, 500);
- if (Test::assignment_partition_count(c1, NULL) == 2 && !c2_subscribed) {
- Test::subscribe(c2, topic_name);
- c2_subscribed = true;
- }
- if (Test::assignment_partition_count(c1, NULL) == 1 &&
- Test::assignment_partition_count(c2, NULL) == 1) {
- Test::Say("Consumer 1 and 2 are both assigned to single partition.\n");
- done = true;
- }
- }
- if (close_consumer) {
- Test::Say("Closing consumer 1\n");
- c1->close();
- Test::Say("Closing consumer 2\n");
- c2->close();
- } else {
- Test::Say("Skipping close() of consumer 1 and 2.\n");
- }
- delete c1;
- delete c2;
-/* Check behavior when:
- * 1. Single consumer (no rebalance_cb) subscribes to topic.
- * 2. Subscription is changed (topic added).
- * 3. Consumer is closed.
- */
-static void d_change_subscription_add_topic(rd_bool_t close_consumer) {
- std::string topic_name_1 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
- std::string topic_name_2 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name_2.c_str(), 2, 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- RdKafka::KafkaConsumer *c =
- make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15);
- test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name_1);
- bool subscribed_to_one_topic = false;
- bool done = false;
- while (!done) {
- Test::poll_once(c, 500);
- if (Test::assignment_partition_count(c, NULL) == 2 &&
- !subscribed_to_one_topic) {
- subscribed_to_one_topic = true;
- Test::subscribe(c, topic_name_1, topic_name_2);
- }
- if (Test::assignment_partition_count(c, NULL) == 4) {
- Test::Say("Consumer is assigned to two topics.\n");
- done = true;
- }
- }
- if (close_consumer) {
- Test::Say("Closing consumer\n");
- c->close();
- } else
- Test::Say("Skipping close() of consumer\n");
- delete c;
-/* Check behavior when:
- * 1. Single consumer (no rebalance_cb) subscribes to topic.
- * 2. Subscription is changed (topic added).
- * 3. Consumer is closed.
- */
-static void e_change_subscription_remove_topic(rd_bool_t close_consumer) {
- std::string topic_name_1 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
- std::string topic_name_2 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name_2.c_str(), 2, 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- RdKafka::KafkaConsumer *c =
- make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15);
- test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name_1, topic_name_2);
- bool subscribed_to_two_topics = false;
- bool done = false;
- while (!done) {
- Test::poll_once(c, 500);
- if (Test::assignment_partition_count(c, NULL) == 4 &&
- !subscribed_to_two_topics) {
- subscribed_to_two_topics = true;
- Test::subscribe(c, topic_name_1);
- }
- if (Test::assignment_partition_count(c, NULL) == 2) {
- Test::Say("Consumer is assigned to one topic\n");
- done = true;
- }
- }
- if (!close_consumer) {
- Test::Say("Closing consumer\n");
- c->close();
- } else
- Test::Say("Skipping close() of consumer\n");
- delete c;
-/* Check that use of consumer->assign() and consumer->unassign() is disallowed
- * when a COOPERATIVE assignor is in use.
- *
- * Except when the consumer is closing, where all forms of unassign are
- * allowed and treated as a full unassign.
- */
-class FTestRebalanceCb : public RdKafka::RebalanceCb {
- public:
- bool assigned;
- bool closing;
- FTestRebalanceCb() : assigned(false), closing(false) {
- }
- void rebalance_cb(RdKafka::KafkaConsumer *consumer,
- RdKafka::ErrorCode err,
- std::vector<RdKafka::TopicPartition *> &partitions) {
- Test::Say(tostr() << "RebalanceCb: " << consumer->name() << " "
- << RdKafka::err2str(err) << (closing ? " (closing)" : "")
- << "\n");
- if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
- RdKafka::ErrorCode err_resp = consumer->assign(partitions);
- Test::Say(tostr() << "consumer->assign() response code: " << err_resp
- << "\n");
- if (err_resp != RdKafka::ERR__STATE)
- Test::Fail(tostr() << "Expected assign to fail with error code: "
- << RdKafka::ERR__STATE << "(ERR__STATE)");
- RdKafka::Error *error = consumer->incremental_assign(partitions);
- if (error)
- Test::Fail(tostr() << "consumer->incremental_unassign() failed: "
- << error->str());
- assigned = true;
- } else {
- RdKafka::ErrorCode err_resp = consumer->unassign();
- Test::Say(tostr() << "consumer->unassign() response code: " << err_resp
- << "\n");
- if (!closing) {
- if (err_resp != RdKafka::ERR__STATE)
- Test::Fail(tostr() << "Expected assign to fail with error code: "
- << RdKafka::ERR__STATE << "(ERR__STATE)");
- RdKafka::Error *error = consumer->incremental_unassign(partitions);
- if (error)
- Test::Fail(tostr() << "consumer->incremental_unassign() failed: "
- << error->str());
- } else {
- /* During termination (close()) any type of unassign*() is allowed. */
- if (err_resp)
- Test::Fail(tostr() << "Expected unassign to succeed during close, "
- "but got: "
- << RdKafka::ERR__STATE << "(ERR__STATE)");
- }
- }
- }
-static void f_assign_call_cooperative() {
- std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name.c_str(), 1, 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- std::vector<std::pair<std::string, std::string> > additional_conf;
- additional_conf.push_back(std::pair<std::string, std::string>(
- std::string(""), std::string("3000")));
- FTestRebalanceCb rebalance_cb;
- RdKafka::KafkaConsumer *c =
- make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
- &rebalance_cb, 15);
- test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name);
- while (!rebalance_cb.assigned)
- Test::poll_once(c, 500);
- rebalance_cb.closing = true;
- c->close();
- delete c;
-/* Check that use of consumer->incremental_assign() and
- * consumer->incremental_unassign() is disallowed when an EAGER assignor is in
- * use.
- */
-class GTestRebalanceCb : public RdKafka::RebalanceCb {
- public:
- bool assigned;
- bool closing;
- GTestRebalanceCb() : assigned(false), closing(false) {
- }
- void rebalance_cb(RdKafka::KafkaConsumer *consumer,
- RdKafka::ErrorCode err,
- std::vector<RdKafka::TopicPartition *> &partitions) {
- Test::Say(tostr() << "RebalanceCb: " << consumer->name() << " "
- << RdKafka::err2str(err) << "\n");
- if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
- RdKafka::Error *error = consumer->incremental_assign(partitions);
- Test::Say(tostr() << "consumer->incremental_assign() response: "
- << (!error ? "NULL" : error->str()) << "\n");
- if (!error)
- Test::Fail("Expected consumer->incremental_assign() to fail");
- if (error->code() != RdKafka::ERR__STATE)
- Test::Fail(tostr() << "Expected consumer->incremental_assign() to fail "
- "with error code "
- << RdKafka::ERR__STATE);
- delete error;
- RdKafka::ErrorCode err_resp = consumer->assign(partitions);
- if (err_resp)
- Test::Fail(tostr() << "consumer->assign() failed: " << err_resp);
- assigned = true;
- } else {
- RdKafka::Error *error = consumer->incremental_unassign(partitions);
- Test::Say(tostr() << "consumer->incremental_unassign() response: "
- << (!error ? "NULL" : error->str()) << "\n");
- if (!closing) {
- if (!error)
- Test::Fail("Expected consumer->incremental_unassign() to fail");
- if (error->code() != RdKafka::ERR__STATE)
- Test::Fail(tostr() << "Expected consumer->incremental_unassign() to "
- "fail with error code "
- << RdKafka::ERR__STATE);
- delete error;
- RdKafka::ErrorCode err_resp = consumer->unassign();
- if (err_resp)
- Test::Fail(tostr() << "consumer->unassign() failed: " << err_resp);
- } else {
- /* During termination (close()) any type of unassign*() is allowed. */
- if (error)
- Test::Fail(
- tostr()
- << "Expected incremental_unassign to succeed during close, "
- "but got: "
- << RdKafka::ERR__STATE << "(ERR__STATE)");
- }
- }
- }
-static void g_incremental_assign_call_eager() {
- std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name.c_str(), 1, 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- std::vector<std::pair<std::string, std::string> > additional_conf;
- additional_conf.push_back(std::pair<std::string, std::string>(
- std::string(""), std::string("3000")));
- GTestRebalanceCb rebalance_cb;
- RdKafka::KafkaConsumer *c = make_consumer(
- "C_1", group_name, "roundrobin", &additional_conf, &rebalance_cb, 15);
- test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name);
- while (!rebalance_cb.assigned)
- Test::poll_once(c, 500);
- rebalance_cb.closing = true;
- c->close();
- delete c;
-/* Check behavior when:
- * 1. Single consumer (rebalance_cb) subscribes to two topics.
- * 2. One of the topics is deleted.
- * 3. Consumer is closed.
- */
-static void h_delete_topic() {
- std::string topic_name_1 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
- std::string topic_name_2 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name_2.c_str(), 1, 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- std::vector<std::pair<std::string, std::string> > additional_conf;
- additional_conf.push_back(std::pair<std::string, std::string>(
- std::string(""), std::string("3000")));
- DefaultRebalanceCb rebalance_cb;
- RdKafka::KafkaConsumer *c =
- make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
- &rebalance_cb, 15);
- test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name_1, topic_name_2);
- bool deleted = false;
- bool done = false;
- while (!done) {
- Test::poll_once(c, 500);
- std::vector<RdKafka::TopicPartition *> partitions;
- c->assignment(partitions);
- if (partitions.size() == 2 && !deleted) {
- if (rebalance_cb.assign_call_cnt != 1)
- Test::Fail(tostr() << "Expected 1 assign call, saw "
- << rebalance_cb.assign_call_cnt << "\n");
- Test::delete_topic(c, topic_name_2.c_str());
- deleted = true;
- }
- if (partitions.size() == 1 && deleted) {
- if (partitions[0]->topic() != topic_name_1)
- Test::Fail(tostr() << "Expecting subscribed topic to be '"
- << topic_name_1 << "' not '"
- << partitions[0]->topic() << "'");
- Test::Say(tostr() << "Assignment no longer includes deleted topic '"
- << topic_name_2 << "'\n");
- done = true;
- }
- RdKafka::TopicPartition::destroy(partitions);
- }
- Test::Say("Closing consumer\n");
- c->close();
- delete c;
-/* Check behavior when:
- * 1. Single consumer (rebalance_cb) subscribes to a single topic.
- * 2. That topic is deleted leaving no topics.
- * 3. Consumer is closed.
- */
-static void i_delete_topic_2() {
- std::string topic_name_1 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- std::vector<std::pair<std::string, std::string> > additional_conf;
- additional_conf.push_back(std::pair<std::string, std::string>(
- std::string(""), std::string("3000")));
- DefaultRebalanceCb rebalance_cb;
- RdKafka::KafkaConsumer *c =
- make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
- &rebalance_cb, 15);
- test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name_1);
- bool deleted = false;
- bool done = false;
- while (!done) {
- Test::poll_once(c, 500);
- if (Test::assignment_partition_count(c, NULL) == 1 && !deleted) {
- if (rebalance_cb.assign_call_cnt != 1)
- Test::Fail(tostr() << "Expected one assign call, saw "
- << rebalance_cb.assign_call_cnt << "\n");
- Test::delete_topic(c, topic_name_1.c_str());
- deleted = true;
- }
- if (Test::assignment_partition_count(c, NULL) == 0 && deleted) {
- Test::Say(tostr() << "Assignment is empty following deletion of topic\n");
- done = true;
- }
- }
- Test::Say("Closing consumer\n");
- c->close();
- delete c;
-/* Check behavior when:
- * 1. single consumer (without rebalance_cb) subscribes to a single topic.
- * 2. that topic is deleted leaving no topics.
- * 3. consumer is closed.
- */
-static void j_delete_topic_no_rb_callback() {
- std::string topic_name_1 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- std::vector<std::pair<std::string, std::string> > additional_conf;
- additional_conf.push_back(std::pair<std::string, std::string>(
- std::string(""), std::string("3000")));
- RdKafka::KafkaConsumer *c = make_consumer(
- "C_1", group_name, "cooperative-sticky", &additional_conf, NULL, 15);
- test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name_1);
- bool deleted = false;
- bool done = false;
- while (!done) {
- Test::poll_once(c, 500);
- if (Test::assignment_partition_count(c, NULL) == 1 && !deleted) {
- Test::delete_topic(c, topic_name_1.c_str());
- deleted = true;
- }
- if (Test::assignment_partition_count(c, NULL) == 0 && deleted) {
- Test::Say(tostr() << "Assignment is empty following deletion of topic\n");
- done = true;
- }
- }
- Test::Say("Closing consumer\n");
- c->close();
- delete c;
-/* Check behavior when:
- * 1. Single consumer (rebalance_cb) subscribes to a 1 partition topic.
- * 2. Number of partitions is increased to 2.
- * 3. Consumer is closed.
- */
-static void k_add_partition() {
- std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
- test_create_topic(NULL, topic_name.c_str(), 1, 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- std::vector<std::pair<std::string, std::string> > additional_conf;
- additional_conf.push_back(std::pair<std::string, std::string>(
- std::string(""), std::string("3000")));
- DefaultRebalanceCb rebalance_cb;
- RdKafka::KafkaConsumer *c =
- make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
- &rebalance_cb, 15);
- test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name);
- bool subscribed = false;
- bool done = false;
- while (!done) {
- Test::poll_once(c, 500);
- if (Test::assignment_partition_count(c, NULL) == 1 && !subscribed) {
- if (rebalance_cb.assign_call_cnt != 1)
- Test::Fail(tostr() << "Expected 1 assign call, saw "
- << rebalance_cb.assign_call_cnt);
- if (rebalance_cb.revoke_call_cnt != 0)
- Test::Fail(tostr() << "Expected 0 revoke calls, saw "
- << rebalance_cb.revoke_call_cnt);
- Test::create_partitions(c, topic_name.c_str(), 2);
- subscribed = true;
- }
- if (Test::assignment_partition_count(c, NULL) == 2 && subscribed) {
- if (rebalance_cb.assign_call_cnt != 2)
- Test::Fail(tostr() << "Expected 2 assign calls, saw "
- << rebalance_cb.assign_call_cnt);
- if (rebalance_cb.revoke_call_cnt != 0)
- Test::Fail(tostr() << "Expected 0 revoke calls, saw "
- << rebalance_cb.revoke_call_cnt);
- done = true;
- }
- }
- Test::Say("Closing consumer\n");
- c->close();
- delete c;
- if (rebalance_cb.assign_call_cnt != 2)
- Test::Fail(tostr() << "Expected 2 assign calls, saw "
- << rebalance_cb.assign_call_cnt);
- if (rebalance_cb.revoke_call_cnt != 1)
- Test::Fail(tostr() << "Expected 1 revoke call, saw "
- << rebalance_cb.revoke_call_cnt);
-/* Check behavior when:
- * 1. two consumers (with rebalance_cb's) subscribe to two topics.
- * 2. one of the consumers calls unsubscribe.
- * 3. consumers closed.
- */
-static void l_unsubscribe() {
- std::string topic_name_1 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- std::string topic_name_2 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
- test_create_topic(NULL, topic_name_2.c_str(), 2, 1);
- DefaultRebalanceCb rebalance_cb1;
- RdKafka::KafkaConsumer *c1 = make_consumer(
- "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb1, 30);
- test_wait_topic_exists(c1->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- test_wait_topic_exists(c1->c_ptr(), topic_name_2.c_str(), 10 * 1000);
- Test::subscribe(c1, topic_name_1, topic_name_2);
- DefaultRebalanceCb rebalance_cb2;
- RdKafka::KafkaConsumer *c2 = make_consumer(
- "C_2", group_name, "cooperative-sticky", NULL, &rebalance_cb2, 30);
- Test::subscribe(c2, topic_name_1, topic_name_2);
- bool done = false;
- bool unsubscribed = false;
- while (!done) {
- Test::poll_once(c1, 500);
- Test::poll_once(c2, 500);
- if (Test::assignment_partition_count(c1, NULL) == 2 &&
- Test::assignment_partition_count(c2, NULL) == 2) {
- if (rebalance_cb1.assign_call_cnt != 1)
- Test::Fail(
- tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: "
- << rebalance_cb1.assign_call_cnt);
- if (rebalance_cb2.assign_call_cnt != 1)
- Test::Fail(
- tostr() << "Expecting consumer 2's assign_call_cnt to be 1 not: "
- << rebalance_cb2.assign_call_cnt);
- Test::Say("Unsubscribing consumer 1 from both topics\n");
- c1->unsubscribe();
- unsubscribed = true;
- }
- if (unsubscribed && Test::assignment_partition_count(c1, NULL) == 0 &&
- Test::assignment_partition_count(c2, NULL) == 4) {
- if (rebalance_cb1.assign_call_cnt !=
- 1) /* is now unsubscribed, so rebalance_cb will no longer be called.
- */
- Test::Fail(
- tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: "
- << rebalance_cb1.assign_call_cnt);
- if (rebalance_cb2.assign_call_cnt != 2)
- Test::Fail(
- tostr() << "Expecting consumer 2's assign_call_cnt to be 2 not: "
- << rebalance_cb2.assign_call_cnt);
- if (rebalance_cb1.revoke_call_cnt != 1)
- Test::Fail(
- tostr() << "Expecting consumer 1's revoke_call_cnt to be 1 not: "
- << rebalance_cb1.revoke_call_cnt);
- if (rebalance_cb2.revoke_call_cnt !=
- 0) /* the rebalance_cb should not be called if the revoked partition
- list is empty */
- Test::Fail(
- tostr() << "Expecting consumer 2's revoke_call_cnt to be 0 not: "
- << rebalance_cb2.revoke_call_cnt);
- Test::Say("Unsubscribe completed");
- done = true;
- }
- }
- Test::Say("Closing consumer 1\n");
- c1->close();
- Test::Say("Closing consumer 2\n");
- c2->close();
- /* there should be no assign rebalance_cb calls on close */
- if (rebalance_cb1.assign_call_cnt != 1)
- Test::Fail(tostr() << "Expecting consumer 1's assign_call_cnt to be 1 not: "
- << rebalance_cb1.assign_call_cnt);
- if (rebalance_cb2.assign_call_cnt != 2)
- Test::Fail(tostr() << "Expecting consumer 2's assign_call_cnt to be 2 not: "
- << rebalance_cb2.assign_call_cnt);
- if (rebalance_cb1.revoke_call_cnt !=
- 1) /* should not be called a second revoke rebalance_cb */
- Test::Fail(tostr() << "Expecting consumer 1's revoke_call_cnt to be 1 not: "
- << rebalance_cb1.revoke_call_cnt);
- if (rebalance_cb2.revoke_call_cnt != 1)
- Test::Fail(tostr() << "Expecting consumer 2's revoke_call_cnt to be 1 not: "
- << rebalance_cb2.revoke_call_cnt);
- if (rebalance_cb1.lost_call_cnt != 0)
- Test::Fail(tostr() << "Expecting consumer 1's lost_call_cnt to be 0, not: "
- << rebalance_cb1.lost_call_cnt);
- if (rebalance_cb2.lost_call_cnt != 0)
- Test::Fail(tostr() << "Expecting consumer 2's lost_call_cnt to be 0, not: "
- << rebalance_cb2.lost_call_cnt);
- delete c1;
- delete c2;
-/* Check behavior when:
- * 1. A consumers (with no rebalance_cb) subscribes to a topic.
- * 2. The consumer calls unsubscribe.
- * 3. Consumers closed.
- */
-static void m_unsubscribe_2() {
- std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- test_create_topic(NULL, topic_name.c_str(), 2, 1);
- RdKafka::KafkaConsumer *c =
- make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15);
- test_wait_topic_exists(c->c_ptr(), topic_name.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name);
- bool done = false;
- bool unsubscribed = false;
- while (!done) {
- Test::poll_once(c, 500);
- if (Test::assignment_partition_count(c, NULL) == 2) {
- Test::unsubscribe(c);
- unsubscribed = true;
- }
- if (unsubscribed && Test::assignment_partition_count(c, NULL) == 0) {
- Test::Say("Unsubscribe completed");
- done = true;
- }
- }
- Test::Say("Closing consumer\n");
- c->close();
- delete c;
-/* Check behavior when:
- * 1. Two consumers (with rebalance_cb) subscribe to a regex (no matching
- * topics exist)
- * 2. Create two topics.
- * 3. Remove one of the topics.
- * 3. Consumers closed.
- */
-static void n_wildcard() {
- const string topic_base_name = Test::mk_topic_name("0113-n_wildcard", 1);
- const string topic_name_1 = topic_base_name + "_1";
- const string topic_name_2 = topic_base_name + "_2";
- const string topic_regex = "^" + topic_base_name + "_.";
- const string group_name = Test::mk_unique_group_name("0113-n_wildcard");
- std::vector<std::pair<std::string, std::string> > additional_conf;
- additional_conf.push_back(std::pair<std::string, std::string>(
- std::string(""), std::string("3000")));
- DefaultRebalanceCb rebalance_cb1;
- RdKafka::KafkaConsumer *c1 =
- make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
- &rebalance_cb1, 30);
- Test::subscribe(c1, topic_regex);
- DefaultRebalanceCb rebalance_cb2;
- RdKafka::KafkaConsumer *c2 =
- make_consumer("C_2", group_name, "cooperative-sticky", &additional_conf,
- &rebalance_cb2, 30);
- Test::subscribe(c2, topic_regex);
- /* There are no matching topics, so the consumers should not join the group
- * initially */
- Test::poll_once(c1, 500);
- Test::poll_once(c2, 500);
- if (rebalance_cb1.assign_call_cnt != 0)
- Test::Fail(tostr() << "Expecting consumer 1's assign_call_cnt to be 0 not: "
- << rebalance_cb1.assign_call_cnt);
- if (rebalance_cb2.assign_call_cnt != 0)
- Test::Fail(tostr() << "Expecting consumer 2's assign_call_cnt to be 0 not: "
- << rebalance_cb2.assign_call_cnt);
- bool done = false;
- bool created_topics = false;
- bool deleted_topic = false;
- int last_cb1_assign_call_cnt = 0;
- int last_cb2_assign_call_cnt = 0;
- while (!done) {
- Test::poll_once(c1, 500);
- Test::poll_once(c2, 500);
- if (Test::assignment_partition_count(c1, NULL) == 0 &&
- Test::assignment_partition_count(c2, NULL) == 0 && !created_topics) {
- Test::Say(
- "Creating two topics with 2 partitions each that match regex\n");
- test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
- test_create_topic(NULL, topic_name_2.c_str(), 2, 1);
- /* The consumers should autonomously discover these topics and start
- * consuming from them. This happens in the background - is not
- * influenced by whether we wait for the topics to be created before
- * continuing the main loop. It is possible that both topics are
- * discovered simultaneously, requiring a single rebalance OR that
- * topic 1 is discovered first (it was created first), a rebalance
- * initiated, then topic 2 discovered, then another rebalance
- * initiated to include it.
- */
- created_topics = true;
- }
- if (Test::assignment_partition_count(c1, NULL) == 2 &&
- Test::assignment_partition_count(c2, NULL) == 2 && !deleted_topic) {
- if (rebalance_cb1.nonempty_assign_call_cnt == 1) {
- /* just one rebalance was required */
- TEST_ASSERT(rebalance_cb1.nonempty_assign_call_cnt == 1,
- "Expecting C_1's nonempty_assign_call_cnt to be 1 not %d ",
- rebalance_cb1.nonempty_assign_call_cnt);
- TEST_ASSERT(rebalance_cb2.nonempty_assign_call_cnt == 1,
- "Expecting C_2's nonempty_assign_call_cnt to be 1 not %d ",
- rebalance_cb2.nonempty_assign_call_cnt);
- } else {
- /* two rebalances were required (occurs infrequently) */
- TEST_ASSERT(rebalance_cb1.nonempty_assign_call_cnt == 2,
- "Expecting C_1's nonempty_assign_call_cnt to be 2 not %d ",
- rebalance_cb1.nonempty_assign_call_cnt);
- TEST_ASSERT(rebalance_cb2.nonempty_assign_call_cnt == 2,
- "Expecting C_2's nonempty_assign_call_cnt to be 2 not %d ",
- rebalance_cb2.nonempty_assign_call_cnt);
- }
- TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 0,
- "Expecting C_1's revoke_call_cnt to be 0 not %d ",
- rebalance_cb1.revoke_call_cnt);
- TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 0,
- "Expecting C_2's revoke_call_cnt to be 0 not %d ",
- rebalance_cb2.revoke_call_cnt);
- last_cb1_assign_call_cnt = rebalance_cb1.assign_call_cnt;
- last_cb2_assign_call_cnt = rebalance_cb2.assign_call_cnt;
- Test::Say("Deleting topic 1\n");
- Test::delete_topic(c1, topic_name_1.c_str());
- deleted_topic = true;
- }
- if (Test::assignment_partition_count(c1, NULL) == 1 &&
- Test::assignment_partition_count(c2, NULL) == 1 && deleted_topic) {
- /* accumulated in lost case as well */
- TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 1,
- "Expecting C_1's revoke_call_cnt to be 1 not %d",
- rebalance_cb1.revoke_call_cnt);
- TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 1,
- "Expecting C_2's revoke_call_cnt to be 1 not %d",
- rebalance_cb2.revoke_call_cnt);
- TEST_ASSERT(rebalance_cb1.lost_call_cnt == 1,
- "Expecting C_1's lost_call_cnt to be 1 not %d",
- rebalance_cb1.lost_call_cnt);
- TEST_ASSERT(rebalance_cb2.lost_call_cnt == 1,
- "Expecting C_2's lost_call_cnt to be 1 not %d",
- rebalance_cb2.lost_call_cnt);
- /* Consumers will rejoin group after revoking the lost partitions.
- * this will result in an rebalance_cb assign (empty partitions).
- * it follows the revoke, which has already been confirmed to have
- * happened. */
- Test::Say("Waiting for rebalance_cb assigns\n");
- while (rebalance_cb1.assign_call_cnt == last_cb1_assign_call_cnt ||
- rebalance_cb2.assign_call_cnt == last_cb2_assign_call_cnt) {
- Test::poll_once(c1, 500);
- Test::poll_once(c2, 500);
- }
- Test::Say("Consumers are subscribed to one partition each\n");
- done = true;
- }
- }
- Test::Say("Closing consumer 1\n");
- last_cb1_assign_call_cnt = rebalance_cb1.assign_call_cnt;
- c1->close();
- /* There should be no assign rebalance_cb calls on close */
- TEST_ASSERT(rebalance_cb1.assign_call_cnt == last_cb1_assign_call_cnt,
- "Expecting C_1's assign_call_cnt to be %d not %d",
- last_cb1_assign_call_cnt, rebalance_cb1.assign_call_cnt);
- /* Let C_2 catch up on the rebalance and get assigned C_1's partitions. */
- last_cb2_assign_call_cnt = rebalance_cb2.nonempty_assign_call_cnt;
- while (rebalance_cb2.nonempty_assign_call_cnt == last_cb2_assign_call_cnt)
- Test::poll_once(c2, 500);
- Test::Say("Closing consumer 2\n");
- last_cb2_assign_call_cnt = rebalance_cb2.assign_call_cnt;
- c2->close();
- /* There should be no assign rebalance_cb calls on close */
- TEST_ASSERT(rebalance_cb2.assign_call_cnt == last_cb2_assign_call_cnt,
- "Expecting C_2's assign_call_cnt to be %d not %d",
- last_cb2_assign_call_cnt, rebalance_cb2.assign_call_cnt);
- TEST_ASSERT(rebalance_cb1.revoke_call_cnt == 2,
- "Expecting C_1's revoke_call_cnt to be 2 not %d",
- rebalance_cb1.revoke_call_cnt);
- TEST_ASSERT(rebalance_cb2.revoke_call_cnt == 2,
- "Expecting C_2's revoke_call_cnt to be 2 not %d",
- rebalance_cb2.revoke_call_cnt);
- TEST_ASSERT(rebalance_cb1.lost_call_cnt == 1,
- "Expecting C_1's lost_call_cnt to be 1, not %d",
- rebalance_cb1.lost_call_cnt);
- TEST_ASSERT(rebalance_cb2.lost_call_cnt == 1,
- "Expecting C_2's lost_call_cnt to be 1, not %d",
- rebalance_cb2.lost_call_cnt);
- delete c1;
- delete c2;
-/* Check behavior when:
- * 1. Consumer (librdkafka) subscribes to two topics (2 and 6 partitions).
- * 2. Consumer (java) subscribes to the same two topics.
- * 3. Consumer (librdkafka) unsubscribes from the two partition topic.
- * 4. Consumer (java) process closes upon detecting the above unsubscribe.
- * 5. Consumer (librdkafka) will now be subscribed to 6 partitions.
- * 6. Close librdkafka consumer.
- */
-static void o_java_interop() {
- if (*test_conf_get(NULL, "sasl.mechanism") != '\0')
- "Cluster is set up for SASL: we won't bother with that "
- "for the Java client\n");
- std::string topic_name_1 = Test::mk_topic_name("0113_o_2", 1);
- std::string topic_name_2 = Test::mk_topic_name("0113_o_6", 1);
- std::string group_name = Test::mk_unique_group_name("0113_o");
- test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
- test_create_topic(NULL, topic_name_2.c_str(), 6, 1);
- DefaultRebalanceCb rebalance_cb;
- RdKafka::KafkaConsumer *c = make_consumer(
- "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb, 25);
- test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
- Test::subscribe(c, topic_name_1, topic_name_2);
- bool done = false;
- bool changed_subscription = false;
- bool changed_subscription_done = false;
- int java_pid = 0;
- while (!done) {
- Test::poll_once(c, 500);
- if (1) // FIXME: Remove after debugging
- Test::Say(tostr() << "Assignment partition count: "
- << Test::assignment_partition_count(c, NULL)
- << ", changed_sub " << changed_subscription
- << ", changed_sub_done " << changed_subscription_done
- << ", assign_call_cnt " << rebalance_cb.assign_call_cnt
- << "\n");
- if (Test::assignment_partition_count(c, NULL) == 8 && !java_pid) {
- Test::Say(_C_GRN "librdkafka consumer assigned to 8 partitions\n");
- string bootstrapServers = get_bootstrap_servers();
- const char *argv[1 + 1 + 1 + 1 + 1 + 1];
- size_t i = 0;
- argv[i++] = "test1";
- argv[i++] = bootstrapServers.c_str();
- argv[i++] = topic_name_1.c_str();
- argv[i++] = topic_name_2.c_str();
- argv[i++] = group_name.c_str();
- argv[i] = NULL;
- java_pid = test_run_java("IncrementalRebalanceCli", argv);
- if (java_pid <= 0)
- Test::Fail(tostr() << "Unexpected pid: " << java_pid);
- }
- if (Test::assignment_partition_count(c, NULL) == 4 && java_pid != 0 &&
- !changed_subscription) {
- if (rebalance_cb.assign_call_cnt != 2)
- Test::Fail(tostr() << "Expecting consumer's assign_call_cnt to be 2, "
- "not "
- << rebalance_cb.assign_call_cnt);
- Test::Say(_C_GRN "Java consumer is now part of the group\n");
- Test::subscribe(c, topic_name_1);
- changed_subscription = true;
- }
- /* Depending on the timing of resubscribe rebalancing and the
- * Java consumer terminating we might have one or two rebalances,
- * hence the fuzzy <=5 and >=5 checks. */
- if (Test::assignment_partition_count(c, NULL) == 2 &&
- changed_subscription && rebalance_cb.assign_call_cnt <= 5 &&
- !changed_subscription_done) {
- /* All topic 1 partitions will be allocated to this consumer whether or
- * not the Java consumer has unsubscribed yet because the sticky algorithm
- * attempts to ensure partition counts are even. */
- Test::Say(_C_GRN "Consumer 1 has unsubscribed from topic 2\n");
- changed_subscription_done = true;
- }
- if (Test::assignment_partition_count(c, NULL) == 2 &&
- changed_subscription && rebalance_cb.assign_call_cnt >= 5 &&
- changed_subscription_done) {
- /* When the java consumer closes, this will cause an empty assign
- * rebalance_cb event, allowing detection of when this has happened. */
- Test::Say(_C_GRN "Java consumer has left the group\n");
- done = true;
- }
- }
- Test::Say("Closing consumer\n");
- c->close();
- /* Expected behavior is IncrementalRebalanceCli will exit cleanly, timeout
- * otherwise. */
- test_waitpid(java_pid);
- delete c;
-/* Check behavior when:
- * - Single consumer subscribes to topic.
- * - Soon after (timing such that rebalance is probably in progress) it
- * subscribes to a different topic.
- */
-static void s_subscribe_when_rebalancing(int variation) {
- SUB_TEST("variation %d", variation);
- std::string topic_name_1 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- std::string topic_name_2 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- std::string topic_name_3 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- test_create_topic(NULL, topic_name_1.c_str(), 1, 1);
- test_create_topic(NULL, topic_name_2.c_str(), 1, 1);
- test_create_topic(NULL, topic_name_3.c_str(), 1, 1);
- DefaultRebalanceCb rebalance_cb;
- RdKafka::KafkaConsumer *c = make_consumer(
- "C_1", group_name, "cooperative-sticky", NULL, &rebalance_cb, 25);
- test_wait_topic_exists(c->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- test_wait_topic_exists(c->c_ptr(), topic_name_2.c_str(), 10 * 1000);
- test_wait_topic_exists(c->c_ptr(), topic_name_3.c_str(), 10 * 1000);
- if (variation == 2 || variation == 4 || variation == 6) {
- /* Pre-cache metadata for all topics. */
- class RdKafka::Metadata *metadata;
- c->metadata(true, NULL, &metadata, 5000);
- delete metadata;
- }
- Test::subscribe(c, topic_name_1);
- Test::wait_for_assignment(c, 1, &topic_name_1);
- Test::subscribe(c, topic_name_2);
- if (variation == 3 || variation == 5)
- Test::poll_once(c, 500);
- if (variation < 5) {
- // Very quickly after subscribing to topic 2, subscribe to topic 3.
- Test::subscribe(c, topic_name_3);
- Test::wait_for_assignment(c, 1, &topic_name_3);
- } else {
- // ..or unsubscribe.
- Test::unsubscribe(c);
- Test::wait_for_assignment(c, 0, NULL);
- }
- delete c;
-/* Check behavior when:
- * - Two consumer subscribe to a topic.
- * - Max poll interval is exceeded on the first consumer.
- */
-static void t_max_poll_interval_exceeded(int variation) {
- SUB_TEST("variation %d", variation);
- std::string topic_name_1 =
- Test::mk_topic_name("0113-cooperative_rebalance", 1);
- std::string group_name =
- Test::mk_unique_group_name("0113-cooperative_rebalance");
- test_create_topic(NULL, topic_name_1.c_str(), 2, 1);
- std::vector<std::pair<std::string, std::string> > additional_conf;
- additional_conf.push_back(std::pair<std::string, std::string>(
- std::string(""), std::string("6000")));
- additional_conf.push_back(std::pair<std::string, std::string>(
- std::string(""), std::string("7000")));
- DefaultRebalanceCb rebalance_cb1;
- RdKafka::KafkaConsumer *c1 =
- make_consumer("C_1", group_name, "cooperative-sticky", &additional_conf,
- &rebalance_cb1, 30);
- DefaultRebalanceCb rebalance_cb2;
- RdKafka::KafkaConsumer *c2 =
- make_consumer("C_2", group_name, "cooperative-sticky", &additional_conf,
- &rebalance_cb2, 30);
- test_wait_topic_exists(c1->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- test_wait_topic_exists(c2->c_ptr(), topic_name_1.c_str(), 10 * 1000);
- Test::subscribe(c1, topic_name_1);
- Test::subscribe(c2, topic_name_1);
- bool done = false;
- bool both_have_been_assigned = false;
- while (!done) {
- if (!both_have_been_assigned)
- Test::poll_once(c1, 500);
- Test::poll_once(c2, 500);
- if (Test::assignment_partition_count(c1, NULL) == 1 &&
- Test::assignment_partition_count(c2, NULL) == 1 &&
- !both_have_been_assigned) {
- Test::Say(
- tostr()
- << "Both consumers are assigned to topic " << topic_name_1
- << ". WAITING 7 seconds for to be exceeded\n");
- both_have_been_assigned = true;
- }
- if (Test::assignment_partition_count(c2, NULL) == 2 &&
- both_have_been_assigned) {
- Test::Say("Consumer 1 is no longer assigned any partitions, done\n");
- done = true;
- }
- }
- if (variation == 1) {
- if (rebalance_cb1.lost_call_cnt != 0)
- Test::Fail(
- tostr() << "Expected consumer 1 lost revoke count to be 0, not: "
- << rebalance_cb1.lost_call_cnt);
- Test::poll_once(c1,
- 500); /* Eat the max poll interval exceeded error message */
- Test::poll_once(c1,
- 500); /* Trigger the rebalance_cb with lost partitions */
- if (rebalance_cb1.lost_call_cnt != 1)
- Test::Fail(
- tostr() << "Expected consumer 1 lost revoke count to be 1, not: "
- << rebalance_cb1.lost_call_cnt);
- }
- c1->close();
- c2->close();
- if (rebalance_cb1.lost_call_cnt != 1)
- Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be 1, not: "
- << rebalance_cb1.lost_call_cnt);
- if (rebalance_cb1.assign_call_cnt != 1)
- Test::Fail(tostr() << "Expected consumer 1 assign count to be 1, not: "
- << rebalance_cb1.assign_call_cnt);
- if (rebalance_cb2.assign_call_cnt != 2)
- Test::Fail(tostr() << "Expected consumer 1 assign count to be 2, not: "
- << rebalance_cb1.assign_call_cnt);
- if (rebalance_cb1.revoke_call_cnt != 1)
- Test::Fail(tostr() << "Expected consumer 1 revoke count to be 1, not: "
- << rebalance_cb1.revoke_call_cnt);
- if (rebalance_cb2.revoke_call_cnt != 1)
- Test::Fail(tostr() << "Expected consumer 2 revoke count to be 1, not: "
- << rebalance_cb1.revoke_call_cnt);
- delete c1;
- delete c2;
- * @brief Poll all consumers until there are no more events or messages
- * and the timeout has expired.
- */
-static void poll_all_consumers(RdKafka::KafkaConsumer **consumers,
- DefaultRebalanceCb *rebalance_cbs,
- size_t num,
- int timeout_ms) {
- int64_t ts_end = test_clock() + (timeout_ms * 1000);
- /* Poll all consumers until no more events are seen,
- * this makes sure we exhaust the current state events before returning. */
- bool evented;
- do {
- evented = false;
- for (size_t i = 0; i < num; i++) {
- int block_ms = min(10, (int)((ts_end - test_clock()) / 1000));
- while (rebalance_cbs[i].poll_once(consumers[i], max(block_ms, 0)))
- evented = true;
- }
- } while (evented || test_clock() < ts_end);
- * @brief Stress test with 8 consumers subscribing, fetching and committing.
- *
- * @param subscription_variation 0..2
- *
- * TODO: incorporate committing offsets.
- */
-static void u_multiple_subscription_changes(bool use_rebalance_cb,
- int subscription_variation) {
- const int N_CONSUMERS = 8;
- const int N_TOPICS = 2;
- const int N_MSGS_PER_PARTITION = 1000;
- SUB_TEST("use_rebalance_cb: %d, subscription_variation: %d",
- (int)use_rebalance_cb, subscription_variation);
- string topic_name_1 = Test::mk_topic_name("0113u_1", 1);
- string topic_name_2 = Test::mk_topic_name("0113u_2", 1);
- string group_name = Test::mk_unique_group_name("0113u");
- test_create_topic(NULL, topic_name_1.c_str(), N_PARTS_PER_TOPIC, 1);
- test_create_topic(NULL, topic_name_2.c_str(), N_PARTS_PER_TOPIC, 1);
- Test::Say("Creating consumers\n");
- DefaultRebalanceCb rebalance_cbs[N_CONSUMERS];
- RdKafka::KafkaConsumer *consumers[N_CONSUMERS];
- for (int i = 0; i < N_CONSUMERS; i++) {
- std::string name = tostr() << "C_" << i;
- consumers[i] =
- make_consumer(name.c_str(), group_name, "cooperative-sticky", NULL,
- use_rebalance_cb ? &rebalance_cbs[i] : NULL, 120);
- }
- test_wait_topic_exists(consumers[0]->c_ptr(), topic_name_1.c_str(),
- 10 * 1000);
- test_wait_topic_exists(consumers[0]->c_ptr(), topic_name_2.c_str(),
- 10 * 1000);
- /*
- * Seed all partitions with the same number of messages so we later can
- * verify that consumption is working.
- */
- vector<pair<Toppar, int> > ptopics;
- ptopics.push_back(pair<Toppar, int>(Toppar(topic_name_1, N_PARTS_PER_TOPIC),
- ptopics.push_back(pair<Toppar, int>(Toppar(topic_name_2, N_PARTS_PER_TOPIC),
- produce_msgs(ptopics);
- /*
- * Track what topics a consumer should be subscribed to and use this to
- * verify both its subscription and assignment throughout the test.
- */
- /* consumer -> currently subscribed topics */
- map<int, vector<string> > consumer_topics;
- /* topic -> consumers subscribed to topic */
- map<string, set<int> > topic_consumers;
- /* The subscription alternatives that consumers
- * alter between in the playbook. */
- vector<string> SUBSCRIPTION_1;
- vector<string> SUBSCRIPTION_2;
- SUBSCRIPTION_1.push_back(topic_name_1);
- switch (subscription_variation) {
- case 0:
- SUBSCRIPTION_2.push_back(topic_name_1);
- SUBSCRIPTION_2.push_back(topic_name_2);
- break;
- case 1:
- SUBSCRIPTION_2.push_back(topic_name_2);
- break;
- case 2:
- /* No subscription */
- break;
- }
- sort(SUBSCRIPTION_1.begin(), SUBSCRIPTION_1.end());
- sort(SUBSCRIPTION_2.begin(), SUBSCRIPTION_2.end());
- /*
- * Define playbook
- */
- const struct {
- int timestamp_ms;
- int consumer;
- const vector<string> *topics;
- } playbook[] = {/* timestamp_ms, consumer_number, subscribe-to-topics */
- {0, 0, &SUBSCRIPTION_1}, /* Cmd 0 */
- {4000, 1, &SUBSCRIPTION_1}, {4000, 1, &SUBSCRIPTION_1},
- {4000, 1, &SUBSCRIPTION_1}, {4000, 2, &SUBSCRIPTION_1},
- {6000, 3, &SUBSCRIPTION_1}, /* Cmd 5 */
- {6000, 4, &SUBSCRIPTION_1}, {6000, 5, &SUBSCRIPTION_1},
- {6000, 6, &SUBSCRIPTION_1}, {6000, 7, &SUBSCRIPTION_2},
- {6000, 1, &SUBSCRIPTION_1}, /* Cmd 10 */
- {6000, 1, &SUBSCRIPTION_2}, {6000, 1, &SUBSCRIPTION_1},
- {6000, 2, &SUBSCRIPTION_2}, {7000, 2, &SUBSCRIPTION_1},
- {7000, 1, &SUBSCRIPTION_2}, /* Cmd 15 */
- {8000, 0, &SUBSCRIPTION_2}, {8000, 1, &SUBSCRIPTION_1},
- {8000, 0, &SUBSCRIPTION_1}, {13000, 2, &SUBSCRIPTION_1},
- {13000, 1, &SUBSCRIPTION_2}, /* Cmd 20 */
- {13000, 5, &SUBSCRIPTION_2}, {14000, 6, &SUBSCRIPTION_2},
- {15000, 7, &SUBSCRIPTION_1}, {15000, 1, &SUBSCRIPTION_1},
- {15000, 5, &SUBSCRIPTION_1}, /* Cmd 25 */
- {15000, 6, &SUBSCRIPTION_1}, {INT_MAX, 0, 0}};
- /*
- * Run the playbook
- */
- int cmd_number = 0;
- uint64_t ts_start = test_clock();
- while (playbook[cmd_number].timestamp_ms != INT_MAX) {
- TEST_ASSERT(playbook[cmd_number].consumer < N_CONSUMERS);
- Test::Say(tostr() << "Cmd #" << cmd_number << ": wait "
- << playbook[cmd_number].timestamp_ms << "ms\n");
- poll_all_consumers(consumers, rebalance_cbs, N_CONSUMERS,
- playbook[cmd_number].timestamp_ms -
- (int)((test_clock() - ts_start) / 1000));
- /* Verify consumer assignments match subscribed topics */
- map<Toppar, RdKafka::KafkaConsumer *> all_assignments;
- for (int i = 0; i < N_CONSUMERS; i++)
- verify_consumer_assignment(
- consumers[i], rebalance_cbs[i], consumer_topics[i],
- /* Allow empty assignment */
- true,
- /* Allow mismatch between subscribed topics
- * and actual assignment since we can't
- * synchronize the last subscription
- * to the current assignment due to
- * an unknown number of rebalances required
- * for the final assignment to settle.
- * This is instead checked at the end of
- * this test case. */
- true, &all_assignments, -1 /* no msgcnt check*/);
- int cid = playbook[cmd_number].consumer;
- RdKafka::KafkaConsumer *consumer = consumers[playbook[cmd_number].consumer];
- const vector<string> *topics = playbook[cmd_number].topics;
- /*
- * Update our view of the consumer's subscribed topics and vice versa.
- */
- for (vector<string>::const_iterator it = consumer_topics[cid].begin();
- it != consumer_topics[cid].end(); it++) {
- topic_consumers[*it].erase(cid);
- }
- consumer_topics[cid].clear();
- for (vector<string>::const_iterator it = topics->begin();
- it != topics->end(); it++) {
- consumer_topics[cid].push_back(*it);
- topic_consumers[*it].insert(cid);
- }
- RdKafka::ErrorCode err;
- /*
- * Change subscription
- */
- if (!topics->empty()) {
- Test::Say(tostr() << "Consumer: " << consumer->name()
- << " is subscribing to topics "
- << string_vec_to_str(*topics) << " after "
- << ((test_clock() - ts_start) / 1000) << "ms\n");
- err = consumer->subscribe(*topics);
- TEST_ASSERT(!err, "Expected subscribe() to succeed, got %s",
- RdKafka::err2str(err).c_str());
- } else {
- Test::Say(tostr() << "Consumer: " << consumer->name()
- << " is unsubscribing after "
- << ((test_clock() - ts_start) / 1000) << "ms\n");
- Test::unsubscribe(consumer);
- }
- /* Mark this consumer as waiting for rebalance so that
- * verify_consumer_assignment() allows assigned partitions that
- * (no longer) match the subscription. */
- rebalance_cbs[cid].wait_rebalance = true;
- /*
- * Verify subscription matches what we think it should be.
- */
- vector<string> subscription;
- err = consumer->subscription(subscription);
- TEST_ASSERT(!err, "consumer %s subscription() failed: %s",
- consumer->name().c_str(), RdKafka::err2str(err).c_str());
- sort(subscription.begin(), subscription.end());
- Test::Say(tostr() << "Consumer " << consumer->name()
- << " subscription is now "
- << string_vec_to_str(subscription) << "\n");
- if (subscription != *topics)
- Test::Fail(tostr() << "Expected consumer " << consumer->name()
- << " subscription: " << string_vec_to_str(*topics)
- << " but got: " << string_vec_to_str(subscription));
- cmd_number++;
- }
- /*
- * Wait for final rebalances and all consumers to settle,
- * then verify assignments and received message counts.
- */
- Test::Say(_C_YEL "Waiting for final assignment state\n");
- int done_count = 0;
- /* Allow at least 20 seconds for group to stabilize. */
- int64_t stabilize_until = test_clock() + (20 * 1000 * 1000); /* 20s */
- while (done_count < 2) {
- bool stabilized = test_clock() > stabilize_until;
- poll_all_consumers(consumers, rebalance_cbs, N_CONSUMERS, 5000);
- /* Verify consumer assignments */
- int counts[N_CONSUMERS];
- map<Toppar, RdKafka::KafkaConsumer *> all_assignments;
- Test::Say(tostr() << "Consumer assignments "
- << "(subscription_variation " << subscription_variation
- << ")" << (stabilized ? " (stabilized)" : "")
- << (use_rebalance_cb ? " (use_rebalance_cb)"
- : " (no rebalance cb)")
- << ":\n");
- for (int i = 0; i < N_CONSUMERS; i++) {
- bool last_rebalance_stabilized =
- stabilized &&
- (!use_rebalance_cb ||
- /* * 2 + 1 */
- test_clock() > rebalance_cbs[i].ts_last_assign + (13 * 1000 * 1000));
- counts[i] = verify_consumer_assignment(
- consumers[i], rebalance_cbs[i], consumer_topics[i],
- /* allow empty */
- true,
- /* if we're waiting for a
- * rebalance it is okay for the
- * current assignment to contain
- * topics that this consumer
- * (no longer) subscribes to. */
- !last_rebalance_stabilized || !use_rebalance_cb ||
- rebalance_cbs[i].wait_rebalance,
- /* do not allow assignments for
- * topics that are not subscribed*/
- &all_assignments,
- /* Verify received message counts
- * once the assignments have
- * stabilized.
- * Requires the rebalance cb.*/
- done_count > 0 && use_rebalance_cb ? N_MSGS_PER_PARTITION : -1);
- }
- Test::Say(tostr() << all_assignments.size() << "/" << N_PARTITIONS
- << " partitions assigned\n");
- bool done = true;
- for (int i = 0; i < N_CONSUMERS; i++) {
- /* For each topic the consumer subscribes to it should
- * be assigned its share of partitions. */
- int exp_parts = 0;
- for (vector<string>::const_iterator it = consumer_topics[i].begin();
- it != consumer_topics[i].end(); it++)
- exp_parts += N_PARTS_PER_TOPIC / (int)topic_consumers[*it].size();
- Test::Say(tostr() << (counts[i] == exp_parts ? "" : _C_YEL) << "Consumer "
- << consumers[i]->name() << " has " << counts[i]
- << " assigned partitions (" << consumer_topics[i].size()
- << " subscribed topic(s))"
- << ", expecting " << exp_parts
- << " assigned partitions\n");
- if (counts[i] != exp_parts)
- done = false;
- }
- if (done && stabilized) {
- done_count++;
- Test::Say(tostr() << "All assignments verified, done count is "
- << done_count << "\n");
- }
- }
- Test::Say("Disposing consumers\n");
- for (int i = 0; i < N_CONSUMERS; i++) {
- TEST_ASSERT(!use_rebalance_cb || !rebalance_cbs[i].wait_rebalance,
- "Consumer %d still waiting for rebalance", i);
- if (i & 1)
- consumers[i]->close();
- delete consumers[i];
- }
-extern "C" {
-static int rebalance_cnt;
-static rd_kafka_resp_err_t rebalance_exp_event;
-static rd_bool_t rebalance_exp_lost;
-extern void test_print_partition_list(
- const rd_kafka_topic_partition_list_t *partitions);
-static void rebalance_cb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *parts,
- void *opaque) {
- rebalance_cnt++;
- TEST_SAY("Rebalance #%d: %s: %d partition(s)\n", rebalance_cnt,
- rd_kafka_err2name(err), parts->cnt);
- test_print_partition_list(parts);
- TEST_ASSERT(err == rebalance_exp_event ||
- rebalance_exp_event == RD_KAFKA_RESP_ERR_NO_ERROR,
- "Expected rebalance event %s, not %s",
- rd_kafka_err2name(rebalance_exp_event), rd_kafka_err2name(err));
- if (rebalance_exp_lost) {
- TEST_ASSERT(rd_kafka_assignment_lost(rk), "Expected partitions lost");
- TEST_SAY("Partitions were lost\n");
- }
- test_consumer_incremental_assign("assign", rk, parts);
- } else {
- test_consumer_incremental_unassign("unassign", rk, parts);
- }
- * @brief Wait for an expected rebalance event, or fail.
- */
-static void expect_rebalance0(const char *func,
- int line,
- const char *what,
- rd_kafka_t *c,
- rd_kafka_resp_err_t exp_event,
- rd_bool_t exp_lost,
- int timeout_s) {
- int64_t tmout = test_clock() + (timeout_s * 1000000);
- int start_cnt = rebalance_cnt;
- TEST_SAY("%s:%d: Waiting for %s (%s) for %ds\n", func, line, what,
- rd_kafka_err2name(exp_event), timeout_s);
- rebalance_exp_lost = exp_lost;
- rebalance_exp_event = exp_event;
- while (tmout > test_clock() && rebalance_cnt == start_cnt) {
- test_consumer_poll_once(c, NULL, 1000);
- }
- if (rebalance_cnt == start_cnt + 1) {
- rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
- rebalance_exp_lost = exp_lost = rd_false;
- return;
- }
- TEST_FAIL("%s:%d: Timed out waiting for %s (%s)", func, line, what,
- rd_kafka_err2name(exp_event));
-#define expect_rebalance(WHAT, C, EXP_EVENT, EXP_LOST, TIMEOUT_S) \
- expect_rebalance0(__FUNCTION__, __LINE__, WHAT, C, EXP_EVENT, EXP_LOST, \
-/* Check lost partitions revoke occurs on ILLEGAL_GENERATION heartbeat error.
- */
-static void p_lost_partitions_heartbeat_illegal_generation_test() {
- const char *bootstraps;
- rd_kafka_mock_cluster_t *mcluster;
- const char *groupid = "mygroup";
- const char *topic = "test";
- rd_kafka_t *c;
- rd_kafka_conf_t *conf;
- mcluster = test_mock_cluster_new(3, &bootstraps);
- rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
- /* Seed the topic with messages */
- test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10, "bootstrap.servers",
- bootstraps, "batch.num.messages", "10",
- "security.protocol", "plaintext", NULL);
- test_conf_init(&conf, NULL, 30);
- test_conf_set(conf, "bootstrap.servers", bootstraps);
- test_conf_set(conf, "security.protocol", "PLAINTEXT");
- test_conf_set(conf, "", groupid);
- test_conf_set(conf, "", "5000");
- test_conf_set(conf, "", "1000");
- test_conf_set(conf, "auto.offset.reset", "earliest");
- test_conf_set(conf, "", "false");
- test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
- c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
- test_consumer_subscribe(c, topic);
- expect_rebalance("initial assignment", c,
- rd_false /*don't expect lost*/, 5 + 2);
- /* Fail heartbeats */
- rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_Heartbeat, 5,
- expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
- rd_true /*expect lost*/, 10 + 2);
- rd_kafka_mock_clear_request_errors(mcluster, RD_KAFKAP_Heartbeat);
- expect_rebalance("rejoin after lost", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
- rd_false /*don't expect lost*/, 10 + 2);
- TEST_SAY("Closing consumer\n");
- test_consumer_close(c);
- TEST_SAY("Destroying consumer\n");
- rd_kafka_destroy(c);
- TEST_SAY("Destroying mock cluster\n");
- test_mock_cluster_destroy(mcluster);
-/* Check lost partitions revoke occurs on ILLEGAL_GENERATION JoinGroup
- * or SyncGroup error.
- */
-static void q_lost_partitions_illegal_generation_test(
- rd_bool_t test_joingroup_fail) {
- const char *bootstraps;
- rd_kafka_mock_cluster_t *mcluster;
- const char *groupid = "mygroup";
- const char *topic1 = "test1";
- const char *topic2 = "test2";
- rd_kafka_t *c;
- rd_kafka_conf_t *conf;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *topics;
- SUB_TEST0(!test_joingroup_fail /*quick*/, "test_joingroup_fail=%d",
- test_joingroup_fail);
- mcluster = test_mock_cluster_new(3, &bootstraps);
- rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
- /* Seed the topic1 with messages */
- test_produce_msgs_easy_v(topic1, 0, 0, 0, 100, 10, "bootstrap.servers",
- bootstraps, "batch.num.messages", "10",
- "security.protocol", "plaintext", NULL);
- /* Seed the topic2 with messages */
- test_produce_msgs_easy_v(topic2, 0, 0, 0, 100, 10, "bootstrap.servers",
- bootstraps, "batch.num.messages", "10",
- "security.protocol", "plaintext", NULL);
- test_conf_init(&conf, NULL, 30);
- test_conf_set(conf, "bootstrap.servers", bootstraps);
- test_conf_set(conf, "security.protocol", "PLAINTEXT");
- test_conf_set(conf, "", groupid);
- test_conf_set(conf, "", "5000");
- test_conf_set(conf, "", "1000");
- test_conf_set(conf, "auto.offset.reset", "earliest");
- test_conf_set(conf, "", "false");
- test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
- c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
- test_consumer_subscribe(c, topic1);
- expect_rebalance("initial assignment", c,
- rd_false /*don't expect lost*/, 5 + 2);
- /* Fail JoinGroups or SyncGroups */
- rd_kafka_mock_push_request_errors(
- mcluster, test_joingroup_fail ? RD_KAFKAP_JoinGroup : RD_KAFKAP_SyncGroup,
- topics = rd_kafka_topic_partition_list_new(2);
- rd_kafka_topic_partition_list_add(topics, topic1, RD_KAFKA_PARTITION_UA);
- rd_kafka_topic_partition_list_add(topics, topic2, RD_KAFKA_PARTITION_UA);
- err = rd_kafka_subscribe(c, topics);
- if (err)
- TEST_FAIL("%s: Failed to subscribe to topics: %s\n", rd_kafka_name(c),
- rd_kafka_err2str(err));
- rd_kafka_topic_partition_list_destroy(topics);
- expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
- rd_true /*expect lost*/, 10 + 2);
- rd_kafka_mock_clear_request_errors(mcluster, test_joingroup_fail
- ? RD_KAFKAP_JoinGroup
- : RD_KAFKAP_SyncGroup);
- expect_rebalance("rejoin group", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
- rd_false /*expect lost*/, 10 + 2);
- TEST_SAY("Closing consumer\n");
- test_consumer_close(c);
- TEST_SAY("Destroying consumer\n");
- rd_kafka_destroy(c);
- TEST_SAY("Destroying mock cluster\n");
- test_mock_cluster_destroy(mcluster);
-/* Check lost partitions revoke occurs on ILLEGAL_GENERATION Commit
- * error.
- */
-static void r_lost_partitions_commit_illegal_generation_test_local() {
- const char *bootstraps;
- rd_kafka_mock_cluster_t *mcluster;
- const char *groupid = "mygroup";
- const char *topic = "test";
- const int msgcnt = 100;
- rd_kafka_t *c;
- rd_kafka_conf_t *conf;
- mcluster = test_mock_cluster_new(3, &bootstraps);
- rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
- /* Seed the topic with messages */
- test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 10, "bootstrap.servers",
- bootstraps, "batch.num.messages", "10",
- "security.protocol", "plaintext", NULL);
- test_conf_init(&conf, NULL, 30);
- test_conf_set(conf, "bootstrap.servers", bootstraps);
- test_conf_set(conf, "security.protocol", "PLAINTEXT");
- test_conf_set(conf, "", groupid);
- test_conf_set(conf, "auto.offset.reset", "earliest");
- test_conf_set(conf, "", "false");
- test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
- c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
- test_consumer_subscribe(c, topic);
- expect_rebalance("initial assignment", c,
- rd_false /*don't expect lost*/, 5 + 2);
- /* Consume some messages so that the commit has something to commit. */
- test_consumer_poll("consume", c, -1, -1, -1, msgcnt / 2, NULL);
- /* Fail Commit */
- rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_OffsetCommit, 5,
- rd_kafka_commit(c, NULL, rd_false);
- expect_rebalance("lost partitions", c, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
- rd_true /*expect lost*/, 10 + 2);
- expect_rebalance("rejoin group", c, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
- rd_false /*expect lost*/, 20 + 2);
- TEST_SAY("Closing consumer\n");
- test_consumer_close(c);
- TEST_SAY("Destroying consumer\n");
- rd_kafka_destroy(c);
- TEST_SAY("Destroying mock cluster\n");
- test_mock_cluster_destroy(mcluster);
- * @brief Rebalance callback for the v_.. test below.
- */
-static void v_rebalance_cb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *parts,
- void *opaque) {
- bool *auto_commitp = (bool *)opaque;
- TEST_SAY("%s: %s: %d partition(s)%s\n", rd_kafka_name(rk),
- rd_kafka_err2name(err), parts->cnt,
- rd_kafka_assignment_lost(rk) ? " - assignment lost" : "");
- test_print_partition_list(parts);
- test_consumer_incremental_assign("assign", rk, parts);
- } else {
- test_consumer_incremental_unassign("unassign", rk, parts);
- if (!*auto_commitp) {
- rd_kafka_resp_err_t commit_err;
- TEST_SAY("Attempting manual commit after unassign, in 2 seconds..\n");
- /* Sleep enough to have the generation-id bumped by rejoin. */
- rd_sleep(2);
- commit_err = rd_kafka_commit(rk, NULL, 0 /*sync*/);
- TEST_ASSERT(!commit_err || commit_err == RD_KAFKA_RESP_ERR__NO_OFFSET ||
- commit_err == RD_KAFKA_RESP_ERR__DESTROY,
- "%s: manual commit failed: %s", rd_kafka_name(rk),
- rd_kafka_err2str(commit_err));
- }
- }
- * @brief Commit callback for the v_.. test.
- */
-static void v_commit_cb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *offsets,
- void *opaque) {
- TEST_SAY("%s offset commit for %d offsets: %s\n", rd_kafka_name(rk),
- offsets ? offsets->cnt : -1, rd_kafka_err2name(err));
- err == RD_KAFKA_RESP_ERR__DESTROY /* consumer was closed */,
- "%s offset commit failed: %s", rd_kafka_name(rk),
- rd_kafka_err2str(err));
-static void v_commit_during_rebalance(bool with_rebalance_cb,
- bool auto_commit) {
- rd_kafka_t *p, *c1, *c2;
- rd_kafka_conf_t *conf;
- const char *topic = test_mk_topic_name("0113_v", 1);
- const int partition_cnt = 6;
- const int msgcnt_per_partition = 100;
- const int msgcnt = partition_cnt * msgcnt_per_partition;
- uint64_t testid;
- int i;
- SUB_TEST("With%s rebalance callback and %s-commit",
- with_rebalance_cb ? "" : "out", auto_commit ? "auto" : "manual");
- test_conf_init(&conf, NULL, 30);
- testid = test_id_generate();
- /*
- * Produce messages to topic
- */
- p = test_create_producer();
- test_create_topic(p, topic, partition_cnt, 1);
- for (i = 0; i < partition_cnt; i++) {
- test_produce_msgs2(p, topic, testid, i, i * msgcnt_per_partition,
- msgcnt_per_partition, NULL, 0);
- }
- test_flush(p, -1);
- rd_kafka_destroy(p);
- test_conf_set(conf, "auto.offset.reset", "earliest");
- test_conf_set(conf, "", auto_commit ? "true" : "false");
- test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
- rd_kafka_conf_set_offset_commit_cb(conf, v_commit_cb);
- rd_kafka_conf_set_opaque(conf, (void *)&auto_commit);
- TEST_SAY("Create and subscribe first consumer\n");
- c1 = test_create_consumer(topic, with_rebalance_cb ? v_rebalance_cb : NULL,
- rd_kafka_conf_dup(conf), NULL);
- TEST_ASSERT(rd_kafka_opaque(c1) == (void *)&auto_commit,
- "c1 opaque mismatch");
- test_consumer_subscribe(c1, topic);
- /* Consume some messages so that we know we have an assignment
- * and something to commit. */
- test_consumer_poll("C1.PRECONSUME", c1, testid, -1, 0,
- msgcnt / partition_cnt / 2, NULL);
- TEST_SAY("Create and subscribe second consumer\n");
- c2 = test_create_consumer(topic, with_rebalance_cb ? v_rebalance_cb : NULL,
- conf, NULL);
- TEST_ASSERT(rd_kafka_opaque(c2) == (void *)&auto_commit,
- "c2 opaque mismatch");
- test_consumer_subscribe(c2, topic);
- /* Poll both consumers */
- for (i = 0; i < 10; i++) {
- test_consumer_poll_once(c1, NULL, 1000);
- test_consumer_poll_once(c2, NULL, 1000);
- }
- TEST_SAY("Closing consumers\n");
- test_consumer_close(c1);
- test_consumer_close(c2);
- rd_kafka_destroy(c1);
- rd_kafka_destroy(c2);
- * @brief Verify that incremental rebalances retain stickyness.
- */
-static void x_incremental_rebalances(void) {
-#define _NUM_CONS 3
- rd_kafka_t *c[_NUM_CONS];
- rd_kafka_conf_t *conf;
- const char *topic = test_mk_topic_name("0113_x", 1);
- int i;
- test_conf_init(&conf, NULL, 60);
- test_create_topic(NULL, topic, 6, 1);
- test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
- for (i = 0; i < _NUM_CONS; i++) {
- char clientid[32];
- rd_snprintf(clientid, sizeof(clientid), "consumer%d", i);
- test_conf_set(conf, "", clientid);
- c[i] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
- }
- rd_kafka_conf_destroy(conf);
- /* First consumer joins group */
- TEST_SAY("%s: joining\n", rd_kafka_name(c[0]));
- test_consumer_subscribe(c[0], topic);
- test_consumer_wait_assignment(c[0], rd_true /*poll*/);
- test_consumer_verify_assignment(c[0], rd_true /*fail immediately*/, topic, 0,
- topic, 1, topic, 2, topic, 3, topic, 4, topic,
- 5, NULL);
- /* Second consumer joins group */
- TEST_SAY("%s: joining\n", rd_kafka_name(c[1]));
- test_consumer_subscribe(c[1], topic);
- test_consumer_wait_assignment(c[1], rd_true /*poll*/);
- rd_sleep(3);
- test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 3,
- topic, 4, topic, 5, NULL);
- test_consumer_verify_assignment(c[1], rd_false /*fail later*/, topic, 0,
- topic, 1, topic, 2, NULL);
- /* Third consumer joins group */
- TEST_SAY("%s: joining\n", rd_kafka_name(c[2]));
- test_consumer_subscribe(c[2], topic);
- test_consumer_wait_assignment(c[2], rd_true /*poll*/);
- rd_sleep(3);
- test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 4,
- topic, 5, NULL);
- test_consumer_verify_assignment(c[1], rd_false /*fail later*/, topic, 1,
- topic, 2, NULL);
- test_consumer_verify_assignment(c[2], rd_false /*fail later*/, topic, 3,
- topic, 0, NULL);
- /* Raise any previously failed verify_assignment calls and fail the test */
- for (i = 0; i < _NUM_CONS; i++)
- rd_kafka_destroy(c[i]);
-#undef _NUM_CONS
-/* Local tests not needing a cluster */
-int main_0113_cooperative_rebalance_local(int argc, char **argv) {
- a_assign_rapid();
- p_lost_partitions_heartbeat_illegal_generation_test();
- q_lost_partitions_illegal_generation_test(rd_false /*joingroup*/);
- q_lost_partitions_illegal_generation_test(rd_true /*syncgroup*/);
- r_lost_partitions_commit_illegal_generation_test_local();
- return 0;
-int main_0113_cooperative_rebalance(int argc, char **argv) {
- int i;
- a_assign_tests();
- b_subscribe_with_cb_test(true /*close consumer*/);
- b_subscribe_with_cb_test(false /*don't close consumer*/);
- c_subscribe_no_cb_test(true /*close consumer*/);
- if (test_quick) {
- Test::Say("Skipping tests >= c_ .. due to quick mode\n");
- return 0;
- }
- c_subscribe_no_cb_test(false /*don't close consumer*/);
- d_change_subscription_add_topic(true /*close consumer*/);
- d_change_subscription_add_topic(false /*don't close consumer*/);
- e_change_subscription_remove_topic(true /*close consumer*/);
- e_change_subscription_remove_topic(false /*don't close consumer*/);
- f_assign_call_cooperative();
- g_incremental_assign_call_eager();
- h_delete_topic();
- i_delete_topic_2();
- j_delete_topic_no_rb_callback();
- k_add_partition();
- l_unsubscribe();
- m_unsubscribe_2();
- n_wildcard();
- o_java_interop();
- for (i = 1; i <= 6; i++) /* iterate over 6 different test variations */
- s_subscribe_when_rebalancing(i);
- for (i = 1; i <= 2; i++)
- t_max_poll_interval_exceeded(i);
- /* Run all 2*3 variations of the u_.. test */
- for (i = 0; i < 3; i++) {
- u_multiple_subscription_changes(true /*with rebalance_cb*/, i);
- u_multiple_subscription_changes(false /*without rebalance_cb*/, i);
- }
- v_commit_during_rebalance(true /*with rebalance callback*/,
- true /*auto commit*/);
- v_commit_during_rebalance(false /*without rebalance callback*/,
- true /*auto commit*/);
- v_commit_during_rebalance(true /*with rebalance callback*/,
- false /*manual commit*/);
- x_incremental_rebalances();
- return 0;