summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h583
1 files changed, 0 insertions, 583 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h
deleted file mode 100644
index 877fac15..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.h
+++ /dev/null
@@ -1,583 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- * PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#ifndef _RDKAFKA_MSG_H_
-#define _RDKAFKA_MSG_H_
-
-#include "rdsysqueue.h"
-
-#include "rdkafka_proto.h"
-#include "rdkafka_header.h"
-
-
-/**
- * @brief Internal RD_KAFKA_MSG_F_.. flags
- */
-#define RD_KAFKA_MSG_F_RKT_RDLOCKED 0x100000 /* rkt is rdlock():ed */
-
-
-/**
- * @brief Message.MsgAttributes for MsgVersion v0..v1,
- * also used for MessageSet.Attributes for MsgVersion v2.
- */
-#define RD_KAFKA_MSG_ATTR_GZIP (1 << 0)
-#define RD_KAFKA_MSG_ATTR_SNAPPY (1 << 1)
-#define RD_KAFKA_MSG_ATTR_LZ4 (3)
-#define RD_KAFKA_MSG_ATTR_ZSTD (4)
-#define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x7
-#define RD_KAFKA_MSG_ATTR_CREATE_TIME (0 << 3)
-#define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME (1 << 3)
-
-/**
- * @brief MessageSet.Attributes for MsgVersion v2
- *
- * Attributes:
- * -------------------------------------------------------------------------------------------------
- * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) |
- * Compression Type (0-2) |
- * -------------------------------------------------------------------------------------------------
- */
-/* Compression types same as MsgVersion 0 above */
-/* Timestamp type same as MsgVersion 0 above */
-#define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4)
-#define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5)
-
-
-typedef struct rd_kafka_msg_s {
- rd_kafka_message_t rkm_rkmessage; /* MUST be first field */
-#define rkm_len rkm_rkmessage.len
-#define rkm_payload rkm_rkmessage.payload
-#define rkm_opaque rkm_rkmessage._private
-#define rkm_partition rkm_rkmessage.partition
-#define rkm_offset rkm_rkmessage.offset
-#define rkm_key rkm_rkmessage.key
-#define rkm_key_len rkm_rkmessage.key_len
-#define rkm_err rkm_rkmessage.err
-
- TAILQ_ENTRY(rd_kafka_msg_s) rkm_link;
-
- int rkm_flags;
- /* @remark These additional flags must not collide with
- * the RD_KAFKA_MSG_F_* flags in rdkafka.h */
-#define RD_KAFKA_MSG_F_FREE_RKM 0x10000 /* msg_t is allocated */
-#define RD_KAFKA_MSG_F_ACCOUNT 0x20000 /* accounted for in curr_msgs */
-#define RD_KAFKA_MSG_F_PRODUCER 0x40000 /* Producer message */
-#define RD_KAFKA_MSG_F_CONTROL 0x80000 /* Control message */
-
- rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */
- int64_t rkm_timestamp; /* Message format V1.
- * Meaning of timestamp depends on
- * message Attribute LogAppendtime (broker)
- * or CreateTime (producer).
- * Unit is milliseconds since epoch (UTC).*/
-
-
- rd_kafka_headers_t *rkm_headers; /**< Parsed headers list, if any. */
-
- rd_kafka_msg_status_t rkm_status; /**< Persistence status. Updated in
- * the ProduceResponse handler:
- * this value is always up to date.
- */
- int32_t rkm_broker_id; /**< Broker message was produced to
- * or fetched from. */
-
- union {
- struct {
- rd_ts_t ts_timeout; /* Message timeout */
- rd_ts_t ts_enq; /* Enqueue/Produce time */
- rd_ts_t ts_backoff; /* Backoff next Produce until
- * this time. */
- uint64_t msgid; /**< Message sequencial id,
- * used to maintain ordering.
- * Starts at 1. */
- uint64_t last_msgid; /**< On retry this is set
- * on the first message
- * in a batch to point
- * out the last message
- * of the batch so that
- * the batch can be
- * identically reconstructed.
- */
- int retries; /* Number of retries so far */
- } producer;
-#define rkm_ts_timeout rkm_u.producer.ts_timeout
-#define rkm_ts_enq rkm_u.producer.ts_enq
-#define rkm_msgid rkm_u.producer.msgid
-
- struct {
- rd_kafkap_bytes_t binhdrs; /**< Unparsed
- * binary headers in
- * protocol msg */
- int32_t leader_epoch; /**< Leader epoch at the time
- * the message was fetched. */
- } consumer;
- } rkm_u;
-} rd_kafka_msg_t;
-
-TAILQ_HEAD(rd_kafka_msg_head_s, rd_kafka_msg_s);
-
-
-/** @returns the absolute time a message was enqueued (producer) */
-#define rd_kafka_msg_enq_time(rkm) ((rkm)->rkm_ts_enq)
-
-/**
- * @returns the message's total maximum on-wire size.
- * @remark Depending on message version (MagicByte) the actual size
- * may be smaller.
- */
-static RD_INLINE RD_UNUSED size_t
-rd_kafka_msg_wire_size(const rd_kafka_msg_t *rkm, int MsgVersion) {
- static const size_t overheads[] = {
- [0] = RD_KAFKAP_MESSAGE_V0_OVERHEAD,
- [1] = RD_KAFKAP_MESSAGE_V1_OVERHEAD,
- [2] = RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD};
- size_t size;
- rd_dassert(MsgVersion >= 0 && MsgVersion <= 2);
-
- size = overheads[MsgVersion] + rkm->rkm_len + rkm->rkm_key_len;
- if (MsgVersion == 2 && rkm->rkm_headers)
- size += rd_kafka_headers_serialized_size(rkm->rkm_headers);
-
- return size;
-}
-
-
-/**
- * @returns the maximum total on-wire message size regardless of MsgVersion.
- *
- * @remark This does not account for the ProduceRequest, et.al, just the
- * per-message overhead.
- */
-static RD_INLINE RD_UNUSED size_t rd_kafka_msg_max_wire_size(size_t keylen,
- size_t valuelen,
- size_t hdrslen) {
- return RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD + keylen + valuelen + hdrslen;
-}
-
-/**
- * @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t
- * wrapped rd_kafka_message_t.
- */
-static RD_INLINE RD_UNUSED rd_kafka_msg_t *
-rd_kafka_message2msg(rd_kafka_message_t *rkmessage) {
- return (rd_kafka_msg_t *)rkmessage;
-}
-
-
-
-/**
- * @brief Message queue with message and byte counters.
- */
-TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s);
-typedef struct rd_kafka_msgq_s {
- struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */
- int32_t rkmq_msg_cnt;
- int64_t rkmq_msg_bytes;
- struct {
- rd_ts_t abstime; /**< Allow wake-ups after this point in time.*/
- int32_t msg_cnt; /**< Signal wake-up when this message count
- * is reached. */
- int64_t msg_bytes; /**< .. or when this byte count is
- * reached. */
- rd_bool_t on_first; /**< Wake-up on first message enqueued
- * regardless of .abstime. */
- rd_bool_t signalled; /**< Wake-up (already) signalled. */
- } rkmq_wakeup;
-} rd_kafka_msgq_t;
-
-#define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \
- { .rkmq_msgs = TAILQ_HEAD_INITIALIZER((rkmq).rkmq_msgs) }
-
-#define RD_KAFKA_MSGQ_FOREACH(elm, head) \
- TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link)
-
-/* @brief Check if queue is empty. Proper locks must be held. */
-#define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs)
-
-/**
- * Returns the number of messages in the specified queue.
- */
-static RD_INLINE RD_UNUSED int rd_kafka_msgq_len(const rd_kafka_msgq_t *rkmq) {
- return (int)rkmq->rkmq_msg_cnt;
-}
-
-/**
- * Returns the total number of bytes in the specified queue.
- */
-static RD_INLINE RD_UNUSED size_t
-rd_kafka_msgq_size(const rd_kafka_msgq_t *rkmq) {
- return (size_t)rkmq->rkmq_msg_bytes;
-}
-
-
-void rd_kafka_msg_destroy(rd_kafka_t *rk, rd_kafka_msg_t *rkm);
-
-int rd_kafka_msg_new(rd_kafka_topic_t *rkt,
- int32_t force_partition,
- int msgflags,
- char *payload,
- size_t len,
- const void *keydata,
- size_t keylen,
- void *msg_opaque);
-
-static RD_INLINE RD_UNUSED void rd_kafka_msgq_init(rd_kafka_msgq_t *rkmq) {
- TAILQ_INIT(&rkmq->rkmq_msgs);
- rkmq->rkmq_msg_cnt = 0;
- rkmq->rkmq_msg_bytes = 0;
-}
-
-#if ENABLE_DEVEL
-#define rd_kafka_msgq_verify_order(rktp, rkmq, exp_first_msgid, gapless) \
- rd_kafka_msgq_verify_order0(__FUNCTION__, __LINE__, rktp, rkmq, \
- exp_first_msgid, gapless)
-#else
-#define rd_kafka_msgq_verify_order(rktp, rkmq, exp_first_msgid, gapless) \
- do { \
- } while (0)
-#endif
-
-void rd_kafka_msgq_verify_order0(const char *function,
- int line,
- const struct rd_kafka_toppar_s *rktp,
- const rd_kafka_msgq_t *rkmq,
- uint64_t exp_first_msgid,
- rd_bool_t gapless);
-
-
-/**
- * Concat all elements of 'src' onto tail of 'dst'.
- * 'src' will be cleared.
- * Proper locks for 'src' and 'dst' must be held.
- */
-static RD_INLINE RD_UNUSED void rd_kafka_msgq_concat(rd_kafka_msgq_t *dst,
- rd_kafka_msgq_t *src) {
- TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
- dst->rkmq_msg_cnt += src->rkmq_msg_cnt;
- dst->rkmq_msg_bytes += src->rkmq_msg_bytes;
- rd_kafka_msgq_init(src);
- rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
-}
-
-/**
- * Move queue 'src' to 'dst' (overwrites dst)
- * Source will be cleared.
- */
-static RD_INLINE RD_UNUSED void rd_kafka_msgq_move(rd_kafka_msgq_t *dst,
- rd_kafka_msgq_t *src) {
- TAILQ_MOVE(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
- dst->rkmq_msg_cnt = src->rkmq_msg_cnt;
- dst->rkmq_msg_bytes = src->rkmq_msg_bytes;
- rd_kafka_msgq_init(src);
- rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
-}
-
-
-/**
- * @brief Prepend all elements of \ src onto head of \p dst.
- * \p src will be cleared/re-initialized.
- *
- * @locks proper locks for \p src and \p dst MUST be held.
- */
-static RD_INLINE RD_UNUSED void rd_kafka_msgq_prepend(rd_kafka_msgq_t *dst,
- rd_kafka_msgq_t *src) {
- rd_kafka_msgq_concat(src, dst);
- rd_kafka_msgq_move(dst, src);
- rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
-}
-
-
-/**
- * rd_free all msgs in msgq and reinitialize the msgq.
- */
-static RD_INLINE RD_UNUSED void rd_kafka_msgq_purge(rd_kafka_t *rk,
- rd_kafka_msgq_t *rkmq) {
- rd_kafka_msg_t *rkm, *next;
-
- next = TAILQ_FIRST(&rkmq->rkmq_msgs);
- while (next) {
- rkm = next;
- next = TAILQ_NEXT(next, rkm_link);
-
- rd_kafka_msg_destroy(rk, rkm);
- }
-
- rd_kafka_msgq_init(rkmq);
-}
-
-
-/**
- * Remove message from message queue
- */
-static RD_INLINE RD_UNUSED rd_kafka_msg_t *
-rd_kafka_msgq_deq(rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm, int do_count) {
- if (likely(do_count)) {
- rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0);
- rd_kafka_assert(NULL,
- rkmq->rkmq_msg_bytes >=
- (int64_t)(rkm->rkm_len + rkm->rkm_key_len));
- rkmq->rkmq_msg_cnt--;
- rkmq->rkmq_msg_bytes -= rkm->rkm_len + rkm->rkm_key_len;
- }
-
- TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link);
-
- return rkm;
-}
-
-static RD_INLINE RD_UNUSED rd_kafka_msg_t *
-rd_kafka_msgq_pop(rd_kafka_msgq_t *rkmq) {
- rd_kafka_msg_t *rkm;
-
- if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
- rd_kafka_msgq_deq(rkmq, rkm, 1);
-
- return rkm;
-}
-
-
-/**
- * @returns the first message in the queue, or NULL if empty.
- *
- * @locks caller's responsibility
- */
-static RD_INLINE RD_UNUSED rd_kafka_msg_t *
-rd_kafka_msgq_first(const rd_kafka_msgq_t *rkmq) {
- return TAILQ_FIRST(&rkmq->rkmq_msgs);
-}
-
-/**
- * @returns the last message in the queue, or NULL if empty.
- *
- * @locks caller's responsibility
- */
-static RD_INLINE RD_UNUSED rd_kafka_msg_t *
-rd_kafka_msgq_last(const rd_kafka_msgq_t *rkmq) {
- return TAILQ_LAST(&rkmq->rkmq_msgs, rd_kafka_msgs_head_s);
-}
-
-
-/**
- * @returns the MsgId of the first message in the queue, or 0 if empty.
- *
- * @locks caller's responsibility
- */
-static RD_INLINE RD_UNUSED uint64_t
-rd_kafka_msgq_first_msgid(const rd_kafka_msgq_t *rkmq) {
- const rd_kafka_msg_t *rkm = TAILQ_FIRST(&rkmq->rkmq_msgs);
- if (rkm)
- return rkm->rkm_u.producer.msgid;
- else
- return 0;
-}
-
-
-
-rd_bool_t rd_kafka_msgq_allow_wakeup_at(rd_kafka_msgq_t *rkmq,
- const rd_kafka_msgq_t *dest_rkmq,
- rd_ts_t *next_wakeup,
- rd_ts_t now,
- rd_ts_t linger_us,
- int32_t batch_msg_cnt,
- int64_t batch_msg_bytes);
-
-/**
- * @returns true if msgq may be awoken.
- */
-
-static RD_INLINE RD_UNUSED rd_bool_t
-rd_kafka_msgq_may_wakeup(const rd_kafka_msgq_t *rkmq, rd_ts_t now) {
- /* No: Wakeup already signalled */
- if (rkmq->rkmq_wakeup.signalled)
- return rd_false;
-
- /* Yes: Wakeup linger time has expired */
- if (now >= rkmq->rkmq_wakeup.abstime)
- return rd_true;
-
- /* Yes: First message enqueued may trigger wakeup */
- if (rkmq->rkmq_msg_cnt == 1 && rkmq->rkmq_wakeup.on_first)
- return rd_true;
-
- /* Yes: batch.size or batch.num.messages exceeded */
- if (rkmq->rkmq_msg_cnt >= rkmq->rkmq_wakeup.msg_cnt ||
- rkmq->rkmq_msg_bytes > rkmq->rkmq_wakeup.msg_bytes)
- return rd_true;
-
- /* No */
- return rd_false;
-}
-
-
-/**
- * @brief Message ordering comparator using the message id
- * number to order messages in ascending order (FIFO).
- */
-static RD_INLINE int rd_kafka_msg_cmp_msgid(const void *_a, const void *_b) {
- const rd_kafka_msg_t *a = _a, *b = _b;
-
- rd_dassert(a->rkm_u.producer.msgid);
-
- return RD_CMP(a->rkm_u.producer.msgid, b->rkm_u.producer.msgid);
-}
-
-/**
- * @brief Message ordering comparator using the message id
- * number to order messages in descending order (LIFO).
- */
-static RD_INLINE int rd_kafka_msg_cmp_msgid_lifo(const void *_a,
- const void *_b) {
- const rd_kafka_msg_t *a = _a, *b = _b;
-
- rd_dassert(a->rkm_u.producer.msgid);
-
- return RD_CMP(b->rkm_u.producer.msgid, a->rkm_u.producer.msgid);
-}
-
-
-/**
- * @brief Insert message at its sorted position using the msgid.
- * @remark This is an O(n) operation.
- * @warning The message must have a msgid set.
- * @returns the message count of the queue after enqueuing the message.
- */
-int rd_kafka_msgq_enq_sorted0(rd_kafka_msgq_t *rkmq,
- rd_kafka_msg_t *rkm,
- int (*order_cmp)(const void *, const void *));
-
-/**
- * @brief Insert message at its sorted position using the msgid.
- * @remark This is an O(n) operation.
- * @warning The message must have a msgid set.
- * @returns the message count of the queue after enqueuing the message.
- */
-int rd_kafka_msgq_enq_sorted(const rd_kafka_topic_t *rkt,
- rd_kafka_msgq_t *rkmq,
- rd_kafka_msg_t *rkm);
-
-/**
- * Insert message at head of message queue.
- */
-static RD_INLINE RD_UNUSED void rd_kafka_msgq_insert(rd_kafka_msgq_t *rkmq,
- rd_kafka_msg_t *rkm) {
- TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link);
- rkmq->rkmq_msg_cnt++;
- rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len;
-}
-
-/**
- * Append message to tail of message queue.
- */
-static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq(rd_kafka_msgq_t *rkmq,
- rd_kafka_msg_t *rkm) {
- TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
- rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len;
- return (int)++rkmq->rkmq_msg_cnt;
-}
-
-
-/**
- * @returns true if the MsgId extents (first, last) in the two queues overlap.
- */
-static RD_INLINE RD_UNUSED rd_bool_t
-rd_kafka_msgq_overlap(const rd_kafka_msgq_t *a, const rd_kafka_msgq_t *b) {
- const rd_kafka_msg_t *fa, *la, *fb, *lb;
-
- if (RD_KAFKA_MSGQ_EMPTY(a) || RD_KAFKA_MSGQ_EMPTY(b))
- return rd_false;
-
- fa = rd_kafka_msgq_first(a);
- fb = rd_kafka_msgq_first(b);
- la = rd_kafka_msgq_last(a);
- lb = rd_kafka_msgq_last(b);
-
- return (rd_bool_t)(
- fa->rkm_u.producer.msgid <= lb->rkm_u.producer.msgid &&
- fb->rkm_u.producer.msgid <= la->rkm_u.producer.msgid);
-}
-
-/**
- * Scans a message queue for timed out messages and removes them from
- * 'rkmq' and adds them to 'timedout', returning the number of timed out
- * messages.
- * 'timedout' must be initialized.
- */
-int rd_kafka_msgq_age_scan(struct rd_kafka_toppar_s *rktp,
- rd_kafka_msgq_t *rkmq,
- rd_kafka_msgq_t *timedout,
- rd_ts_t now,
- rd_ts_t *abs_next_timeout);
-
-void rd_kafka_msgq_split(rd_kafka_msgq_t *leftq,
- rd_kafka_msgq_t *rightq,
- rd_kafka_msg_t *first_right,
- int cnt,
- int64_t bytes);
-
-rd_kafka_msg_t *rd_kafka_msgq_find_pos(const rd_kafka_msgq_t *rkmq,
- const rd_kafka_msg_t *start_pos,
- const rd_kafka_msg_t *rkm,
- int (*cmp)(const void *, const void *),
- int *cntp,
- int64_t *bytesp);
-
-void rd_kafka_msgq_set_metadata(rd_kafka_msgq_t *rkmq,
- int32_t broker_id,
- int64_t base_offset,
- int64_t timestamp,
- rd_kafka_msg_status_t status);
-
-void rd_kafka_msgq_move_acked(rd_kafka_msgq_t *dest,
- rd_kafka_msgq_t *src,
- uint64_t last_msgid,
- rd_kafka_msg_status_t status);
-
-int rd_kafka_msg_partitioner(rd_kafka_topic_t *rkt,
- rd_kafka_msg_t *rkm,
- rd_dolock_t do_lock);
-
-
-rd_kafka_message_t *rd_kafka_message_get(struct rd_kafka_op_s *rko);
-rd_kafka_message_t *rd_kafka_message_get_from_rkm(struct rd_kafka_op_s *rko,
- rd_kafka_msg_t *rkm);
-rd_kafka_message_t *rd_kafka_message_new(void);
-
-
-/**
- * @returns a (possibly) wrapped Kafka protocol message sequence counter
- * for the non-overflowing \p seq.
- */
-static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap(int64_t seq) {
- return (int32_t)(seq & (int64_t)INT32_MAX);
-}
-
-void rd_kafka_msgq_dump(FILE *fp, const char *what, rd_kafka_msgq_t *rkmq);
-
-rd_kafka_msg_t *ut_rd_kafka_msg_new(size_t msgsize);
-void ut_rd_kafka_msgq_purge(rd_kafka_msgq_t *rkmq);
-int unittest_msg(void);
-
-#endif /* _RDKAFKA_MSG_H_ */