diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_metadata.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_metadata.c | 1468 |
1 files changed, 1468 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_metadata.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_metadata.c new file mode 100644 index 000000000..4e32e5d58 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_metadata.c @@ -0,0 +1,1468 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2013, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "rd.h" +#include "rdkafka_int.h" +#include "rdkafka_topic.h" +#include "rdkafka_broker.h" +#include "rdkafka_request.h" +#include "rdkafka_idempotence.h" +#include "rdkafka_metadata.h" + +#include <string.h> +#include <stdarg.h> + + +rd_kafka_resp_err_t +rd_kafka_metadata(rd_kafka_t *rk, + int all_topics, + rd_kafka_topic_t *only_rkt, + const struct rd_kafka_metadata **metadatap, + int timeout_ms) { + rd_kafka_q_t *rkq; + rd_kafka_broker_t *rkb; + rd_kafka_op_t *rko; + rd_ts_t ts_end = rd_timeout_init(timeout_ms); + rd_list_t topics; + rd_bool_t allow_auto_create_topics = + rk->rk_conf.allow_auto_create_topics; + + /* Query any broker that is up, and if none are up pick the first one, + * if we're lucky it will be up before the timeout */ + rkb = rd_kafka_broker_any_usable(rk, timeout_ms, RD_DO_LOCK, 0, + "application metadata request"); + if (!rkb) + return RD_KAFKA_RESP_ERR__TRANSPORT; + + rkq = rd_kafka_q_new(rk); + + rd_list_init(&topics, 0, rd_free); + if (!all_topics) { + if (only_rkt) + rd_list_add(&topics, + rd_strdup(rd_kafka_topic_name(only_rkt))); + else { + int cache_cnt; + rd_kafka_local_topics_to_list(rkb->rkb_rk, &topics, + &cache_cnt); + /* Don't trigger auto-create for cached topics */ + if (rd_list_cnt(&topics) == cache_cnt) + allow_auto_create_topics = rd_true; + } + } + + /* Async: request metadata */ + rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA); + rd_kafka_op_set_replyq(rko, rkq, 0); + rko->rko_u.metadata.force = 1; /* Force metadata request regardless + * of outstanding metadata requests. */ + rd_kafka_MetadataRequest(rkb, &topics, "application requested", + allow_auto_create_topics, + /* cgrp_update: + * Only update consumer group state + * on response if this lists all + * topics in the cluster, since a + * partial request may make it seem + * like some subscribed topics are missing. */ + all_topics ? rd_true : rd_false, rko); + + rd_list_destroy(&topics); + rd_kafka_broker_destroy(rkb); + + /* Wait for reply (or timeout) */ + rko = rd_kafka_q_pop(rkq, rd_timeout_remains_us(ts_end), 0); + + rd_kafka_q_destroy_owner(rkq); + + /* Timeout */ + if (!rko) + return RD_KAFKA_RESP_ERR__TIMED_OUT; + + /* Error */ + if (rko->rko_err) { + rd_kafka_resp_err_t err = rko->rko_err; + rd_kafka_op_destroy(rko); + return err; + } + + /* Reply: pass metadata pointer to application who now owns it*/ + rd_kafka_assert(rk, rko->rko_u.metadata.md); + *metadatap = rko->rko_u.metadata.md; + rko->rko_u.metadata.md = NULL; + rd_kafka_op_destroy(rko); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata) { + rd_free((void *)metadata); +} + + +/** + * @returns a newly allocated copy of metadata \p src of size \p size + */ +struct rd_kafka_metadata * +rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { + struct rd_kafka_metadata *md; + rd_tmpabuf_t tbuf; + int i; + + /* metadata is stored in one contigious buffer where structs and + * and pointed-to fields are layed out in a memory aligned fashion. + * rd_tmpabuf_t provides the infrastructure to do this. + * Because of this we copy all the structs verbatim but + * any pointer fields needs to be copied explicitly to update + * the pointer address. */ + rd_tmpabuf_new(&tbuf, size, 1 /*assert on fail*/); + md = rd_tmpabuf_write(&tbuf, src, sizeof(*md)); + + rd_tmpabuf_write_str(&tbuf, src->orig_broker_name); + + + /* Copy Brokers */ + md->brokers = rd_tmpabuf_write(&tbuf, src->brokers, + md->broker_cnt * sizeof(*md->brokers)); + + for (i = 0; i < md->broker_cnt; i++) + md->brokers[i].host = + rd_tmpabuf_write_str(&tbuf, src->brokers[i].host); + + + /* Copy TopicMetadata */ + md->topics = rd_tmpabuf_write(&tbuf, src->topics, + md->topic_cnt * sizeof(*md->topics)); + + for (i = 0; i < md->topic_cnt; i++) { + int j; + + md->topics[i].topic = + rd_tmpabuf_write_str(&tbuf, src->topics[i].topic); + + + /* Copy partitions */ + md->topics[i].partitions = + rd_tmpabuf_write(&tbuf, src->topics[i].partitions, + md->topics[i].partition_cnt * + sizeof(*md->topics[i].partitions)); + + for (j = 0; j < md->topics[i].partition_cnt; j++) { + /* Copy replicas and ISRs */ + md->topics[i].partitions[j].replicas = rd_tmpabuf_write( + &tbuf, src->topics[i].partitions[j].replicas, + md->topics[i].partitions[j].replica_cnt * + sizeof(*md->topics[i].partitions[j].replicas)); + + md->topics[i].partitions[j].isrs = rd_tmpabuf_write( + &tbuf, src->topics[i].partitions[j].isrs, + md->topics[i].partitions[j].isr_cnt * + sizeof(*md->topics[i].partitions[j].isrs)); + } + } + + /* Check for tmpabuf errors */ + if (rd_tmpabuf_failed(&tbuf)) + rd_kafka_assert(NULL, !*"metadata copy failed"); + + /* Delibarely not destroying the tmpabuf since we return + * its allocated memory. */ + + return md; +} + + + +/** + * @brief Partition (id) comparator for partition_id_leader_epoch struct. + */ +static int rd_kafka_metadata_partition_leader_epoch_cmp(const void *_a, + const void *_b) { + const rd_kafka_partition_leader_epoch_t *a = _a, *b = _b; + return RD_CMP(a->partition_id, b->partition_id); +} + + + +/** + * @brief Update topic state and information based on topic metadata. + * + * @param mdt Topic metadata. + * @param leader_epochs Per-partition leader epoch array, or NULL if not known. + * + * @locality rdkafka main thread + * @locks_acquired rd_kafka_wrlock(rk) + */ +static void rd_kafka_parse_Metadata_update_topic( + rd_kafka_broker_t *rkb, + const rd_kafka_metadata_topic_t *mdt, + const rd_kafka_partition_leader_epoch_t *leader_epochs) { + + rd_rkb_dbg(rkb, METADATA, "METADATA", + /* The indent below is intentional */ + " Topic %s with %i partitions%s%s", mdt->topic, + mdt->partition_cnt, mdt->err ? ": " : "", + mdt->err ? rd_kafka_err2str(mdt->err) : ""); + + /* Ignore metadata completely for temporary errors. (issue #513) + * LEADER_NOT_AVAILABLE: Broker is rebalancing + */ + if (mdt->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE && + mdt->partition_cnt == 0) { + rd_rkb_dbg(rkb, TOPIC, "METADATA", + "Temporary error in metadata reply for " + "topic %s (PartCnt %i): %s: ignoring", + mdt->topic, mdt->partition_cnt, + rd_kafka_err2str(mdt->err)); + } else { + /* Update local topic & partition state based + * on metadata */ + rd_kafka_topic_metadata_update2(rkb, mdt, leader_epochs); + } +} + +/** + * @brief Only brokers with Metadata version >= 9 have reliable leader + * epochs. Before that version, leader epoch must be treated + * as missing (-1). + * + * @param rkb The broker + * @return Is this a broker version with reliable leader epochs? + * + * @locality rdkafka main thread + */ +rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb) { + int features; + int16_t ApiVersion = 0; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_Metadata, 0, 9, &features); + + return ApiVersion >= 9; +} + + +/** + * @brief Handle a Metadata response message. + * + * @param topics are the requested topics (may be NULL) + * + * The metadata will be marshalled into 'struct rd_kafka_metadata*' structs. + * + * The marshalled metadata is returned in \p *mdp, (NULL on error). + + * @returns an error code on parse failure, else NO_ERRRO. + * + * @locality rdkafka main thread + */ +rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, + rd_kafka_buf_t *request, + rd_kafka_buf_t *rkbuf, + struct rd_kafka_metadata **mdp) { + rd_kafka_t *rk = rkb->rkb_rk; + int i, j, k; + rd_tmpabuf_t tbuf; + struct rd_kafka_metadata *md = NULL; + size_t rkb_namelen; + const int log_decode_errors = LOG_ERR; + rd_list_t *missing_topics = NULL; + const rd_list_t *requested_topics = request->rkbuf_u.Metadata.topics; + rd_bool_t all_topics = request->rkbuf_u.Metadata.all_topics; + rd_bool_t cgrp_update = + request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp; + const char *reason = request->rkbuf_u.Metadata.reason + ? request->rkbuf_u.Metadata.reason + : "(no reason)"; + int ApiVersion = request->rkbuf_reqhdr.ApiVersion; + rd_kafkap_str_t cluster_id = RD_ZERO_INIT; + int32_t controller_id = -1; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + int broker_changes = 0; + int cache_changes = 0; + /** This array is reused and resized as necessary to hold per-partition + * leader epochs (ApiVersion >= 7). */ + rd_kafka_partition_leader_epoch_t *leader_epochs = NULL; + /** Number of allocated elements in leader_epochs. */ + size_t leader_epochs_size = 0; + rd_ts_t ts_start = rd_clock(); + + /* Ignore metadata updates when terminating */ + if (rd_kafka_terminating(rkb->rkb_rk)) { + err = RD_KAFKA_RESP_ERR__DESTROY; + goto done; + } + + rd_kafka_assert(NULL, thrd_is_current(rk->rk_thread)); + + /* Remove topics from missing_topics as they are seen in Metadata. */ + if (requested_topics) + missing_topics = + rd_list_copy(requested_topics, rd_list_string_copy, NULL); + + rd_kafka_broker_lock(rkb); + rkb_namelen = strlen(rkb->rkb_name) + 1; + /* We assume that the marshalled representation is + * no more than 4 times larger than the wire representation. */ + rd_tmpabuf_new(&tbuf, + sizeof(*md) + rkb_namelen + (rkbuf->rkbuf_totlen * 4), + 0 /*dont assert on fail*/); + + if (!(md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)))) { + rd_kafka_broker_unlock(rkb); + err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + goto err; + } + + md->orig_broker_id = rkb->rkb_nodeid; + md->orig_broker_name = + rd_tmpabuf_write(&tbuf, rkb->rkb_name, rkb_namelen); + rd_kafka_broker_unlock(rkb); + + if (ApiVersion >= 3) + rd_kafka_buf_read_throttle_time(rkbuf); + + /* Read Brokers */ + rd_kafka_buf_read_arraycnt(rkbuf, &md->broker_cnt, + RD_KAFKAP_BROKERS_MAX); + + if (!(md->brokers = rd_tmpabuf_alloc(&tbuf, md->broker_cnt * + sizeof(*md->brokers)))) + rd_kafka_buf_parse_fail(rkbuf, + "%d brokers: tmpabuf memory shortage", + md->broker_cnt); + + for (i = 0; i < md->broker_cnt; i++) { + rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].id); + rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, + md->brokers[i].host); + rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].port); + + if (ApiVersion >= 1) { + rd_kafkap_str_t rack; + rd_kafka_buf_read_str(rkbuf, &rack); + } + + rd_kafka_buf_skip_tags(rkbuf); + } + + if (ApiVersion >= 2) + rd_kafka_buf_read_str(rkbuf, &cluster_id); + + if (ApiVersion >= 1) { + rd_kafka_buf_read_i32(rkbuf, &controller_id); + rd_rkb_dbg(rkb, METADATA, "METADATA", + "ClusterId: %.*s, ControllerId: %" PRId32, + RD_KAFKAP_STR_PR(&cluster_id), controller_id); + } + + + + /* Read TopicMetadata */ + rd_kafka_buf_read_arraycnt(rkbuf, &md->topic_cnt, RD_KAFKAP_TOPICS_MAX); + rd_rkb_dbg(rkb, METADATA, "METADATA", "%i brokers, %i topics", + md->broker_cnt, md->topic_cnt); + + if (!(md->topics = + rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*md->topics)))) + rd_kafka_buf_parse_fail( + rkbuf, "%d topics: tmpabuf memory shortage", md->topic_cnt); + + for (i = 0; i < md->topic_cnt; i++) { + rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err); + rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, + md->topics[i].topic); + if (ApiVersion >= 1) { + int8_t is_internal; + rd_kafka_buf_read_i8(rkbuf, &is_internal); + } + + /* PartitionMetadata */ + rd_kafka_buf_read_arraycnt(rkbuf, &md->topics[i].partition_cnt, + RD_KAFKAP_PARTITIONS_MAX); + + if (!(md->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, md->topics[i].partition_cnt * + sizeof(*md->topics[i].partitions)))) + rd_kafka_buf_parse_fail(rkbuf, + "%s: %d partitions: " + "tmpabuf memory shortage", + md->topics[i].topic, + md->topics[i].partition_cnt); + + /* Resize reused leader_epochs array to fit this partition's + * leader epochs. */ + if (ApiVersion >= 7 && md->topics[i].partition_cnt > 0 && + (size_t)md->topics[i].partition_cnt > leader_epochs_size) { + leader_epochs_size = + RD_MAX(32, md->topics[i].partition_cnt); + leader_epochs = + rd_realloc(leader_epochs, sizeof(*leader_epochs) * + leader_epochs_size); + } + + for (j = 0; j < md->topics[i].partition_cnt; j++) { + rd_kafka_buf_read_i16a(rkbuf, + md->topics[i].partitions[j].err); + rd_kafka_buf_read_i32a(rkbuf, + md->topics[i].partitions[j].id); + rd_kafka_buf_read_i32a( + rkbuf, md->topics[i].partitions[j].leader); + if (ApiVersion >= 7) { + leader_epochs[j].partition_id = + md->topics[i].partitions[j].id; + rd_kafka_buf_read_i32( + rkbuf, &leader_epochs[j].leader_epoch); + } + + /* Replicas */ + rd_kafka_buf_read_arraycnt( + rkbuf, &md->topics[i].partitions[j].replica_cnt, + RD_KAFKAP_BROKERS_MAX); + + if (!(md->topics[i].partitions[j].replicas = + rd_tmpabuf_alloc( + &tbuf, + md->topics[i].partitions[j].replica_cnt * + sizeof(*md->topics[i] + .partitions[j] + .replicas)))) + rd_kafka_buf_parse_fail( + rkbuf, + "%s [%" PRId32 + "]: %d replicas: " + "tmpabuf memory shortage", + md->topics[i].topic, + md->topics[i].partitions[j].id, + md->topics[i].partitions[j].replica_cnt); + + + for (k = 0; k < md->topics[i].partitions[j].replica_cnt; + k++) + rd_kafka_buf_read_i32a( + rkbuf, + md->topics[i].partitions[j].replicas[k]); + + /* Isrs */ + rd_kafka_buf_read_arraycnt( + rkbuf, &md->topics[i].partitions[j].isr_cnt, + RD_KAFKAP_BROKERS_MAX); + + if (!(md->topics[i] + .partitions[j] + .isrs = rd_tmpabuf_alloc( + &tbuf, + md->topics[i].partitions[j].isr_cnt * + sizeof( + *md->topics[i].partitions[j].isrs)))) + rd_kafka_buf_parse_fail( + rkbuf, + "%s [%" PRId32 + "]: %d isrs: " + "tmpabuf memory shortage", + md->topics[i].topic, + md->topics[i].partitions[j].id, + md->topics[i].partitions[j].isr_cnt); + + + for (k = 0; k < md->topics[i].partitions[j].isr_cnt; + k++) + rd_kafka_buf_read_i32a( + rkbuf, md->topics[i].partitions[j].isrs[k]); + + if (ApiVersion >= 5) { + /* OfflineReplicas int32 array (ignored) */ + int32_t offline_replicas_cnt; + + /* #OfflineReplicas */ + rd_kafka_buf_read_arraycnt( + rkbuf, &offline_replicas_cnt, + RD_KAFKAP_BROKERS_MAX); + rd_kafka_buf_skip(rkbuf, offline_replicas_cnt * + sizeof(int32_t)); + } + + rd_kafka_buf_skip_tags(rkbuf); + } + + if (ApiVersion >= 8) { + int32_t TopicAuthorizedOperations; + /* TopicAuthorizedOperations */ + rd_kafka_buf_read_i32(rkbuf, + &TopicAuthorizedOperations); + } + + rd_kafka_buf_skip_tags(rkbuf); + + /* Ignore topics in blacklist */ + if (rkb->rkb_rk->rk_conf.topic_blacklist && + rd_kafka_pattern_match(rkb->rkb_rk->rk_conf.topic_blacklist, + md->topics[i].topic)) { + rd_rkb_dbg(rkb, TOPIC | RD_KAFKA_DBG_METADATA, + "BLACKLIST", + "Ignoring blacklisted topic \"%s\" " + "in metadata", + md->topics[i].topic); + continue; + } + + if (leader_epochs_size > 0 && + !rd_kafka_has_reliable_leader_epochs(rkb)) { + /* Prior to Kafka version 2.4 (which coincides with + * Metadata version 9), the broker does not propagate + * leader epoch information accurately while a + * reassignment is in progress. Relying on a stale + * epoch can lead to FENCED_LEADER_EPOCH errors which + * can prevent consumption throughout the course of + * a reassignment. It is safer in this case to revert + * to the behavior in previous protocol versions + * which checks leader status only. */ + leader_epochs_size = 0; + rd_free(leader_epochs); + leader_epochs = NULL; + } + + + /* Sort partitions by partition id */ + qsort(md->topics[i].partitions, md->topics[i].partition_cnt, + sizeof(*md->topics[i].partitions), + rd_kafka_metadata_partition_id_cmp); + if (leader_epochs_size > 0) { + /* And sort leader_epochs by partition id */ + qsort(leader_epochs, md->topics[i].partition_cnt, + sizeof(*leader_epochs), + rd_kafka_metadata_partition_leader_epoch_cmp); + } + + /* Update topic state based on the topic metadata */ + rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], + leader_epochs); + + + if (requested_topics) { + rd_list_free_cb(missing_topics, + rd_list_remove_cmp(missing_topics, + md->topics[i].topic, + (void *)strcmp)); + if (!all_topics) { + /* Only update cache when not asking + * for all topics. */ + + rd_kafka_wrlock(rk); + rd_kafka_metadata_cache_topic_update( + rk, &md->topics[i], + rd_false /*propagate later*/); + cache_changes++; + rd_kafka_wrunlock(rk); + } + } + } + + if (ApiVersion >= 8 && ApiVersion <= 10) { + int32_t ClusterAuthorizedOperations; + /* ClusterAuthorizedOperations */ + rd_kafka_buf_read_i32(rkbuf, &ClusterAuthorizedOperations); + } + + rd_kafka_buf_skip_tags(rkbuf); + + /* Entire Metadata response now parsed without errors: + * update our internal state according to the response. */ + + if (md->broker_cnt == 0 && md->topic_cnt == 0) { + rd_rkb_dbg(rkb, METADATA, "METADATA", + "No brokers or topics in metadata: should retry"); + err = RD_KAFKA_RESP_ERR__PARTIAL; + goto err; + } + + /* Update our list of brokers. */ + for (i = 0; i < md->broker_cnt; i++) { + rd_rkb_dbg(rkb, METADATA, "METADATA", + " Broker #%i/%i: %s:%i NodeId %" PRId32, i, + md->broker_cnt, md->brokers[i].host, + md->brokers[i].port, md->brokers[i].id); + rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, + &md->brokers[i], NULL); + } + + /* Requested topics not seen in metadata? Propogate to topic code. */ + if (missing_topics) { + char *topic; + rd_rkb_dbg(rkb, TOPIC, "METADATA", + "%d/%d requested topic(s) seen in metadata", + rd_list_cnt(requested_topics) - + rd_list_cnt(missing_topics), + rd_list_cnt(requested_topics)); + for (i = 0; i < rd_list_cnt(missing_topics); i++) + rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s", + (char *)(missing_topics->rl_elems[i])); + RD_LIST_FOREACH(topic, missing_topics, i) { + rd_kafka_topic_t *rkt; + + rkt = + rd_kafka_topic_find(rkb->rkb_rk, topic, 1 /*lock*/); + if (rkt) { + /* Received metadata response contained no + * information about topic 'rkt' and thus + * indicates the topic is not available in the + * cluster. + * Mark the topic as non-existent */ + rd_kafka_topic_wrlock(rkt); + rd_kafka_topic_set_notexists( + rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); + rd_kafka_topic_wrunlock(rkt); + + rd_kafka_topic_destroy0(rkt); + } + } + } + + + rd_kafka_wrlock(rkb->rkb_rk); + + rkb->rkb_rk->rk_ts_metadata = rd_clock(); + + /* Update cached cluster id. */ + if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 && + (!rk->rk_clusterid || + rd_kafkap_str_cmp_str(&cluster_id, rk->rk_clusterid))) { + rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_GENERIC, "CLUSTERID", + "ClusterId update \"%s\" -> \"%.*s\"", + rk->rk_clusterid ? rk->rk_clusterid : "", + RD_KAFKAP_STR_PR(&cluster_id)); + if (rk->rk_clusterid) { + rd_kafka_log(rk, LOG_WARNING, "CLUSTERID", + "Broker %s reports different ClusterId " + "\"%.*s\" than previously known \"%s\": " + "a client must not be simultaneously " + "connected to multiple clusters", + rd_kafka_broker_name(rkb), + RD_KAFKAP_STR_PR(&cluster_id), + rk->rk_clusterid); + rd_free(rk->rk_clusterid); + } + + rk->rk_clusterid = RD_KAFKAP_STR_DUP(&cluster_id); + /* rd_kafka_clusterid() waits for a cache update even though + * the clusterid is not in the cache itself. (#3620) */ + cache_changes++; + } + + /* Update controller id. */ + if (rkb->rkb_rk->rk_controllerid != controller_id) { + rd_rkb_dbg(rkb, BROKER, "CONTROLLERID", + "ControllerId update %" PRId32 " -> %" PRId32, + rkb->rkb_rk->rk_controllerid, controller_id); + rkb->rkb_rk->rk_controllerid = controller_id; + broker_changes++; + } + + if (all_topics) { + /* Expire all cache entries that were not updated. */ + rd_kafka_metadata_cache_evict_by_age(rkb->rkb_rk, ts_start); + + if (rkb->rkb_rk->rk_full_metadata) + rd_kafka_metadata_destroy( + rkb->rkb_rk->rk_full_metadata); + rkb->rkb_rk->rk_full_metadata = + rd_kafka_metadata_copy(md, tbuf.of); + rkb->rkb_rk->rk_ts_full_metadata = rkb->rkb_rk->rk_ts_metadata; + rd_rkb_dbg(rkb, METADATA, "METADATA", + "Caching full metadata with " + "%d broker(s) and %d topic(s): %s", + md->broker_cnt, md->topic_cnt, reason); + } else { + if (cache_changes) + rd_kafka_metadata_cache_propagate_changes(rk); + rd_kafka_metadata_cache_expiry_start(rk); + } + + /* Remove cache hints for the originally requested topics. */ + if (requested_topics) + rd_kafka_metadata_cache_purge_hints(rk, requested_topics); + + rd_kafka_wrunlock(rkb->rkb_rk); + + if (broker_changes) { + /* Broadcast broker metadata changes to listeners. */ + rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk); + } + + /* Check if cgrp effective subscription is affected by + * new topic metadata. + * Ignore if this was a broker-only refresh (no topics), or + * the request was from the partition assignor (!cgrp_update) + * which may contain only a sub-set of the subscribed topics (namely + * the effective subscription of available topics) as to not + * propagate non-included topics as non-existent. */ + if (cgrp_update && (requested_topics || all_topics)) + rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp, + rd_true /*do join*/); + + /* Try to acquire a Producer ID from this broker if we + * don't have one. */ + if (rd_kafka_is_idempotent(rkb->rkb_rk)) { + rd_kafka_wrlock(rkb->rkb_rk); + rd_kafka_idemp_pid_fsm(rkb->rkb_rk); + rd_kafka_wrunlock(rkb->rkb_rk); + } + +done: + if (missing_topics) + rd_list_destroy(missing_topics); + + if (leader_epochs) + rd_free(leader_epochs); + + /* This metadata request was triggered by someone wanting + * the metadata information back as a reply, so send that reply now. + * In this case we must not rd_free the metadata memory here, + * the requestee will do. + * The tbuf is explicitly not destroyed as we return its memory + * to the caller. */ + *mdp = md; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + err = rkbuf->rkbuf_err; +err: + if (requested_topics) { + /* Failed requests shall purge cache hints for + * the requested topics. */ + rd_kafka_wrlock(rkb->rkb_rk); + rd_kafka_metadata_cache_purge_hints(rk, requested_topics); + rd_kafka_wrunlock(rkb->rkb_rk); + } + + if (missing_topics) + rd_list_destroy(missing_topics); + + if (leader_epochs) + rd_free(leader_epochs); + + rd_tmpabuf_destroy(&tbuf); + + return err; +} + + +/** + * @brief Add all topics in current cached full metadata + * that matches the topics in \p match + * to \p tinfos (rd_kafka_topic_info_t *). + * + * @param errored Any topic or wildcard pattern that did not match + * an available topic will be added to this list with + * the appropriate error set. + * + * @returns the number of topics matched and added to \p list + * + * @locks none + * @locality any + */ +size_t +rd_kafka_metadata_topic_match(rd_kafka_t *rk, + rd_list_t *tinfos, + const rd_kafka_topic_partition_list_t *match, + rd_kafka_topic_partition_list_t *errored) { + int ti, i; + size_t cnt = 0; + const struct rd_kafka_metadata *metadata; + rd_kafka_topic_partition_list_t *unmatched; + + rd_kafka_rdlock(rk); + metadata = rk->rk_full_metadata; + if (!metadata) { + rd_kafka_rdunlock(rk); + return 0; + } + + /* To keep track of which patterns and topics in `match` that + * did not match any topic (or matched an errored topic), we + * create a set of all topics to match in `unmatched` and then + * remove from this set as a match is found. + * Whatever remains in `unmatched` after all matching is performed + * are the topics and patterns that did not match a topic. */ + unmatched = rd_kafka_topic_partition_list_copy(match); + + /* For each topic in the cluster, scan through the match list + * to find matching topic. */ + for (ti = 0; ti < metadata->topic_cnt; ti++) { + const char *topic = metadata->topics[ti].topic; + + /* Ignore topics in blacklist */ + if (rk->rk_conf.topic_blacklist && + rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic)) + continue; + + /* Scan for matches */ + for (i = 0; i < match->cnt; i++) { + if (!rd_kafka_topic_match(rk, match->elems[i].topic, + topic)) + continue; + + /* Remove from unmatched */ + rd_kafka_topic_partition_list_del( + unmatched, match->elems[i].topic, + RD_KAFKA_PARTITION_UA); + + if (metadata->topics[ti].err) { + rd_kafka_topic_partition_list_add( + errored, topic, RD_KAFKA_PARTITION_UA) + ->err = metadata->topics[ti].err; + continue; /* Skip errored topics */ + } + + rd_list_add( + tinfos, + rd_kafka_topic_info_new( + topic, metadata->topics[ti].partition_cnt)); + + cnt++; + } + } + rd_kafka_rdunlock(rk); + + /* Any topics/patterns still in unmatched did not match any + * existing topics, add them to `errored`. */ + for (i = 0; i < unmatched->cnt; i++) { + rd_kafka_topic_partition_t *elem = &unmatched->elems[i]; + + rd_kafka_topic_partition_list_add(errored, elem->topic, + RD_KAFKA_PARTITION_UA) + ->err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; + } + + rd_kafka_topic_partition_list_destroy(unmatched); + + return cnt; +} + + +/** + * @brief Add all topics in \p match that matches cached metadata. + * @remark MUST NOT be used with wildcard topics, + * see rd_kafka_metadata_topic_match() for that. + * + * @param errored Non-existent and unauthorized topics are added to this + * list with the appropriate error code. + * + * @returns the number of topics matched and added to \p tinfos + * @locks none + */ +size_t +rd_kafka_metadata_topic_filter(rd_kafka_t *rk, + rd_list_t *tinfos, + const rd_kafka_topic_partition_list_t *match, + rd_kafka_topic_partition_list_t *errored) { + int i; + size_t cnt = 0; + + rd_kafka_rdlock(rk); + /* For each topic in match, look up the topic in the cache. */ + for (i = 0; i < match->cnt; i++) { + const char *topic = match->elems[i].topic; + const rd_kafka_metadata_topic_t *mtopic; + + /* Ignore topics in blacklist */ + if (rk->rk_conf.topic_blacklist && + rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic)) + continue; + + mtopic = + rd_kafka_metadata_cache_topic_get(rk, topic, 1 /*valid*/); + + if (!mtopic) + rd_kafka_topic_partition_list_add(errored, topic, + RD_KAFKA_PARTITION_UA) + ->err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; + else if (mtopic->err) + rd_kafka_topic_partition_list_add(errored, topic, + RD_KAFKA_PARTITION_UA) + ->err = mtopic->err; + else { + rd_list_add(tinfos, rd_kafka_topic_info_new( + topic, mtopic->partition_cnt)); + + cnt++; + } + } + rd_kafka_rdunlock(rk); + + return cnt; +} + + +void rd_kafka_metadata_log(rd_kafka_t *rk, + const char *fac, + const struct rd_kafka_metadata *md) { + int i; + + rd_kafka_dbg(rk, METADATA, fac, + "Metadata with %d broker(s) and %d topic(s):", + md->broker_cnt, md->topic_cnt); + + for (i = 0; i < md->broker_cnt; i++) { + rd_kafka_dbg(rk, METADATA, fac, + " Broker #%i/%i: %s:%i NodeId %" PRId32, i, + md->broker_cnt, md->brokers[i].host, + md->brokers[i].port, md->brokers[i].id); + } + + for (i = 0; i < md->topic_cnt; i++) { + rd_kafka_dbg( + rk, METADATA, fac, + " Topic #%i/%i: %s with %i partitions%s%s", i, + md->topic_cnt, md->topics[i].topic, + md->topics[i].partition_cnt, md->topics[i].err ? ": " : "", + md->topics[i].err ? rd_kafka_err2str(md->topics[i].err) + : ""); + } +} + + + +/** + * @brief Refresh metadata for \p topics + * + * @param rk: used to look up usable broker if \p rkb is NULL. + * @param rkb: use this broker, unless NULL then any usable broker from \p rk + * @param force: force refresh even if topics are up-to-date in cache + * @param allow_auto_create: Enable/disable auto creation of topics + * (through MetadataRequest). Requires a modern + * broker version. + * Takes precedence over allow.auto.create.topics. + * @param cgrp_update: Allow consumer group state update on response. + * + * @returns an error code + * + * @locality any + * @locks none + */ +rd_kafka_resp_err_t +rd_kafka_metadata_refresh_topics(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + const rd_list_t *topics, + rd_bool_t force, + rd_bool_t allow_auto_create, + rd_bool_t cgrp_update, + const char *reason) { + rd_list_t q_topics; + int destroy_rkb = 0; + + if (!rk) { + rd_assert(rkb); + rk = rkb->rkb_rk; + } + + rd_kafka_wrlock(rk); + + if (!rkb) { + if (!(rkb = rd_kafka_broker_any_usable( + rk, RD_POLL_NOWAIT, RD_DONT_LOCK, 0, reason))) { + /* Hint cache that something is interested in + * these topics so that they will be included in + * a future all known_topics query. */ + rd_kafka_metadata_cache_hint(rk, topics, NULL, + RD_KAFKA_RESP_ERR__NOENT, + 0 /*dont replace*/); + + rd_kafka_wrunlock(rk); + rd_kafka_dbg(rk, METADATA, "METADATA", + "Skipping metadata refresh of %d topic(s):" + " %s: no usable brokers", + rd_list_cnt(topics), reason); + + return RD_KAFKA_RESP_ERR__TRANSPORT; + } + destroy_rkb = 1; + } + + rd_list_init(&q_topics, rd_list_cnt(topics), rd_free); + + if (!force) { + + /* Hint cache of upcoming MetadataRequest and filter + * out any topics that are already being requested. + * q_topics will contain remaining topics to query. */ + rd_kafka_metadata_cache_hint(rk, topics, &q_topics, + RD_KAFKA_RESP_ERR__WAIT_CACHE, + rd_false /*dont replace*/); + rd_kafka_wrunlock(rk); + + if (rd_list_cnt(&q_topics) == 0) { + /* No topics need new query. */ + rd_kafka_dbg(rk, METADATA, "METADATA", + "Skipping metadata refresh of " + "%d topic(s): %s: " + "already being requested", + rd_list_cnt(topics), reason); + rd_list_destroy(&q_topics); + if (destroy_rkb) + rd_kafka_broker_destroy(rkb); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + } else { + rd_kafka_wrunlock(rk); + rd_list_copy_to(&q_topics, topics, rd_list_string_copy, NULL); + } + + rd_kafka_dbg(rk, METADATA, "METADATA", + "Requesting metadata for %d/%d topics: %s", + rd_list_cnt(&q_topics), rd_list_cnt(topics), reason); + + rd_kafka_MetadataRequest(rkb, &q_topics, reason, allow_auto_create, + cgrp_update, NULL); + + rd_list_destroy(&q_topics); + + if (destroy_rkb) + rd_kafka_broker_destroy(rkb); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Refresh metadata for known topics + * + * @param rk: used to look up usable broker if \p rkb is NULL. + * @param rkb: use this broker, unless NULL then any usable broker from \p rk + * @param force: refresh even if cache is up-to-date + * + * @returns an error code (__UNKNOWN_TOPIC if there are no local topics) + * + * @locality any + * @locks none + */ +rd_kafka_resp_err_t +rd_kafka_metadata_refresh_known_topics(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_bool_t force, + const char *reason) { + rd_list_t topics; + rd_kafka_resp_err_t err; + int cache_cnt = 0; + rd_bool_t allow_auto_create_topics; + + if (!rk) + rk = rkb->rkb_rk; + + rd_list_init(&topics, 8, rd_free); + rd_kafka_local_topics_to_list(rk, &topics, &cache_cnt); + + /* Allow topic auto creation if there are locally known topics (rkt) + * and not just cached (to be queried) topics. */ + allow_auto_create_topics = rk->rk_conf.allow_auto_create_topics && + rd_list_cnt(&topics) > cache_cnt; + + if (rd_list_cnt(&topics) == 0) + err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; + else + err = rd_kafka_metadata_refresh_topics( + rk, rkb, &topics, force, allow_auto_create_topics, + rd_false /*!cgrp_update*/, reason); + + rd_list_destroy(&topics); + + return err; +} + + +/** + * @brief Refresh metadata for known and subscribed topics. + * + * @param rk used to look up usable broker if \p rkb is NULL.. + * @param rkb use this broker, unless NULL then any usable broker from \p rk. + * @param reason reason of refresh, used in debug logs. + * + * @returns an error code (ERR__UNKNOWN_TOPIC if no topics are desired). + * + * @locality rdkafka main thread + * @locks_required none + * @locks_acquired rk(read) + */ +rd_kafka_resp_err_t +rd_kafka_metadata_refresh_consumer_topics(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + const char *reason) { + rd_list_t topics; + rd_kafka_resp_err_t err; + rd_kafka_cgrp_t *rkcg; + rd_bool_t allow_auto_create_topics = + rk->rk_conf.allow_auto_create_topics; + int cache_cnt = 0; + + if (!rk) { + rd_assert(rkb); + rk = rkb->rkb_rk; + } + + rkcg = rk->rk_cgrp; + rd_assert(rkcg != NULL); + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) { + /* If there is a wildcard subscription we need to request + * all topics in the cluster so that we can perform + * regexp matching. */ + return rd_kafka_metadata_refresh_all(rk, rkb, reason); + } + + rd_list_init(&topics, 8, rd_free); + + /* Add locally known topics, i.e., those that are currently + * being consumed or otherwise referenced through topic_t objects. */ + rd_kafka_local_topics_to_list(rk, &topics, &cache_cnt); + if (rd_list_cnt(&topics) == cache_cnt) + allow_auto_create_topics = rd_false; + + /* Add subscribed (non-wildcard) topics, if any. */ + if (rkcg->rkcg_subscription) + rd_kafka_topic_partition_list_get_topic_names( + rkcg->rkcg_subscription, &topics, + rd_false /*no wildcards*/); + + if (rd_list_cnt(&topics) == 0) + err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; + else + err = rd_kafka_metadata_refresh_topics( + rk, rkb, &topics, rd_true /*force*/, + allow_auto_create_topics, rd_true /*cgrp_update*/, reason); + + rd_list_destroy(&topics); + + return err; +} + + +/** + * @brief Refresh broker list by metadata. + * + * Attempts to use sparse metadata request if possible, else falls back + * on a full metadata request. (NOTE: sparse not implemented, KIP-4) + * + * @param rk: used to look up usable broker if \p rkb is NULL. + * @param rkb: use this broker, unless NULL then any usable broker from \p rk + * + * @returns an error code + * + * @locality any + * @locks none + */ +rd_kafka_resp_err_t rd_kafka_metadata_refresh_brokers(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + const char *reason) { + return rd_kafka_metadata_request(rk, rkb, NULL /*brokers only*/, + rd_false /*!allow auto create topics*/, + rd_false /*no cgrp update */, reason, + NULL); +} + + + +/** + * @brief Refresh metadata for all topics in cluster. + * This is a full metadata request which might be taxing on the + * broker if the cluster has many topics. + * + * @locality any + * @locks none + */ +rd_kafka_resp_err_t rd_kafka_metadata_refresh_all(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + const char *reason) { + int destroy_rkb = 0; + rd_list_t topics; + + if (!rk) { + rd_assert(rkb); + rk = rkb->rkb_rk; + } + + if (!rkb) { + if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, + RD_DO_LOCK, 0, reason))) + return RD_KAFKA_RESP_ERR__TRANSPORT; + destroy_rkb = 1; + } + + rd_list_init(&topics, 0, NULL); /* empty list = all topics */ + rd_kafka_MetadataRequest(rkb, &topics, reason, + rd_false /*no auto create*/, + rd_true /*cgrp update*/, NULL); + rd_list_destroy(&topics); + + if (destroy_rkb) + rd_kafka_broker_destroy(rkb); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + + * @brief Lower-level Metadata request that takes a callback (with replyq set) + * which will be triggered after parsing is complete. + * + * @param cgrp_update Allow consumer group updates from the response. + * + * @locks none + * @locality any + */ +rd_kafka_resp_err_t +rd_kafka_metadata_request(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + const rd_list_t *topics, + rd_bool_t allow_auto_create_topics, + rd_bool_t cgrp_update, + const char *reason, + rd_kafka_op_t *rko) { + int destroy_rkb = 0; + + if (!rkb) { + if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, + RD_DO_LOCK, 0, reason))) + return RD_KAFKA_RESP_ERR__TRANSPORT; + destroy_rkb = 1; + } + + rd_kafka_MetadataRequest(rkb, topics, reason, allow_auto_create_topics, + cgrp_update, rko); + + if (destroy_rkb) + rd_kafka_broker_destroy(rkb); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Query timer callback to trigger refresh for topics + * that have partitions missing their leaders. + * + * @locks none + * @locality rdkafka main thread + */ +static void rd_kafka_metadata_leader_query_tmr_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_t *rk = rkts->rkts_rk; + rd_kafka_timer_t *rtmr = &rk->rk_metadata_cache.rkmc_query_tmr; + rd_kafka_topic_t *rkt; + rd_list_t topics; + + rd_kafka_wrlock(rk); + rd_list_init(&topics, rk->rk_topic_cnt, rd_free); + + TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { + int i, require_metadata; + rd_kafka_topic_rdlock(rkt); + + if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) { + /* Skip topics that are known to not exist. */ + rd_kafka_topic_rdunlock(rkt); + continue; + } + + require_metadata = + rkt->rkt_flags & RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; + + /* Check if any partitions are missing brokers. */ + for (i = 0; !require_metadata && i < rkt->rkt_partition_cnt; + i++) { + rd_kafka_toppar_t *rktp = rkt->rkt_p[i]; + rd_kafka_toppar_lock(rktp); + require_metadata = + !rktp->rktp_broker && !rktp->rktp_next_broker; + rd_kafka_toppar_unlock(rktp); + } + + if (require_metadata || rkt->rkt_partition_cnt == 0) + rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str)); + + rd_kafka_topic_rdunlock(rkt); + } + + rd_kafka_wrunlock(rk); + + if (rd_list_cnt(&topics) == 0) { + /* No leader-less topics+partitions, stop the timer. */ + rd_kafka_timer_stop(rkts, rtmr, 1 /*lock*/); + } else { + rd_kafka_metadata_refresh_topics( + rk, NULL, &topics, rd_true /*force*/, + rk->rk_conf.allow_auto_create_topics, + rd_false /*!cgrp_update*/, "partition leader query"); + /* Back off next query exponentially until we reach + * the standard query interval - then stop the timer + * since the intervalled querier will do the job for us. */ + if (rk->rk_conf.metadata_refresh_interval_ms > 0 && + rtmr->rtmr_interval * 2 / 1000 >= + rk->rk_conf.metadata_refresh_interval_ms) + rd_kafka_timer_stop(rkts, rtmr, 1 /*lock*/); + else + rd_kafka_timer_exp_backoff(rkts, rtmr); + } + + rd_list_destroy(&topics); +} + + + +/** + * @brief Trigger fast leader query to quickly pick up on leader changes. + * The fast leader query is a quick query followed by later queries at + * exponentially increased intervals until no topics are missing + * leaders. + * + * @locks none + * @locality any + */ +void rd_kafka_metadata_fast_leader_query(rd_kafka_t *rk) { + rd_ts_t next; + + /* Restart the timer if it will speed things up. */ + next = rd_kafka_timer_next( + &rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr, 1 /*lock*/); + if (next == -1 /* not started */ || + next > + (rd_ts_t)rk->rk_conf.metadata_refresh_fast_interval_ms * 1000) { + rd_kafka_dbg(rk, METADATA | RD_KAFKA_DBG_TOPIC, "FASTQUERY", + "Starting fast leader query"); + rd_kafka_timer_start( + &rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr, + rk->rk_conf.metadata_refresh_fast_interval_ms * 1000, + rd_kafka_metadata_leader_query_tmr_cb, NULL); + } +} + + + +/** + * @brief Create mock Metadata (for testing) based on the provided topics. + * + * @param topics elements are checked for .topic and .partition_cnt + * @param topic_cnt is the number of topic elements in \p topics. + * + * @returns a newly allocated metadata object that must be freed with + * rd_kafka_metadata_destroy(). + * + * @sa rd_kafka_metadata_copy() + */ +rd_kafka_metadata_t * +rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, + size_t topic_cnt) { + rd_kafka_metadata_t *md; + rd_tmpabuf_t tbuf; + size_t topic_names_size = 0; + int total_partition_cnt = 0; + size_t i; + + /* Calculate total partition count and topic names size before + * allocating memory. */ + for (i = 0; i < topic_cnt; i++) { + topic_names_size += 1 + strlen(topics[i].topic); + total_partition_cnt += topics[i].partition_cnt; + } + + + /* Allocate contiguous buffer which will back all the memory + * needed by the final metadata_t object */ + rd_tmpabuf_new( + &tbuf, + sizeof(*md) + (sizeof(*md->topics) * topic_cnt) + topic_names_size + + (64 /*topic name size..*/ * topic_cnt) + + (sizeof(*md->topics[0].partitions) * total_partition_cnt), + 1 /*assert on fail*/); + + md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)); + memset(md, 0, sizeof(*md)); + + md->topic_cnt = (int)topic_cnt; + md->topics = + rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*md->topics)); + + for (i = 0; i < (size_t)md->topic_cnt; i++) { + int j; + + md->topics[i].topic = + rd_tmpabuf_write_str(&tbuf, topics[i].topic); + md->topics[i].partition_cnt = topics[i].partition_cnt; + md->topics[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; + + md->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, md->topics[i].partition_cnt * + sizeof(*md->topics[i].partitions)); + + for (j = 0; j < md->topics[i].partition_cnt; j++) { + memset(&md->topics[i].partitions[j], 0, + sizeof(md->topics[i].partitions[j])); + md->topics[i].partitions[j].id = j; + } + } + + /* Check for tmpabuf errors */ + if (rd_tmpabuf_failed(&tbuf)) + rd_assert(!*"metadata mock failed"); + + /* Not destroying the tmpabuf since we return + * its allocated memory. */ + return md; +} + + +/** + * @brief Create mock Metadata (for testing) based on the + * var-arg tuples of (const char *topic, int partition_cnt). + * + * @param topic_cnt is the number of topic,partition_cnt tuples. + * + * @returns a newly allocated metadata object that must be freed with + * rd_kafka_metadata_destroy(). + * + * @sa rd_kafka_metadata_new_topic_mock() + */ +rd_kafka_metadata_t *rd_kafka_metadata_new_topic_mockv(size_t topic_cnt, ...) { + rd_kafka_metadata_topic_t *topics; + va_list ap; + size_t i; + + topics = rd_alloca(sizeof(*topics) * topic_cnt); + + va_start(ap, topic_cnt); + for (i = 0; i < topic_cnt; i++) { + topics[i].topic = va_arg(ap, char *); + topics[i].partition_cnt = va_arg(ap, int); + } + va_end(ap); + + return rd_kafka_metadata_new_topic_mock(topics, topic_cnt); +} |