/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012,2013 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _RDKAFKA_TOPIC_H_ #define _RDKAFKA_TOPIC_H_ #include "rdlist.h" extern const char *rd_kafka_topic_state_names[]; /** * @struct Light-weight topic object which only contains the topic name. * * For use in outgoing APIs (like rd_kafka_message_t) when there is * no proper topic object available. * * @remark lrkt_magic[4] MUST be the first field and be set to "LRKT". */ struct rd_kafka_lwtopic_s { char lrkt_magic[4]; /**< "LRKT" */ rd_kafka_t *lrkt_rk; /**< Pointer to the client instance. */ rd_refcnt_t lrkt_refcnt; /**< Refcount */ char *lrkt_topic; /**< Points past this struct, allocated * along with the struct. */ }; /** Casts a topic_t to a light-weight lwtopic_t */ #define rd_kafka_rkt_lw(rkt) ((rd_kafka_lwtopic_t *)rkt) #define rd_kafka_rkt_lw_const(rkt) ((const rd_kafka_lwtopic_t *)rkt) /** * @returns true if the topic object is a light-weight topic, else false. */ static RD_UNUSED RD_INLINE rd_bool_t rd_kafka_rkt_is_lw(const rd_kafka_topic_t *app_rkt) { const rd_kafka_lwtopic_t *lrkt = rd_kafka_rkt_lw_const(app_rkt); return !memcmp(lrkt->lrkt_magic, "LRKT", 4); } /** @returns the lwtopic_t if \p rkt is a light-weight topic, else NULL. */ static RD_UNUSED RD_INLINE rd_kafka_lwtopic_t * rd_kafka_rkt_get_lw(rd_kafka_topic_t *rkt) { if (rd_kafka_rkt_is_lw(rkt)) return rd_kafka_rkt_lw(rkt); return NULL; } void rd_kafka_lwtopic_destroy(rd_kafka_lwtopic_t *lrkt); rd_kafka_lwtopic_t *rd_kafka_lwtopic_new(rd_kafka_t *rk, const char *topic); static RD_UNUSED RD_INLINE void rd_kafka_lwtopic_keep(rd_kafka_lwtopic_t *lrkt) { rd_refcnt_add(&lrkt->lrkt_refcnt); } /** * @struct Holds partition + transactional PID + base sequence msgid. * * Used in rkt_saved_partmsgids to restore transactional/idempotency state * for a partition that is lost from metadata for some time and then returns. */ typedef struct rd_kafka_partition_msgid_s { TAILQ_ENTRY(rd_kafka_partition_msgid_s) link; int32_t partition; rd_kafka_pid_t pid; uint64_t msgid; uint64_t epoch_base_msgid; rd_ts_t ts; } rd_kafka_partition_msgid_t; /** * @struct Aux struct that holds a partition id and a leader epoch. * Used as temporary holding space for per-partition leader epochs * while parsing MetadataResponse. */ typedef struct rd_kafka_partition_leader_epoch_s { int32_t partition_id; int32_t leader_epoch; } rd_kafka_partition_leader_epoch_t; /* * @struct Internal representation of a topic. * * @remark rkt_magic[4] MUST be the first field and be set to "IRKT". */ struct rd_kafka_topic_s { char rkt_magic[4]; /**< "IRKT" */ TAILQ_ENTRY(rd_kafka_topic_s) rkt_link; rd_refcnt_t rkt_refcnt; rwlock_t rkt_lock; rd_kafkap_str_t *rkt_topic; rd_kafka_toppar_t *rkt_ua; /**< Unassigned partition (-1) */ rd_kafka_toppar_t **rkt_p; /**< Partition array */ int32_t rkt_partition_cnt; int32_t rkt_sticky_partition; /**< Current sticky partition. * @locks rkt_lock */ rd_interval_t rkt_sticky_intvl; /**< Interval to assign new * sticky partition. */ rd_list_t rkt_desp; /* Desired partitions * that are not yet seen * in the cluster. */ rd_interval_t rkt_desp_refresh_intvl; /**< Rate-limiter for * desired partition * metadata refresh. */ rd_ts_t rkt_ts_create; /**< Topic object creation time. */ rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata * update for this topic. */ rd_refcnt_t rkt_app_refcnt; /**< Number of active rkt's new()ed * by application. */ enum { RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */ RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */ RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */ RD_KAFKA_TOPIC_S_ERROR, /* Topic exists but is in an errored * state, such as auth failure. */ } rkt_state; int rkt_flags; #define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL \ 0x1 /* Leader lost/unavailable \ * for at least one partition. */ rd_kafka_resp_err_t rkt_err; /**< Permanent error. */ rd_kafka_t *rkt_rk; rd_avg_t rkt_avg_batchsize; /**< Average batch size */ rd_avg_t rkt_avg_batchcnt; /**< Average batch message count */ rd_kafka_topic_conf_t rkt_conf; /** Idempotent/Txn producer: * The PID,Epoch,base Msgid state for removed partitions. */ TAILQ_HEAD(, rd_kafka_partition_msgid_s) rkt_saved_partmsgids; }; #define rd_kafka_topic_rdlock(rkt) rwlock_rdlock(&(rkt)->rkt_lock) #define rd_kafka_topic_wrlock(rkt) rwlock_wrlock(&(rkt)->rkt_lock) #define rd_kafka_topic_rdunlock(rkt) rwlock_rdunlock(&(rkt)->rkt_lock) #define rd_kafka_topic_wrunlock(rkt) rwlock_wrunlock(&(rkt)->rkt_lock) /** * @brief Increase refcount and return topic object. */ static RD_INLINE RD_UNUSED rd_kafka_topic_t * rd_kafka_topic_keep(rd_kafka_topic_t *rkt) { rd_kafka_lwtopic_t *lrkt; if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL)) rd_kafka_lwtopic_keep(lrkt); else rd_refcnt_add(&rkt->rkt_refcnt); return rkt; } void rd_kafka_topic_destroy_final(rd_kafka_topic_t *rkt); rd_kafka_topic_t *rd_kafka_topic_proper(rd_kafka_topic_t *app_rkt); /** * @brief Loose reference to topic object as increased by ..topic_keep(). */ static RD_INLINE RD_UNUSED void rd_kafka_topic_destroy0(rd_kafka_topic_t *rkt) { rd_kafka_lwtopic_t *lrkt; if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL)) rd_kafka_lwtopic_destroy(lrkt); else if (unlikely(rd_refcnt_sub(&rkt->rkt_refcnt) == 0)) rd_kafka_topic_destroy_final(rkt); } rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf, int *existing, int do_lock); rd_kafka_topic_t *rd_kafka_topic_find_fl(const char *func, int line, rd_kafka_t *rk, const char *topic, int do_lock); rd_kafka_topic_t *rd_kafka_topic_find0_fl(const char *func, int line, rd_kafka_t *rk, const rd_kafkap_str_t *topic); #define rd_kafka_topic_find(rk, topic, do_lock) \ rd_kafka_topic_find_fl(__FUNCTION__, __LINE__, rk, topic, do_lock) #define rd_kafka_topic_find0(rk, topic) \ rd_kafka_topic_find0_fl(__FUNCTION__, __LINE__, rk, topic) int rd_kafka_topic_cmp_rkt(const void *_a, const void *_b); void rd_kafka_topic_partitions_remove(rd_kafka_topic_t *rkt); rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt, rd_kafka_resp_err_t err); rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, rd_kafka_resp_err_t err); /** * @returns the topic's permanent error, if any. * * @locality any * @locks_acquired rd_kafka_topic_rdlock(rkt) */ static RD_INLINE RD_UNUSED rd_kafka_resp_err_t rd_kafka_topic_get_error(rd_kafka_topic_t *rkt) { rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; rd_kafka_topic_rdlock(rkt); if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR) err = rkt->rkt_err; rd_kafka_topic_rdunlock(rkt); return err; } int rd_kafka_topic_metadata_update2( rd_kafka_broker_t *rkb, const struct rd_kafka_metadata_topic *mdt, const rd_kafka_partition_leader_epoch_t *leader_epochs); void rd_kafka_topic_scan_all(rd_kafka_t *rk, rd_ts_t now); typedef struct rd_kafka_topic_info_s { const char *topic; /**< Allocated along with struct */ int partition_cnt; } rd_kafka_topic_info_t; int rd_kafka_topic_info_topic_cmp(const void *_a, const void *_b); int rd_kafka_topic_info_cmp(const void *_a, const void *_b); rd_kafka_topic_info_t *rd_kafka_topic_info_new(const char *topic, int partition_cnt); void rd_kafka_topic_info_destroy(rd_kafka_topic_info_t *ti); int rd_kafka_topic_match(rd_kafka_t *rk, const char *pattern, const char *topic); int rd_kafka_toppar_broker_update(rd_kafka_toppar_t *rktp, int32_t broker_id, rd_kafka_broker_t *rkb, const char *reason); int rd_kafka_toppar_delegate_to_leader(rd_kafka_toppar_t *rktp); rd_kafka_resp_err_t rd_kafka_topics_leader_query_sync(rd_kafka_t *rk, int all_topics, const rd_list_t *topics, int timeout_ms); void rd_kafka_topic_leader_query0(rd_kafka_t *rk, rd_kafka_topic_t *rkt, int do_rk_lock, rd_bool_t force); #define rd_kafka_topic_leader_query(rk, rkt) \ rd_kafka_topic_leader_query0(rk, rkt, 1 /*lock*/, \ rd_false /*dont force*/) #define rd_kafka_topic_fast_leader_query(rk) \ rd_kafka_metadata_fast_leader_query(rk) void rd_kafka_local_topics_to_list(rd_kafka_t *rk, rd_list_t *topics, int *cache_cntp); void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, int partition_cnt, int32_t leader_id); #endif /* _RDKAFKA_TOPIC_H_ */