diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h new file mode 100644 index 00000000..cbed9308 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h @@ -0,0 +1,311 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012,2013 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RDKAFKA_TOPIC_H_ +#define _RDKAFKA_TOPIC_H_ + +#include "rdlist.h" + +extern const char *rd_kafka_topic_state_names[]; + + +/** + * @struct Light-weight topic object which only contains the topic name. + * + * For use in outgoing APIs (like rd_kafka_message_t) when there is + * no proper topic object available. + * + * @remark lrkt_magic[4] MUST be the first field and be set to "LRKT". + */ +struct rd_kafka_lwtopic_s { + char lrkt_magic[4]; /**< "LRKT" */ + rd_kafka_t *lrkt_rk; /**< Pointer to the client instance. */ + rd_refcnt_t lrkt_refcnt; /**< Refcount */ + char *lrkt_topic; /**< Points past this struct, allocated + * along with the struct. */ +}; + +/** Casts a topic_t to a light-weight lwtopic_t */ +#define rd_kafka_rkt_lw(rkt) ((rd_kafka_lwtopic_t *)rkt) + +#define rd_kafka_rkt_lw_const(rkt) ((const rd_kafka_lwtopic_t *)rkt) + +/** + * @returns true if the topic object is a light-weight topic, else false. + */ +static RD_UNUSED RD_INLINE rd_bool_t +rd_kafka_rkt_is_lw(const rd_kafka_topic_t *app_rkt) { + const rd_kafka_lwtopic_t *lrkt = rd_kafka_rkt_lw_const(app_rkt); + return !memcmp(lrkt->lrkt_magic, "LRKT", 4); +} + +/** @returns the lwtopic_t if \p rkt is a light-weight topic, else NULL. */ +static RD_UNUSED RD_INLINE rd_kafka_lwtopic_t * +rd_kafka_rkt_get_lw(rd_kafka_topic_t *rkt) { + if (rd_kafka_rkt_is_lw(rkt)) + return rd_kafka_rkt_lw(rkt); + return NULL; +} + +void rd_kafka_lwtopic_destroy(rd_kafka_lwtopic_t *lrkt); +rd_kafka_lwtopic_t *rd_kafka_lwtopic_new(rd_kafka_t *rk, const char *topic); + +static RD_UNUSED RD_INLINE void +rd_kafka_lwtopic_keep(rd_kafka_lwtopic_t *lrkt) { + rd_refcnt_add(&lrkt->lrkt_refcnt); +} + + + +/** + * @struct Holds partition + transactional PID + base sequence msgid. + * + * Used in rkt_saved_partmsgids to restore transactional/idempotency state + * for a partition that is lost from metadata for some time and then returns. + */ +typedef struct rd_kafka_partition_msgid_s { + TAILQ_ENTRY(rd_kafka_partition_msgid_s) link; + int32_t partition; + rd_kafka_pid_t pid; + uint64_t msgid; + uint64_t epoch_base_msgid; + rd_ts_t ts; +} rd_kafka_partition_msgid_t; + + +/** + * @struct Aux struct that holds a partition id and a leader epoch. + * Used as temporary holding space for per-partition leader epochs + * while parsing MetadataResponse. + */ +typedef struct rd_kafka_partition_leader_epoch_s { + int32_t partition_id; + int32_t leader_epoch; +} rd_kafka_partition_leader_epoch_t; + + +/* + * @struct Internal representation of a topic. + * + * @remark rkt_magic[4] MUST be the first field and be set to "IRKT". + */ +struct rd_kafka_topic_s { + char rkt_magic[4]; /**< "IRKT" */ + + TAILQ_ENTRY(rd_kafka_topic_s) rkt_link; + + rd_refcnt_t rkt_refcnt; + + rwlock_t rkt_lock; + rd_kafkap_str_t *rkt_topic; + + rd_kafka_toppar_t *rkt_ua; /**< Unassigned partition (-1) */ + rd_kafka_toppar_t **rkt_p; /**< Partition array */ + int32_t rkt_partition_cnt; + + int32_t rkt_sticky_partition; /**< Current sticky partition. + * @locks rkt_lock */ + rd_interval_t rkt_sticky_intvl; /**< Interval to assign new + * sticky partition. */ + + rd_list_t rkt_desp; /* Desired partitions + * that are not yet seen + * in the cluster. */ + rd_interval_t rkt_desp_refresh_intvl; /**< Rate-limiter for + * desired partition + * metadata refresh. */ + + rd_ts_t rkt_ts_create; /**< Topic object creation time. */ + rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata + * update for this topic. */ + + rd_refcnt_t rkt_app_refcnt; /**< Number of active rkt's new()ed + * by application. */ + + enum { RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */ + RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */ + RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */ + RD_KAFKA_TOPIC_S_ERROR, /* Topic exists but is in an errored + * state, such as auth failure. */ + } rkt_state; + + int rkt_flags; +#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL \ + 0x1 /* Leader lost/unavailable \ + * for at least one partition. */ + + rd_kafka_resp_err_t rkt_err; /**< Permanent error. */ + + rd_kafka_t *rkt_rk; + + rd_avg_t rkt_avg_batchsize; /**< Average batch size */ + rd_avg_t rkt_avg_batchcnt; /**< Average batch message count */ + + rd_kafka_topic_conf_t rkt_conf; + + /** Idempotent/Txn producer: + * The PID,Epoch,base Msgid state for removed partitions. */ + TAILQ_HEAD(, rd_kafka_partition_msgid_s) rkt_saved_partmsgids; +}; + +#define rd_kafka_topic_rdlock(rkt) rwlock_rdlock(&(rkt)->rkt_lock) +#define rd_kafka_topic_wrlock(rkt) rwlock_wrlock(&(rkt)->rkt_lock) +#define rd_kafka_topic_rdunlock(rkt) rwlock_rdunlock(&(rkt)->rkt_lock) +#define rd_kafka_topic_wrunlock(rkt) rwlock_wrunlock(&(rkt)->rkt_lock) + + + +/** + * @brief Increase refcount and return topic object. + */ +static RD_INLINE RD_UNUSED rd_kafka_topic_t * +rd_kafka_topic_keep(rd_kafka_topic_t *rkt) { + rd_kafka_lwtopic_t *lrkt; + if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL)) + rd_kafka_lwtopic_keep(lrkt); + else + rd_refcnt_add(&rkt->rkt_refcnt); + return rkt; +} + +void rd_kafka_topic_destroy_final(rd_kafka_topic_t *rkt); + +rd_kafka_topic_t *rd_kafka_topic_proper(rd_kafka_topic_t *app_rkt); + + + +/** + * @brief Loose reference to topic object as increased by ..topic_keep(). + */ +static RD_INLINE RD_UNUSED void rd_kafka_topic_destroy0(rd_kafka_topic_t *rkt) { + rd_kafka_lwtopic_t *lrkt; + if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL)) + rd_kafka_lwtopic_destroy(lrkt); + else if (unlikely(rd_refcnt_sub(&rkt->rkt_refcnt) == 0)) + rd_kafka_topic_destroy_final(rkt); +} + + +rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk, + const char *topic, + rd_kafka_topic_conf_t *conf, + int *existing, + int do_lock); + +rd_kafka_topic_t *rd_kafka_topic_find_fl(const char *func, + int line, + rd_kafka_t *rk, + const char *topic, + int do_lock); +rd_kafka_topic_t *rd_kafka_topic_find0_fl(const char *func, + int line, + rd_kafka_t *rk, + const rd_kafkap_str_t *topic); +#define rd_kafka_topic_find(rk, topic, do_lock) \ + rd_kafka_topic_find_fl(__FUNCTION__, __LINE__, rk, topic, do_lock) +#define rd_kafka_topic_find0(rk, topic) \ + rd_kafka_topic_find0_fl(__FUNCTION__, __LINE__, rk, topic) +int rd_kafka_topic_cmp_rkt(const void *_a, const void *_b); + +void rd_kafka_topic_partitions_remove(rd_kafka_topic_t *rkt); + +rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt, + rd_kafka_resp_err_t err); +rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, + rd_kafka_resp_err_t err); + +/** + * @returns the topic's permanent error, if any. + * + * @locality any + * @locks_acquired rd_kafka_topic_rdlock(rkt) + */ +static RD_INLINE RD_UNUSED rd_kafka_resp_err_t +rd_kafka_topic_get_error(rd_kafka_topic_t *rkt) { + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_topic_rdlock(rkt); + if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR) + err = rkt->rkt_err; + rd_kafka_topic_rdunlock(rkt); + return err; +} + +int rd_kafka_topic_metadata_update2( + rd_kafka_broker_t *rkb, + const struct rd_kafka_metadata_topic *mdt, + const rd_kafka_partition_leader_epoch_t *leader_epochs); + +void rd_kafka_topic_scan_all(rd_kafka_t *rk, rd_ts_t now); + + +typedef struct rd_kafka_topic_info_s { + const char *topic; /**< Allocated along with struct */ + int partition_cnt; +} rd_kafka_topic_info_t; + +int rd_kafka_topic_info_topic_cmp(const void *_a, const void *_b); +int rd_kafka_topic_info_cmp(const void *_a, const void *_b); +rd_kafka_topic_info_t *rd_kafka_topic_info_new(const char *topic, + int partition_cnt); +void rd_kafka_topic_info_destroy(rd_kafka_topic_info_t *ti); + +int rd_kafka_topic_match(rd_kafka_t *rk, + const char *pattern, + const char *topic); + +int rd_kafka_toppar_broker_update(rd_kafka_toppar_t *rktp, + int32_t broker_id, + rd_kafka_broker_t *rkb, + const char *reason); + +int rd_kafka_toppar_delegate_to_leader(rd_kafka_toppar_t *rktp); + +rd_kafka_resp_err_t rd_kafka_topics_leader_query_sync(rd_kafka_t *rk, + int all_topics, + const rd_list_t *topics, + int timeout_ms); +void rd_kafka_topic_leader_query0(rd_kafka_t *rk, + rd_kafka_topic_t *rkt, + int do_rk_lock, + rd_bool_t force); +#define rd_kafka_topic_leader_query(rk, rkt) \ + rd_kafka_topic_leader_query0(rk, rkt, 1 /*lock*/, \ + rd_false /*dont force*/) + +#define rd_kafka_topic_fast_leader_query(rk) \ + rd_kafka_metadata_fast_leader_query(rk) + +void rd_kafka_local_topics_to_list(rd_kafka_t *rk, + rd_list_t *topics, + int *cache_cntp); + +void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, + int partition_cnt, + int32_t leader_id); + +#endif /* _RDKAFKA_TOPIC_H_ */ |