/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2013, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "rd.h" #include "rdkafka_int.h" #include "rdkafka_topic.h" #include "rdkafka_broker.h" #include "rdkafka_request.h" #include "rdkafka_idempotence.h" #include "rdkafka_metadata.h" #include #include rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t *rk, int all_topics, rd_kafka_topic_t *only_rkt, const struct rd_kafka_metadata **metadatap, int timeout_ms) { rd_kafka_q_t *rkq; rd_kafka_broker_t *rkb; rd_kafka_op_t *rko; rd_ts_t ts_end = rd_timeout_init(timeout_ms); rd_list_t topics; rd_bool_t allow_auto_create_topics = rk->rk_conf.allow_auto_create_topics; /* Query any broker that is up, and if none are up pick the first one, * if we're lucky it will be up before the timeout */ rkb = rd_kafka_broker_any_usable(rk, timeout_ms, RD_DO_LOCK, 0, "application metadata request"); if (!rkb) return RD_KAFKA_RESP_ERR__TRANSPORT; rkq = rd_kafka_q_new(rk); rd_list_init(&topics, 0, rd_free); if (!all_topics) { if (only_rkt) rd_list_add(&topics, rd_strdup(rd_kafka_topic_name(only_rkt))); else { int cache_cnt; rd_kafka_local_topics_to_list(rkb->rkb_rk, &topics, &cache_cnt); /* Don't trigger auto-create for cached topics */ if (rd_list_cnt(&topics) == cache_cnt) allow_auto_create_topics = rd_true; } } /* Async: request metadata */ rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA); rd_kafka_op_set_replyq(rko, rkq, 0); rko->rko_u.metadata.force = 1; /* Force metadata request regardless * of outstanding metadata requests. */ rd_kafka_MetadataRequest(rkb, &topics, "application requested", allow_auto_create_topics, /* cgrp_update: * Only update consumer group state * on response if this lists all * topics in the cluster, since a * partial request may make it seem * like some subscribed topics are missing. */ all_topics ? rd_true : rd_false, rko); rd_list_destroy(&topics); rd_kafka_broker_destroy(rkb); /* Wait for reply (or timeout) */ rko = rd_kafka_q_pop(rkq, rd_timeout_remains_us(ts_end), 0); rd_kafka_q_destroy_owner(rkq); /* Timeout */ if (!rko) return RD_KAFKA_RESP_ERR__TIMED_OUT; /* Error */ if (rko->rko_err) { rd_kafka_resp_err_t err = rko->rko_err; rd_kafka_op_destroy(rko); return err; } /* Reply: pass metadata pointer to application who now owns it*/ rd_kafka_assert(rk, rko->rko_u.metadata.md); *metadatap = rko->rko_u.metadata.md; rko->rko_u.metadata.md = NULL; rd_kafka_op_destroy(rko); return RD_KAFKA_RESP_ERR_NO_ERROR; } void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata) { rd_free((void *)metadata); } /** * @returns a newly allocated copy of metadata \p src of size \p size */ struct rd_kafka_metadata * rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { struct rd_kafka_metadata *md; rd_tmpabuf_t tbuf; int i; /* metadata is stored in one contigious buffer where structs and * and pointed-to fields are layed out in a memory aligned fashion. * rd_tmpabuf_t provides the infrastructure to do this. * Because of this we copy all the structs verbatim but * any pointer fields needs to be copied explicitly to update * the pointer address. */ rd_tmpabuf_new(&tbuf, size, 1 /*assert on fail*/); md = rd_tmpabuf_write(&tbuf, src, sizeof(*md)); rd_tmpabuf_write_str(&tbuf, src->orig_broker_name); /* Copy Brokers */ md->brokers = rd_tmpabuf_write(&tbuf, src->brokers, md->broker_cnt * sizeof(*md->brokers)); for (i = 0; i < md->broker_cnt; i++) md->brokers[i].host = rd_tmpabuf_write_str(&tbuf, src->brokers[i].host); /* Copy TopicMetadata */ md->topics = rd_tmpabuf_write(&tbuf, src->topics, md->topic_cnt * sizeof(*md->topics)); for (i = 0; i < md->topic_cnt; i++) { int j; md->topics[i].topic = rd_tmpabuf_write_str(&tbuf, src->topics[i].topic); /* Copy partitions */ md->topics[i].partitions = rd_tmpabuf_write(&tbuf, src->topics[i].partitions, md->topics[i].partition_cnt * sizeof(*md->topics[i].partitions)); for (j = 0; j < md->topics[i].partition_cnt; j++) { /* Copy replicas and ISRs */ md->topics[i].partitions[j].replicas = rd_tmpabuf_write( &tbuf, src->topics[i].partitions[j].replicas, md->topics[i].partitions[j].replica_cnt * sizeof(*md->topics[i].partitions[j].replicas)); md->topics[i].partitions[j].isrs = rd_tmpabuf_write( &tbuf, src->topics[i].partitions[j].isrs, md->topics[i].partitions[j].isr_cnt * sizeof(*md->topics[i].partitions[j].isrs)); } } /* Check for tmpabuf errors */ if (rd_tmpabuf_failed(&tbuf)) rd_kafka_assert(NULL, !*"metadata copy failed"); /* Delibarely not destroying the tmpabuf since we return * its allocated memory. */ return md; } /** * @brief Partition (id) comparator for partition_id_leader_epoch struct. */ static int rd_kafka_metadata_partition_leader_epoch_cmp(const void *_a, const void *_b) { const rd_kafka_partition_leader_epoch_t *a = _a, *b = _b; return RD_CMP(a->partition_id, b->partition_id); } /** * @brief Update topic state and information based on topic metadata. * * @param mdt Topic metadata. * @param leader_epochs Per-partition leader epoch array, or NULL if not known. * * @locality rdkafka main thread * @locks_acquired rd_kafka_wrlock(rk) */ static void rd_kafka_parse_Metadata_update_topic( rd_kafka_broker_t *rkb, const rd_kafka_metadata_topic_t *mdt, const rd_kafka_partition_leader_epoch_t *leader_epochs) { rd_rkb_dbg(rkb, METADATA, "METADATA", /* The indent below is intentional */ " Topic %s with %i partitions%s%s", mdt->topic, mdt->partition_cnt, mdt->err ? ": " : "", mdt->err ? rd_kafka_err2str(mdt->err) : ""); /* Ignore metadata completely for temporary errors. (issue #513) * LEADER_NOT_AVAILABLE: Broker is rebalancing */ if (mdt->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE && mdt->partition_cnt == 0) { rd_rkb_dbg(rkb, TOPIC, "METADATA", "Temporary error in metadata reply for " "topic %s (PartCnt %i): %s: ignoring", mdt->topic, mdt->partition_cnt, rd_kafka_err2str(mdt->err)); } else { /* Update local topic & partition state based * on metadata */ rd_kafka_topic_metadata_update2(rkb, mdt, leader_epochs); } } /** * @brief Only brokers with Metadata version >= 9 have reliable leader * epochs. Before that version, leader epoch must be treated * as missing (-1). * * @param rkb The broker * @return Is this a broker version with reliable leader epochs? * * @locality rdkafka main thread */ rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb) { int features; int16_t ApiVersion = 0; ApiVersion = rd_kafka_broker_ApiVersion_supported( rkb, RD_KAFKAP_Metadata, 0, 9, &features); return ApiVersion >= 9; } /** * @brief Handle a Metadata response message. * * @param topics are the requested topics (may be NULL) * * The metadata will be marshalled into 'struct rd_kafka_metadata*' structs. * * The marshalled metadata is returned in \p *mdp, (NULL on error). * @returns an error code on parse failure, else NO_ERRRO. * * @locality rdkafka main thread */ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_buf_t *request, rd_kafka_buf_t *rkbuf, struct rd_kafka_metadata **mdp) { rd_kafka_t *rk = rkb->rkb_rk; int i, j, k; rd_tmpabuf_t tbuf; struct rd_kafka_metadata *md = NULL; size_t rkb_namelen; const int log_decode_errors = LOG_ERR; rd_list_t *missing_topics = NULL; const rd_list_t *requested_topics = request->rkbuf_u.Metadata.topics; rd_bool_t all_topics = request->rkbuf_u.Metadata.all_topics; rd_bool_t cgrp_update = request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp; const char *reason = request->rkbuf_u.Metadata.reason ? request->rkbuf_u.Metadata.reason : "(no reason)"; int ApiVersion = request->rkbuf_reqhdr.ApiVersion; rd_kafkap_str_t cluster_id = RD_ZERO_INIT; int32_t controller_id = -1; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int broker_changes = 0; int cache_changes = 0; /** This array is reused and resized as necessary to hold per-partition * leader epochs (ApiVersion >= 7). */ rd_kafka_partition_leader_epoch_t *leader_epochs = NULL; /** Number of allocated elements in leader_epochs. */ size_t leader_epochs_size = 0; rd_ts_t ts_start = rd_clock(); /* Ignore metadata updates when terminating */ if (rd_kafka_terminating(rkb->rkb_rk)) { err = RD_KAFKA_RESP_ERR__DESTROY; goto done; } rd_kafka_assert(NULL, thrd_is_current(rk->rk_thread)); /* Remove topics from missing_topics as they are seen in Metadata. */ if (requested_topics) missing_topics = rd_list_copy(requested_topics, rd_list_string_copy, NULL); rd_kafka_broker_lock(rkb); rkb_namelen = strlen(rkb->rkb_name) + 1; /* We assume that the marshalled representation is * no more than 4 times larger than the wire representation. */ rd_tmpabuf_new(&tbuf, sizeof(*md) + rkb_namelen + (rkbuf->rkbuf_totlen * 4), 0 /*dont assert on fail*/); if (!(md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)))) { rd_kafka_broker_unlock(rkb); err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; goto err; } md->orig_broker_id = rkb->rkb_nodeid; md->orig_broker_name = rd_tmpabuf_write(&tbuf, rkb->rkb_name, rkb_namelen); rd_kafka_broker_unlock(rkb); if (ApiVersion >= 3) rd_kafka_buf_read_throttle_time(rkbuf); /* Read Brokers */ rd_kafka_buf_read_arraycnt(rkbuf, &md->broker_cnt, RD_KAFKAP_BROKERS_MAX); if (!(md->brokers = rd_tmpabuf_alloc(&tbuf, md->broker_cnt * sizeof(*md->brokers)))) rd_kafka_buf_parse_fail(rkbuf, "%d brokers: tmpabuf memory shortage", md->broker_cnt); for (i = 0; i < md->broker_cnt; i++) { rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].id); rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, md->brokers[i].host); rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].port); if (ApiVersion >= 1) { rd_kafkap_str_t rack; rd_kafka_buf_read_str(rkbuf, &rack); } rd_kafka_buf_skip_tags(rkbuf); } if (ApiVersion >= 2) rd_kafka_buf_read_str(rkbuf, &cluster_id); if (ApiVersion >= 1) { rd_kafka_buf_read_i32(rkbuf, &controller_id); rd_rkb_dbg(rkb, METADATA, "METADATA", "ClusterId: %.*s, ControllerId: %" PRId32, RD_KAFKAP_STR_PR(&cluster_id), controller_id); } /* Read TopicMetadata */ rd_kafka_buf_read_arraycnt(rkbuf, &md->topic_cnt, RD_KAFKAP_TOPICS_MAX); rd_rkb_dbg(rkb, METADATA, "METADATA", "%i brokers, %i topics", md->broker_cnt, md->topic_cnt); if (!(md->topics = rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*md->topics)))) rd_kafka_buf_parse_fail( rkbuf, "%d topics: tmpabuf memory shortage", md->topic_cnt); for (i = 0; i < md->topic_cnt; i++) { rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err); rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, md->topics[i].topic); if (ApiVersion >= 1) { int8_t is_internal; rd_kafka_buf_read_i8(rkbuf, &is_internal); } /* PartitionMetadata */ rd_kafka_buf_read_arraycnt(rkbuf, &md->topics[i].partition_cnt, RD_KAFKAP_PARTITIONS_MAX); if (!(md->topics[i].partitions = rd_tmpabuf_alloc( &tbuf, md->topics[i].partition_cnt * sizeof(*md->topics[i].partitions)))) rd_kafka_buf_parse_fail(rkbuf, "%s: %d partitions: " "tmpabuf memory shortage", md->topics[i].topic, md->topics[i].partition_cnt); /* Resize reused leader_epochs array to fit this partition's * leader epochs. */ if (ApiVersion >= 7 && md->topics[i].partition_cnt > 0 && (size_t)md->topics[i].partition_cnt > leader_epochs_size) { leader_epochs_size = RD_MAX(32, md->topics[i].partition_cnt); leader_epochs = rd_realloc(leader_epochs, sizeof(*leader_epochs) * leader_epochs_size); } for (j = 0; j < md->topics[i].partition_cnt; j++) { rd_kafka_buf_read_i16a(rkbuf, md->topics[i].partitions[j].err); rd_kafka_buf_read_i32a(rkbuf, md->topics[i].partitions[j].id); rd_kafka_buf_read_i32a( rkbuf, md->topics[i].partitions[j].leader); if (ApiVersion >= 7) { leader_epochs[j].partition_id = md->topics[i].partitions[j].id; rd_kafka_buf_read_i32( rkbuf, &leader_epochs[j].leader_epoch); } /* Replicas */ rd_kafka_buf_read_arraycnt( rkbuf, &md->topics[i].partitions[j].replica_cnt, RD_KAFKAP_BROKERS_MAX); if (!(md->topics[i].partitions[j].replicas = rd_tmpabuf_alloc( &tbuf, md->topics[i].partitions[j].replica_cnt * sizeof(*md->topics[i] .partitions[j] .replicas)))) rd_kafka_buf_parse_fail( rkbuf, "%s [%" PRId32 "]: %d replicas: " "tmpabuf memory shortage", md->topics[i].topic, md->topics[i].partitions[j].id, md->topics[i].partitions[j].replica_cnt); for (k = 0; k < md->topics[i].partitions[j].replica_cnt; k++) rd_kafka_buf_read_i32a( rkbuf, md->topics[i].partitions[j].replicas[k]); /* Isrs */ rd_kafka_buf_read_arraycnt( rkbuf, &md->topics[i].partitions[j].isr_cnt, RD_KAFKAP_BROKERS_MAX); if (!(md->topics[i] .partitions[j] .isrs = rd_tmpabuf_alloc( &tbuf, md->topics[i].partitions[j].isr_cnt * sizeof( *md->topics[i].partitions[j].isrs)))) rd_kafka_buf_parse_fail( rkbuf, "%s [%" PRId32 "]: %d isrs: " "tmpabuf memory shortage", md->topics[i].topic, md->topics[i].partitions[j].id, md->topics[i].partitions[j].isr_cnt); for (k = 0; k < md->topics[i].partitions[j].isr_cnt; k++) rd_kafka_buf_read_i32a( rkbuf, md->topics[i].partitions[j].isrs[k]); if (ApiVersion >= 5) { /* OfflineReplicas int32 array (ignored) */ int32_t offline_replicas_cnt; /* #OfflineReplicas */ rd_kafka_buf_read_arraycnt( rkbuf, &offline_replicas_cnt, RD_KAFKAP_BROKERS_MAX); rd_kafka_buf_skip(rkbuf, offline_replicas_cnt * sizeof(int32_t)); } rd_kafka_buf_skip_tags(rkbuf); } if (ApiVersion >= 8) { int32_t TopicAuthorizedOperations; /* TopicAuthorizedOperations */ rd_kafka_buf_read_i32(rkbuf, &TopicAuthorizedOperations); } rd_kafka_buf_skip_tags(rkbuf); /* Ignore topics in blacklist */ if (rkb->rkb_rk->rk_conf.topic_blacklist && rd_kafka_pattern_match(rkb->rkb_rk->rk_conf.topic_blacklist, md->topics[i].topic)) { rd_rkb_dbg(rkb, TOPIC | RD_KAFKA_DBG_METADATA, "BLACKLIST", "Ignoring blacklisted topic \"%s\" " "in metadata", md->topics[i].topic); continue; } if (leader_epochs_size > 0 && !rd_kafka_has_reliable_leader_epochs(rkb)) { /* Prior to Kafka version 2.4 (which coincides with * Metadata version 9), the broker does not propagate * leader epoch information accurately while a * reassignment is in progress. Relying on a stale * epoch can lead to FENCED_LEADER_EPOCH errors which * can prevent consumption throughout the course of * a reassignment. It is safer in this case to revert * to the behavior in previous protocol versions * which checks leader status only. */ leader_epochs_size = 0; rd_free(leader_epochs); leader_epochs = NULL; } /* Sort partitions by partition id */ qsort(md->topics[i].partitions, md->topics[i].partition_cnt, sizeof(*md->topics[i].partitions), rd_kafka_metadata_partition_id_cmp); if (leader_epochs_size > 0) { /* And sort leader_epochs by partition id */ qsort(leader_epochs, md->topics[i].partition_cnt, sizeof(*leader_epochs), rd_kafka_metadata_partition_leader_epoch_cmp); } /* Update topic state based on the topic metadata */ rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], leader_epochs); if (requested_topics) { rd_list_free_cb(missing_topics, rd_list_remove_cmp(missing_topics, md->topics[i].topic, (void *)strcmp)); if (!all_topics) { /* Only update cache when not asking * for all topics. */ rd_kafka_wrlock(rk); rd_kafka_metadata_cache_topic_update( rk, &md->topics[i], rd_false /*propagate later*/); cache_changes++; rd_kafka_wrunlock(rk); } } } if (ApiVersion >= 8 && ApiVersion <= 10) { int32_t ClusterAuthorizedOperations; /* ClusterAuthorizedOperations */ rd_kafka_buf_read_i32(rkbuf, &ClusterAuthorizedOperations); } rd_kafka_buf_skip_tags(rkbuf); /* Entire Metadata response now parsed without errors: * update our internal state according to the response. */ if (md->broker_cnt == 0 && md->topic_cnt == 0) { rd_rkb_dbg(rkb, METADATA, "METADATA", "No brokers or topics in metadata: should retry"); err = RD_KAFKA_RESP_ERR__PARTIAL; goto err; } /* Update our list of brokers. */ for (i = 0; i < md->broker_cnt; i++) { rd_rkb_dbg(rkb, METADATA, "METADATA", " Broker #%i/%i: %s:%i NodeId %" PRId32, i, md->broker_cnt, md->brokers[i].host, md->brokers[i].port, md->brokers[i].id); rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &md->brokers[i], NULL); } /* Requested topics not seen in metadata? Propogate to topic code. */ if (missing_topics) { char *topic; rd_rkb_dbg(rkb, TOPIC, "METADATA", "%d/%d requested topic(s) seen in metadata", rd_list_cnt(requested_topics) - rd_list_cnt(missing_topics), rd_list_cnt(requested_topics)); for (i = 0; i < rd_list_cnt(missing_topics); i++) rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s", (char *)(missing_topics->rl_elems[i])); RD_LIST_FOREACH(topic, missing_topics, i) { rd_kafka_topic_t *rkt; rkt = rd_kafka_topic_find(rkb->rkb_rk, topic, 1 /*lock*/); if (rkt) { /* Received metadata response contained no * information about topic 'rkt' and thus * indicates the topic is not available in the * cluster. * Mark the topic as non-existent */ rd_kafka_topic_wrlock(rkt); rd_kafka_topic_set_notexists( rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); rd_kafka_topic_wrunlock(rkt); rd_kafka_topic_destroy0(rkt); } } } rd_kafka_wrlock(rkb->rkb_rk); rkb->rkb_rk->rk_ts_metadata = rd_clock(); /* Update cached cluster id. */ if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 && (!rk->rk_clusterid || rd_kafkap_str_cmp_str(&cluster_id, rk->rk_clusterid))) { rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_GENERIC, "CLUSTERID", "ClusterId update \"%s\" -> \"%.*s\"", rk->rk_clusterid ? rk->rk_clusterid : "", RD_KAFKAP_STR_PR(&cluster_id)); if (rk->rk_clusterid) { rd_kafka_log(rk, LOG_WARNING, "CLUSTERID", "Broker %s reports different ClusterId " "\"%.*s\" than previously known \"%s\": " "a client must not be simultaneously " "connected to multiple clusters", rd_kafka_broker_name(rkb), RD_KAFKAP_STR_PR(&cluster_id), rk->rk_clusterid); rd_free(rk->rk_clusterid); } rk->rk_clusterid = RD_KAFKAP_STR_DUP(&cluster_id); /* rd_kafka_clusterid() waits for a cache update even though * the clusterid is not in the cache itself. (#3620) */ cache_changes++; } /* Update controller id. */ if (rkb->rkb_rk->rk_controllerid != controller_id) { rd_rkb_dbg(rkb, BROKER, "CONTROLLERID", "ControllerId update %" PRId32 " -> %" PRId32, rkb->rkb_rk->rk_controllerid, controller_id); rkb->rkb_rk->rk_controllerid = controller_id; broker_changes++; } if (all_topics) { /* Expire all cache entries that were not updated. */ rd_kafka_metadata_cache_evict_by_age(rkb->rkb_rk, ts_start); if (rkb->rkb_rk->rk_full_metadata) rd_kafka_metadata_destroy( rkb->rkb_rk->rk_full_metadata); rkb->rkb_rk->rk_full_metadata = rd_kafka_metadata_copy(md, tbuf.of); rkb->rkb_rk->rk_ts_full_metadata = rkb->rkb_rk->rk_ts_metadata; rd_rkb_dbg(rkb, METADATA, "METADATA", "Caching full metadata with " "%d broker(s) and %d topic(s): %s", md->broker_cnt, md->topic_cnt, reason); } else { if (cache_changes) rd_kafka_metadata_cache_propagate_changes(rk); rd_kafka_metadata_cache_expiry_start(rk); } /* Remove cache hints for the originally requested topics. */ if (requested_topics) rd_kafka_metadata_cache_purge_hints(rk, requested_topics); rd_kafka_wrunlock(rkb->rkb_rk); if (broker_changes) { /* Broadcast broker metadata changes to listeners. */ rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk); } /* Check if cgrp effective subscription is affected by * new topic metadata. * Ignore if this was a broker-only refresh (no topics), or * the request was from the partition assignor (!cgrp_update) * which may contain only a sub-set of the subscribed topics (namely * the effective subscription of available topics) as to not * propagate non-included topics as non-existent. */ if (cgrp_update && (requested_topics || all_topics)) rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp, rd_true /*do join*/); /* Try to acquire a Producer ID from this broker if we * don't have one. */ if (rd_kafka_is_idempotent(rkb->rkb_rk)) { rd_kafka_wrlock(rkb->rkb_rk); rd_kafka_idemp_pid_fsm(rkb->rkb_rk); rd_kafka_wrunlock(rkb->rkb_rk); } done: if (missing_topics) rd_list_destroy(missing_topics); if (leader_epochs) rd_free(leader_epochs); /* This metadata request was triggered by someone wanting * the metadata information back as a reply, so send that reply now. * In this case we must not rd_free the metadata memory here, * the requestee will do. * The tbuf is explicitly not destroyed as we return its memory * to the caller. */ *mdp = md; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: err = rkbuf->rkbuf_err; err: if (requested_topics) { /* Failed requests shall purge cache hints for * the requested topics. */ rd_kafka_wrlock(rkb->rkb_rk); rd_kafka_metadata_cache_purge_hints(rk, requested_topics); rd_kafka_wrunlock(rkb->rkb_rk); } if (missing_topics) rd_list_destroy(missing_topics); if (leader_epochs) rd_free(leader_epochs); rd_tmpabuf_destroy(&tbuf); return err; } /** * @brief Add all topics in current cached full metadata * that matches the topics in \p match * to \p tinfos (rd_kafka_topic_info_t *). * * @param errored Any topic or wildcard pattern that did not match * an available topic will be added to this list with * the appropriate error set. * * @returns the number of topics matched and added to \p list * * @locks none * @locality any */ size_t rd_kafka_metadata_topic_match(rd_kafka_t *rk, rd_list_t *tinfos, const rd_kafka_topic_partition_list_t *match, rd_kafka_topic_partition_list_t *errored) { int ti, i; size_t cnt = 0; const struct rd_kafka_metadata *metadata; rd_kafka_topic_partition_list_t *unmatched; rd_kafka_rdlock(rk); metadata = rk->rk_full_metadata; if (!metadata) { rd_kafka_rdunlock(rk); return 0; } /* To keep track of which patterns and topics in `match` that * did not match any topic (or matched an errored topic), we * create a set of all topics to match in `unmatched` and then * remove from this set as a match is found. * Whatever remains in `unmatched` after all matching is performed * are the topics and patterns that did not match a topic. */ unmatched = rd_kafka_topic_partition_list_copy(match); /* For each topic in the cluster, scan through the match list * to find matching topic. */ for (ti = 0; ti < metadata->topic_cnt; ti++) { const char *topic = metadata->topics[ti].topic; /* Ignore topics in blacklist */ if (rk->rk_conf.topic_blacklist && rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic)) continue; /* Scan for matches */ for (i = 0; i < match->cnt; i++) { if (!rd_kafka_topic_match(rk, match->elems[i].topic, topic)) continue; /* Remove from unmatched */ rd_kafka_topic_partition_list_del( unmatched, match->elems[i].topic, RD_KAFKA_PARTITION_UA); if (metadata->topics[ti].err) { rd_kafka_topic_partition_list_add( errored, topic, RD_KAFKA_PARTITION_UA) ->err = metadata->topics[ti].err; continue; /* Skip errored topics */ } rd_list_add( tinfos, rd_kafka_topic_info_new( topic, metadata->topics[ti].partition_cnt)); cnt++; } } rd_kafka_rdunlock(rk); /* Any topics/patterns still in unmatched did not match any * existing topics, add them to `errored`. */ for (i = 0; i < unmatched->cnt; i++) { rd_kafka_topic_partition_t *elem = &unmatched->elems[i]; rd_kafka_topic_partition_list_add(errored, elem->topic, RD_KAFKA_PARTITION_UA) ->err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; } rd_kafka_topic_partition_list_destroy(unmatched); return cnt; } /** * @brief Add all topics in \p match that matches cached metadata. * @remark MUST NOT be used with wildcard topics, * see rd_kafka_metadata_topic_match() for that. * * @param errored Non-existent and unauthorized topics are added to this * list with the appropriate error code. * * @returns the number of topics matched and added to \p tinfos * @locks none */ size_t rd_kafka_metadata_topic_filter(rd_kafka_t *rk, rd_list_t *tinfos, const rd_kafka_topic_partition_list_t *match, rd_kafka_topic_partition_list_t *errored) { int i; size_t cnt = 0; rd_kafka_rdlock(rk); /* For each topic in match, look up the topic in the cache. */ for (i = 0; i < match->cnt; i++) { const char *topic = match->elems[i].topic; const rd_kafka_metadata_topic_t *mtopic; /* Ignore topics in blacklist */ if (rk->rk_conf.topic_blacklist && rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic)) continue; mtopic = rd_kafka_metadata_cache_topic_get(rk, topic, 1 /*valid*/); if (!mtopic) rd_kafka_topic_partition_list_add(errored, topic, RD_KAFKA_PARTITION_UA) ->err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; else if (mtopic->err) rd_kafka_topic_partition_list_add(errored, topic, RD_KAFKA_PARTITION_UA) ->err = mtopic->err; else { rd_list_add(tinfos, rd_kafka_topic_info_new( topic, mtopic->partition_cnt)); cnt++; } } rd_kafka_rdunlock(rk); return cnt; } void rd_kafka_metadata_log(rd_kafka_t *rk, const char *fac, const struct rd_kafka_metadata *md) { int i; rd_kafka_dbg(rk, METADATA, fac, "Metadata with %d broker(s) and %d topic(s):", md->broker_cnt, md->topic_cnt); for (i = 0; i < md->broker_cnt; i++) { rd_kafka_dbg(rk, METADATA, fac, " Broker #%i/%i: %s:%i NodeId %" PRId32, i, md->broker_cnt, md->brokers[i].host, md->brokers[i].port, md->brokers[i].id); } for (i = 0; i < md->topic_cnt; i++) { rd_kafka_dbg( rk, METADATA, fac, " Topic #%i/%i: %s with %i partitions%s%s", i, md->topic_cnt, md->topics[i].topic, md->topics[i].partition_cnt, md->topics[i].err ? ": " : "", md->topics[i].err ? rd_kafka_err2str(md->topics[i].err) : ""); } } /** * @brief Refresh metadata for \p topics * * @param rk: used to look up usable broker if \p rkb is NULL. * @param rkb: use this broker, unless NULL then any usable broker from \p rk * @param force: force refresh even if topics are up-to-date in cache * @param allow_auto_create: Enable/disable auto creation of topics * (through MetadataRequest). Requires a modern * broker version. * Takes precedence over allow.auto.create.topics. * @param cgrp_update: Allow consumer group state update on response. * * @returns an error code * * @locality any * @locks none */ rd_kafka_resp_err_t rd_kafka_metadata_refresh_topics(rd_kafka_t *rk, rd_kafka_broker_t *rkb, const rd_list_t *topics, rd_bool_t force, rd_bool_t allow_auto_create, rd_bool_t cgrp_update, const char *reason) { rd_list_t q_topics; int destroy_rkb = 0; if (!rk) { rd_assert(rkb); rk = rkb->rkb_rk; } rd_kafka_wrlock(rk); if (!rkb) { if (!(rkb = rd_kafka_broker_any_usable( rk, RD_POLL_NOWAIT, RD_DONT_LOCK, 0, reason))) { /* Hint cache that something is interested in * these topics so that they will be included in * a future all known_topics query. */ rd_kafka_metadata_cache_hint(rk, topics, NULL, RD_KAFKA_RESP_ERR__NOENT, 0 /*dont replace*/); rd_kafka_wrunlock(rk); rd_kafka_dbg(rk, METADATA, "METADATA", "Skipping metadata refresh of %d topic(s):" " %s: no usable brokers", rd_list_cnt(topics), reason); return RD_KAFKA_RESP_ERR__TRANSPORT; } destroy_rkb = 1; } rd_list_init(&q_topics, rd_list_cnt(topics), rd_free); if (!force) { /* Hint cache of upcoming MetadataRequest and filter * out any topics that are already being requested. * q_topics will contain remaining topics to query. */ rd_kafka_metadata_cache_hint(rk, topics, &q_topics, RD_KAFKA_RESP_ERR__WAIT_CACHE, rd_false /*dont replace*/); rd_kafka_wrunlock(rk); if (rd_list_cnt(&q_topics) == 0) { /* No topics need new query. */ rd_kafka_dbg(rk, METADATA, "METADATA", "Skipping metadata refresh of " "%d topic(s): %s: " "already being requested", rd_list_cnt(topics), reason); rd_list_destroy(&q_topics); if (destroy_rkb) rd_kafka_broker_destroy(rkb); return RD_KAFKA_RESP_ERR_NO_ERROR; } } else { rd_kafka_wrunlock(rk); rd_list_copy_to(&q_topics, topics, rd_list_string_copy, NULL); } rd_kafka_dbg(rk, METADATA, "METADATA", "Requesting metadata for %d/%d topics: %s", rd_list_cnt(&q_topics), rd_list_cnt(topics), reason); rd_kafka_MetadataRequest(rkb, &q_topics, reason, allow_auto_create, cgrp_update, NULL); rd_list_destroy(&q_topics); if (destroy_rkb) rd_kafka_broker_destroy(rkb); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Refresh metadata for known topics * * @param rk: used to look up usable broker if \p rkb is NULL. * @param rkb: use this broker, unless NULL then any usable broker from \p rk * @param force: refresh even if cache is up-to-date * * @returns an error code (__UNKNOWN_TOPIC if there are no local topics) * * @locality any * @locks none */ rd_kafka_resp_err_t rd_kafka_metadata_refresh_known_topics(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_bool_t force, const char *reason) { rd_list_t topics; rd_kafka_resp_err_t err; int cache_cnt = 0; rd_bool_t allow_auto_create_topics; if (!rk) rk = rkb->rkb_rk; rd_list_init(&topics, 8, rd_free); rd_kafka_local_topics_to_list(rk, &topics, &cache_cnt); /* Allow topic auto creation if there are locally known topics (rkt) * and not just cached (to be queried) topics. */ allow_auto_create_topics = rk->rk_conf.allow_auto_create_topics && rd_list_cnt(&topics) > cache_cnt; if (rd_list_cnt(&topics) == 0) err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; else err = rd_kafka_metadata_refresh_topics( rk, rkb, &topics, force, allow_auto_create_topics, rd_false /*!cgrp_update*/, reason); rd_list_destroy(&topics); return err; } /** * @brief Refresh metadata for known and subscribed topics. * * @param rk used to look up usable broker if \p rkb is NULL.. * @param rkb use this broker, unless NULL then any usable broker from \p rk. * @param reason reason of refresh, used in debug logs. * * @returns an error code (ERR__UNKNOWN_TOPIC if no topics are desired). * * @locality rdkafka main thread * @locks_required none * @locks_acquired rk(read) */ rd_kafka_resp_err_t rd_kafka_metadata_refresh_consumer_topics(rd_kafka_t *rk, rd_kafka_broker_t *rkb, const char *reason) { rd_list_t topics; rd_kafka_resp_err_t err; rd_kafka_cgrp_t *rkcg; rd_bool_t allow_auto_create_topics = rk->rk_conf.allow_auto_create_topics; int cache_cnt = 0; if (!rk) { rd_assert(rkb); rk = rkb->rkb_rk; } rkcg = rk->rk_cgrp; rd_assert(rkcg != NULL); if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) { /* If there is a wildcard subscription we need to request * all topics in the cluster so that we can perform * regexp matching. */ return rd_kafka_metadata_refresh_all(rk, rkb, reason); } rd_list_init(&topics, 8, rd_free); /* Add locally known topics, i.e., those that are currently * being consumed or otherwise referenced through topic_t objects. */ rd_kafka_local_topics_to_list(rk, &topics, &cache_cnt); if (rd_list_cnt(&topics) == cache_cnt) allow_auto_create_topics = rd_false; /* Add subscribed (non-wildcard) topics, if any. */ if (rkcg->rkcg_subscription) rd_kafka_topic_partition_list_get_topic_names( rkcg->rkcg_subscription, &topics, rd_false /*no wildcards*/); if (rd_list_cnt(&topics) == 0) err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; else err = rd_kafka_metadata_refresh_topics( rk, rkb, &topics, rd_true /*force*/, allow_auto_create_topics, rd_true /*cgrp_update*/, reason); rd_list_destroy(&topics); return err; } /** * @brief Refresh broker list by metadata. * * Attempts to use sparse metadata request if possible, else falls back * on a full metadata request. (NOTE: sparse not implemented, KIP-4) * * @param rk: used to look up usable broker if \p rkb is NULL. * @param rkb: use this broker, unless NULL then any usable broker from \p rk * * @returns an error code * * @locality any * @locks none */ rd_kafka_resp_err_t rd_kafka_metadata_refresh_brokers(rd_kafka_t *rk, rd_kafka_broker_t *rkb, const char *reason) { return rd_kafka_metadata_request(rk, rkb, NULL /*brokers only*/, rd_false /*!allow auto create topics*/, rd_false /*no cgrp update */, reason, NULL); } /** * @brief Refresh metadata for all topics in cluster. * This is a full metadata request which might be taxing on the * broker if the cluster has many topics. * * @locality any * @locks none */ rd_kafka_resp_err_t rd_kafka_metadata_refresh_all(rd_kafka_t *rk, rd_kafka_broker_t *rkb, const char *reason) { int destroy_rkb = 0; rd_list_t topics; if (!rk) { rd_assert(rkb); rk = rkb->rkb_rk; } if (!rkb) { if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, RD_DO_LOCK, 0, reason))) return RD_KAFKA_RESP_ERR__TRANSPORT; destroy_rkb = 1; } rd_list_init(&topics, 0, NULL); /* empty list = all topics */ rd_kafka_MetadataRequest(rkb, &topics, reason, rd_false /*no auto create*/, rd_true /*cgrp update*/, NULL); rd_list_destroy(&topics); if (destroy_rkb) rd_kafka_broker_destroy(rkb); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Lower-level Metadata request that takes a callback (with replyq set) * which will be triggered after parsing is complete. * * @param cgrp_update Allow consumer group updates from the response. * * @locks none * @locality any */ rd_kafka_resp_err_t rd_kafka_metadata_request(rd_kafka_t *rk, rd_kafka_broker_t *rkb, const rd_list_t *topics, rd_bool_t allow_auto_create_topics, rd_bool_t cgrp_update, const char *reason, rd_kafka_op_t *rko) { int destroy_rkb = 0; if (!rkb) { if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, RD_DO_LOCK, 0, reason))) return RD_KAFKA_RESP_ERR__TRANSPORT; destroy_rkb = 1; } rd_kafka_MetadataRequest(rkb, topics, reason, allow_auto_create_topics, cgrp_update, rko); if (destroy_rkb) rd_kafka_broker_destroy(rkb); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Query timer callback to trigger refresh for topics * that have partitions missing their leaders. * * @locks none * @locality rdkafka main thread */ static void rd_kafka_metadata_leader_query_tmr_cb(rd_kafka_timers_t *rkts, void *arg) { rd_kafka_t *rk = rkts->rkts_rk; rd_kafka_timer_t *rtmr = &rk->rk_metadata_cache.rkmc_query_tmr; rd_kafka_topic_t *rkt; rd_list_t topics; rd_kafka_wrlock(rk); rd_list_init(&topics, rk->rk_topic_cnt, rd_free); TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { int i, require_metadata; rd_kafka_topic_rdlock(rkt); if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) { /* Skip topics that are known to not exist. */ rd_kafka_topic_rdunlock(rkt); continue; } require_metadata = rkt->rkt_flags & RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; /* Check if any partitions are missing brokers. */ for (i = 0; !require_metadata && i < rkt->rkt_partition_cnt; i++) { rd_kafka_toppar_t *rktp = rkt->rkt_p[i]; rd_kafka_toppar_lock(rktp); require_metadata = !rktp->rktp_broker && !rktp->rktp_next_broker; rd_kafka_toppar_unlock(rktp); } if (require_metadata || rkt->rkt_partition_cnt == 0) rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str)); rd_kafka_topic_rdunlock(rkt); } rd_kafka_wrunlock(rk); if (rd_list_cnt(&topics) == 0) { /* No leader-less topics+partitions, stop the timer. */ rd_kafka_timer_stop(rkts, rtmr, 1 /*lock*/); } else { rd_kafka_metadata_refresh_topics( rk, NULL, &topics, rd_true /*force*/, rk->rk_conf.allow_auto_create_topics, rd_false /*!cgrp_update*/, "partition leader query"); /* Back off next query exponentially until we reach * the standard query interval - then stop the timer * since the intervalled querier will do the job for us. */ if (rk->rk_conf.metadata_refresh_interval_ms > 0 && rtmr->rtmr_interval * 2 / 1000 >= rk->rk_conf.metadata_refresh_interval_ms) rd_kafka_timer_stop(rkts, rtmr, 1 /*lock*/); else rd_kafka_timer_exp_backoff(rkts, rtmr); } rd_list_destroy(&topics); } /** * @brief Trigger fast leader query to quickly pick up on leader changes. * The fast leader query is a quick query followed by later queries at * exponentially increased intervals until no topics are missing * leaders. * * @locks none * @locality any */ void rd_kafka_metadata_fast_leader_query(rd_kafka_t *rk) { rd_ts_t next; /* Restart the timer if it will speed things up. */ next = rd_kafka_timer_next( &rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr, 1 /*lock*/); if (next == -1 /* not started */ || next > (rd_ts_t)rk->rk_conf.metadata_refresh_fast_interval_ms * 1000) { rd_kafka_dbg(rk, METADATA | RD_KAFKA_DBG_TOPIC, "FASTQUERY", "Starting fast leader query"); rd_kafka_timer_start( &rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr, rk->rk_conf.metadata_refresh_fast_interval_ms * 1000, rd_kafka_metadata_leader_query_tmr_cb, NULL); } } /** * @brief Create mock Metadata (for testing) based on the provided topics. * * @param topics elements are checked for .topic and .partition_cnt * @param topic_cnt is the number of topic elements in \p topics. * * @returns a newly allocated metadata object that must be freed with * rd_kafka_metadata_destroy(). * * @sa rd_kafka_metadata_copy() */ rd_kafka_metadata_t * rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, size_t topic_cnt) { rd_kafka_metadata_t *md; rd_tmpabuf_t tbuf; size_t topic_names_size = 0; int total_partition_cnt = 0; size_t i; /* Calculate total partition count and topic names size before * allocating memory. */ for (i = 0; i < topic_cnt; i++) { topic_names_size += 1 + strlen(topics[i].topic); total_partition_cnt += topics[i].partition_cnt; } /* Allocate contiguous buffer which will back all the memory * needed by the final metadata_t object */ rd_tmpabuf_new( &tbuf, sizeof(*md) + (sizeof(*md->topics) * topic_cnt) + topic_names_size + (64 /*topic name size..*/ * topic_cnt) + (sizeof(*md->topics[0].partitions) * total_partition_cnt), 1 /*assert on fail*/); md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)); memset(md, 0, sizeof(*md)); md->topic_cnt = (int)topic_cnt; md->topics = rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*md->topics)); for (i = 0; i < (size_t)md->topic_cnt; i++) { int j; md->topics[i].topic = rd_tmpabuf_write_str(&tbuf, topics[i].topic); md->topics[i].partition_cnt = topics[i].partition_cnt; md->topics[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; md->topics[i].partitions = rd_tmpabuf_alloc( &tbuf, md->topics[i].partition_cnt * sizeof(*md->topics[i].partitions)); for (j = 0; j < md->topics[i].partition_cnt; j++) { memset(&md->topics[i].partitions[j], 0, sizeof(md->topics[i].partitions[j])); md->topics[i].partitions[j].id = j; } } /* Check for tmpabuf errors */ if (rd_tmpabuf_failed(&tbuf)) rd_assert(!*"metadata mock failed"); /* Not destroying the tmpabuf since we return * its allocated memory. */ return md; } /** * @brief Create mock Metadata (for testing) based on the * var-arg tuples of (const char *topic, int partition_cnt). * * @param topic_cnt is the number of topic,partition_cnt tuples. * * @returns a newly allocated metadata object that must be freed with * rd_kafka_metadata_destroy(). * * @sa rd_kafka_metadata_new_topic_mock() */ rd_kafka_metadata_t *rd_kafka_metadata_new_topic_mockv(size_t topic_cnt, ...) { rd_kafka_metadata_topic_t *topics; va_list ap; size_t i; topics = rd_alloca(sizeof(*topics) * topic_cnt); va_start(ap, topic_cnt); for (i = 0; i < topic_cnt; i++) { topics[i].topic = va_arg(ap, char *); topics[i].partition_cnt = va_arg(ap, int); } va_end(ap); return rd_kafka_metadata_new_topic_mock(topics, topic_cnt); }