diff options
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0101-fetch-from-follower.cpp')
-rw-r--r-- | src/fluent-bit/lib/librdkafka-2.1.0/tests/0101-fetch-from-follower.cpp | 446 |
1 files changed, 446 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0101-fetch-from-follower.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0101-fetch-from-follower.cpp new file mode 100644 index 000000000..342ec4f8f --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0101-fetch-from-follower.cpp @@ -0,0 +1,446 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "testcpp.h" + +#if WITH_RAPIDJSON + +#include <iostream> +#include <cstring> +#include <cstdlib> +#include <assert.h> +#include <sstream> +#include <string> +#include <map> +#include <set> +#include "rdkafka.h" + +#include <rapidjson/document.h> +#include <rapidjson/schema.h> +#include <rapidjson/filereadstream.h> +#include <rapidjson/stringbuffer.h> +#include <rapidjson/error/en.h> +#include <rapidjson/prettywriter.h> + + +/** + * @brief A basic test of fetch from follower funtionality + * - produces a bunch of messages to a replicated topic. + * - configure the consumer such that `client.rack` is different from the + * broker's `broker.rack` (and use + * org.apache.kafka.common.replica.RackAwareReplicaSelector). + * - consume the messages, and check they are as expected. + * - use rxbytes from the statistics event to confirm that + * the messages were retrieved from the replica broker (not the + * leader). + */ + + +#define test_assert(cond, msg) \ + do { \ + if (!(cond)) \ + Test::Say(msg); \ + } while (0) + + +class TestEvent2Cb : public RdKafka::EventCb { + public: + static bool should_capture_stats; + static bool has_captured_stats; + static std::map<int32_t, int64_t> rxbytes; + + void event_cb(RdKafka::Event &event) { + switch (event.type()) { + case RdKafka::Event::EVENT_LOG: + Test::Say(event.str() + "\n"); + break; + case RdKafka::Event::EVENT_STATS: + if (should_capture_stats) { + rapidjson::Document d; + if (d.Parse(event.str().c_str()).HasParseError()) + Test::Fail(tostr() << "Failed to parse stats JSON: " + << rapidjson::GetParseError_En(d.GetParseError()) + << " at " << d.GetErrorOffset()); + + /* iterate over brokers. */ + rapidjson::Pointer jpath((const char *)"/brokers"); + rapidjson::Value *pp = rapidjson::GetValueByPointer(d, jpath); + if (pp == NULL) + return; + + for (rapidjson::Value::ConstMemberIterator itr = pp->MemberBegin(); + itr != pp->MemberEnd(); ++itr) { + std::string broker_name = itr->name.GetString(); + size_t broker_id_idx = broker_name.rfind('/'); + if (broker_id_idx == (size_t)-1) + continue; + std::string broker_id = broker_name.substr( + broker_id_idx + 1, broker_name.size() - broker_id_idx - 1); + + int64_t broker_rxbytes = + itr->value.FindMember("rxbytes")->value.GetInt64(); + rxbytes[atoi(broker_id.c_str())] = broker_rxbytes; + } + + has_captured_stats = true; + break; + } + default: + break; + } + } +}; + +bool TestEvent2Cb::should_capture_stats; +bool TestEvent2Cb::has_captured_stats; +std::map<int32_t, int64_t> TestEvent2Cb::rxbytes; +static TestEvent2Cb ex_event_cb; + + +static void get_brokers_info(std::string &topic_str, + int32_t *leader, + std::vector<int> &brokers) { + std::string errstr; + RdKafka::ErrorCode err; + class RdKafka::Metadata *metadata; + + /* Determine the ids of the brokers that the partition has replicas + * on and which one of those is the leader. + */ + RdKafka::Conf *pConf; + Test::conf_init(&pConf, NULL, 10); + RdKafka::Producer *p = RdKafka::Producer::create(pConf, errstr); + delete pConf; + test_assert(p, tostr() << "Failed to create producer: " << errstr); + + RdKafka::Topic *topic = RdKafka::Topic::create(p, topic_str, NULL, errstr); + test_assert(topic, tostr() << "Failed to create topic: " << errstr); + + err = p->metadata(0, topic, &metadata, tmout_multip(5000)); + test_assert( + err == RdKafka::ERR_NO_ERROR, + tostr() << "%% Failed to acquire metadata: " << RdKafka::err2str(err)); + + test_assert(metadata->topics()->size() == 1, + tostr() << "expecting metadata for exactly one topic. " + << "have metadata for " << metadata->topics()->size() + << "topics"); + + RdKafka::Metadata::TopicMetadataIterator topicMetadata = + metadata->topics()->begin(); + RdKafka::TopicMetadata::PartitionMetadataIterator partitionMetadata = + (*topicMetadata)->partitions()->begin(); + + *leader = (*partitionMetadata)->leader(); + + size_t idx = 0; + RdKafka::PartitionMetadata::ReplicasIterator replicasIterator; + for (replicasIterator = (*partitionMetadata)->replicas()->begin(); + replicasIterator != (*partitionMetadata)->replicas()->end(); + ++replicasIterator) { + brokers.push_back(*replicasIterator); + idx++; + } + + delete metadata; + delete topic; + delete p; +} + + +/** + * @brief Wait for up to \p tmout for any type of admin result. + * @returns the event + */ +rd_kafka_event_t *test_wait_admin_result(rd_kafka_queue_t *q, + rd_kafka_event_type_t evtype, + int tmout) { + rd_kafka_event_t *rkev; + + while (1) { + rkev = rd_kafka_queue_poll(q, tmout); + if (!rkev) + Test::Fail(tostr() << "Timed out waiting for admin result (" << evtype + << ")\n"); + + if (rd_kafka_event_type(rkev) == evtype) + return rkev; + + if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_ERROR) { + Test::Say(tostr() << "Received error event while waiting for " << evtype + << ": " << rd_kafka_event_error_string(rkev) + << ": ignoring"); + continue; + } + + test_assert(rd_kafka_event_type(rkev) == evtype, + tostr() << "Expected event type " << evtype << ", got " + << rd_kafka_event_type(rkev) << " (" + << rd_kafka_event_name(rkev) << ")"); + } + + return NULL; +} + + +/** + * @returns the number of broker.rack values configured across all brokers. + */ +static int get_broker_rack_count(std::vector<int> &replica_ids) { + std::string errstr; + RdKafka::Conf *pConf; + Test::conf_init(&pConf, NULL, 10); + RdKafka::Producer *p = RdKafka::Producer::create(pConf, errstr); + delete pConf; + + rd_kafka_queue_t *mainq = rd_kafka_queue_get_main(p->c_ptr()); + + std::set<std::string> racks; + for (size_t i = 0; i < replica_ids.size(); ++i) { + std::string name = tostr() << replica_ids[i]; + + rd_kafka_ConfigResource_t *config = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_BROKER, &name[0]); + + rd_kafka_AdminOptions_t *options; + char cerrstr[128]; + options = rd_kafka_AdminOptions_new(p->c_ptr(), RD_KAFKA_ADMIN_OP_ANY); + rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout( + options, 10000, cerrstr, sizeof(cerrstr)); + test_assert(!err, cerrstr); + + rd_kafka_DescribeConfigs(p->c_ptr(), &config, 1, options, mainq); + rd_kafka_ConfigResource_destroy(config); + rd_kafka_AdminOptions_destroy(options); + rd_kafka_event_t *rkev = test_wait_admin_result( + mainq, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, 5000); + + const rd_kafka_DescribeConfigs_result_t *res = + rd_kafka_event_DescribeConfigs_result(rkev); + test_assert(res, "expecting describe config results to be not NULL"); + + err = rd_kafka_event_error(rkev); + const char *errstr2 = rd_kafka_event_error_string(rkev); + test_assert(!err, tostr() << "Expected success, not " + << rd_kafka_err2name(err) << ": " << errstr2); + + size_t rconfig_cnt; + const rd_kafka_ConfigResource_t **rconfigs = + rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt); + test_assert(rconfig_cnt == 1, + tostr() << "Expecting 1 resource, got " << rconfig_cnt); + + err = rd_kafka_ConfigResource_error(rconfigs[0]); + errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[0]); + + size_t entry_cnt; + const rd_kafka_ConfigEntry_t **entries = + rd_kafka_ConfigResource_configs(rconfigs[0], &entry_cnt); + + for (size_t j = 0; j < entry_cnt; ++j) { + const rd_kafka_ConfigEntry_t *e = entries[j]; + const char *cname = rd_kafka_ConfigEntry_name(e); + if (!strcmp(cname, "broker.rack")) { + const char *val = rd_kafka_ConfigEntry_value(e) + ? rd_kafka_ConfigEntry_value(e) + : "(NULL)"; + racks.insert(std::string(val)); + } + } + + rd_kafka_event_destroy(rkev); + } + + rd_kafka_queue_destroy(mainq); + delete p; + + return (int)racks.size(); +} + + +static void do_fff_test(void) { + /* Produce some messages to a single partition topic + * with 3 replicas. + */ + int msgcnt = 1000; + const int msgsize = 100; + std::string topic_str = Test::mk_topic_name("0101-fetch-from-follower", 1); + test_create_topic(NULL, topic_str.c_str(), 1, 3); + test_produce_msgs_easy_size(topic_str.c_str(), 0, 0, msgcnt, msgsize); + + int leader_id; + std::vector<int> replica_ids; + get_brokers_info(topic_str, &leader_id, replica_ids); + test_assert(replica_ids.size() == 3, + tostr() << "expecting three replicas, but " << replica_ids.size() + << " were reported."); + Test::Say(tostr() << topic_str << " leader id: " << leader_id + << ", all replica ids: [" << replica_ids[0] << ", " + << replica_ids[1] << ", " << replica_ids[2] << "]\n"); + + if (get_broker_rack_count(replica_ids) != 3) { + Test::Skip("unexpected broker.rack configuration: skipping test.\n"); + return; + } + + /* arrange for the consumer's client.rack to align with a broker that is not + * the leader. */ + int client_rack_id = -1; + size_t i; + for (i = 0; i < replica_ids.size(); ++i) { + if (replica_ids[i] != leader_id) { + client_rack_id = replica_ids[i]; + break; + } + } + + std::string client_rack = tostr() << "RACK" << client_rack_id; + Test::Say("client.rack: " + client_rack + "\n"); + + std::string errstr; + RdKafka::Conf *conf; + Test::conf_init(&conf, NULL, 10); + Test::conf_set(conf, "group.id", topic_str); + Test::conf_set(conf, "auto.offset.reset", "earliest"); + Test::conf_set(conf, "enable.auto.commit", "false"); + Test::conf_set(conf, "statistics.interval.ms", "1000"); + conf->set("event_cb", &ex_event_cb, errstr); + Test::conf_set(conf, "client.rack", client_rack); + + RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); + test_assert(c, "Failed to create KafkaConsumer: " + errstr); + delete conf; + + /* Subscribe */ + std::vector<std::string> topics; + topics.push_back(topic_str); + RdKafka::ErrorCode err; + if ((err = c->subscribe(topics))) + Test::Fail("subscribe failed: " + RdKafka::err2str(err)); + + /* Start consuming */ + Test::Say("Consuming topic " + topic_str + "\n"); + int cnt = 0; + while (cnt < msgcnt) { + RdKafka::Message *msg = c->consume(tmout_multip(1000)); + + switch (msg->err()) { + case RdKafka::ERR__TIMED_OUT: + break; + + case RdKafka::ERR_NO_ERROR: { + test_assert(msg->len() == 100, "expecting message value size to be 100"); + char *cnt_str_start_ptr = strstr((char *)msg->payload(), "msg=") + 4; + test_assert(cnt_str_start_ptr, "expecting 'msg=' in message payload"); + char *cnt_str_end_ptr = strstr(cnt_str_start_ptr, "\n"); + test_assert(cnt_str_start_ptr, + "expecting '\n' following 'msg=' in message payload"); + *cnt_str_end_ptr = '\0'; + int msg_cnt = atoi(cnt_str_start_ptr); + test_assert(msg_cnt == cnt, "message consumed out of order"); + cnt++; + } break; + + default: + Test::Fail("Consume error: " + msg->errstr()); + break; + } + + delete msg; + } + + /* rely on the test timeout to prevent an infinite loop in + * the (unlikely) event that the statistics callback isn't + * called. */ + Test::Say("Capturing rxbytes statistics\n"); + TestEvent2Cb::should_capture_stats = true; + while (!TestEvent2Cb::has_captured_stats) { + RdKafka::Message *msg = c->consume(tmout_multip(500)); + delete msg; + } + + for (i = 0; i < replica_ids.size(); ++i) + Test::Say( + tostr() << _C_YEL << "rxbytes for replica on broker " << replica_ids[i] + << ": " << TestEvent2Cb::rxbytes[replica_ids[i]] + << (replica_ids[i] == leader_id ? " (leader)" : "") + << (replica_ids[i] == client_rack_id ? " (preferred replica)" + : "") + << "\n"); + + for (i = 0; i < replica_ids.size(); ++i) + if (replica_ids[i] != client_rack_id) + test_assert( + TestEvent2Cb::rxbytes[replica_ids[i]] < + TestEvent2Cb::rxbytes[client_rack_id], + "rxbytes was not highest on broker corresponding to client.rack."); + + test_assert( + TestEvent2Cb::rxbytes[client_rack_id] > msgcnt * msgsize, + tostr() << "expecting rxbytes of client.rack broker to be at least " + << msgcnt * msgsize << " but it was " + << TestEvent2Cb::rxbytes[client_rack_id]); + + Test::Say("Done\n"); + + // Manual test 1: + // - change the lease period from 5 minutes to 5 seconds (modify + // rdkafka_partition.c) + // - change the max lease grant period from 1 minute to 10 seconds (modify + // rdkafka_broker.c) + // - add infinite consume loop to the end of this test. + // - observe: + // - the partition gets delegated to the preferred replica. + // - the messages get consumed. + // - the lease expires. + // - the partition is reverted to the leader. + // - the toppar is backed off, and debug message noting the faster than + // expected delegation to a replica. + + // Manual test 2: + // - same modifications as above. + // - add Test::conf_set(conf, "topic.metadata.refresh.interval.ms", "3000"); + // - observe: + // - that metadata being periodically received and not interfering with + // anything. + + c->close(); + delete c; +} +#endif + +extern "C" { +int main_0101_fetch_from_follower(int argc, char **argv) { +#if WITH_RAPIDJSON + do_fff_test(); +#else + Test::Skip("RapidJSON >=1.1.0 not available\n"); +#endif + return 0; +} +} |