diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h | 311 |
1 files changed, 0 insertions, 311 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h deleted file mode 100644 index cbed9308a..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h +++ /dev/null @@ -1,311 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012,2013 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _RDKAFKA_TOPIC_H_ -#define _RDKAFKA_TOPIC_H_ - -#include "rdlist.h" - -extern const char *rd_kafka_topic_state_names[]; - - -/** - * @struct Light-weight topic object which only contains the topic name. - * - * For use in outgoing APIs (like rd_kafka_message_t) when there is - * no proper topic object available. - * - * @remark lrkt_magic[4] MUST be the first field and be set to "LRKT". - */ -struct rd_kafka_lwtopic_s { - char lrkt_magic[4]; /**< "LRKT" */ - rd_kafka_t *lrkt_rk; /**< Pointer to the client instance. */ - rd_refcnt_t lrkt_refcnt; /**< Refcount */ - char *lrkt_topic; /**< Points past this struct, allocated - * along with the struct. */ -}; - -/** Casts a topic_t to a light-weight lwtopic_t */ -#define rd_kafka_rkt_lw(rkt) ((rd_kafka_lwtopic_t *)rkt) - -#define rd_kafka_rkt_lw_const(rkt) ((const rd_kafka_lwtopic_t *)rkt) - -/** - * @returns true if the topic object is a light-weight topic, else false. - */ -static RD_UNUSED RD_INLINE rd_bool_t -rd_kafka_rkt_is_lw(const rd_kafka_topic_t *app_rkt) { - const rd_kafka_lwtopic_t *lrkt = rd_kafka_rkt_lw_const(app_rkt); - return !memcmp(lrkt->lrkt_magic, "LRKT", 4); -} - -/** @returns the lwtopic_t if \p rkt is a light-weight topic, else NULL. */ -static RD_UNUSED RD_INLINE rd_kafka_lwtopic_t * -rd_kafka_rkt_get_lw(rd_kafka_topic_t *rkt) { - if (rd_kafka_rkt_is_lw(rkt)) - return rd_kafka_rkt_lw(rkt); - return NULL; -} - -void rd_kafka_lwtopic_destroy(rd_kafka_lwtopic_t *lrkt); -rd_kafka_lwtopic_t *rd_kafka_lwtopic_new(rd_kafka_t *rk, const char *topic); - -static RD_UNUSED RD_INLINE void -rd_kafka_lwtopic_keep(rd_kafka_lwtopic_t *lrkt) { - rd_refcnt_add(&lrkt->lrkt_refcnt); -} - - - -/** - * @struct Holds partition + transactional PID + base sequence msgid. - * - * Used in rkt_saved_partmsgids to restore transactional/idempotency state - * for a partition that is lost from metadata for some time and then returns. - */ -typedef struct rd_kafka_partition_msgid_s { - TAILQ_ENTRY(rd_kafka_partition_msgid_s) link; - int32_t partition; - rd_kafka_pid_t pid; - uint64_t msgid; - uint64_t epoch_base_msgid; - rd_ts_t ts; -} rd_kafka_partition_msgid_t; - - -/** - * @struct Aux struct that holds a partition id and a leader epoch. - * Used as temporary holding space for per-partition leader epochs - * while parsing MetadataResponse. - */ -typedef struct rd_kafka_partition_leader_epoch_s { - int32_t partition_id; - int32_t leader_epoch; -} rd_kafka_partition_leader_epoch_t; - - -/* - * @struct Internal representation of a topic. - * - * @remark rkt_magic[4] MUST be the first field and be set to "IRKT". - */ -struct rd_kafka_topic_s { - char rkt_magic[4]; /**< "IRKT" */ - - TAILQ_ENTRY(rd_kafka_topic_s) rkt_link; - - rd_refcnt_t rkt_refcnt; - - rwlock_t rkt_lock; - rd_kafkap_str_t *rkt_topic; - - rd_kafka_toppar_t *rkt_ua; /**< Unassigned partition (-1) */ - rd_kafka_toppar_t **rkt_p; /**< Partition array */ - int32_t rkt_partition_cnt; - - int32_t rkt_sticky_partition; /**< Current sticky partition. - * @locks rkt_lock */ - rd_interval_t rkt_sticky_intvl; /**< Interval to assign new - * sticky partition. */ - - rd_list_t rkt_desp; /* Desired partitions - * that are not yet seen - * in the cluster. */ - rd_interval_t rkt_desp_refresh_intvl; /**< Rate-limiter for - * desired partition - * metadata refresh. */ - - rd_ts_t rkt_ts_create; /**< Topic object creation time. */ - rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata - * update for this topic. */ - - rd_refcnt_t rkt_app_refcnt; /**< Number of active rkt's new()ed - * by application. */ - - enum { RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */ - RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */ - RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */ - RD_KAFKA_TOPIC_S_ERROR, /* Topic exists but is in an errored - * state, such as auth failure. */ - } rkt_state; - - int rkt_flags; -#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL \ - 0x1 /* Leader lost/unavailable \ - * for at least one partition. */ - - rd_kafka_resp_err_t rkt_err; /**< Permanent error. */ - - rd_kafka_t *rkt_rk; - - rd_avg_t rkt_avg_batchsize; /**< Average batch size */ - rd_avg_t rkt_avg_batchcnt; /**< Average batch message count */ - - rd_kafka_topic_conf_t rkt_conf; - - /** Idempotent/Txn producer: - * The PID,Epoch,base Msgid state for removed partitions. */ - TAILQ_HEAD(, rd_kafka_partition_msgid_s) rkt_saved_partmsgids; -}; - -#define rd_kafka_topic_rdlock(rkt) rwlock_rdlock(&(rkt)->rkt_lock) -#define rd_kafka_topic_wrlock(rkt) rwlock_wrlock(&(rkt)->rkt_lock) -#define rd_kafka_topic_rdunlock(rkt) rwlock_rdunlock(&(rkt)->rkt_lock) -#define rd_kafka_topic_wrunlock(rkt) rwlock_wrunlock(&(rkt)->rkt_lock) - - - -/** - * @brief Increase refcount and return topic object. - */ -static RD_INLINE RD_UNUSED rd_kafka_topic_t * -rd_kafka_topic_keep(rd_kafka_topic_t *rkt) { - rd_kafka_lwtopic_t *lrkt; - if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL)) - rd_kafka_lwtopic_keep(lrkt); - else - rd_refcnt_add(&rkt->rkt_refcnt); - return rkt; -} - -void rd_kafka_topic_destroy_final(rd_kafka_topic_t *rkt); - -rd_kafka_topic_t *rd_kafka_topic_proper(rd_kafka_topic_t *app_rkt); - - - -/** - * @brief Loose reference to topic object as increased by ..topic_keep(). - */ -static RD_INLINE RD_UNUSED void rd_kafka_topic_destroy0(rd_kafka_topic_t *rkt) { - rd_kafka_lwtopic_t *lrkt; - if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL)) - rd_kafka_lwtopic_destroy(lrkt); - else if (unlikely(rd_refcnt_sub(&rkt->rkt_refcnt) == 0)) - rd_kafka_topic_destroy_final(rkt); -} - - -rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk, - const char *topic, - rd_kafka_topic_conf_t *conf, - int *existing, - int do_lock); - -rd_kafka_topic_t *rd_kafka_topic_find_fl(const char *func, - int line, - rd_kafka_t *rk, - const char *topic, - int do_lock); -rd_kafka_topic_t *rd_kafka_topic_find0_fl(const char *func, - int line, - rd_kafka_t *rk, - const rd_kafkap_str_t *topic); -#define rd_kafka_topic_find(rk, topic, do_lock) \ - rd_kafka_topic_find_fl(__FUNCTION__, __LINE__, rk, topic, do_lock) -#define rd_kafka_topic_find0(rk, topic) \ - rd_kafka_topic_find0_fl(__FUNCTION__, __LINE__, rk, topic) -int rd_kafka_topic_cmp_rkt(const void *_a, const void *_b); - -void rd_kafka_topic_partitions_remove(rd_kafka_topic_t *rkt); - -rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt, - rd_kafka_resp_err_t err); -rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, - rd_kafka_resp_err_t err); - -/** - * @returns the topic's permanent error, if any. - * - * @locality any - * @locks_acquired rd_kafka_topic_rdlock(rkt) - */ -static RD_INLINE RD_UNUSED rd_kafka_resp_err_t -rd_kafka_topic_get_error(rd_kafka_topic_t *rkt) { - rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; - rd_kafka_topic_rdlock(rkt); - if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR) - err = rkt->rkt_err; - rd_kafka_topic_rdunlock(rkt); - return err; -} - -int rd_kafka_topic_metadata_update2( - rd_kafka_broker_t *rkb, - const struct rd_kafka_metadata_topic *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs); - -void rd_kafka_topic_scan_all(rd_kafka_t *rk, rd_ts_t now); - - -typedef struct rd_kafka_topic_info_s { - const char *topic; /**< Allocated along with struct */ - int partition_cnt; -} rd_kafka_topic_info_t; - -int rd_kafka_topic_info_topic_cmp(const void *_a, const void *_b); -int rd_kafka_topic_info_cmp(const void *_a, const void *_b); -rd_kafka_topic_info_t *rd_kafka_topic_info_new(const char *topic, - int partition_cnt); -void rd_kafka_topic_info_destroy(rd_kafka_topic_info_t *ti); - -int rd_kafka_topic_match(rd_kafka_t *rk, - const char *pattern, - const char *topic); - -int rd_kafka_toppar_broker_update(rd_kafka_toppar_t *rktp, - int32_t broker_id, - rd_kafka_broker_t *rkb, - const char *reason); - -int rd_kafka_toppar_delegate_to_leader(rd_kafka_toppar_t *rktp); - -rd_kafka_resp_err_t rd_kafka_topics_leader_query_sync(rd_kafka_t *rk, - int all_topics, - const rd_list_t *topics, - int timeout_ms); -void rd_kafka_topic_leader_query0(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - int do_rk_lock, - rd_bool_t force); -#define rd_kafka_topic_leader_query(rk, rkt) \ - rd_kafka_topic_leader_query0(rk, rkt, 1 /*lock*/, \ - rd_false /*dont force*/) - -#define rd_kafka_topic_fast_leader_query(rk) \ - rd_kafka_metadata_fast_leader_query(rk) - -void rd_kafka_local_topics_to_list(rd_kafka_t *rk, - rd_list_t *topics, - int *cache_cntp); - -void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, - int partition_cnt, - int32_t leader_id); - -#endif /* _RDKAFKA_TOPIC_H_ */ |