diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.c | 1900 |
1 files changed, 1900 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.c new file mode 100644 index 00000000..89bfa092 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.c @@ -0,0 +1,1900 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012,2013 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rd.h" +#include "rdkafka_int.h" +#include "rdkafka_msg.h" +#include "rdkafka_topic.h" +#include "rdkafka_partition.h" +#include "rdkafka_broker.h" +#include "rdkafka_cgrp.h" +#include "rdkafka_metadata.h" +#include "rdkafka_offset.h" +#include "rdlog.h" +#include "rdsysqueue.h" +#include "rdtime.h" +#include "rdregex.h" + +#if WITH_ZSTD +#include <zstd.h> +#endif + + +const char *rd_kafka_topic_state_names[] = {"unknown", "exists", "notexists", + "error"}; + + +static int rd_kafka_topic_metadata_update( + rd_kafka_topic_t *rkt, + const struct rd_kafka_metadata_topic *mdt, + const rd_kafka_partition_leader_epoch_t *leader_epochs, + rd_ts_t ts_age); + + +/** + * @brief Increases the app's topic reference count. + * + * The app refcounts are implemented separately from the librdkafka refcounts, + * they are increased/decreased in a separate rkt_app_refcnt to keep track of + * its use. + * + * This only covers topic_new() & topic_destroy(). + * The topic_t exposed in rd_kafka_message_t is NOT covered and is handled + * like a standard internal -> app pointer conversion (keep_a()). + */ +static void rd_kafka_topic_keep_app(rd_kafka_topic_t *rkt) { + if (rd_refcnt_add(&rkt->rkt_app_refcnt) == 1) + rd_kafka_topic_keep(rkt); +} + +/** + * @brief drop rkt app reference + */ +static void rd_kafka_topic_destroy_app(rd_kafka_topic_t *app_rkt) { + rd_kafka_topic_t *rkt = app_rkt; + + rd_assert(!rd_kafka_rkt_is_lw(app_rkt)); + + if (unlikely(rd_refcnt_sub(&rkt->rkt_app_refcnt) == 0)) + rd_kafka_topic_destroy0(rkt); /* final app reference lost, + * loose reference from + * keep_app() */ +} + + +/** + * Final destructor for topic. Refcnt must be 0. + */ +void rd_kafka_topic_destroy_final(rd_kafka_topic_t *rkt) { + rd_kafka_partition_msgid_t *partmsgid, *partmsgid_tmp; + + rd_kafka_assert(rkt->rkt_rk, rd_refcnt_get(&rkt->rkt_refcnt) == 0); + + rd_kafka_wrlock(rkt->rkt_rk); + TAILQ_REMOVE(&rkt->rkt_rk->rk_topics, rkt, rkt_link); + rkt->rkt_rk->rk_topic_cnt--; + rd_kafka_wrunlock(rkt->rkt_rk); + + TAILQ_FOREACH_SAFE(partmsgid, &rkt->rkt_saved_partmsgids, link, + partmsgid_tmp) { + rd_free(partmsgid); + } + + rd_kafka_assert(rkt->rkt_rk, rd_list_empty(&rkt->rkt_desp)); + rd_list_destroy(&rkt->rkt_desp); + + rd_avg_destroy(&rkt->rkt_avg_batchsize); + rd_avg_destroy(&rkt->rkt_avg_batchcnt); + + if (rkt->rkt_topic) + rd_kafkap_str_destroy(rkt->rkt_topic); + + rd_kafka_anyconf_destroy(_RK_TOPIC, &rkt->rkt_conf); + + rwlock_destroy(&rkt->rkt_lock); + rd_refcnt_destroy(&rkt->rkt_app_refcnt); + rd_refcnt_destroy(&rkt->rkt_refcnt); + + rd_free(rkt); +} + +/** + * @brief Application topic object destroy. + * @warning MUST ONLY BE CALLED BY THE APPLICATION. + * Use rd_kafka_topic_destroy0() for all internal use. + */ +void rd_kafka_topic_destroy(rd_kafka_topic_t *app_rkt) { + rd_kafka_lwtopic_t *lrkt; + if (unlikely((lrkt = rd_kafka_rkt_get_lw(app_rkt)) != NULL)) + rd_kafka_lwtopic_destroy(lrkt); + else + rd_kafka_topic_destroy_app(app_rkt); +} + + +/** + * Finds and returns a topic based on its name, or NULL if not found. + * The 'rkt' refcount is increased by one and the caller must call + * rd_kafka_topic_destroy() when it is done with the topic to decrease + * the refcount. + * + * Locality: any thread + */ +rd_kafka_topic_t *rd_kafka_topic_find_fl(const char *func, + int line, + rd_kafka_t *rk, + const char *topic, + int do_lock) { + rd_kafka_topic_t *rkt; + + if (do_lock) + rd_kafka_rdlock(rk); + TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { + if (!rd_kafkap_str_cmp_str(rkt->rkt_topic, topic)) { + rd_kafka_topic_keep(rkt); + break; + } + } + if (do_lock) + rd_kafka_rdunlock(rk); + + return rkt; +} + +/** + * Same semantics as ..find() but takes a Kafka protocol string instead. + */ +rd_kafka_topic_t *rd_kafka_topic_find0_fl(const char *func, + int line, + rd_kafka_t *rk, + const rd_kafkap_str_t *topic) { + rd_kafka_topic_t *rkt; + + rd_kafka_rdlock(rk); + TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { + if (!rd_kafkap_str_cmp(rkt->rkt_topic, topic)) { + rd_kafka_topic_keep(rkt); + break; + } + } + rd_kafka_rdunlock(rk); + + return rkt; +} + + +/** + * @brief rd_kafka_topic_t comparator. + */ +int rd_kafka_topic_cmp_rkt(const void *_a, const void *_b) { + rd_kafka_topic_t *rkt_a = (void *)_a, *rkt_b = (void *)_b; + + if (rkt_a == rkt_b) + return 0; + + return rd_kafkap_str_cmp(rkt_a->rkt_topic, rkt_b->rkt_topic); +} + + +/** + * @brief Destroy/free a light-weight topic object. + */ +void rd_kafka_lwtopic_destroy(rd_kafka_lwtopic_t *lrkt) { + rd_assert(rd_kafka_rkt_is_lw((const rd_kafka_topic_t *)lrkt)); + if (rd_refcnt_sub(&lrkt->lrkt_refcnt) > 0) + return; + + rd_refcnt_destroy(&lrkt->lrkt_refcnt); + rd_free(lrkt); +} + + +/** + * @brief Create a new light-weight topic name-only handle. + * + * This type of object is a light-weight non-linked alternative + * to the proper rd_kafka_itopic_t for outgoing APIs + * (such as rd_kafka_message_t) when there is no full topic object available. + */ +rd_kafka_lwtopic_t *rd_kafka_lwtopic_new(rd_kafka_t *rk, const char *topic) { + rd_kafka_lwtopic_t *lrkt; + size_t topic_len = strlen(topic); + + lrkt = rd_malloc(sizeof(*lrkt) + topic_len + 1); + + memcpy(lrkt->lrkt_magic, "LRKT", 4); + lrkt->lrkt_rk = rk; + rd_refcnt_init(&lrkt->lrkt_refcnt, 1); + lrkt->lrkt_topic = (char *)(lrkt + 1); + memcpy(lrkt->lrkt_topic, topic, topic_len + 1); + + return lrkt; +} + + +/** + * @returns a proper rd_kafka_topic_t object (not light-weight) + * based on the input rd_kafka_topic_t app object which may + * either be a proper topic (which is then returned) or a light-weight + * topic in which case it will look up or create the proper topic + * object. + * + * This allows the application to (unknowingly) pass a light-weight + * topic object to any proper-aware public API. + */ +rd_kafka_topic_t *rd_kafka_topic_proper(rd_kafka_topic_t *app_rkt) { + rd_kafka_lwtopic_t *lrkt; + + if (likely(!(lrkt = rd_kafka_rkt_get_lw(app_rkt)))) + return app_rkt; + + /* Create proper topic object */ + return rd_kafka_topic_new0(lrkt->lrkt_rk, lrkt->lrkt_topic, NULL, NULL, + 0); +} + + +/** + * @brief Create new topic handle. + * + * @locality any + */ +rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk, + const char *topic, + rd_kafka_topic_conf_t *conf, + int *existing, + int do_lock) { + rd_kafka_topic_t *rkt; + const struct rd_kafka_metadata_cache_entry *rkmce; + const char *conf_err; + const char *used_conf_str; + + /* Verify configuration. + * Maximum topic name size + headers must never exceed message.max.bytes + * which is min-capped to 1000. + * See rd_kafka_broker_produce_toppar() and rdkafka_conf.c */ + if (!topic || strlen(topic) > 512) { + if (conf) + rd_kafka_topic_conf_destroy(conf); + rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); + return NULL; + } + + if (do_lock) + rd_kafka_wrlock(rk); + if ((rkt = rd_kafka_topic_find(rk, topic, 0 /*no lock*/))) { + if (do_lock) + rd_kafka_wrunlock(rk); + if (conf) + rd_kafka_topic_conf_destroy(conf); + if (existing) + *existing = 1; + return rkt; + } + + if (!conf) { + if (rk->rk_conf.topic_conf) { + conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf); + used_conf_str = "default_topic_conf"; + } else { + conf = rd_kafka_topic_conf_new(); + used_conf_str = "empty"; + } + } else { + used_conf_str = "user-supplied"; + } + + + /* Verify and finalize topic configuration */ + if ((conf_err = rd_kafka_topic_conf_finalize(rk->rk_type, &rk->rk_conf, + conf))) { + if (do_lock) + rd_kafka_wrunlock(rk); + /* Incompatible configuration settings */ + rd_kafka_log(rk, LOG_ERR, "TOPICCONF", + "Incompatible configuration settings " + "for topic \"%s\": %s", + topic, conf_err); + rd_kafka_topic_conf_destroy(conf); + rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); + return NULL; + } + + if (existing) + *existing = 0; + + rkt = rd_calloc(1, sizeof(*rkt)); + + memcpy(rkt->rkt_magic, "IRKT", 4); + + rkt->rkt_topic = rd_kafkap_str_new(topic, -1); + rkt->rkt_rk = rk; + + rkt->rkt_ts_create = rd_clock(); + + rkt->rkt_conf = *conf; + rd_free(conf); /* explicitly not rd_kafka_topic_destroy() + * since we dont want to rd_free internal members, + * just the placeholder. The internal members + * were copied on the line above. */ + + /* Partitioner */ + if (!rkt->rkt_conf.partitioner) { + const struct { + const char *str; + void *part; + } part_map[] = { + {"random", (void *)rd_kafka_msg_partitioner_random}, + {"consistent", (void *)rd_kafka_msg_partitioner_consistent}, + {"consistent_random", + (void *)rd_kafka_msg_partitioner_consistent_random}, + {"murmur2", (void *)rd_kafka_msg_partitioner_murmur2}, + {"murmur2_random", + (void *)rd_kafka_msg_partitioner_murmur2_random}, + {"fnv1a", (void *)rd_kafka_msg_partitioner_fnv1a}, + {"fnv1a_random", + (void *)rd_kafka_msg_partitioner_fnv1a_random}, + {NULL}}; + int i; + + /* Use "partitioner" configuration property string, if set */ + for (i = 0; rkt->rkt_conf.partitioner_str && part_map[i].str; + i++) { + if (!strcmp(rkt->rkt_conf.partitioner_str, + part_map[i].str)) { + rkt->rkt_conf.partitioner = part_map[i].part; + break; + } + } + + /* Default partitioner: consistent_random */ + if (!rkt->rkt_conf.partitioner) { + /* Make sure part_map matched something, otherwise + * there is a discreprency between this code + * and the validator in rdkafka_conf.c */ + assert(!rkt->rkt_conf.partitioner_str); + + rkt->rkt_conf.partitioner = + rd_kafka_msg_partitioner_consistent_random; + } + } + + if (rkt->rkt_rk->rk_conf.sticky_partition_linger_ms > 0 && + rkt->rkt_conf.partitioner != rd_kafka_msg_partitioner_consistent && + rkt->rkt_conf.partitioner != rd_kafka_msg_partitioner_murmur2 && + rkt->rkt_conf.partitioner != rd_kafka_msg_partitioner_fnv1a) { + rkt->rkt_conf.random_partitioner = rd_false; + } else { + rkt->rkt_conf.random_partitioner = rd_true; + } + + /* Sticky partition assignment interval */ + rd_interval_init(&rkt->rkt_sticky_intvl); + + if (rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) + rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid; + else + rkt->rkt_conf.msg_order_cmp = rd_kafka_msg_cmp_msgid_lifo; + + if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT) + rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec; + + /* Translate compression level to library-specific level and check + * upper bound */ + switch (rkt->rkt_conf.compression_codec) { +#if WITH_ZLIB + case RD_KAFKA_COMPRESSION_GZIP: + if (rkt->rkt_conf.compression_level == + RD_KAFKA_COMPLEVEL_DEFAULT) + rkt->rkt_conf.compression_level = Z_DEFAULT_COMPRESSION; + else if (rkt->rkt_conf.compression_level > + RD_KAFKA_COMPLEVEL_GZIP_MAX) + rkt->rkt_conf.compression_level = + RD_KAFKA_COMPLEVEL_GZIP_MAX; + break; +#endif + case RD_KAFKA_COMPRESSION_LZ4: + if (rkt->rkt_conf.compression_level == + RD_KAFKA_COMPLEVEL_DEFAULT) + /* LZ4 has no notion of system-wide default compression + * level, use zero in this case */ + rkt->rkt_conf.compression_level = 0; + else if (rkt->rkt_conf.compression_level > + RD_KAFKA_COMPLEVEL_LZ4_MAX) + rkt->rkt_conf.compression_level = + RD_KAFKA_COMPLEVEL_LZ4_MAX; + break; +#if WITH_ZSTD + case RD_KAFKA_COMPRESSION_ZSTD: + if (rkt->rkt_conf.compression_level == + RD_KAFKA_COMPLEVEL_DEFAULT) + rkt->rkt_conf.compression_level = 3; + else if (rkt->rkt_conf.compression_level > + RD_KAFKA_COMPLEVEL_ZSTD_MAX) + rkt->rkt_conf.compression_level = + RD_KAFKA_COMPLEVEL_ZSTD_MAX; + break; +#endif + case RD_KAFKA_COMPRESSION_SNAPPY: + default: + /* Compression level has no effect in this case */ + rkt->rkt_conf.compression_level = RD_KAFKA_COMPLEVEL_DEFAULT; + } + + rd_avg_init(&rkt->rkt_avg_batchsize, RD_AVG_GAUGE, 0, + rk->rk_conf.max_msg_size, 2, + rk->rk_conf.stats_interval_ms ? 1 : 0); + rd_avg_init(&rkt->rkt_avg_batchcnt, RD_AVG_GAUGE, 0, + rk->rk_conf.batch_num_messages, 2, + rk->rk_conf.stats_interval_ms ? 1 : 0); + + rd_kafka_dbg(rk, TOPIC, "TOPIC", "New local topic: %.*s", + RD_KAFKAP_STR_PR(rkt->rkt_topic)); + + rd_list_init(&rkt->rkt_desp, 16, NULL); + rd_interval_init(&rkt->rkt_desp_refresh_intvl); + TAILQ_INIT(&rkt->rkt_saved_partmsgids); + rd_refcnt_init(&rkt->rkt_refcnt, 0); + rd_refcnt_init(&rkt->rkt_app_refcnt, 0); + + rd_kafka_topic_keep(rkt); + + rwlock_init(&rkt->rkt_lock); + + /* Create unassigned partition */ + rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA); + + TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link); + rk->rk_topic_cnt++; + + /* Populate from metadata cache. */ + if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1 /*valid*/)) && + !rkmce->rkmce_mtopic.err) { + if (existing) + *existing = 1; + + rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic, NULL, + rkmce->rkmce_ts_insert); + } + + if (do_lock) + rd_kafka_wrunlock(rk); + + if (rk->rk_conf.debug & RD_KAFKA_DBG_CONF) { + char desc[256]; + rd_snprintf(desc, sizeof(desc), + "Topic \"%s\" configuration (%s)", topic, + used_conf_str); + rd_kafka_anyconf_dump_dbg(rk, _RK_TOPIC, &rkt->rkt_conf, desc); + } + + return rkt; +} + + + +/** + * @brief Create new app topic handle. + * + * @locality application thread + */ +rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, + const char *topic, + rd_kafka_topic_conf_t *conf) { + rd_kafka_topic_t *rkt; + int existing; + + rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1 /*lock*/); + if (!rkt) + return NULL; + + /* Increase application refcount. */ + rd_kafka_topic_keep_app(rkt); + + /* Query for the topic leader (async) */ + if (!existing) + rd_kafka_topic_leader_query(rk, rkt); + + /* Drop our reference since there is already/now an app refcnt */ + rd_kafka_topic_destroy0(rkt); + + return rkt; +} + + + +/** + * Sets the state for topic. + * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held + */ +static void rd_kafka_topic_set_state(rd_kafka_topic_t *rkt, int state) { + + if ((int)rkt->rkt_state == state) + return; + + rd_kafka_dbg(rkt->rkt_rk, TOPIC, "STATE", + "Topic %s changed state %s -> %s", rkt->rkt_topic->str, + rd_kafka_topic_state_names[rkt->rkt_state], + rd_kafka_topic_state_names[state]); + + if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR) + rkt->rkt_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + rkt->rkt_state = state; +} + +/** + * Returns the name of a topic. + * NOTE: + * The topic Kafka String representation is crafted with an extra byte + * at the end for the Nul that is not included in the length, this way + * we can use the topic's String directly. + * This is not true for Kafka Strings read from the network. + */ +const char *rd_kafka_topic_name(const rd_kafka_topic_t *app_rkt) { + if (rd_kafka_rkt_is_lw(app_rkt)) + return rd_kafka_rkt_lw_const(app_rkt)->lrkt_topic; + else + return app_rkt->rkt_topic->str; +} + + +/** + * @brief Update the broker that a topic+partition is delegated to. + * + * @param broker_id The id of the broker to associate the toppar with. + * @param rkb A reference to the broker to delegate to (must match + * broker_id) or NULL if the toppar should be undelegated for + * any reason. + * @param reason Human-readable reason for the update, included in debug log. + * + * @returns 1 if the broker delegation was changed, -1 if the broker + * delegation was changed and is now undelegated, else 0. + * + * @locks caller must have rd_kafka_toppar_lock(rktp) + * @locality any + */ +int rd_kafka_toppar_broker_update(rd_kafka_toppar_t *rktp, + int32_t broker_id, + rd_kafka_broker_t *rkb, + const char *reason) { + + rktp->rktp_broker_id = broker_id; + + if (!rkb) { + int had_broker = rktp->rktp_broker ? 1 : 0; + rd_kafka_toppar_broker_delegate(rktp, NULL); + return had_broker ? -1 : 0; + } + + if (rktp->rktp_broker) { + if (rktp->rktp_broker == rkb) { + /* No change in broker */ + return 0; + } + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC | RD_KAFKA_DBG_FETCH, + "TOPICUPD", + "Topic %s [%" PRId32 + "]: migrating from " + "broker %" PRId32 " to %" PRId32 + " (leader is " + "%" PRId32 "): %s", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + rktp->rktp_broker->rkb_nodeid, rkb->rkb_nodeid, + rktp->rktp_leader_id, reason); + } + + rd_kafka_toppar_broker_delegate(rktp, rkb); + + return 1; +} + + +/** + * @brief Update a topic+partition for a new leader. + * + * @remark If a toppar is currently delegated to a preferred replica, + * it will not be delegated to the leader broker unless there + * has been a leader change. + * + * @param leader_id The id of the new leader broker. + * @param leader A reference to the leader broker or NULL if the + * toppar should be undelegated for any reason. + * @param leader_epoch Partition leader's epoch (KIP-320), or -1 if not known. + * + * @returns 1 if the broker delegation was changed, -1 if the broker + * delegation was changed and is now undelegated, else 0. + * + * @locks caller must have rd_kafka_topic_wrlock(rkt) + * AND NOT rd_kafka_toppar_lock(rktp) + * @locality any + */ +static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, + int32_t partition, + int32_t leader_id, + rd_kafka_broker_t *leader, + int32_t leader_epoch) { + rd_kafka_toppar_t *rktp; + rd_bool_t fetching_from_follower, need_epoch_validation = rd_false; + int r = 0; + + rktp = rd_kafka_toppar_get(rkt, partition, 0); + if (unlikely(!rktp)) { + /* Have only seen this in issue #132. + * Probably caused by corrupt broker state. */ + rd_kafka_log(rkt->rkt_rk, LOG_WARNING, "BROKER", + "%s [%" PRId32 + "] is unknown " + "(partition_cnt %i): " + "ignoring leader (%" PRId32 ") update", + rkt->rkt_topic->str, partition, + rkt->rkt_partition_cnt, leader_id); + return -1; + } + + rd_kafka_toppar_lock(rktp); + + if (leader_epoch < rktp->rktp_leader_epoch) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "%s [%" PRId32 + "]: ignoring outdated metadata update with " + "leader epoch %" PRId32 + " which is older than " + "our cached epoch %" PRId32, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, leader_epoch, + rktp->rktp_leader_epoch); + if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE) { + rd_kafka_toppar_unlock(rktp); + return 0; + } + } + + if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + need_epoch_validation = rd_true; + else if (leader_epoch > rktp->rktp_leader_epoch) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "%s [%" PRId32 "]: leader %" PRId32 + " epoch %" PRId32 " -> leader %" PRId32 + " epoch %" PRId32, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, rktp->rktp_leader_id, + rktp->rktp_leader_epoch, leader_id, leader_epoch); + rktp->rktp_leader_epoch = leader_epoch; + need_epoch_validation = rd_true; + } + + fetching_from_follower = + leader != NULL && rktp->rktp_broker != NULL && + rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && + rktp->rktp_broker != leader; + + if (fetching_from_follower && rktp->rktp_leader_id == leader_id) { + rd_kafka_dbg( + rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "Topic %s [%" PRId32 "]: leader %" PRId32 + " unchanged, " + "not migrating away from preferred replica %" PRId32, + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + leader_id, rktp->rktp_broker_id); + r = 0; + + } else { + + if (rktp->rktp_leader_id != leader_id || + rktp->rktp_leader != leader) { + /* Update leader if it has changed */ + rktp->rktp_leader_id = leader_id; + if (rktp->rktp_leader) + rd_kafka_broker_destroy(rktp->rktp_leader); + if (leader) + rd_kafka_broker_keep(leader); + rktp->rktp_leader = leader; + } + + /* Update handling broker */ + r = rd_kafka_toppar_broker_update(rktp, leader_id, leader, + "leader updated"); + } + + if (need_epoch_validation) { + /* Update next fetch position, that could be stale since last + * fetch start. Only if the app pos is real. */ + if (rktp->rktp_app_pos.offset > 0) { + rd_kafka_toppar_set_next_fetch_position( + rktp, rktp->rktp_app_pos); + } + rd_kafka_offset_validate(rktp, "epoch updated from metadata"); + } + + rd_kafka_toppar_unlock(rktp); + + rd_kafka_toppar_destroy(rktp); /* from get() */ + + return r; +} + + +/** + * @brief Revert the topic+partition delegation to the leader from + * a preferred replica. + * + * @returns 1 if the broker delegation was changed, -1 if the broker + * delegation was changed and is now undelegated, else 0. + * + * @locks none + * @locality any + */ +int rd_kafka_toppar_delegate_to_leader(rd_kafka_toppar_t *rktp) { + rd_kafka_broker_t *leader; + int r; + + rd_kafka_rdlock(rktp->rktp_rkt->rkt_rk); + rd_kafka_toppar_lock(rktp); + + rd_assert(rktp->rktp_leader_id != rktp->rktp_broker_id); + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "Topic %s [%" PRId32 + "]: Reverting from preferred " + "replica %" PRId32 " to leader %" PRId32, + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rktp->rktp_broker_id, rktp->rktp_leader_id); + + leader = rd_kafka_broker_find_by_nodeid(rktp->rktp_rkt->rkt_rk, + rktp->rktp_leader_id); + + rd_kafka_toppar_unlock(rktp); + rd_kafka_rdunlock(rktp->rktp_rkt->rkt_rk); + + rd_kafka_toppar_lock(rktp); + r = rd_kafka_toppar_broker_update( + rktp, rktp->rktp_leader_id, leader, + "reverting from preferred replica to leader"); + rd_kafka_toppar_unlock(rktp); + + if (leader) + rd_kafka_broker_destroy(leader); + + return r; +} + + + +/** + * @brief Save idempotent producer state for a partition that is about to + * be removed. + * + * @locks_required rd_kafka_wrlock(rkt), rd_kafka_toppar_lock(rktp) + */ +static void rd_kafka_toppar_idemp_msgid_save(rd_kafka_topic_t *rkt, + const rd_kafka_toppar_t *rktp) { + rd_kafka_partition_msgid_t *partmsgid = rd_malloc(sizeof(*partmsgid)); + partmsgid->partition = rktp->rktp_partition; + partmsgid->msgid = rktp->rktp_msgid; + partmsgid->pid = rktp->rktp_eos.pid; + partmsgid->epoch_base_msgid = rktp->rktp_eos.epoch_base_msgid; + partmsgid->ts = rd_clock(); + + TAILQ_INSERT_TAIL(&rkt->rkt_saved_partmsgids, partmsgid, link); +} + + +/** + * @brief Restore idempotent producer state for a new/resurfacing partition. + * + * @locks_required rd_kafka_wrlock(rkt), rd_kafka_toppar_lock(rktp) + */ +static void rd_kafka_toppar_idemp_msgid_restore(rd_kafka_topic_t *rkt, + rd_kafka_toppar_t *rktp) { + rd_kafka_partition_msgid_t *partmsgid; + + TAILQ_FOREACH(partmsgid, &rkt->rkt_saved_partmsgids, link) { + if (partmsgid->partition == rktp->rktp_partition) + break; + } + + if (!partmsgid) + return; + + rktp->rktp_msgid = partmsgid->msgid; + rktp->rktp_eos.pid = partmsgid->pid; + rktp->rktp_eos.epoch_base_msgid = partmsgid->epoch_base_msgid; + + rd_kafka_dbg(rkt->rkt_rk, EOS | RD_KAFKA_DBG_TOPIC, "MSGID", + "Topic %s [%" PRId32 "]: restored %s with MsgId %" PRIu64 + " and " + "epoch base MsgId %" PRIu64 + " that was saved upon removal %dms ago", + rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_pid2str(partmsgid->pid), partmsgid->msgid, + partmsgid->epoch_base_msgid, + (int)((rd_clock() - partmsgid->ts) / 1000)); + + TAILQ_REMOVE(&rkt->rkt_saved_partmsgids, partmsgid, link); + rd_free(partmsgid); +} + + +/** + * @brief Update the number of partitions for a topic and takes actions + * accordingly. + * + * @returns 1 if the partition count changed, else 0. + * + * @locks rd_kafka_topic_wrlock(rkt) MUST be held. + */ +static int rd_kafka_topic_partition_cnt_update(rd_kafka_topic_t *rkt, + int32_t partition_cnt) { + rd_kafka_t *rk = rkt->rkt_rk; + rd_kafka_toppar_t **rktps; + rd_kafka_toppar_t *rktp; + rd_bool_t is_idempodent = rd_kafka_is_idempotent(rk); + int32_t i; + + if (likely(rkt->rkt_partition_cnt == partition_cnt)) + return 0; /* No change in partition count */ + + if (unlikely(rkt->rkt_partition_cnt != 0 && + !rd_kafka_terminating(rkt->rkt_rk))) + rd_kafka_log(rk, LOG_NOTICE, "PARTCNT", + "Topic %s partition count changed " + "from %" PRId32 " to %" PRId32, + rkt->rkt_topic->str, rkt->rkt_partition_cnt, + partition_cnt); + else + rd_kafka_dbg(rk, TOPIC, "PARTCNT", + "Topic %s partition count changed " + "from %" PRId32 " to %" PRId32, + rkt->rkt_topic->str, rkt->rkt_partition_cnt, + partition_cnt); + + + /* Create and assign new partition list */ + if (partition_cnt > 0) + rktps = rd_calloc(partition_cnt, sizeof(*rktps)); + else + rktps = NULL; + + for (i = 0; i < partition_cnt; i++) { + if (i >= rkt->rkt_partition_cnt) { + /* New partition. Check if its in the list of + * desired partitions first. */ + + rktp = rd_kafka_toppar_desired_get(rkt, i); + if (rktp) { + rd_kafka_toppar_lock(rktp); + rktp->rktp_flags &= + ~(RD_KAFKA_TOPPAR_F_UNKNOWN | + RD_KAFKA_TOPPAR_F_REMOVE); + + /* Remove from desp list since the + * partition is now known. */ + rd_kafka_toppar_desired_unlink(rktp); + } else { + rktp = rd_kafka_toppar_new(rkt, i); + + rd_kafka_toppar_lock(rktp); + rktp->rktp_flags &= + ~(RD_KAFKA_TOPPAR_F_UNKNOWN | + RD_KAFKA_TOPPAR_F_REMOVE); + } + rktps[i] = rktp; + + if (is_idempodent) + /* Restore idempotent producer state for + * this partition, if any. */ + rd_kafka_toppar_idemp_msgid_restore(rkt, rktp); + + rd_kafka_toppar_unlock(rktp); + + } else { + /* Existing partition, grab our own reference. */ + rktps[i] = rd_kafka_toppar_keep(rkt->rkt_p[i]); + /* Loose previous ref */ + rd_kafka_toppar_destroy(rkt->rkt_p[i]); + } + } + + /* Propagate notexist errors for desired partitions */ + RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i) { + rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED", + "%s [%" PRId32 + "]: " + "desired partition does not exist in cluster", + rkt->rkt_topic->str, rktp->rktp_partition); + rd_kafka_toppar_enq_error( + rktp, + rkt->rkt_err ? rkt->rkt_err + : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, + "desired partition is not available"); + } + + /* Remove excessive partitions */ + for (i = partition_cnt; i < rkt->rkt_partition_cnt; i++) { + rktp = rkt->rkt_p[i]; + + rd_kafka_dbg(rkt->rkt_rk, TOPIC, "REMOVE", + "%s [%" PRId32 "] no longer reported in metadata", + rkt->rkt_topic->str, rktp->rktp_partition); + + rd_kafka_toppar_lock(rktp); + + /* Idempotent/Transactional producer: + * We need to save each removed partition's base msgid for + * the (rare) chance the partition comes back, + * in which case we must continue with the correct msgid + * in future ProduceRequests. + * + * These base msgsid are restored (above) if/when partitions + * come back and the PID,Epoch hasn't changed. + * + * One situation where this might happen is if a broker goes + * out of sync and starts to wrongfully report an existing + * topic as non-existent, triggering the removal of partitions + * on the producer client. When metadata is eventually correct + * again and the topic is "re-created" on the producer, it + * must continue with the next msgid/baseseq. */ + if (is_idempodent && rd_kafka_pid_valid(rktp->rktp_eos.pid)) + rd_kafka_toppar_idemp_msgid_save(rkt, rktp); + + rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN; + + if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) { + rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED", + "Topic %s [%" PRId32 + "] is desired " + "but no longer known: " + "moving back on desired list", + rkt->rkt_topic->str, rktp->rktp_partition); + + /* If this is a desired partition move it back on to + * the desired list since partition is no longer known*/ + rd_kafka_toppar_desired_link(rktp); + + if (!rd_kafka_terminating(rkt->rkt_rk)) + rd_kafka_toppar_enq_error( + rktp, + rkt->rkt_err + ? rkt->rkt_err + : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, + "desired partition is no longer " + "available"); + + rd_kafka_toppar_broker_delegate(rktp, NULL); + + } else { + /* Tell handling broker to let go of the toppar */ + rd_kafka_toppar_broker_leave_for_remove(rktp); + } + + rd_kafka_toppar_unlock(rktp); + + rd_kafka_toppar_destroy(rktp); + } + + if (rkt->rkt_p) + rd_free(rkt->rkt_p); + + rkt->rkt_p = rktps; + + rkt->rkt_partition_cnt = partition_cnt; + + return 1; +} + + + +/** + * Topic 'rkt' does not exist: propagate to interested parties. + * The topic's state must have been set to NOTEXISTS and + * rd_kafka_topic_partition_cnt_update() must have been called prior to + * calling this function. + * + * Locks: rd_kafka_topic_*lock() must be held. + */ +static void rd_kafka_topic_propagate_notexists(rd_kafka_topic_t *rkt, + rd_kafka_resp_err_t err) { + rd_kafka_toppar_t *rktp; + int i; + + if (rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER) + return; + + + /* Notify consumers that the topic doesn't exist. */ + RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i) + rd_kafka_toppar_enq_error(rktp, err, "topic does not exist"); +} + + +/** + * Assign messages on the UA partition to available partitions. + * Locks: rd_kafka_topic_*lock() must be held. + */ +static void rd_kafka_topic_assign_uas(rd_kafka_topic_t *rkt, + rd_kafka_resp_err_t err) { + rd_kafka_t *rk = rkt->rkt_rk; + rd_kafka_toppar_t *rktp_ua; + rd_kafka_msg_t *rkm, *tmp; + rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas); + rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed); + rd_kafka_resp_err_t err_all = RD_KAFKA_RESP_ERR_NO_ERROR; + int cnt; + + if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER) + return; + + rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0); + if (unlikely(!rktp_ua)) { + rd_kafka_dbg(rk, TOPIC, "ASSIGNUA", + "No UnAssigned partition available for %s", + rkt->rkt_topic->str); + return; + } + + /* Assign all unassigned messages to new topics. */ + rd_kafka_toppar_lock(rktp_ua); + + if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR) { + err_all = rkt->rkt_err; + rd_kafka_dbg(rk, TOPIC, "PARTCNT", + "Failing all %i unassigned messages in " + "topic %.*s due to permanent topic error: %s", + rktp_ua->rktp_msgq.rkmq_msg_cnt, + RD_KAFKAP_STR_PR(rkt->rkt_topic), + rd_kafka_err2str(err_all)); + } else if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) { + err_all = err; + rd_kafka_dbg(rk, TOPIC, "PARTCNT", + "Failing all %i unassigned messages in " + "topic %.*s since topic does not exist: %s", + rktp_ua->rktp_msgq.rkmq_msg_cnt, + RD_KAFKAP_STR_PR(rkt->rkt_topic), + rd_kafka_err2str(err_all)); + } else { + rd_kafka_dbg(rk, TOPIC, "PARTCNT", + "Partitioning %i unassigned messages in " + "topic %.*s to %" PRId32 " partitions", + rktp_ua->rktp_msgq.rkmq_msg_cnt, + RD_KAFKAP_STR_PR(rkt->rkt_topic), + rkt->rkt_partition_cnt); + } + + rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq); + cnt = uas.rkmq_msg_cnt; + rd_kafka_toppar_unlock(rktp_ua); + + TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) { + /* Fast-path for failing messages with forced partition or + * when all messages are to fail. */ + if (err_all || (rkm->rkm_partition != RD_KAFKA_PARTITION_UA && + rkm->rkm_partition >= rkt->rkt_partition_cnt && + rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN)) { + rd_kafka_msgq_enq(&failed, rkm); + continue; + } + + if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) { + /* Desired partition not available */ + rd_kafka_msgq_enq(&failed, rkm); + } + } + + rd_kafka_dbg(rk, TOPIC, "UAS", + "%i/%i messages were partitioned in topic %s", + cnt - failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str); + + if (failed.rkmq_msg_cnt > 0) { + /* Fail the messages */ + rd_kafka_dbg(rk, TOPIC, "UAS", + "%" PRId32 + "/%i messages failed partitioning " + "in topic %s", + failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str); + rd_kafka_dr_msgq( + rkt, &failed, + err_all ? err_all : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION); + } + + rd_kafka_toppar_destroy(rktp_ua); /* from get() */ +} + + +/** + * @brief Mark topic as non-existent, unless metadata propagation configuration + * disallows it. + * + * @param err Propagate non-existent topic using this error code. + * If \p err is RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION it means the + * topic is invalid and no propagation delay will be used. + * + * @returns true if the topic was marked as non-existent, else false. + * + * @locks topic_wrlock() MUST be held. + */ +rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt, + rd_kafka_resp_err_t err) { + rd_ts_t remains_us; + rd_bool_t permanent = err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION; + + if (unlikely(rd_kafka_terminating(rkt->rkt_rk))) { + /* Dont update metadata while terminating. */ + return rd_false; + } + + rd_assert(err != RD_KAFKA_RESP_ERR_NO_ERROR); + + remains_us = + (rkt->rkt_ts_create + + (rkt->rkt_rk->rk_conf.metadata_propagation_max_ms * 1000)) - + rkt->rkt_ts_metadata; + + if (!permanent && rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN && + remains_us > 0) { + /* Still allowing topic metadata to propagate. */ + rd_kafka_dbg( + rkt->rkt_rk, TOPIC | RD_KAFKA_DBG_METADATA, "TOPICPROP", + "Topic %.*s does not exist, allowing %dms " + "for metadata propagation before marking topic " + "as non-existent", + RD_KAFKAP_STR_PR(rkt->rkt_topic), (int)(remains_us / 1000)); + return rd_false; + } + + rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS); + + rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; + + /* Update number of partitions */ + rd_kafka_topic_partition_cnt_update(rkt, 0); + + /* Purge messages with forced partition */ + rd_kafka_topic_assign_uas(rkt, err); + + /* Propagate nonexistent topic info */ + rd_kafka_topic_propagate_notexists(rkt, err); + + return rd_true; +} + +/** + * @brief Mark topic as errored, such as when topic authorization fails. + * + * @param err Propagate error using this error code. + * + * @returns true if the topic was marked as errored, else false. + * + * @locality any + * @locks topic_wrlock() MUST be held. + */ +rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, + rd_kafka_resp_err_t err) { + + if (unlikely(rd_kafka_terminating(rkt->rkt_rk))) { + /* Dont update metadata while terminating. */ + return rd_false; + } + + rd_assert(err != RD_KAFKA_RESP_ERR_NO_ERROR); + + /* Same error, ignore. */ + if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR && rkt->rkt_err == err) + return rd_true; + + rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPICERROR", + "Topic %s has permanent error: %s", rkt->rkt_topic->str, + rd_kafka_err2str(err)); + + rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_ERROR); + + rkt->rkt_err = err; + + /* Update number of partitions */ + rd_kafka_topic_partition_cnt_update(rkt, 0); + + /* Purge messages with forced partition */ + rd_kafka_topic_assign_uas(rkt, err); + + return rd_true; +} + + + +/** + * @brief Update a topic from metadata. + * + * @param mdt Topic metadata. + * @param leader_epochs Array of per-partition leader epochs, or NULL. + * The array size is identical to the partition count in + * \p mdt. + * @param ts_age absolute age (timestamp) of metadata. + * @returns 1 if the number of partitions changed, 0 if not, and -1 if the + * topic is unknown. + + * + * @locks_required rd_kafka_*lock() MUST be held. + */ +static int rd_kafka_topic_metadata_update( + rd_kafka_topic_t *rkt, + const struct rd_kafka_metadata_topic *mdt, + const rd_kafka_partition_leader_epoch_t *leader_epochs, + rd_ts_t ts_age) { + rd_kafka_t *rk = rkt->rkt_rk; + int upd = 0; + int j; + rd_kafka_broker_t **partbrokers; + int leader_cnt = 0; + int old_state; + + if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR) + rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", + "Error in metadata reply for " + "topic %s (PartCnt %i): %s", + rkt->rkt_topic->str, mdt->partition_cnt, + rd_kafka_err2str(mdt->err)); + + if (unlikely(rd_kafka_terminating(rk))) { + /* Dont update metadata while terminating, do this + * after acquiring lock for proper synchronisation */ + return -1; + } + + /* Look up brokers before acquiring rkt lock to preserve lock order */ + partbrokers = rd_malloc(mdt->partition_cnt * sizeof(*partbrokers)); + + for (j = 0; j < mdt->partition_cnt; j++) { + if (mdt->partitions[j].leader == -1) { + partbrokers[j] = NULL; + continue; + } + + partbrokers[j] = rd_kafka_broker_find_by_nodeid( + rk, mdt->partitions[j].leader); + } + + + rd_kafka_topic_wrlock(rkt); + + old_state = rkt->rkt_state; + rkt->rkt_ts_metadata = ts_age; + + /* Set topic state. + * UNKNOWN_TOPIC_OR_PART may indicate that auto.create.topics failed */ + if (mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION /*invalid topic*/ || + mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) + rd_kafka_topic_set_notexists(rkt, mdt->err); + else if (mdt->partition_cnt > 0) + rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS); + else if (mdt->err) + rd_kafka_topic_set_error(rkt, mdt->err); + + /* Update number of partitions, but not if there are + * (possibly intermittent) errors (e.g., "Leader not available"). */ + if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) { + upd += rd_kafka_topic_partition_cnt_update(rkt, + mdt->partition_cnt); + + /* If the metadata times out for a topic (because all brokers + * are down) the state will transition to S_UNKNOWN. + * When updated metadata is eventually received there might + * not be any change to partition count or leader, + * but there may still be messages in the UA partition that + * needs to be assigned, so trigger an update for this case too. + * Issue #1985. */ + if (old_state == RD_KAFKA_TOPIC_S_UNKNOWN) + upd++; + } + + /* Update leader for each partition */ + for (j = 0; j < mdt->partition_cnt; j++) { + int r; + rd_kafka_broker_t *leader; + int32_t leader_epoch = + leader_epochs ? leader_epochs[j].leader_epoch : -1; + + rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", + " Topic %s partition %i Leader %" PRId32 + " Epoch %" PRId32, + rkt->rkt_topic->str, mdt->partitions[j].id, + mdt->partitions[j].leader, leader_epoch); + + leader = partbrokers[j]; + partbrokers[j] = NULL; + + /* Update leader for partition */ + r = rd_kafka_toppar_leader_update(rkt, mdt->partitions[j].id, + mdt->partitions[j].leader, + leader, leader_epoch); + + upd += (r != 0 ? 1 : 0); + + if (leader) { + if (r != -1) + leader_cnt++; + /* Drop reference to broker (from find()) */ + rd_kafka_broker_destroy(leader); + } + } + + /* If all partitions have leaders we can turn off fast leader query. */ + if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt) + rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; + + if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) { + /* (Possibly intermittent) topic-wide error: + * remove leaders for partitions */ + + for (j = 0; j < rkt->rkt_partition_cnt; j++) { + rd_kafka_toppar_t *rktp; + if (!rkt->rkt_p[j]) + continue; + + rktp = rkt->rkt_p[j]; + rd_kafka_toppar_lock(rktp); + rd_kafka_toppar_broker_delegate(rktp, NULL); + rd_kafka_toppar_unlock(rktp); + } + } + + /* If there was an update to the partitions try to assign + * unassigned messages to new partitions, or fail them */ + if (upd > 0) + rd_kafka_topic_assign_uas( + rkt, + mdt->err ? mdt->err : RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); + + rd_kafka_topic_wrunlock(rkt); + + /* Loose broker references */ + for (j = 0; j < mdt->partition_cnt; j++) + if (partbrokers[j]) + rd_kafka_broker_destroy(partbrokers[j]); + + rd_free(partbrokers); + + return upd; +} + +/** + * @brief Update topic by metadata, if topic is locally known. + * @sa rd_kafka_topic_metadata_update() + * @locks none + */ +int rd_kafka_topic_metadata_update2( + rd_kafka_broker_t *rkb, + const struct rd_kafka_metadata_topic *mdt, + const rd_kafka_partition_leader_epoch_t *leader_epochs) { + rd_kafka_topic_t *rkt; + int r; + + rd_kafka_wrlock(rkb->rkb_rk); + if (!(rkt = + rd_kafka_topic_find(rkb->rkb_rk, mdt->topic, 0 /*!lock*/))) { + rd_kafka_wrunlock(rkb->rkb_rk); + return -1; /* Ignore topics that we dont have locally. */ + } + + r = rd_kafka_topic_metadata_update(rkt, mdt, leader_epochs, rd_clock()); + + rd_kafka_wrunlock(rkb->rkb_rk); + + rd_kafka_topic_destroy0(rkt); /* from find() */ + + return r; +} + + + +/** + * @returns a list of all partitions (rktp's) for a topic. + * @remark rd_kafka_topic_*lock() MUST be held. + */ +static rd_list_t *rd_kafka_topic_get_all_partitions(rd_kafka_topic_t *rkt) { + rd_list_t *list; + rd_kafka_toppar_t *rktp; + int i; + + list = rd_list_new(rkt->rkt_partition_cnt + + rd_list_cnt(&rkt->rkt_desp) + 1 /*ua*/, + NULL); + + for (i = 0; i < rkt->rkt_partition_cnt; i++) + rd_list_add(list, rd_kafka_toppar_keep(rkt->rkt_p[i])); + + RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i) + rd_list_add(list, rd_kafka_toppar_keep(rktp)); + + if (rkt->rkt_ua) + rd_list_add(list, rd_kafka_toppar_keep(rkt->rkt_ua)); + + return list; +} + + + +/** + * Remove all partitions from a topic, including the ua. + * Must only be called during rd_kafka_t termination. + * + * Locality: main thread + */ +void rd_kafka_topic_partitions_remove(rd_kafka_topic_t *rkt) { + rd_kafka_toppar_t *rktp; + rd_list_t *partitions; + int i; + + /* Purge messages for all partitions outside the topic_wrlock since + * a message can hold a reference to the topic_t and thus + * would trigger a recursive lock dead-lock. */ + rd_kafka_topic_rdlock(rkt); + partitions = rd_kafka_topic_get_all_partitions(rkt); + rd_kafka_topic_rdunlock(rkt); + + RD_LIST_FOREACH(rktp, partitions, i) { + rd_kafka_toppar_lock(rktp); + rd_kafka_msgq_purge(rkt->rkt_rk, &rktp->rktp_msgq); + rd_kafka_toppar_purge_and_disable_queues(rktp); + rd_kafka_toppar_unlock(rktp); + + rd_kafka_toppar_destroy(rktp); + } + rd_list_destroy(partitions); + + rd_kafka_topic_keep(rkt); + rd_kafka_topic_wrlock(rkt); + + /* Setting the partition count to 0 moves all partitions to + * the desired list (rktp_desp). */ + rd_kafka_topic_partition_cnt_update(rkt, 0); + + /* Now clean out the desired partitions list. + * Use reverse traversal to avoid excessive memory shuffling + * in rd_list_remove() */ + RD_LIST_FOREACH_REVERSE(rktp, &rkt->rkt_desp, i) { + /* Keep a reference while deleting from desired list */ + rd_kafka_toppar_keep(rktp); + + rd_kafka_toppar_lock(rktp); + rd_kafka_toppar_desired_del(rktp); + rd_kafka_toppar_unlock(rktp); + + rd_kafka_toppar_destroy(rktp); + } + + rd_kafka_assert(rkt->rkt_rk, rkt->rkt_partition_cnt == 0); + + if (rkt->rkt_p) + rd_free(rkt->rkt_p); + + rkt->rkt_p = NULL; + rkt->rkt_partition_cnt = 0; + + if ((rktp = rkt->rkt_ua)) { + rkt->rkt_ua = NULL; + rd_kafka_toppar_destroy(rktp); + } + + rd_kafka_topic_wrunlock(rkt); + + rd_kafka_topic_destroy0(rkt); +} + + + +/** + * @returns the broker state (as a human readable string) if a query + * for the partition leader is necessary, else NULL. + * @locality any + * @locks rd_kafka_toppar_lock MUST be held + */ +static const char *rd_kafka_toppar_needs_query(rd_kafka_t *rk, + rd_kafka_toppar_t *rktp) { + int broker_state; + + if (!rktp->rktp_broker) + return "not delegated"; + + if (rktp->rktp_broker->rkb_source == RD_KAFKA_INTERNAL) + return "internal"; + + broker_state = rd_kafka_broker_get_state(rktp->rktp_broker); + + if (broker_state >= RD_KAFKA_BROKER_STATE_UP) + return NULL; + + if (!rk->rk_conf.sparse_connections) + return "down"; + + /* Partition assigned to broker but broker does not + * need a persistent connection, this typically means + * the partition is not being fetched or not being produced to, + * so there is no need to re-query the leader. */ + if (broker_state == RD_KAFKA_BROKER_STATE_INIT) + return NULL; + + /* This is most likely a persistent broker, + * which means the partition leader should probably + * be re-queried to see if it needs changing. */ + return "down"; +} + + + +/** + * @brief Scan all topics and partitions for: + * - timed out messages in UA partitions. + * - topics that needs to be created on the broker. + * - topics who's metadata is too old. + * - partitions with unknown leaders that require leader query. + * + * @locality rdkafka main thread + */ +void rd_kafka_topic_scan_all(rd_kafka_t *rk, rd_ts_t now) { + rd_kafka_topic_t *rkt; + rd_kafka_toppar_t *rktp; + rd_list_t query_topics; + + rd_list_init(&query_topics, 0, rd_free); + + rd_kafka_rdlock(rk); + TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { + int p; + int query_this = 0; + rd_kafka_msgq_t timedout = RD_KAFKA_MSGQ_INITIALIZER(timedout); + + rd_kafka_topic_wrlock(rkt); + + /* Check if metadata information has timed out. */ + if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN && + !rd_kafka_metadata_cache_topic_get(rk, rkt->rkt_topic->str, + 1 /*only valid*/)) { + rd_kafka_dbg(rk, TOPIC, "NOINFO", + "Topic %s metadata information timed out " + "(%" PRId64 "ms old)", + rkt->rkt_topic->str, + (rd_clock() - rkt->rkt_ts_metadata) / + 1000); + rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN); + + query_this = 1; + } else if (rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN) { + rd_kafka_dbg(rk, TOPIC, "NOINFO", + "Topic %s metadata information unknown", + rkt->rkt_topic->str); + query_this = 1; + } + + /* Just need a read-lock from here on. */ + rd_kafka_topic_wrunlock(rkt); + rd_kafka_topic_rdlock(rkt); + + if (rkt->rkt_partition_cnt == 0) { + /* If this topic is unknown by brokers try + * to create it by sending a topic-specific + * metadata request. + * This requires "auto.create.topics.enable=true" + * on the brokers. */ + rd_kafka_dbg(rk, TOPIC, "NOINFO", + "Topic %s partition count is zero: " + "should refresh metadata", + rkt->rkt_topic->str); + + query_this = 1; + + } else if (!rd_list_empty(&rkt->rkt_desp) && + rd_interval_immediate(&rkt->rkt_desp_refresh_intvl, + 10 * 1000 * 1000, 0) > 0) { + /* Query topic metadata if there are + * desired (non-existent) partitions. + * At most every 10 seconds. */ + rd_kafka_dbg(rk, TOPIC, "DESIRED", + "Topic %s has %d desired partition(s): " + "should refresh metadata", + rkt->rkt_topic->str, + rd_list_cnt(&rkt->rkt_desp)); + + query_this = 1; + } + + for (p = RD_KAFKA_PARTITION_UA; p < rkt->rkt_partition_cnt; + p++) { + + if (!(rktp = rd_kafka_toppar_get( + rkt, p, + p == RD_KAFKA_PARTITION_UA ? rd_true + : rd_false))) + continue; + + rd_kafka_toppar_lock(rktp); + + /* Check that partition is delegated to a broker that + * is up, else add topic to query list. */ + if (p != RD_KAFKA_PARTITION_UA) { + const char *leader_reason = + rd_kafka_toppar_needs_query(rk, rktp); + + if (leader_reason) { + rd_kafka_dbg(rk, TOPIC, "QRYLEADER", + "Topic %s [%" PRId32 + "]: " + "broker is %s: re-query", + rkt->rkt_topic->str, + rktp->rktp_partition, + leader_reason); + query_this = 1; + } + } else { + if (rk->rk_type == RD_KAFKA_PRODUCER) { + /* Scan UA partition for message + * timeouts. + * Proper partitions are scanned by + * their toppar broker thread. */ + rd_kafka_msgq_age_scan( + rktp, &rktp->rktp_msgq, &timedout, + now, NULL); + } + } + + rd_kafka_toppar_unlock(rktp); + rd_kafka_toppar_destroy(rktp); + } + + rd_kafka_topic_rdunlock(rkt); + + /* Propagate delivery reports for timed out messages */ + if (rd_kafka_msgq_len(&timedout) > 0) { + rd_kafka_dbg( + rk, MSG, "TIMEOUT", "%s: %d message(s) timed out", + rkt->rkt_topic->str, rd_kafka_msgq_len(&timedout)); + rd_kafka_dr_msgq(rkt, &timedout, + RD_KAFKA_RESP_ERR__MSG_TIMED_OUT); + } + + /* Need to re-query this topic's leader. */ + if (query_this && + !rd_list_find(&query_topics, rkt->rkt_topic->str, + (void *)strcmp)) + rd_list_add(&query_topics, + rd_strdup(rkt->rkt_topic->str)); + } + rd_kafka_rdunlock(rk); + + if (!rd_list_empty(&query_topics)) + rd_kafka_metadata_refresh_topics( + rk, NULL, &query_topics, rd_true /*force even if cached + * info exists*/ + , + rk->rk_conf.allow_auto_create_topics, + rd_false /*!cgrp_update*/, "refresh unavailable topics"); + rd_list_destroy(&query_topics); +} + + +/** + * Locks: rd_kafka_topic_*lock() must be held. + */ +int rd_kafka_topic_partition_available(const rd_kafka_topic_t *app_rkt, + int32_t partition) { + int avail; + rd_kafka_toppar_t *rktp; + rd_kafka_broker_t *rkb; + + /* This API must only be called from a partitioner and the + * partitioner is always passed a proper topic */ + rd_assert(!rd_kafka_rkt_is_lw(app_rkt)); + + rktp = rd_kafka_toppar_get(app_rkt, partition, 0 /*no ua-on-miss*/); + if (unlikely(!rktp)) + return 0; + + rkb = rd_kafka_toppar_broker(rktp, 1 /*proper broker*/); + avail = rkb ? 1 : 0; + if (rkb) + rd_kafka_broker_destroy(rkb); + rd_kafka_toppar_destroy(rktp); + return avail; +} + + +void *rd_kafka_topic_opaque(const rd_kafka_topic_t *app_rkt) { + const rd_kafka_lwtopic_t *lrkt; + + lrkt = rd_kafka_rkt_get_lw((rd_kafka_topic_t *)app_rkt); + if (unlikely(lrkt != NULL)) { + void *opaque; + rd_kafka_topic_t *rkt; + + if (!(rkt = rd_kafka_topic_find(lrkt->lrkt_rk, lrkt->lrkt_topic, + 1 /*lock*/))) + return NULL; + + opaque = rkt->rkt_conf.opaque; + + rd_kafka_topic_destroy0(rkt); /* loose refcnt from find() */ + + return opaque; + } + + return app_rkt->rkt_conf.opaque; +} + + +int rd_kafka_topic_info_cmp(const void *_a, const void *_b) { + const rd_kafka_topic_info_t *a = _a, *b = _b; + int r; + + if ((r = strcmp(a->topic, b->topic))) + return r; + + return RD_CMP(a->partition_cnt, b->partition_cnt); +} + + +/** + * @brief string compare two topics. + * + * @param _a topic string (type char *) + * @param _b rd_kafka_topic_info_t * pointer. + */ +int rd_kafka_topic_info_topic_cmp(const void *_a, const void *_b) { + const char *a = _a; + const rd_kafka_topic_info_t *b = _b; + return strcmp(a, b->topic); +} + + +/** + * Allocate new topic_info. + * \p topic is copied. + */ +rd_kafka_topic_info_t *rd_kafka_topic_info_new(const char *topic, + int partition_cnt) { + rd_kafka_topic_info_t *ti; + size_t tlen = strlen(topic) + 1; + + /* Allocate space for the topic along with the struct */ + ti = rd_malloc(sizeof(*ti) + tlen); + ti->topic = (char *)(ti + 1); + memcpy((char *)ti->topic, topic, tlen); + ti->partition_cnt = partition_cnt; + + return ti; +} + +/** + * Destroy/free topic_info + */ +void rd_kafka_topic_info_destroy(rd_kafka_topic_info_t *ti) { + rd_free(ti); +} + + +/** + * @brief Match \p topic to \p pattern. + * + * If pattern begins with "^" it is considered a regexp, + * otherwise a simple string comparison is performed. + * + * @returns 1 on match, else 0. + */ +int rd_kafka_topic_match(rd_kafka_t *rk, + const char *pattern, + const char *topic) { + char errstr[128]; + + if (*pattern == '^') { + int r = rd_regex_match(pattern, topic, errstr, sizeof(errstr)); + if (unlikely(r == -1)) + rd_kafka_dbg(rk, TOPIC, "TOPICREGEX", + "Topic \"%s\" regex \"%s\" " + "matching failed: %s", + topic, pattern, errstr); + return r == 1; + } else + return !strcmp(pattern, topic); +} + + + +/** + * @brief Trigger broker metadata query for topic leader. + * + * @locks none + */ +void rd_kafka_topic_leader_query0(rd_kafka_t *rk, + rd_kafka_topic_t *rkt, + int do_rk_lock, + rd_bool_t force) { + rd_list_t topics; + + rd_list_init(&topics, 1, rd_free); + rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str)); + + rd_kafka_metadata_refresh_topics( + rk, NULL, &topics, force, rk->rk_conf.allow_auto_create_topics, + rd_false /*!cgrp_update*/, "leader query"); + + rd_list_destroy(&topics); +} + + + +/** + * @brief Populate list \p topics with the topic names (strdupped char *) of + * all locally known or cached topics. + * + * @param cache_cntp is an optional pointer to an int that will be set to the + * number of entries added to \p topics from the + * metadata cache. + * @remark \p rk lock MUST NOT be held + */ +void rd_kafka_local_topics_to_list(rd_kafka_t *rk, + rd_list_t *topics, + int *cache_cntp) { + rd_kafka_topic_t *rkt; + int cache_cnt; + + rd_kafka_rdlock(rk); + rd_list_grow(topics, rk->rk_topic_cnt); + TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) + rd_list_add(topics, rd_strdup(rkt->rkt_topic->str)); + cache_cnt = rd_kafka_metadata_cache_topics_to_list(rk, topics); + if (cache_cntp) + *cache_cntp = cache_cnt; + rd_kafka_rdunlock(rk); +} + + +/** + * @brief Unit test helper to set a topic's state to EXISTS + * with the given number of partitions. + */ +void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, + int partition_cnt, + int32_t leader_id) { + struct rd_kafka_metadata_topic mdt = {.topic = + (char *)rkt->rkt_topic->str, + .partition_cnt = partition_cnt}; + int i; + + mdt.partitions = rd_alloca(sizeof(*mdt.partitions) * partition_cnt); + + for (i = 0; i < partition_cnt; i++) { + memset(&mdt.partitions[i], 0, sizeof(mdt.partitions[i])); + mdt.partitions[i].id = i; + mdt.partitions[i].leader = leader_id; + } + + rd_kafka_wrlock(rkt->rkt_rk); + rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, rd_true); + rd_kafka_topic_metadata_update(rkt, &mdt, NULL, rd_clock()); + rd_kafka_wrunlock(rkt->rkt_rk); +} |