diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h | 583 |
1 files changed, 0 insertions, 583 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h deleted file mode 100644 index 877fac15..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h +++ /dev/null @@ -1,583 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012,2013 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _RDKAFKA_MSG_H_ -#define _RDKAFKA_MSG_H_ - -#include "rdsysqueue.h" - -#include "rdkafka_proto.h" -#include "rdkafka_header.h" - - -/** - * @brief Internal RD_KAFKA_MSG_F_.. flags - */ -#define RD_KAFKA_MSG_F_RKT_RDLOCKED 0x100000 /* rkt is rdlock():ed */ - - -/** - * @brief Message.MsgAttributes for MsgVersion v0..v1, - * also used for MessageSet.Attributes for MsgVersion v2. - */ -#define RD_KAFKA_MSG_ATTR_GZIP (1 << 0) -#define RD_KAFKA_MSG_ATTR_SNAPPY (1 << 1) -#define RD_KAFKA_MSG_ATTR_LZ4 (3) -#define RD_KAFKA_MSG_ATTR_ZSTD (4) -#define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x7 -#define RD_KAFKA_MSG_ATTR_CREATE_TIME (0 << 3) -#define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME (1 << 3) - -/** - * @brief MessageSet.Attributes for MsgVersion v2 - * - * Attributes: - * ------------------------------------------------------------------------------------------------- - * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | - * Compression Type (0-2) | - * ------------------------------------------------------------------------------------------------- - */ -/* Compression types same as MsgVersion 0 above */ -/* Timestamp type same as MsgVersion 0 above */ -#define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4) -#define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5) - - -typedef struct rd_kafka_msg_s { - rd_kafka_message_t rkm_rkmessage; /* MUST be first field */ -#define rkm_len rkm_rkmessage.len -#define rkm_payload rkm_rkmessage.payload -#define rkm_opaque rkm_rkmessage._private -#define rkm_partition rkm_rkmessage.partition -#define rkm_offset rkm_rkmessage.offset -#define rkm_key rkm_rkmessage.key -#define rkm_key_len rkm_rkmessage.key_len -#define rkm_err rkm_rkmessage.err - - TAILQ_ENTRY(rd_kafka_msg_s) rkm_link; - - int rkm_flags; - /* @remark These additional flags must not collide with - * the RD_KAFKA_MSG_F_* flags in rdkafka.h */ -#define RD_KAFKA_MSG_F_FREE_RKM 0x10000 /* msg_t is allocated */ -#define RD_KAFKA_MSG_F_ACCOUNT 0x20000 /* accounted for in curr_msgs */ -#define RD_KAFKA_MSG_F_PRODUCER 0x40000 /* Producer message */ -#define RD_KAFKA_MSG_F_CONTROL 0x80000 /* Control message */ - - rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */ - int64_t rkm_timestamp; /* Message format V1. - * Meaning of timestamp depends on - * message Attribute LogAppendtime (broker) - * or CreateTime (producer). - * Unit is milliseconds since epoch (UTC).*/ - - - rd_kafka_headers_t *rkm_headers; /**< Parsed headers list, if any. */ - - rd_kafka_msg_status_t rkm_status; /**< Persistence status. Updated in - * the ProduceResponse handler: - * this value is always up to date. - */ - int32_t rkm_broker_id; /**< Broker message was produced to - * or fetched from. */ - - union { - struct { - rd_ts_t ts_timeout; /* Message timeout */ - rd_ts_t ts_enq; /* Enqueue/Produce time */ - rd_ts_t ts_backoff; /* Backoff next Produce until - * this time. */ - uint64_t msgid; /**< Message sequencial id, - * used to maintain ordering. - * Starts at 1. */ - uint64_t last_msgid; /**< On retry this is set - * on the first message - * in a batch to point - * out the last message - * of the batch so that - * the batch can be - * identically reconstructed. - */ - int retries; /* Number of retries so far */ - } producer; -#define rkm_ts_timeout rkm_u.producer.ts_timeout -#define rkm_ts_enq rkm_u.producer.ts_enq -#define rkm_msgid rkm_u.producer.msgid - - struct { - rd_kafkap_bytes_t binhdrs; /**< Unparsed - * binary headers in - * protocol msg */ - int32_t leader_epoch; /**< Leader epoch at the time - * the message was fetched. */ - } consumer; - } rkm_u; -} rd_kafka_msg_t; - -TAILQ_HEAD(rd_kafka_msg_head_s, rd_kafka_msg_s); - - -/** @returns the absolute time a message was enqueued (producer) */ -#define rd_kafka_msg_enq_time(rkm) ((rkm)->rkm_ts_enq) - -/** - * @returns the message's total maximum on-wire size. - * @remark Depending on message version (MagicByte) the actual size - * may be smaller. - */ -static RD_INLINE RD_UNUSED size_t -rd_kafka_msg_wire_size(const rd_kafka_msg_t *rkm, int MsgVersion) { - static const size_t overheads[] = { - [0] = RD_KAFKAP_MESSAGE_V0_OVERHEAD, - [1] = RD_KAFKAP_MESSAGE_V1_OVERHEAD, - [2] = RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD}; - size_t size; - rd_dassert(MsgVersion >= 0 && MsgVersion <= 2); - - size = overheads[MsgVersion] + rkm->rkm_len + rkm->rkm_key_len; - if (MsgVersion == 2 && rkm->rkm_headers) - size += rd_kafka_headers_serialized_size(rkm->rkm_headers); - - return size; -} - - -/** - * @returns the maximum total on-wire message size regardless of MsgVersion. - * - * @remark This does not account for the ProduceRequest, et.al, just the - * per-message overhead. - */ -static RD_INLINE RD_UNUSED size_t rd_kafka_msg_max_wire_size(size_t keylen, - size_t valuelen, - size_t hdrslen) { - return RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD + keylen + valuelen + hdrslen; -} - -/** - * @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t - * wrapped rd_kafka_message_t. - */ -static RD_INLINE RD_UNUSED rd_kafka_msg_t * -rd_kafka_message2msg(rd_kafka_message_t *rkmessage) { - return (rd_kafka_msg_t *)rkmessage; -} - - - -/** - * @brief Message queue with message and byte counters. - */ -TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s); -typedef struct rd_kafka_msgq_s { - struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */ - int32_t rkmq_msg_cnt; - int64_t rkmq_msg_bytes; - struct { - rd_ts_t abstime; /**< Allow wake-ups after this point in time.*/ - int32_t msg_cnt; /**< Signal wake-up when this message count - * is reached. */ - int64_t msg_bytes; /**< .. or when this byte count is - * reached. */ - rd_bool_t on_first; /**< Wake-up on first message enqueued - * regardless of .abstime. */ - rd_bool_t signalled; /**< Wake-up (already) signalled. */ - } rkmq_wakeup; -} rd_kafka_msgq_t; - -#define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \ - { .rkmq_msgs = TAILQ_HEAD_INITIALIZER((rkmq).rkmq_msgs) } - -#define RD_KAFKA_MSGQ_FOREACH(elm, head) \ - TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link) - -/* @brief Check if queue is empty. Proper locks must be held. */ -#define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs) - -/** - * Returns the number of messages in the specified queue. - */ -static RD_INLINE RD_UNUSED int rd_kafka_msgq_len(const rd_kafka_msgq_t *rkmq) { - return (int)rkmq->rkmq_msg_cnt; -} - -/** - * Returns the total number of bytes in the specified queue. - */ -static RD_INLINE RD_UNUSED size_t -rd_kafka_msgq_size(const rd_kafka_msgq_t *rkmq) { - return (size_t)rkmq->rkmq_msg_bytes; -} - - -void rd_kafka_msg_destroy(rd_kafka_t *rk, rd_kafka_msg_t *rkm); - -int rd_kafka_msg_new(rd_kafka_topic_t *rkt, - int32_t force_partition, - int msgflags, - char *payload, - size_t len, - const void *keydata, - size_t keylen, - void *msg_opaque); - -static RD_INLINE RD_UNUSED void rd_kafka_msgq_init(rd_kafka_msgq_t *rkmq) { - TAILQ_INIT(&rkmq->rkmq_msgs); - rkmq->rkmq_msg_cnt = 0; - rkmq->rkmq_msg_bytes = 0; -} - -#if ENABLE_DEVEL -#define rd_kafka_msgq_verify_order(rktp, rkmq, exp_first_msgid, gapless) \ - rd_kafka_msgq_verify_order0(__FUNCTION__, __LINE__, rktp, rkmq, \ - exp_first_msgid, gapless) -#else -#define rd_kafka_msgq_verify_order(rktp, rkmq, exp_first_msgid, gapless) \ - do { \ - } while (0) -#endif - -void rd_kafka_msgq_verify_order0(const char *function, - int line, - const struct rd_kafka_toppar_s *rktp, - const rd_kafka_msgq_t *rkmq, - uint64_t exp_first_msgid, - rd_bool_t gapless); - - -/** - * Concat all elements of 'src' onto tail of 'dst'. - * 'src' will be cleared. - * Proper locks for 'src' and 'dst' must be held. - */ -static RD_INLINE RD_UNUSED void rd_kafka_msgq_concat(rd_kafka_msgq_t *dst, - rd_kafka_msgq_t *src) { - TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link); - dst->rkmq_msg_cnt += src->rkmq_msg_cnt; - dst->rkmq_msg_bytes += src->rkmq_msg_bytes; - rd_kafka_msgq_init(src); - rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); -} - -/** - * Move queue 'src' to 'dst' (overwrites dst) - * Source will be cleared. - */ -static RD_INLINE RD_UNUSED void rd_kafka_msgq_move(rd_kafka_msgq_t *dst, - rd_kafka_msgq_t *src) { - TAILQ_MOVE(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link); - dst->rkmq_msg_cnt = src->rkmq_msg_cnt; - dst->rkmq_msg_bytes = src->rkmq_msg_bytes; - rd_kafka_msgq_init(src); - rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); -} - - -/** - * @brief Prepend all elements of \ src onto head of \p dst. - * \p src will be cleared/re-initialized. - * - * @locks proper locks for \p src and \p dst MUST be held. - */ -static RD_INLINE RD_UNUSED void rd_kafka_msgq_prepend(rd_kafka_msgq_t *dst, - rd_kafka_msgq_t *src) { - rd_kafka_msgq_concat(src, dst); - rd_kafka_msgq_move(dst, src); - rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); -} - - -/** - * rd_free all msgs in msgq and reinitialize the msgq. - */ -static RD_INLINE RD_UNUSED void rd_kafka_msgq_purge(rd_kafka_t *rk, - rd_kafka_msgq_t *rkmq) { - rd_kafka_msg_t *rkm, *next; - - next = TAILQ_FIRST(&rkmq->rkmq_msgs); - while (next) { - rkm = next; - next = TAILQ_NEXT(next, rkm_link); - - rd_kafka_msg_destroy(rk, rkm); - } - - rd_kafka_msgq_init(rkmq); -} - - -/** - * Remove message from message queue - */ -static RD_INLINE RD_UNUSED rd_kafka_msg_t * -rd_kafka_msgq_deq(rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm, int do_count) { - if (likely(do_count)) { - rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0); - rd_kafka_assert(NULL, - rkmq->rkmq_msg_bytes >= - (int64_t)(rkm->rkm_len + rkm->rkm_key_len)); - rkmq->rkmq_msg_cnt--; - rkmq->rkmq_msg_bytes -= rkm->rkm_len + rkm->rkm_key_len; - } - - TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link); - - return rkm; -} - -static RD_INLINE RD_UNUSED rd_kafka_msg_t * -rd_kafka_msgq_pop(rd_kafka_msgq_t *rkmq) { - rd_kafka_msg_t *rkm; - - if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)))) - rd_kafka_msgq_deq(rkmq, rkm, 1); - - return rkm; -} - - -/** - * @returns the first message in the queue, or NULL if empty. - * - * @locks caller's responsibility - */ -static RD_INLINE RD_UNUSED rd_kafka_msg_t * -rd_kafka_msgq_first(const rd_kafka_msgq_t *rkmq) { - return TAILQ_FIRST(&rkmq->rkmq_msgs); -} - -/** - * @returns the last message in the queue, or NULL if empty. - * - * @locks caller's responsibility - */ -static RD_INLINE RD_UNUSED rd_kafka_msg_t * -rd_kafka_msgq_last(const rd_kafka_msgq_t *rkmq) { - return TAILQ_LAST(&rkmq->rkmq_msgs, rd_kafka_msgs_head_s); -} - - -/** - * @returns the MsgId of the first message in the queue, or 0 if empty. - * - * @locks caller's responsibility - */ -static RD_INLINE RD_UNUSED uint64_t -rd_kafka_msgq_first_msgid(const rd_kafka_msgq_t *rkmq) { - const rd_kafka_msg_t *rkm = TAILQ_FIRST(&rkmq->rkmq_msgs); - if (rkm) - return rkm->rkm_u.producer.msgid; - else - return 0; -} - - - -rd_bool_t rd_kafka_msgq_allow_wakeup_at(rd_kafka_msgq_t *rkmq, - const rd_kafka_msgq_t *dest_rkmq, - rd_ts_t *next_wakeup, - rd_ts_t now, - rd_ts_t linger_us, - int32_t batch_msg_cnt, - int64_t batch_msg_bytes); - -/** - * @returns true if msgq may be awoken. - */ - -static RD_INLINE RD_UNUSED rd_bool_t -rd_kafka_msgq_may_wakeup(const rd_kafka_msgq_t *rkmq, rd_ts_t now) { - /* No: Wakeup already signalled */ - if (rkmq->rkmq_wakeup.signalled) - return rd_false; - - /* Yes: Wakeup linger time has expired */ - if (now >= rkmq->rkmq_wakeup.abstime) - return rd_true; - - /* Yes: First message enqueued may trigger wakeup */ - if (rkmq->rkmq_msg_cnt == 1 && rkmq->rkmq_wakeup.on_first) - return rd_true; - - /* Yes: batch.size or batch.num.messages exceeded */ - if (rkmq->rkmq_msg_cnt >= rkmq->rkmq_wakeup.msg_cnt || - rkmq->rkmq_msg_bytes > rkmq->rkmq_wakeup.msg_bytes) - return rd_true; - - /* No */ - return rd_false; -} - - -/** - * @brief Message ordering comparator using the message id - * number to order messages in ascending order (FIFO). - */ -static RD_INLINE int rd_kafka_msg_cmp_msgid(const void *_a, const void *_b) { - const rd_kafka_msg_t *a = _a, *b = _b; - - rd_dassert(a->rkm_u.producer.msgid); - - return RD_CMP(a->rkm_u.producer.msgid, b->rkm_u.producer.msgid); -} - -/** - * @brief Message ordering comparator using the message id - * number to order messages in descending order (LIFO). - */ -static RD_INLINE int rd_kafka_msg_cmp_msgid_lifo(const void *_a, - const void *_b) { - const rd_kafka_msg_t *a = _a, *b = _b; - - rd_dassert(a->rkm_u.producer.msgid); - - return RD_CMP(b->rkm_u.producer.msgid, a->rkm_u.producer.msgid); -} - - -/** - * @brief Insert message at its sorted position using the msgid. - * @remark This is an O(n) operation. - * @warning The message must have a msgid set. - * @returns the message count of the queue after enqueuing the message. - */ -int rd_kafka_msgq_enq_sorted0(rd_kafka_msgq_t *rkmq, - rd_kafka_msg_t *rkm, - int (*order_cmp)(const void *, const void *)); - -/** - * @brief Insert message at its sorted position using the msgid. - * @remark This is an O(n) operation. - * @warning The message must have a msgid set. - * @returns the message count of the queue after enqueuing the message. - */ -int rd_kafka_msgq_enq_sorted(const rd_kafka_topic_t *rkt, - rd_kafka_msgq_t *rkmq, - rd_kafka_msg_t *rkm); - -/** - * Insert message at head of message queue. - */ -static RD_INLINE RD_UNUSED void rd_kafka_msgq_insert(rd_kafka_msgq_t *rkmq, - rd_kafka_msg_t *rkm) { - TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link); - rkmq->rkmq_msg_cnt++; - rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len; -} - -/** - * Append message to tail of message queue. - */ -static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq(rd_kafka_msgq_t *rkmq, - rd_kafka_msg_t *rkm) { - TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link); - rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len; - return (int)++rkmq->rkmq_msg_cnt; -} - - -/** - * @returns true if the MsgId extents (first, last) in the two queues overlap. - */ -static RD_INLINE RD_UNUSED rd_bool_t -rd_kafka_msgq_overlap(const rd_kafka_msgq_t *a, const rd_kafka_msgq_t *b) { - const rd_kafka_msg_t *fa, *la, *fb, *lb; - - if (RD_KAFKA_MSGQ_EMPTY(a) || RD_KAFKA_MSGQ_EMPTY(b)) - return rd_false; - - fa = rd_kafka_msgq_first(a); - fb = rd_kafka_msgq_first(b); - la = rd_kafka_msgq_last(a); - lb = rd_kafka_msgq_last(b); - - return (rd_bool_t)( - fa->rkm_u.producer.msgid <= lb->rkm_u.producer.msgid && - fb->rkm_u.producer.msgid <= la->rkm_u.producer.msgid); -} - -/** - * Scans a message queue for timed out messages and removes them from - * 'rkmq' and adds them to 'timedout', returning the number of timed out - * messages. - * 'timedout' must be initialized. - */ -int rd_kafka_msgq_age_scan(struct rd_kafka_toppar_s *rktp, - rd_kafka_msgq_t *rkmq, - rd_kafka_msgq_t *timedout, - rd_ts_t now, - rd_ts_t *abs_next_timeout); - -void rd_kafka_msgq_split(rd_kafka_msgq_t *leftq, - rd_kafka_msgq_t *rightq, - rd_kafka_msg_t *first_right, - int cnt, - int64_t bytes); - -rd_kafka_msg_t *rd_kafka_msgq_find_pos(const rd_kafka_msgq_t *rkmq, - const rd_kafka_msg_t *start_pos, - const rd_kafka_msg_t *rkm, - int (*cmp)(const void *, const void *), - int *cntp, - int64_t *bytesp); - -void rd_kafka_msgq_set_metadata(rd_kafka_msgq_t *rkmq, - int32_t broker_id, - int64_t base_offset, - int64_t timestamp, - rd_kafka_msg_status_t status); - -void rd_kafka_msgq_move_acked(rd_kafka_msgq_t *dest, - rd_kafka_msgq_t *src, - uint64_t last_msgid, - rd_kafka_msg_status_t status); - -int rd_kafka_msg_partitioner(rd_kafka_topic_t *rkt, - rd_kafka_msg_t *rkm, - rd_dolock_t do_lock); - - -rd_kafka_message_t *rd_kafka_message_get(struct rd_kafka_op_s *rko); -rd_kafka_message_t *rd_kafka_message_get_from_rkm(struct rd_kafka_op_s *rko, - rd_kafka_msg_t *rkm); -rd_kafka_message_t *rd_kafka_message_new(void); - - -/** - * @returns a (possibly) wrapped Kafka protocol message sequence counter - * for the non-overflowing \p seq. - */ -static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap(int64_t seq) { - return (int32_t)(seq & (int64_t)INT32_MAX); -} - -void rd_kafka_msgq_dump(FILE *fp, const char *what, rd_kafka_msgq_t *rkmq); - -rd_kafka_msg_t *ut_rd_kafka_msg_new(size_t msgsize); -void ut_rd_kafka_msgq_purge(rd_kafka_msgq_t *rkmq); -int unittest_msg(void); - -#endif /* _RDKAFKA_MSG_H_ */ |