summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_partition.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_partition.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_partition.h1058
1 files changed, 1058 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_partition.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_partition.h
new file mode 100644
index 000000000..a1f1f47cd
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_partition.h
@@ -0,0 +1,1058 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef _RDKAFKA_PARTITION_H_
+#define _RDKAFKA_PARTITION_H_
+
+#include "rdkafka_topic.h"
+#include "rdkafka_cgrp.h"
+#include "rdkafka_broker.h"
+
+extern const char *rd_kafka_fetch_states[];
+
+
+/**
+ * @brief Offset statistics
+ */
+struct offset_stats {
+ rd_kafka_fetch_pos_t fetch_pos; /**< Next offset to fetch */
+ int64_t eof_offset; /**< Last offset we reported EOF for */
+};
+
+/**
+ * @brief Reset offset_stats struct to default values
+ */
+static RD_UNUSED void rd_kafka_offset_stats_reset(struct offset_stats *offs) {
+ offs->fetch_pos.offset = 0;
+ offs->fetch_pos.leader_epoch = -1;
+ offs->eof_offset = RD_KAFKA_OFFSET_INVALID;
+}
+
+
+/**
+ * @brief Store information about a partition error for future use.
+ */
+struct rd_kafka_toppar_err {
+ rd_kafka_resp_err_t err; /**< Error code */
+ int actions; /**< Request actions */
+ rd_ts_t ts; /**< Timestamp */
+ uint64_t base_msgid; /**< First msg msgid */
+ int32_t base_seq; /**< Idempodent Producer:
+ * first msg sequence */
+ int32_t last_seq; /**< Idempotent Producer:
+ * last msg sequence */
+};
+
+
+
+/**
+ * @brief Fetchpos comparator, leader epoch has precedence.
+ */
+static RD_UNUSED RD_INLINE int
+rd_kafka_fetch_pos_cmp(const rd_kafka_fetch_pos_t *a,
+ const rd_kafka_fetch_pos_t *b) {
+ if (a->leader_epoch < b->leader_epoch)
+ return -1;
+ else if (a->leader_epoch > b->leader_epoch)
+ return 1;
+ else if (a->offset < b->offset)
+ return -1;
+ else if (a->offset > b->offset)
+ return 1;
+ else
+ return 0;
+}
+
+
+static RD_UNUSED RD_INLINE void
+rd_kafka_fetch_pos_init(rd_kafka_fetch_pos_t *fetchpos) {
+ fetchpos->offset = RD_KAFKA_OFFSET_INVALID;
+ fetchpos->leader_epoch = -1;
+}
+
+const char *rd_kafka_fetch_pos2str(const rd_kafka_fetch_pos_t fetchpos);
+
+static RD_UNUSED RD_INLINE rd_kafka_fetch_pos_t
+rd_kafka_fetch_pos_make(int64_t offset,
+ int32_t leader_epoch,
+ rd_bool_t validated) {
+ rd_kafka_fetch_pos_t fetchpos = {offset, leader_epoch, validated};
+ return fetchpos;
+}
+
+#ifdef RD_HAS_STATEMENT_EXPRESSIONS
+#define RD_KAFKA_FETCH_POS0(offset, leader_epoch, validated) \
+ ({ \
+ rd_kafka_fetch_pos_t _fetchpos = {offset, leader_epoch, \
+ validated}; \
+ _fetchpos; \
+ })
+#else
+#define RD_KAFKA_FETCH_POS0(offset, leader_epoch, validated) \
+ rd_kafka_fetch_pos_make(offset, leader_epoch, validated)
+#endif
+
+#define RD_KAFKA_FETCH_POS(offset, leader_epoch) \
+ RD_KAFKA_FETCH_POS0(offset, leader_epoch, rd_false)
+
+
+
+typedef TAILQ_HEAD(rd_kafka_toppar_tqhead_s,
+ rd_kafka_toppar_s) rd_kafka_toppar_tqhead_t;
+
+/**
+ * Topic + Partition combination
+ */
+struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
+ TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink; /* rd_kafka_t link */
+ TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink; /* rd_kafka_broker_t link*/
+ CIRCLEQ_ENTRY(rd_kafka_toppar_s)
+ rktp_activelink; /* rkb_active_toppars */
+ TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink; /* rd_kafka_topic_t link*/
+ TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink; /* rd_kafka_cgrp_t link */
+ TAILQ_ENTRY(rd_kafka_toppar_s)
+ rktp_txnlink; /**< rd_kafka_t.rk_eos.
+ * txn_pend_rktps
+ * or txn_rktps */
+ rd_kafka_topic_t *rktp_rkt; /**< This toppar's topic object */
+ int32_t rktp_partition;
+ // LOCK: toppar_lock() + topic_wrlock()
+ // LOCK: .. in partition_available()
+ int32_t rktp_leader_id; /**< Current leader id.
+ * This is updated directly
+ * from metadata. */
+ int32_t rktp_broker_id; /**< Current broker id. */
+ rd_kafka_broker_t *rktp_leader; /**< Current leader broker.
+ * This updated simultaneously
+ * with rktp_leader_id. */
+ rd_kafka_broker_t *rktp_broker; /**< Current preferred broker
+ * (usually the leader).
+ * This updated asynchronously
+ * by issuing JOIN op to
+ * broker thread, so be careful
+ * in using this since it
+ * may lag. */
+ rd_kafka_broker_t *rktp_next_broker; /**< Next preferred broker after
+ * async migration op. */
+ rd_refcnt_t rktp_refcnt;
+ mtx_t rktp_lock;
+
+ // LOCK: toppar_lock. toppar_insert_msg(), concat_msgq()
+ // LOCK: toppar_lock. toppar_enq_msg(), deq_msg(), toppar_retry_msgq()
+ rd_kafka_q_t *rktp_msgq_wakeup_q; /**< Wake-up queue */
+ rd_kafka_msgq_t rktp_msgq; /* application->rdkafka queue.
+ * protected by rktp_lock */
+ rd_kafka_msgq_t rktp_xmit_msgq; /* internal broker xmit queue.
+ * local to broker thread. */
+
+ int rktp_fetch; /* On rkb_active_toppars list */
+
+ /* Consumer */
+ rd_kafka_q_t *rktp_fetchq; /* Queue of fetched messages
+ * from broker.
+ * Broker thread -> App */
+ rd_kafka_q_t *rktp_ops; /* * -> Main thread */
+
+ rd_atomic32_t rktp_msgs_inflight; /**< Current number of
+ * messages in-flight to/from
+ * the broker. */
+
+ uint64_t rktp_msgid; /**< Current/last message id.
+ * Each message enqueued on a
+ * non-UA partition will get a
+ * partition-unique sequencial
+ * number assigned.
+ * This number is used to
+ * re-enqueue the message
+ * on resends but making sure
+ * the input ordering is still
+ * maintained, and used by
+ * the idempotent producer.
+ * Starts at 1.
+ * Protected by toppar_lock */
+ struct {
+ rd_kafka_pid_t pid; /**< Partition's last known
+ * Producer Id and epoch.
+ * Protected by toppar lock.
+ * Only updated in toppar
+ * handler thread. */
+ uint64_t acked_msgid; /**< Highest acknowledged message.
+ * Protected by toppar lock. */
+ uint64_t epoch_base_msgid; /**< This Producer epoch's
+ * base msgid.
+ * When a new epoch is
+ * acquired, or on transaction
+ * abort, the base_seq is set to
+ * the current rktp_msgid so that
+ * sub-sequent produce
+ * requests will have
+ * a sequence number series
+ * starting at 0.
+ * Protected by toppar_lock */
+ int32_t next_ack_seq; /**< Next expected ack sequence.
+ * Protected by toppar lock. */
+ int32_t next_err_seq; /**< Next expected error sequence.
+ * Used when draining outstanding
+ * issues.
+ * This value will be the same
+ * as next_ack_seq until a
+ * drainable error occurs,
+ * in which case it
+ * will advance past next_ack_seq.
+ * next_ack_seq can never be larger
+ * than next_err_seq.
+ * Protected by toppar lock. */
+ rd_bool_t wait_drain; /**< All inflight requests must
+ * be drained/finish before
+ * resuming producing.
+ * This is set to true
+ * when a leader change
+ * happens so that the
+ * in-flight messages for the
+ * old brokers finish before
+ * the new broker starts sending.
+ * This as a step to ensure
+ * consistency.
+ * Only accessed from toppar
+ * handler thread. */
+ } rktp_eos;
+
+ /**
+ * rktp version barriers
+ *
+ * rktp_version is the application/controller side's
+ * authoritative version, it depicts the most up to date state.
+ * This is what q_filter() matches an rko_version to.
+ *
+ * rktp_op_version is the last/current received state handled
+ * by the toppar in the broker thread. It is updated to rktp_version
+ * when receiving a new op.
+ *
+ * rktp_fetch_version is the current fetcher decision version.
+ * It is used in fetch_decide() to see if the fetch decision
+ * needs to be updated by comparing to rktp_op_version.
+ *
+ * Example:
+ * App thread : Send OP_START (v1 bump): rktp_version=1
+ * Broker thread: Recv OP_START (v1): rktp_op_version=1
+ * Broker thread: fetch_decide() detects that
+ * rktp_op_version != rktp_fetch_version and
+ * sets rktp_fetch_version=1.
+ * Broker thread: next Fetch request has it's tver state set to
+ * rktp_fetch_verison (v1).
+ *
+ * App thread : Send OP_SEEK (v2 bump): rktp_version=2
+ * Broker thread: Recv OP_SEEK (v2): rktp_op_version=2
+ * Broker thread: Recv IO FetchResponse with tver=1,
+ * when enqueued on rktp_fetchq they're discarded
+ * due to old version (tver<rktp_version).
+ * Broker thread: fetch_decide() detects version change and
+ * sets rktp_fetch_version=2.
+ * Broker thread: next Fetch request has tver=2
+ * Broker thread: Recv IO FetchResponse with tver=2 which
+ * is same as rktp_version so message is forwarded
+ * to app.
+ */
+ rd_atomic32_t rktp_version; /* Latest op version.
+ * Authoritative (app thread)*/
+ int32_t rktp_op_version; /* Op version of curr command
+ * state from.
+ * (broker thread) */
+ int32_t rktp_fetch_version; /* Op version of curr fetch.
+ (broker thread) */
+
+ enum { RD_KAFKA_TOPPAR_FETCH_NONE = 0,
+ RD_KAFKA_TOPPAR_FETCH_STOPPING,
+ RD_KAFKA_TOPPAR_FETCH_STOPPED,
+ RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
+ RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
+ RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT,
+ RD_KAFKA_TOPPAR_FETCH_ACTIVE,
+ } rktp_fetch_state; /* Broker thread's state */
+
+#define RD_KAFKA_TOPPAR_FETCH_IS_STARTED(fetch_state) \
+ ((fetch_state) >= RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
+
+ int32_t rktp_leader_epoch; /**< Last known partition leader epoch,
+ * or -1. */
+
+ int32_t rktp_fetch_msg_max_bytes; /* Max number of bytes to
+ * fetch.
+ * Locality: broker thread
+ */
+
+ rd_ts_t rktp_ts_fetch_backoff; /* Back off fetcher for
+ * this partition until this
+ * absolute timestamp
+ * expires. */
+
+ /** Offset to query broker for. */
+ rd_kafka_fetch_pos_t rktp_query_pos;
+
+ /** Next fetch start position.
+ * This is set up start, seek, resume, etc, to tell
+ * the fetcher where to start fetching.
+ * It is not updated for each fetch, see
+ * rktp_offsets.fetch_pos for that.
+ * @locality toppar thread */
+ rd_kafka_fetch_pos_t rktp_next_fetch_start;
+
+ /** The previous next fetch position.
+ * @locality toppar thread */
+ rd_kafka_fetch_pos_t rktp_last_next_fetch_start;
+
+ /** Application's position.
+ * This is the latest offset delivered to application + 1.
+ * It is reset to INVALID_OFFSET when partition is
+ * unassigned/stopped/seeked. */
+ rd_kafka_fetch_pos_t rktp_app_pos;
+
+ /** Last stored offset, but maybe not yet committed. */
+ rd_kafka_fetch_pos_t rktp_stored_pos;
+
+ /** Offset currently being committed */
+ rd_kafka_fetch_pos_t rktp_committing_pos;
+
+ /** Last (known) committed offset */
+ rd_kafka_fetch_pos_t rktp_committed_pos;
+
+ rd_ts_t rktp_ts_committed_offset; /**< Timestamp of last commit */
+
+ struct offset_stats rktp_offsets; /* Current offsets.
+ * Locality: broker thread*/
+ struct offset_stats rktp_offsets_fin; /* Finalized offset for stats.
+ * Updated periodically
+ * by broker thread.
+ * Locks: toppar_lock */
+
+ int64_t rktp_ls_offset; /**< Current last stable offset
+ * Locks: toppar_lock */
+ int64_t rktp_hi_offset; /* Current high watermark offset.
+ * Locks: toppar_lock */
+ int64_t rktp_lo_offset; /* Current broker low offset.
+ * This is outside of the stats
+ * struct due to this field
+ * being populated by the
+ * toppar thread rather than
+ * the broker thread.
+ * Locality: toppar thread
+ * Locks: toppar_lock */
+
+ rd_ts_t rktp_ts_offset_lag;
+
+ char *rktp_offset_path; /* Path to offset file */
+ FILE *rktp_offset_fp; /* Offset file pointer */
+
+ rd_kafka_resp_err_t rktp_last_error; /**< Last Fetch error.
+ * Used for suppressing
+ * reoccuring errors.
+ * @locality broker thread */
+
+ rd_kafka_cgrp_t *rktp_cgrp; /* Belongs to this cgrp */
+
+ rd_bool_t rktp_started; /**< Fetcher is instructured to
+ * start.
+ * This is used by cgrp to keep
+ * track of whether the toppar has
+ * been started or not. */
+
+ rd_kafka_replyq_t rktp_replyq; /* Current replyq+version
+ * for propagating
+ * major operations, e.g.,
+ * FETCH_STOP. */
+ // LOCK: toppar_lock(). RD_KAFKA_TOPPAR_F_DESIRED
+ // LOCK: toppar_lock(). RD_KAFKA_TOPPAR_F_UNKNOWN
+ int rktp_flags;
+#define RD_KAFKA_TOPPAR_F_DESIRED \
+ 0x1 /* This partition is desired \
+ * by a consumer. */
+#define RD_KAFKA_TOPPAR_F_UNKNOWN \
+ 0x2 /* Topic is not yet or no longer \
+ * seen on a broker. */
+#define RD_KAFKA_TOPPAR_F_OFFSET_STORE 0x4 /* Offset store is active */
+#define RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING \
+ 0x8 /* Offset store stopping \
+ */
+#define RD_KAFKA_TOPPAR_F_APP_PAUSE 0x10 /* App pause()d consumption */
+#define RD_KAFKA_TOPPAR_F_LIB_PAUSE 0x20 /* librdkafka paused consumption */
+#define RD_KAFKA_TOPPAR_F_REMOVE 0x40 /* partition removed from cluster */
+#define RD_KAFKA_TOPPAR_F_LEADER_ERR \
+ 0x80 /* Operation failed: \
+ * leader might be missing. \
+ * Typically set from \
+ * ProduceResponse failure. */
+#define RD_KAFKA_TOPPAR_F_PEND_TXN \
+ 0x100 /* Partition is pending being added \
+ * to a producer transaction. */
+#define RD_KAFKA_TOPPAR_F_IN_TXN \
+ 0x200 /* Partition is part of \
+ * a producer transaction. */
+#define RD_KAFKA_TOPPAR_F_ON_DESP 0x400 /**< On rkt_desp list */
+#define RD_KAFKA_TOPPAR_F_ON_CGRP 0x800 /**< On rkcg_toppars list */
+#define RD_KAFKA_TOPPAR_F_ON_RKB 0x1000 /**< On rkb_toppars list */
+#define RD_KAFKA_TOPPAR_F_ASSIGNED \
+ 0x2000 /**< Toppar is part of the consumer \
+ * assignment. */
+
+ /*
+ * Timers
+ */
+ rd_kafka_timer_t rktp_offset_query_tmr; /* Offset query timer */
+ rd_kafka_timer_t rktp_offset_commit_tmr; /* Offset commit timer */
+ rd_kafka_timer_t rktp_offset_sync_tmr; /* Offset file sync timer */
+ rd_kafka_timer_t rktp_consumer_lag_tmr; /* Consumer lag monitoring
+ * timer */
+ rd_kafka_timer_t rktp_validate_tmr; /**< Offset and epoch
+ * validation retry timer */
+
+ rd_interval_t rktp_lease_intvl; /**< Preferred replica lease
+ * period */
+ rd_interval_t rktp_new_lease_intvl; /**< Controls max frequency
+ * at which a new preferred
+ * replica lease can be
+ * created for a toppar.
+ */
+ rd_interval_t rktp_new_lease_log_intvl; /**< .. and how often
+ * we log about it. */
+ rd_interval_t rktp_metadata_intvl; /**< Controls max frequency
+ * of metadata requests
+ * in preferred replica
+ * handler.
+ */
+
+ int rktp_wait_consumer_lag_resp; /* Waiting for consumer lag
+ * response. */
+
+ struct rd_kafka_toppar_err rktp_last_err; /**< Last produce error */
+
+
+ struct {
+ rd_atomic64_t tx_msgs; /**< Producer: sent messages */
+ rd_atomic64_t tx_msg_bytes; /**< .. bytes */
+ rd_atomic64_t rx_msgs; /**< Consumer: received messages */
+ rd_atomic64_t rx_msg_bytes; /**< .. bytes */
+ rd_atomic64_t producer_enq_msgs; /**< Producer: enqueued msgs */
+ rd_atomic64_t rx_ver_drops; /**< Consumer: outdated message
+ * drops. */
+ } rktp_c;
+};
+
+/**
+ * @struct This is a separately allocated glue object used in
+ * rd_kafka_topic_partition_t._private to allow referencing both
+ * an rktp and/or a leader epoch. Both are optional.
+ * The rktp, if non-NULL, owns a refcount.
+ *
+ * This glue object is not always set in ._private, but allocated on demand
+ * as necessary.
+ */
+typedef struct rd_kafka_topic_partition_private_s {
+ /** Reference to a toppar. Optional, may be NULL. */
+ rd_kafka_toppar_t *rktp;
+ /** Current Leader epoch, if known, else -1.
+ * this is set when the API needs to send the last epoch known
+ * by the client. */
+ int32_t current_leader_epoch;
+ /** Leader epoch if known, else -1. */
+ int32_t leader_epoch;
+} rd_kafka_topic_partition_private_t;
+
+
+/**
+ * Check if toppar is paused (consumer).
+ * Locks: toppar_lock() MUST be held.
+ */
+#define RD_KAFKA_TOPPAR_IS_PAUSED(rktp) \
+ ((rktp)->rktp_flags & \
+ (RD_KAFKA_TOPPAR_F_APP_PAUSE | RD_KAFKA_TOPPAR_F_LIB_PAUSE))
+
+
+
+/**
+ * @brief Increase refcount and return rktp object.
+ */
+#define rd_kafka_toppar_keep(RKTP) \
+ rd_kafka_toppar_keep0(__FUNCTION__, __LINE__, RKTP)
+
+#define rd_kafka_toppar_keep_fl(FUNC, LINE, RKTP) \
+ rd_kafka_toppar_keep0(FUNC, LINE, RKTP)
+
+static RD_UNUSED RD_INLINE rd_kafka_toppar_t *
+rd_kafka_toppar_keep0(const char *func, int line, rd_kafka_toppar_t *rktp) {
+ rd_refcnt_add_fl(func, line, &rktp->rktp_refcnt);
+ return rktp;
+}
+
+void rd_kafka_toppar_destroy_final(rd_kafka_toppar_t *rktp);
+
+#define rd_kafka_toppar_destroy(RKTP) \
+ do { \
+ rd_kafka_toppar_t *_RKTP = (RKTP); \
+ if (unlikely(rd_refcnt_sub(&_RKTP->rktp_refcnt) == 0)) \
+ rd_kafka_toppar_destroy_final(_RKTP); \
+ } while (0)
+
+
+
+#define rd_kafka_toppar_lock(rktp) mtx_lock(&(rktp)->rktp_lock)
+#define rd_kafka_toppar_unlock(rktp) mtx_unlock(&(rktp)->rktp_lock)
+
+static const char *
+rd_kafka_toppar_name(const rd_kafka_toppar_t *rktp) RD_UNUSED;
+static const char *rd_kafka_toppar_name(const rd_kafka_toppar_t *rktp) {
+ static RD_TLS char ret[256];
+
+ rd_snprintf(ret, sizeof(ret), "%.*s [%" PRId32 "]",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition);
+
+ return ret;
+}
+rd_kafka_toppar_t *rd_kafka_toppar_new0(rd_kafka_topic_t *rkt,
+ int32_t partition,
+ const char *func,
+ int line);
+#define rd_kafka_toppar_new(rkt, partition) \
+ rd_kafka_toppar_new0(rkt, partition, __FUNCTION__, __LINE__)
+void rd_kafka_toppar_purge_and_disable_queues(rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_set_fetch_state(rd_kafka_toppar_t *rktp, int fetch_state);
+void rd_kafka_toppar_insert_msg(rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
+void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp,
+ rd_kafka_msg_t *rkm,
+ rd_ts_t now);
+int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq,
+ rd_kafka_msgq_t *srcq,
+ int incr_retry,
+ int max_retries,
+ rd_ts_t backoff,
+ rd_kafka_msg_status_t status,
+ int (*cmp)(const void *a, const void *b));
+void rd_kafka_msgq_insert_msgq(rd_kafka_msgq_t *destq,
+ rd_kafka_msgq_t *srcq,
+ int (*cmp)(const void *a, const void *b));
+int rd_kafka_toppar_retry_msgq(rd_kafka_toppar_t *rktp,
+ rd_kafka_msgq_t *rkmq,
+ int incr_retry,
+ rd_kafka_msg_status_t status);
+void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp,
+ rd_kafka_msgq_t *rkmq);
+void rd_kafka_toppar_enq_error(rd_kafka_toppar_t *rktp,
+ rd_kafka_resp_err_t err,
+ const char *reason);
+rd_kafka_toppar_t *rd_kafka_toppar_get0(const char *func,
+ int line,
+ const rd_kafka_topic_t *rkt,
+ int32_t partition,
+ int ua_on_miss);
+#define rd_kafka_toppar_get(rkt, partition, ua_on_miss) \
+ rd_kafka_toppar_get0(__FUNCTION__, __LINE__, rkt, partition, ua_on_miss)
+rd_kafka_toppar_t *rd_kafka_toppar_get2(rd_kafka_t *rk,
+ const char *topic,
+ int32_t partition,
+ int ua_on_miss,
+ int create_on_miss);
+rd_kafka_toppar_t *rd_kafka_toppar_get_avail(const rd_kafka_topic_t *rkt,
+ int32_t partition,
+ int ua_on_miss,
+ rd_kafka_resp_err_t *errp);
+
+rd_kafka_toppar_t *rd_kafka_toppar_desired_get(rd_kafka_topic_t *rkt,
+ int32_t partition);
+void rd_kafka_toppar_desired_add0(rd_kafka_toppar_t *rktp);
+rd_kafka_toppar_t *rd_kafka_toppar_desired_add(rd_kafka_topic_t *rkt,
+ int32_t partition);
+void rd_kafka_toppar_desired_link(rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_desired_unlink(rd_kafka_toppar_t *rktp);
+void rd_kafka_toppar_desired_del(rd_kafka_toppar_t *rktp);
+
+void rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t *rktp,
+ rd_kafka_fetch_pos_t next_pos);
+
+void rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t *rktp,
+ rd_kafka_broker_t *rkb);
+
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t *rktp,
+ rd_kafka_fetch_pos_t pos,
+ rd_kafka_q_t *fwdq,
+ rd_kafka_replyq_t replyq);
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop(rd_kafka_toppar_t *rktp,
+ rd_kafka_replyq_t replyq);
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_seek(rd_kafka_toppar_t *rktp,
+ rd_kafka_fetch_pos_t pos,
+ rd_kafka_replyq_t replyq);
+
+rd_kafka_resp_err_t
+rd_kafka_toppar_op_pause(rd_kafka_toppar_t *rktp, int pause, int flag);
+
+void rd_kafka_toppar_fetch_stopped(rd_kafka_toppar_t *rktp,
+ rd_kafka_resp_err_t err);
+
+
+
+rd_ts_t rd_kafka_broker_consumer_toppar_serve(rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp);
+
+
+void rd_kafka_toppar_offset_fetch(rd_kafka_toppar_t *rktp,
+ rd_kafka_replyq_t replyq);
+
+void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp,
+ rd_kafka_fetch_pos_t query_pos,
+ int backoff_ms);
+
+int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp,
+ int purge_flags,
+ rd_bool_t include_xmit_msgq);
+
+rd_kafka_broker_t *rd_kafka_toppar_broker(rd_kafka_toppar_t *rktp,
+ int proper_broker);
+void rd_kafka_toppar_leader_unavailable(rd_kafka_toppar_t *rktp,
+ const char *reason,
+ rd_kafka_resp_err_t err);
+
+void rd_kafka_toppar_pause(rd_kafka_toppar_t *rktp, int flag);
+void rd_kafka_toppar_resume(rd_kafka_toppar_t *rktp, int flag);
+
+rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp,
+ int pause,
+ int flag,
+ rd_kafka_replyq_t replyq);
+rd_kafka_resp_err_t
+rd_kafka_toppars_pause_resume(rd_kafka_t *rk,
+ rd_bool_t pause,
+ rd_async_t async,
+ int flag,
+ rd_kafka_topic_partition_list_t *partitions);
+
+
+rd_kafka_topic_partition_t *rd_kafka_topic_partition_new(const char *topic,
+ int32_t partition);
+void rd_kafka_topic_partition_destroy_free(void *ptr);
+rd_kafka_topic_partition_t *
+rd_kafka_topic_partition_copy(const rd_kafka_topic_partition_t *src);
+void *rd_kafka_topic_partition_copy_void(const void *src);
+void rd_kafka_topic_partition_destroy_free(void *ptr);
+rd_kafka_topic_partition_t *
+rd_kafka_topic_partition_new_from_rktp(rd_kafka_toppar_t *rktp);
+
+void rd_kafka_topic_partition_list_init(
+ rd_kafka_topic_partition_list_t *rktparlist,
+ int size);
+void rd_kafka_topic_partition_list_destroy_free(void *ptr);
+
+void rd_kafka_topic_partition_list_clear(
+ rd_kafka_topic_partition_list_t *rktparlist);
+
+rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add0(
+ const char *func,
+ int line,
+ rd_kafka_topic_partition_list_t *rktparlist,
+ const char *topic,
+ int32_t partition,
+ rd_kafka_toppar_t *rktp,
+ const rd_kafka_topic_partition_private_t *parpriv);
+
+rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert(
+ rd_kafka_topic_partition_list_t *rktparlist,
+ const char *topic,
+ int32_t partition);
+
+void rd_kafka_topic_partition_list_add_copy(
+ rd_kafka_topic_partition_list_t *rktparlist,
+ const rd_kafka_topic_partition_t *rktpar);
+
+
+void rd_kafka_topic_partition_list_add_list(
+ rd_kafka_topic_partition_list_t *dst,
+ const rd_kafka_topic_partition_list_t *src);
+
+/**
+ * Traverse rd_kafka_topic_partition_list_t.
+ *
+ * @warning \p TPLIST modifications are not allowed.
+ */
+#define RD_KAFKA_TPLIST_FOREACH(RKTPAR, TPLIST) \
+ for (RKTPAR = &(TPLIST)->elems[0]; \
+ (RKTPAR) < &(TPLIST)->elems[(TPLIST)->cnt]; RKTPAR++)
+
+/**
+ * Traverse rd_kafka_topic_partition_list_t.
+ *
+ * @warning \p TPLIST modifications are not allowed, but removal of the
+ * current \p RKTPAR element is allowed.
+ */
+#define RD_KAFKA_TPLIST_FOREACH_REVERSE(RKTPAR, TPLIST) \
+ for (RKTPAR = &(TPLIST)->elems[(TPLIST)->cnt - 1]; \
+ (RKTPAR) >= &(TPLIST)->elems[0]; RKTPAR--)
+
+int rd_kafka_topic_partition_match(rd_kafka_t *rk,
+ const rd_kafka_group_member_t *rkgm,
+ const rd_kafka_topic_partition_t *rktpar,
+ const char *topic,
+ int *matched_by_regex);
+
+
+int rd_kafka_topic_partition_cmp(const void *_a, const void *_b);
+unsigned int rd_kafka_topic_partition_hash(const void *a);
+
+int rd_kafka_topic_partition_list_find_idx(
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ const char *topic,
+ int32_t partition);
+rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic(
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ const char *topic);
+
+void rd_kafka_topic_partition_list_sort_by_topic(
+ rd_kafka_topic_partition_list_t *rktparlist);
+
+void rd_kafka_topic_partition_list_reset_offsets(
+ rd_kafka_topic_partition_list_t *rktparlist,
+ int64_t offset);
+
+int rd_kafka_topic_partition_list_set_offsets(
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *rktparlist,
+ int from_rktp,
+ int64_t def_value,
+ int is_commit);
+
+int rd_kafka_topic_partition_list_count_abs_offsets(
+ const rd_kafka_topic_partition_list_t *rktparlist);
+
+int rd_kafka_topic_partition_list_cmp(const void *_a,
+ const void *_b,
+ int (*cmp)(const void *, const void *));
+
+/**
+ * @returns (and creates if necessary) the ._private glue object.
+ */
+static RD_UNUSED RD_INLINE rd_kafka_topic_partition_private_t *
+rd_kafka_topic_partition_get_private(rd_kafka_topic_partition_t *rktpar) {
+ rd_kafka_topic_partition_private_t *parpriv;
+
+ if (!(parpriv = rktpar->_private)) {
+ parpriv = rd_calloc(1, sizeof(*parpriv));
+ parpriv->leader_epoch = -1;
+ rktpar->_private = parpriv;
+ }
+
+ return parpriv;
+}
+
+
+/**
+ * @returns the partition leader current epoch, if relevant and known,
+ * else -1.
+ *
+ * @param rktpar Partition object.
+ *
+ * @remark See KIP-320 for more information.
+ */
+int32_t rd_kafka_topic_partition_get_current_leader_epoch(
+ const rd_kafka_topic_partition_t *rktpar);
+
+
+/**
+ * @brief Sets the partition leader current epoch (use -1 to clear).
+ *
+ * @param rktpar Partition object.
+ * @param leader_epoch Partition leader current epoch, use -1 to reset.
+ *
+ * @remark See KIP-320 for more information.
+ */
+void rd_kafka_topic_partition_set_current_leader_epoch(
+ rd_kafka_topic_partition_t *rktpar,
+ int32_t leader_epoch);
+
+
+/**
+ * @returns the partition's rktp if set (no refcnt increase), else NULL.
+ */
+static RD_INLINE RD_UNUSED rd_kafka_toppar_t *
+rd_kafka_topic_partition_toppar(rd_kafka_t *rk,
+ const rd_kafka_topic_partition_t *rktpar) {
+ const rd_kafka_topic_partition_private_t *parpriv;
+
+ if ((parpriv = rktpar->_private))
+ return parpriv->rktp;
+
+ return NULL;
+}
+
+rd_kafka_toppar_t *
+rd_kafka_topic_partition_ensure_toppar(rd_kafka_t *rk,
+ rd_kafka_topic_partition_t *rktpar,
+ rd_bool_t create_on_miss);
+
+/**
+ * @returns (and sets if necessary) the \p rktpar's ._private.
+ * @remark a new reference is returned.
+ */
+static RD_INLINE RD_UNUSED rd_kafka_toppar_t *
+rd_kafka_topic_partition_get_toppar(rd_kafka_t *rk,
+ rd_kafka_topic_partition_t *rktpar,
+ rd_bool_t create_on_miss) {
+ rd_kafka_toppar_t *rktp;
+
+ rktp =
+ rd_kafka_topic_partition_ensure_toppar(rk, rktpar, create_on_miss);
+
+ if (rktp)
+ rd_kafka_toppar_keep(rktp);
+
+ return rktp;
+}
+
+
+
+void rd_kafka_topic_partition_list_update_toppars(
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *rktparlist,
+ rd_bool_t create_on_miss);
+
+
+void rd_kafka_topic_partition_list_query_leaders_async(
+ rd_kafka_t *rk,
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ int timeout_ms,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_op_cb_t *cb,
+ void *opaque);
+
+rd_kafka_resp_err_t rd_kafka_topic_partition_list_query_leaders(
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *rktparlist,
+ rd_list_t *leaders,
+ int timeout_ms);
+
+int rd_kafka_topic_partition_list_get_topics(
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *rktparlist,
+ rd_list_t *rkts);
+
+int rd_kafka_topic_partition_list_get_topic_names(
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ rd_list_t *topics,
+ int include_regex);
+
+void rd_kafka_topic_partition_list_log(
+ rd_kafka_t *rk,
+ const char *fac,
+ int dbg,
+ const rd_kafka_topic_partition_list_t *rktparlist);
+
+#define RD_KAFKA_FMT_F_OFFSET 0x1 /* Print offset */
+#define RD_KAFKA_FMT_F_ONLY_ERR 0x2 /* Only include errored entries */
+#define RD_KAFKA_FMT_F_NO_ERR 0x4 /* Dont print error string */
+const char *rd_kafka_topic_partition_list_str(
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ char *dest,
+ size_t dest_size,
+ int fmt_flags);
+
+void rd_kafka_topic_partition_list_update(
+ rd_kafka_topic_partition_list_t *dst,
+ const rd_kafka_topic_partition_list_t *src);
+
+int rd_kafka_topic_partition_leader_cmp(const void *_a, const void *_b);
+
+void rd_kafka_topic_partition_set_from_fetch_pos(
+ rd_kafka_topic_partition_t *rktpar,
+ const rd_kafka_fetch_pos_t fetchpos);
+
+static RD_UNUSED rd_kafka_fetch_pos_t rd_kafka_topic_partition_get_fetch_pos(
+ const rd_kafka_topic_partition_t *rktpar) {
+ rd_kafka_fetch_pos_t fetchpos = {
+ rktpar->offset, rd_kafka_topic_partition_get_leader_epoch(rktpar)};
+
+ return fetchpos;
+}
+
+
+/**
+ * @brief Match function that returns true if partition has a valid offset.
+ */
+static RD_UNUSED int
+rd_kafka_topic_partition_match_valid_offset(const void *elem,
+ const void *opaque) {
+ const rd_kafka_topic_partition_t *rktpar = elem;
+ return rktpar->offset >= 0;
+}
+
+rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match(
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ int (*match)(const void *elem, const void *opaque),
+ void *opaque);
+
+size_t rd_kafka_topic_partition_list_sum(
+ const rd_kafka_topic_partition_list_t *rktparlist,
+ size_t (*cb)(const rd_kafka_topic_partition_t *rktpar, void *opaque),
+ void *opaque);
+
+rd_bool_t rd_kafka_topic_partition_list_has_duplicates(
+ rd_kafka_topic_partition_list_t *rktparlist,
+ rd_bool_t ignore_partition);
+
+void rd_kafka_topic_partition_list_set_err(
+ rd_kafka_topic_partition_list_t *rktparlist,
+ rd_kafka_resp_err_t err);
+
+rd_kafka_resp_err_t rd_kafka_topic_partition_list_get_err(
+ const rd_kafka_topic_partition_list_t *rktparlist);
+
+int rd_kafka_topic_partition_list_regex_cnt(
+ const rd_kafka_topic_partition_list_t *rktparlist);
+
+void *rd_kafka_topic_partition_list_copy_opaque(const void *src, void *opaque);
+
+/**
+ * @brief Toppar + Op version tuple used for mapping Fetched partitions
+ * back to their fetch versions.
+ */
+struct rd_kafka_toppar_ver {
+ rd_kafka_toppar_t *rktp;
+ int32_t version;
+};
+
+
+/**
+ * @brief Toppar + Op version comparator.
+ */
+static RD_INLINE RD_UNUSED int rd_kafka_toppar_ver_cmp(const void *_a,
+ const void *_b) {
+ const struct rd_kafka_toppar_ver *a = _a, *b = _b;
+ const rd_kafka_toppar_t *rktp_a = a->rktp;
+ const rd_kafka_toppar_t *rktp_b = b->rktp;
+ int r;
+
+ if (rktp_a->rktp_rkt != rktp_b->rktp_rkt &&
+ (r = rd_kafkap_str_cmp(rktp_a->rktp_rkt->rkt_topic,
+ rktp_b->rktp_rkt->rkt_topic)))
+ return r;
+
+ return RD_CMP(rktp_a->rktp_partition, rktp_b->rktp_partition);
+}
+
+/**
+ * @brief Frees up resources for \p tver but not the \p tver itself.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_toppar_ver_destroy(struct rd_kafka_toppar_ver *tver) {
+ rd_kafka_toppar_destroy(tver->rktp);
+}
+
+
+/**
+ * @returns 1 if rko version is outdated, else 0.
+ */
+static RD_INLINE RD_UNUSED int rd_kafka_op_version_outdated(rd_kafka_op_t *rko,
+ int version) {
+ if (!rko->rko_version)
+ return 0;
+
+ if (version)
+ return rko->rko_version < version;
+
+ if (rko->rko_rktp)
+ return rko->rko_version <
+ rd_atomic32_get(&rko->rko_rktp->rktp_version);
+ return 0;
+}
+
+void rd_kafka_toppar_offset_commit_result(
+ rd_kafka_toppar_t *rktp,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *offsets);
+
+void rd_kafka_toppar_broker_leave_for_remove(rd_kafka_toppar_t *rktp);
+
+
+/**
+ * @brief Represents a leader and the partitions it is leader for.
+ */
+struct rd_kafka_partition_leader {
+ rd_kafka_broker_t *rkb;
+ rd_kafka_topic_partition_list_t *partitions;
+};
+
+static RD_UNUSED void
+rd_kafka_partition_leader_destroy(struct rd_kafka_partition_leader *leader) {
+ rd_kafka_broker_destroy(leader->rkb);
+ rd_kafka_topic_partition_list_destroy(leader->partitions);
+ rd_free(leader);
+}
+
+void rd_kafka_partition_leader_destroy_free(void *ptr);
+
+static RD_UNUSED struct rd_kafka_partition_leader *
+rd_kafka_partition_leader_new(rd_kafka_broker_t *rkb) {
+ struct rd_kafka_partition_leader *leader = rd_malloc(sizeof(*leader));
+ leader->rkb = rkb;
+ rd_kafka_broker_keep(rkb);
+ leader->partitions = rd_kafka_topic_partition_list_new(0);
+ return leader;
+}
+
+static RD_UNUSED int rd_kafka_partition_leader_cmp(const void *_a,
+ const void *_b) {
+ const struct rd_kafka_partition_leader *a = _a, *b = _b;
+ return rd_kafka_broker_cmp(a->rkb, b->rkb);
+}
+
+
+int rd_kafka_toppar_pid_change(rd_kafka_toppar_t *rktp,
+ rd_kafka_pid_t pid,
+ uint64_t base_msgid);
+
+int rd_kafka_toppar_handle_purge_queues(rd_kafka_toppar_t *rktp,
+ rd_kafka_broker_t *rkb,
+ int purge_flags);
+void rd_kafka_purge_ua_toppar_queues(rd_kafka_t *rk);
+
+static RD_UNUSED int rd_kafka_toppar_topic_cmp(const void *_a, const void *_b) {
+ const rd_kafka_toppar_t *a = _a, *b = _b;
+ return strcmp(a->rktp_rkt->rkt_topic->str, b->rktp_rkt->rkt_topic->str);
+}
+
+
+/**
+ * @brief Set's the partitions next fetch position, i.e., the next offset
+ * to start fetching from.
+ *
+ * @locks_required rd_kafka_toppar_lock(rktp) MUST be held.
+ */
+static RD_UNUSED RD_INLINE void
+rd_kafka_toppar_set_next_fetch_position(rd_kafka_toppar_t *rktp,
+ rd_kafka_fetch_pos_t next_pos) {
+ rktp->rktp_next_fetch_start = next_pos;
+}
+
+#endif /* _RDKAFKA_PARTITION_H_ */