diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h | 583 |
1 files changed, 583 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h new file mode 100644 index 00000000..877fac15 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h @@ -0,0 +1,583 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012,2013 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RDKAFKA_MSG_H_ +#define _RDKAFKA_MSG_H_ + +#include "rdsysqueue.h" + +#include "rdkafka_proto.h" +#include "rdkafka_header.h" + + +/** + * @brief Internal RD_KAFKA_MSG_F_.. flags + */ +#define RD_KAFKA_MSG_F_RKT_RDLOCKED 0x100000 /* rkt is rdlock():ed */ + + +/** + * @brief Message.MsgAttributes for MsgVersion v0..v1, + * also used for MessageSet.Attributes for MsgVersion v2. + */ +#define RD_KAFKA_MSG_ATTR_GZIP (1 << 0) +#define RD_KAFKA_MSG_ATTR_SNAPPY (1 << 1) +#define RD_KAFKA_MSG_ATTR_LZ4 (3) +#define RD_KAFKA_MSG_ATTR_ZSTD (4) +#define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x7 +#define RD_KAFKA_MSG_ATTR_CREATE_TIME (0 << 3) +#define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME (1 << 3) + +/** + * @brief MessageSet.Attributes for MsgVersion v2 + * + * Attributes: + * ------------------------------------------------------------------------------------------------- + * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | + * Compression Type (0-2) | + * ------------------------------------------------------------------------------------------------- + */ +/* Compression types same as MsgVersion 0 above */ +/* Timestamp type same as MsgVersion 0 above */ +#define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4) +#define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5) + + +typedef struct rd_kafka_msg_s { + rd_kafka_message_t rkm_rkmessage; /* MUST be first field */ +#define rkm_len rkm_rkmessage.len +#define rkm_payload rkm_rkmessage.payload +#define rkm_opaque rkm_rkmessage._private +#define rkm_partition rkm_rkmessage.partition +#define rkm_offset rkm_rkmessage.offset +#define rkm_key rkm_rkmessage.key +#define rkm_key_len rkm_rkmessage.key_len +#define rkm_err rkm_rkmessage.err + + TAILQ_ENTRY(rd_kafka_msg_s) rkm_link; + + int rkm_flags; + /* @remark These additional flags must not collide with + * the RD_KAFKA_MSG_F_* flags in rdkafka.h */ +#define RD_KAFKA_MSG_F_FREE_RKM 0x10000 /* msg_t is allocated */ +#define RD_KAFKA_MSG_F_ACCOUNT 0x20000 /* accounted for in curr_msgs */ +#define RD_KAFKA_MSG_F_PRODUCER 0x40000 /* Producer message */ +#define RD_KAFKA_MSG_F_CONTROL 0x80000 /* Control message */ + + rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */ + int64_t rkm_timestamp; /* Message format V1. + * Meaning of timestamp depends on + * message Attribute LogAppendtime (broker) + * or CreateTime (producer). + * Unit is milliseconds since epoch (UTC).*/ + + + rd_kafka_headers_t *rkm_headers; /**< Parsed headers list, if any. */ + + rd_kafka_msg_status_t rkm_status; /**< Persistence status. Updated in + * the ProduceResponse handler: + * this value is always up to date. + */ + int32_t rkm_broker_id; /**< Broker message was produced to + * or fetched from. */ + + union { + struct { + rd_ts_t ts_timeout; /* Message timeout */ + rd_ts_t ts_enq; /* Enqueue/Produce time */ + rd_ts_t ts_backoff; /* Backoff next Produce until + * this time. */ + uint64_t msgid; /**< Message sequencial id, + * used to maintain ordering. + * Starts at 1. */ + uint64_t last_msgid; /**< On retry this is set + * on the first message + * in a batch to point + * out the last message + * of the batch so that + * the batch can be + * identically reconstructed. + */ + int retries; /* Number of retries so far */ + } producer; +#define rkm_ts_timeout rkm_u.producer.ts_timeout +#define rkm_ts_enq rkm_u.producer.ts_enq +#define rkm_msgid rkm_u.producer.msgid + + struct { + rd_kafkap_bytes_t binhdrs; /**< Unparsed + * binary headers in + * protocol msg */ + int32_t leader_epoch; /**< Leader epoch at the time + * the message was fetched. */ + } consumer; + } rkm_u; +} rd_kafka_msg_t; + +TAILQ_HEAD(rd_kafka_msg_head_s, rd_kafka_msg_s); + + +/** @returns the absolute time a message was enqueued (producer) */ +#define rd_kafka_msg_enq_time(rkm) ((rkm)->rkm_ts_enq) + +/** + * @returns the message's total maximum on-wire size. + * @remark Depending on message version (MagicByte) the actual size + * may be smaller. + */ +static RD_INLINE RD_UNUSED size_t +rd_kafka_msg_wire_size(const rd_kafka_msg_t *rkm, int MsgVersion) { + static const size_t overheads[] = { + [0] = RD_KAFKAP_MESSAGE_V0_OVERHEAD, + [1] = RD_KAFKAP_MESSAGE_V1_OVERHEAD, + [2] = RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD}; + size_t size; + rd_dassert(MsgVersion >= 0 && MsgVersion <= 2); + + size = overheads[MsgVersion] + rkm->rkm_len + rkm->rkm_key_len; + if (MsgVersion == 2 && rkm->rkm_headers) + size += rd_kafka_headers_serialized_size(rkm->rkm_headers); + + return size; +} + + +/** + * @returns the maximum total on-wire message size regardless of MsgVersion. + * + * @remark This does not account for the ProduceRequest, et.al, just the + * per-message overhead. + */ +static RD_INLINE RD_UNUSED size_t rd_kafka_msg_max_wire_size(size_t keylen, + size_t valuelen, + size_t hdrslen) { + return RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD + keylen + valuelen + hdrslen; +} + +/** + * @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t + * wrapped rd_kafka_message_t. + */ +static RD_INLINE RD_UNUSED rd_kafka_msg_t * +rd_kafka_message2msg(rd_kafka_message_t *rkmessage) { + return (rd_kafka_msg_t *)rkmessage; +} + + + +/** + * @brief Message queue with message and byte counters. + */ +TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s); +typedef struct rd_kafka_msgq_s { + struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */ + int32_t rkmq_msg_cnt; + int64_t rkmq_msg_bytes; + struct { + rd_ts_t abstime; /**< Allow wake-ups after this point in time.*/ + int32_t msg_cnt; /**< Signal wake-up when this message count + * is reached. */ + int64_t msg_bytes; /**< .. or when this byte count is + * reached. */ + rd_bool_t on_first; /**< Wake-up on first message enqueued + * regardless of .abstime. */ + rd_bool_t signalled; /**< Wake-up (already) signalled. */ + } rkmq_wakeup; +} rd_kafka_msgq_t; + +#define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \ + { .rkmq_msgs = TAILQ_HEAD_INITIALIZER((rkmq).rkmq_msgs) } + +#define RD_KAFKA_MSGQ_FOREACH(elm, head) \ + TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link) + +/* @brief Check if queue is empty. Proper locks must be held. */ +#define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs) + +/** + * Returns the number of messages in the specified queue. + */ +static RD_INLINE RD_UNUSED int rd_kafka_msgq_len(const rd_kafka_msgq_t *rkmq) { + return (int)rkmq->rkmq_msg_cnt; +} + +/** + * Returns the total number of bytes in the specified queue. + */ +static RD_INLINE RD_UNUSED size_t +rd_kafka_msgq_size(const rd_kafka_msgq_t *rkmq) { + return (size_t)rkmq->rkmq_msg_bytes; +} + + +void rd_kafka_msg_destroy(rd_kafka_t *rk, rd_kafka_msg_t *rkm); + +int rd_kafka_msg_new(rd_kafka_topic_t *rkt, + int32_t force_partition, + int msgflags, + char *payload, + size_t len, + const void *keydata, + size_t keylen, + void *msg_opaque); + +static RD_INLINE RD_UNUSED void rd_kafka_msgq_init(rd_kafka_msgq_t *rkmq) { + TAILQ_INIT(&rkmq->rkmq_msgs); + rkmq->rkmq_msg_cnt = 0; + rkmq->rkmq_msg_bytes = 0; +} + +#if ENABLE_DEVEL +#define rd_kafka_msgq_verify_order(rktp, rkmq, exp_first_msgid, gapless) \ + rd_kafka_msgq_verify_order0(__FUNCTION__, __LINE__, rktp, rkmq, \ + exp_first_msgid, gapless) +#else +#define rd_kafka_msgq_verify_order(rktp, rkmq, exp_first_msgid, gapless) \ + do { \ + } while (0) +#endif + +void rd_kafka_msgq_verify_order0(const char *function, + int line, + const struct rd_kafka_toppar_s *rktp, + const rd_kafka_msgq_t *rkmq, + uint64_t exp_first_msgid, + rd_bool_t gapless); + + +/** + * Concat all elements of 'src' onto tail of 'dst'. + * 'src' will be cleared. + * Proper locks for 'src' and 'dst' must be held. + */ +static RD_INLINE RD_UNUSED void rd_kafka_msgq_concat(rd_kafka_msgq_t *dst, + rd_kafka_msgq_t *src) { + TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link); + dst->rkmq_msg_cnt += src->rkmq_msg_cnt; + dst->rkmq_msg_bytes += src->rkmq_msg_bytes; + rd_kafka_msgq_init(src); + rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); +} + +/** + * Move queue 'src' to 'dst' (overwrites dst) + * Source will be cleared. + */ +static RD_INLINE RD_UNUSED void rd_kafka_msgq_move(rd_kafka_msgq_t *dst, + rd_kafka_msgq_t *src) { + TAILQ_MOVE(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link); + dst->rkmq_msg_cnt = src->rkmq_msg_cnt; + dst->rkmq_msg_bytes = src->rkmq_msg_bytes; + rd_kafka_msgq_init(src); + rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); +} + + +/** + * @brief Prepend all elements of \ src onto head of \p dst. + * \p src will be cleared/re-initialized. + * + * @locks proper locks for \p src and \p dst MUST be held. + */ +static RD_INLINE RD_UNUSED void rd_kafka_msgq_prepend(rd_kafka_msgq_t *dst, + rd_kafka_msgq_t *src) { + rd_kafka_msgq_concat(src, dst); + rd_kafka_msgq_move(dst, src); + rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); +} + + +/** + * rd_free all msgs in msgq and reinitialize the msgq. + */ +static RD_INLINE RD_UNUSED void rd_kafka_msgq_purge(rd_kafka_t *rk, + rd_kafka_msgq_t *rkmq) { + rd_kafka_msg_t *rkm, *next; + + next = TAILQ_FIRST(&rkmq->rkmq_msgs); + while (next) { + rkm = next; + next = TAILQ_NEXT(next, rkm_link); + + rd_kafka_msg_destroy(rk, rkm); + } + + rd_kafka_msgq_init(rkmq); +} + + +/** + * Remove message from message queue + */ +static RD_INLINE RD_UNUSED rd_kafka_msg_t * +rd_kafka_msgq_deq(rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm, int do_count) { + if (likely(do_count)) { + rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0); + rd_kafka_assert(NULL, + rkmq->rkmq_msg_bytes >= + (int64_t)(rkm->rkm_len + rkm->rkm_key_len)); + rkmq->rkmq_msg_cnt--; + rkmq->rkmq_msg_bytes -= rkm->rkm_len + rkm->rkm_key_len; + } + + TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link); + + return rkm; +} + +static RD_INLINE RD_UNUSED rd_kafka_msg_t * +rd_kafka_msgq_pop(rd_kafka_msgq_t *rkmq) { + rd_kafka_msg_t *rkm; + + if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)))) + rd_kafka_msgq_deq(rkmq, rkm, 1); + + return rkm; +} + + +/** + * @returns the first message in the queue, or NULL if empty. + * + * @locks caller's responsibility + */ +static RD_INLINE RD_UNUSED rd_kafka_msg_t * +rd_kafka_msgq_first(const rd_kafka_msgq_t *rkmq) { + return TAILQ_FIRST(&rkmq->rkmq_msgs); +} + +/** + * @returns the last message in the queue, or NULL if empty. + * + * @locks caller's responsibility + */ +static RD_INLINE RD_UNUSED rd_kafka_msg_t * +rd_kafka_msgq_last(const rd_kafka_msgq_t *rkmq) { + return TAILQ_LAST(&rkmq->rkmq_msgs, rd_kafka_msgs_head_s); +} + + +/** + * @returns the MsgId of the first message in the queue, or 0 if empty. + * + * @locks caller's responsibility + */ +static RD_INLINE RD_UNUSED uint64_t +rd_kafka_msgq_first_msgid(const rd_kafka_msgq_t *rkmq) { + const rd_kafka_msg_t *rkm = TAILQ_FIRST(&rkmq->rkmq_msgs); + if (rkm) + return rkm->rkm_u.producer.msgid; + else + return 0; +} + + + +rd_bool_t rd_kafka_msgq_allow_wakeup_at(rd_kafka_msgq_t *rkmq, + const rd_kafka_msgq_t *dest_rkmq, + rd_ts_t *next_wakeup, + rd_ts_t now, + rd_ts_t linger_us, + int32_t batch_msg_cnt, + int64_t batch_msg_bytes); + +/** + * @returns true if msgq may be awoken. + */ + +static RD_INLINE RD_UNUSED rd_bool_t +rd_kafka_msgq_may_wakeup(const rd_kafka_msgq_t *rkmq, rd_ts_t now) { + /* No: Wakeup already signalled */ + if (rkmq->rkmq_wakeup.signalled) + return rd_false; + + /* Yes: Wakeup linger time has expired */ + if (now >= rkmq->rkmq_wakeup.abstime) + return rd_true; + + /* Yes: First message enqueued may trigger wakeup */ + if (rkmq->rkmq_msg_cnt == 1 && rkmq->rkmq_wakeup.on_first) + return rd_true; + + /* Yes: batch.size or batch.num.messages exceeded */ + if (rkmq->rkmq_msg_cnt >= rkmq->rkmq_wakeup.msg_cnt || + rkmq->rkmq_msg_bytes > rkmq->rkmq_wakeup.msg_bytes) + return rd_true; + + /* No */ + return rd_false; +} + + +/** + * @brief Message ordering comparator using the message id + * number to order messages in ascending order (FIFO). + */ +static RD_INLINE int rd_kafka_msg_cmp_msgid(const void *_a, const void *_b) { + const rd_kafka_msg_t *a = _a, *b = _b; + + rd_dassert(a->rkm_u.producer.msgid); + + return RD_CMP(a->rkm_u.producer.msgid, b->rkm_u.producer.msgid); +} + +/** + * @brief Message ordering comparator using the message id + * number to order messages in descending order (LIFO). + */ +static RD_INLINE int rd_kafka_msg_cmp_msgid_lifo(const void *_a, + const void *_b) { + const rd_kafka_msg_t *a = _a, *b = _b; + + rd_dassert(a->rkm_u.producer.msgid); + + return RD_CMP(b->rkm_u.producer.msgid, a->rkm_u.producer.msgid); +} + + +/** + * @brief Insert message at its sorted position using the msgid. + * @remark This is an O(n) operation. + * @warning The message must have a msgid set. + * @returns the message count of the queue after enqueuing the message. + */ +int rd_kafka_msgq_enq_sorted0(rd_kafka_msgq_t *rkmq, + rd_kafka_msg_t *rkm, + int (*order_cmp)(const void *, const void *)); + +/** + * @brief Insert message at its sorted position using the msgid. + * @remark This is an O(n) operation. + * @warning The message must have a msgid set. + * @returns the message count of the queue after enqueuing the message. + */ +int rd_kafka_msgq_enq_sorted(const rd_kafka_topic_t *rkt, + rd_kafka_msgq_t *rkmq, + rd_kafka_msg_t *rkm); + +/** + * Insert message at head of message queue. + */ +static RD_INLINE RD_UNUSED void rd_kafka_msgq_insert(rd_kafka_msgq_t *rkmq, + rd_kafka_msg_t *rkm) { + TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link); + rkmq->rkmq_msg_cnt++; + rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len; +} + +/** + * Append message to tail of message queue. + */ +static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq(rd_kafka_msgq_t *rkmq, + rd_kafka_msg_t *rkm) { + TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link); + rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len; + return (int)++rkmq->rkmq_msg_cnt; +} + + +/** + * @returns true if the MsgId extents (first, last) in the two queues overlap. + */ +static RD_INLINE RD_UNUSED rd_bool_t +rd_kafka_msgq_overlap(const rd_kafka_msgq_t *a, const rd_kafka_msgq_t *b) { + const rd_kafka_msg_t *fa, *la, *fb, *lb; + + if (RD_KAFKA_MSGQ_EMPTY(a) || RD_KAFKA_MSGQ_EMPTY(b)) + return rd_false; + + fa = rd_kafka_msgq_first(a); + fb = rd_kafka_msgq_first(b); + la = rd_kafka_msgq_last(a); + lb = rd_kafka_msgq_last(b); + + return (rd_bool_t)( + fa->rkm_u.producer.msgid <= lb->rkm_u.producer.msgid && + fb->rkm_u.producer.msgid <= la->rkm_u.producer.msgid); +} + +/** + * Scans a message queue for timed out messages and removes them from + * 'rkmq' and adds them to 'timedout', returning the number of timed out + * messages. + * 'timedout' must be initialized. + */ +int rd_kafka_msgq_age_scan(struct rd_kafka_toppar_s *rktp, + rd_kafka_msgq_t *rkmq, + rd_kafka_msgq_t *timedout, + rd_ts_t now, + rd_ts_t *abs_next_timeout); + +void rd_kafka_msgq_split(rd_kafka_msgq_t *leftq, + rd_kafka_msgq_t *rightq, + rd_kafka_msg_t *first_right, + int cnt, + int64_t bytes); + +rd_kafka_msg_t *rd_kafka_msgq_find_pos(const rd_kafka_msgq_t *rkmq, + const rd_kafka_msg_t *start_pos, + const rd_kafka_msg_t *rkm, + int (*cmp)(const void *, const void *), + int *cntp, + int64_t *bytesp); + +void rd_kafka_msgq_set_metadata(rd_kafka_msgq_t *rkmq, + int32_t broker_id, + int64_t base_offset, + int64_t timestamp, + rd_kafka_msg_status_t status); + +void rd_kafka_msgq_move_acked(rd_kafka_msgq_t *dest, + rd_kafka_msgq_t *src, + uint64_t last_msgid, + rd_kafka_msg_status_t status); + +int rd_kafka_msg_partitioner(rd_kafka_topic_t *rkt, + rd_kafka_msg_t *rkm, + rd_dolock_t do_lock); + + +rd_kafka_message_t *rd_kafka_message_get(struct rd_kafka_op_s *rko); +rd_kafka_message_t *rd_kafka_message_get_from_rkm(struct rd_kafka_op_s *rko, + rd_kafka_msg_t *rkm); +rd_kafka_message_t *rd_kafka_message_new(void); + + +/** + * @returns a (possibly) wrapped Kafka protocol message sequence counter + * for the non-overflowing \p seq. + */ +static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap(int64_t seq) { + return (int32_t)(seq & (int64_t)INT32_MAX); +} + +void rd_kafka_msgq_dump(FILE *fp, const char *what, rd_kafka_msgq_t *rkmq); + +rd_kafka_msg_t *ut_rd_kafka_msg_new(size_t msgsize); +void ut_rd_kafka_msgq_purge(rd_kafka_msgq_t *rkmq); +int unittest_msg(void); + +#endif /* _RDKAFKA_MSG_H_ */ |