summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h1171
1 files changed, 1171 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h
new file mode 100644
index 000000000..0d50f5870
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h
@@ -0,0 +1,1171 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2016 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDKAFKA_QUEUE_H_
+#define _RDKAFKA_QUEUE_H_
+
+#include "rdkafka_op.h"
+#include "rdkafka_int.h"
+
+#ifdef _WIN32
+#include <io.h> /* for _write() */
+#endif
+
+/** @brief Queueing strategy */
+#define RD_KAFKA_QUEUE_FIFO 0
+#define RD_KAFKA_QUEUE_LIFO 1
+
+TAILQ_HEAD(rd_kafka_op_tailq, rd_kafka_op_s);
+
+/**
+ * @struct Queue for rd_kafka_op_t*.
+ *
+ * @remark All readers of the queue must call rd_kafka_q_mark_served()
+ * after reading the queue (while still holding the queue lock) to
+ * clear the wakeup-sent flag.
+ */
+struct rd_kafka_q_s {
+ mtx_t rkq_lock;
+ cnd_t rkq_cond;
+ struct rd_kafka_q_s *rkq_fwdq; /* Forwarded/Routed queue.
+ * Used in place of this queue
+ * for all operations. */
+
+ struct rd_kafka_op_tailq rkq_q; /* TAILQ_HEAD(, rd_kafka_op_s) */
+ int rkq_qlen; /* Number of entries in queue */
+ int64_t rkq_qsize; /* Size of all entries in queue */
+ int rkq_refcnt;
+ int rkq_flags;
+#define RD_KAFKA_Q_F_ALLOCATED 0x1 /* Allocated: rd_free on destroy */
+#define RD_KAFKA_Q_F_READY \
+ 0x2 /* Queue is ready to be used. \
+ * Flag is cleared on destroy */
+#define RD_KAFKA_Q_F_FWD_APP \
+ 0x4 /* Queue is being forwarded by a call \
+ * to rd_kafka_queue_forward. */
+#define RD_KAFKA_Q_F_YIELD \
+ 0x8 /* Have waiters return even if \
+ * no rko was enqueued. \
+ * This is used to wake up a waiter \
+ * by triggering the cond-var \
+ * but without having to enqueue \
+ * an op. */
+
+ rd_kafka_t *rkq_rk;
+ struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */
+
+ /* Op serve callback (optional).
+ * Mainly used for forwarded queues to use the original queue's
+ * serve function from the forwarded position.
+ * Shall return 1 if op was handled, else 0. */
+ rd_kafka_q_serve_cb_t *rkq_serve;
+ void *rkq_opaque;
+
+#if ENABLE_DEVEL
+ char rkq_name[64]; /* Debugging: queue name (FUNC:LINE) */
+#else
+ const char *rkq_name; /* Debugging: queue name (FUNC) */
+#endif
+};
+
+
+/* Application signalling state holder. */
+struct rd_kafka_q_io {
+ /* For FD-based signalling */
+ rd_socket_t fd;
+ void *payload;
+ size_t size;
+ rd_bool_t sent; /**< Wake-up has been sent.
+ * This field is reset to false by the queue
+ * reader, allowing a new wake-up to be sent by a
+ * subsequent writer. */
+ /* For callback-based signalling */
+ void (*event_cb)(rd_kafka_t *rk, void *opaque);
+ void *event_cb_opaque;
+};
+
+
+
+/**
+ * @return true if queue is ready/enabled, else false.
+ * @remark queue luck must be held by caller (if applicable)
+ */
+static RD_INLINE RD_UNUSED int rd_kafka_q_ready(rd_kafka_q_t *rkq) {
+ return rkq->rkq_flags & RD_KAFKA_Q_F_READY;
+}
+
+
+
+void rd_kafka_q_init0(rd_kafka_q_t *rkq,
+ rd_kafka_t *rk,
+ const char *func,
+ int line);
+#define rd_kafka_q_init(rkq, rk) \
+ rd_kafka_q_init0(rkq, rk, __FUNCTION__, __LINE__)
+rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line);
+#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, __FUNCTION__, __LINE__)
+void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq);
+
+#define rd_kafka_q_lock(rkqu) mtx_lock(&(rkqu)->rkq_lock)
+#define rd_kafka_q_unlock(rkqu) mtx_unlock(&(rkqu)->rkq_lock)
+
+static RD_INLINE RD_UNUSED rd_kafka_q_t *rd_kafka_q_keep(rd_kafka_q_t *rkq) {
+ mtx_lock(&rkq->rkq_lock);
+ rkq->rkq_refcnt++;
+ mtx_unlock(&rkq->rkq_lock);
+ return rkq;
+}
+
+static RD_INLINE RD_UNUSED rd_kafka_q_t *
+rd_kafka_q_keep_nolock(rd_kafka_q_t *rkq) {
+ rkq->rkq_refcnt++;
+ return rkq;
+}
+
+
+/**
+ * @returns the queue's name (used for debugging)
+ */
+static RD_INLINE RD_UNUSED const char *rd_kafka_q_name(rd_kafka_q_t *rkq) {
+ return rkq->rkq_name;
+}
+
+/**
+ * @returns the final destination queue name (after forwarding)
+ * @remark rkq MUST NOT be locked
+ */
+static RD_INLINE RD_UNUSED const char *rd_kafka_q_dest_name(rd_kafka_q_t *rkq) {
+ const char *ret;
+ mtx_lock(&rkq->rkq_lock);
+ if (rkq->rkq_fwdq)
+ ret = rd_kafka_q_dest_name(rkq->rkq_fwdq);
+ else
+ ret = rd_kafka_q_name(rkq);
+ mtx_unlock(&rkq->rkq_lock);
+ return ret;
+}
+
+/**
+ * @brief Disable a queue.
+ * Attempting to enqueue ops to the queue will destroy the ops.
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_q_disable0(rd_kafka_q_t *rkq,
+ int do_lock) {
+ if (do_lock)
+ mtx_lock(&rkq->rkq_lock);
+ rkq->rkq_flags &= ~RD_KAFKA_Q_F_READY;
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+}
+#define rd_kafka_q_disable(rkq) rd_kafka_q_disable0(rkq, 1 /*lock*/)
+
+int rd_kafka_q_purge0(rd_kafka_q_t *rkq, int do_lock);
+#define rd_kafka_q_purge(rkq) rd_kafka_q_purge0(rkq, 1 /*lock*/)
+void rd_kafka_q_purge_toppar_version(rd_kafka_q_t *rkq,
+ rd_kafka_toppar_t *rktp,
+ int version);
+
+/**
+ * @brief Loose reference to queue, when refcount reaches 0 the queue
+ * will be destroyed.
+ *
+ * @param disable Also disable the queue, to be used by owner of the queue.
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_q_destroy0(rd_kafka_q_t *rkq,
+ int disable) {
+ int do_delete = 0;
+
+ if (disable) {
+ /* To avoid recursive locking (from ops being purged
+ * that reference this queue somehow),
+ * we disable the queue and purge it with individual
+ * locking. */
+ rd_kafka_q_disable0(rkq, 1 /*lock*/);
+ rd_kafka_q_purge0(rkq, 1 /*lock*/);
+ }
+
+ mtx_lock(&rkq->rkq_lock);
+ rd_kafka_assert(NULL, rkq->rkq_refcnt > 0);
+ do_delete = !--rkq->rkq_refcnt;
+ mtx_unlock(&rkq->rkq_lock);
+
+ if (unlikely(do_delete))
+ rd_kafka_q_destroy_final(rkq);
+}
+
+#define rd_kafka_q_destroy(rkq) rd_kafka_q_destroy0(rkq, 0 /*dont-disable*/)
+
+/**
+ * @brief Queue destroy method to be used by the owner (poller) of
+ * the queue. The only difference to q_destroy() is that this
+ * method also disables the queue so that any q_enq() operations
+ * will fail.
+ * Failure to disable a queue on the poller when it destroys its
+ * queue reference results in ops being enqueued on the queue
+ * but there is noone left to poll it, possibly resulting in a
+ * hang on termination due to refcounts held by the op.
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_q_destroy_owner(rd_kafka_q_t *rkq) {
+ rd_kafka_q_destroy0(rkq, 1 /*disable*/);
+}
+
+
+/**
+ * Reset a queue.
+ * WARNING: All messages will be lost and leaked.
+ * NOTE: No locking is performed.
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_q_reset(rd_kafka_q_t *rkq) {
+ TAILQ_INIT(&rkq->rkq_q);
+ rd_dassert(TAILQ_EMPTY(&rkq->rkq_q));
+ rkq->rkq_qlen = 0;
+ rkq->rkq_qsize = 0;
+}
+
+
+
+/**
+ * Forward 'srcq' to 'destq'
+ */
+void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq,
+ rd_kafka_q_t *destq,
+ int do_lock,
+ int fwd_app);
+#define rd_kafka_q_fwd_set(S, D) \
+ rd_kafka_q_fwd_set0(S, D, 1 /*lock*/, 0 /*no fwd_app*/)
+
+/**
+ * @returns the forward queue (if any) with its refcount increased.
+ * @locks rd_kafka_q_lock(rkq) == !do_lock
+ */
+static RD_INLINE RD_UNUSED rd_kafka_q_t *rd_kafka_q_fwd_get(rd_kafka_q_t *rkq,
+ int do_lock) {
+ rd_kafka_q_t *fwdq;
+ if (do_lock)
+ mtx_lock(&rkq->rkq_lock);
+
+ if ((fwdq = rkq->rkq_fwdq))
+ rd_kafka_q_keep(fwdq);
+
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+
+ return fwdq;
+}
+
+
+/**
+ * @returns true if queue is forwarded, else false.
+ *
+ * @remark Thread-safe.
+ */
+static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded(rd_kafka_q_t *rkq) {
+ int r;
+ mtx_lock(&rkq->rkq_lock);
+ r = rkq->rkq_fwdq ? 1 : 0;
+ mtx_unlock(&rkq->rkq_lock);
+ return r;
+}
+
+
+
+/**
+ * @brief Trigger an IO event for this queue.
+ *
+ * @remark Queue MUST be locked
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_q_io_event(rd_kafka_q_t *rkq) {
+
+ if (likely(!rkq->rkq_qio))
+ return;
+
+ if (rkq->rkq_qio->event_cb) {
+ rkq->rkq_qio->event_cb(rkq->rkq_rk,
+ rkq->rkq_qio->event_cb_opaque);
+ return;
+ }
+
+
+ /* Only one wake-up event should be sent per non-polling period.
+ * As the queue reader calls poll/reads the channel it calls to
+ * rd_kafka_q_mark_served() to reset the wakeup sent flag, allowing
+ * further wakeups in the next non-polling period. */
+ if (rkq->rkq_qio->sent)
+ return; /* Wake-up event already written */
+
+ rkq->rkq_qio->sent = rd_true;
+
+ /* Write wake-up event to socket.
+ * Ignore errors, not much to do anyway. */
+ if (rd_socket_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload,
+ (int)rkq->rkq_qio->size) == -1)
+ ;
+}
+
+
+/**
+ * @brief rko->rko_prio comparator
+ * @remark: descending order: higher priority takes preceedence.
+ */
+static RD_INLINE RD_UNUSED int rd_kafka_op_cmp_prio(const void *_a,
+ const void *_b) {
+ const rd_kafka_op_t *a = _a, *b = _b;
+
+ return RD_CMP(b->rko_prio, a->rko_prio);
+}
+
+
+/**
+ * @brief Wake up waiters without enqueuing an op.
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_q_yield(rd_kafka_q_t *rkq) {
+ rd_kafka_q_t *fwdq;
+
+ mtx_lock(&rkq->rkq_lock);
+
+ rd_dassert(rkq->rkq_refcnt > 0);
+
+ if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
+ /* Queue has been disabled */
+ mtx_unlock(&rkq->rkq_lock);
+ return;
+ }
+
+ if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ rkq->rkq_flags |= RD_KAFKA_Q_F_YIELD;
+ cnd_broadcast(&rkq->rkq_cond);
+ if (rkq->rkq_qlen == 0)
+ rd_kafka_q_io_event(rkq);
+
+ mtx_unlock(&rkq->rkq_lock);
+ } else {
+ mtx_unlock(&rkq->rkq_lock);
+ rd_kafka_q_yield(fwdq);
+ rd_kafka_q_destroy(fwdq);
+ }
+}
+
+/**
+ * @brief Low-level unprotected enqueue that only performs
+ * the actual queue enqueue and counter updates.
+ * @remark Will not perform locking, signaling, fwdq, READY checking, etc.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_q_enq0(rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int at_head) {
+ if (likely(!rko->rko_prio))
+ TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
+ else if (at_head)
+ TAILQ_INSERT_HEAD(&rkq->rkq_q, rko, rko_link);
+ else
+ TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, rd_kafka_op_t *, rko_link,
+ rd_kafka_op_cmp_prio);
+ rkq->rkq_qlen++;
+ rkq->rkq_qsize += rko->rko_len;
+}
+
+
+/**
+ * @brief Enqueue \p rko either at head or tail of \p rkq.
+ *
+ * The provided \p rko is either enqueued or destroyed.
+ *
+ * \p orig_destq is the original (outermost) dest queue for which
+ * this op was enqueued, before any queue forwarding has kicked in.
+ * The rko_serve callback from the orig_destq will be set on the rko
+ * if there is no rko_serve callback already set, and the \p rko isn't
+ * failed because the final queue is disabled.
+ *
+ * @returns 1 if op was enqueued or 0 if queue is disabled and
+ * there was no replyq to enqueue on in which case the rko is destroyed.
+ *
+ * @locality any thread.
+ */
+static RD_INLINE RD_UNUSED int rd_kafka_q_enq1(rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ rd_kafka_q_t *orig_destq,
+ int at_head,
+ int do_lock) {
+ rd_kafka_q_t *fwdq;
+
+ if (do_lock)
+ mtx_lock(&rkq->rkq_lock);
+
+ rd_dassert(rkq->rkq_refcnt > 0);
+
+ if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
+ /* Queue has been disabled, reply to and fail the rko. */
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+
+ return rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__DESTROY);
+ }
+
+ if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ if (!rko->rko_serve && orig_destq->rkq_serve) {
+ /* Store original queue's serve callback and opaque
+ * prior to forwarding. */
+ rko->rko_serve = orig_destq->rkq_serve;
+ rko->rko_serve_opaque = orig_destq->rkq_opaque;
+ }
+
+ rd_kafka_q_enq0(rkq, rko, at_head);
+ cnd_signal(&rkq->rkq_cond);
+ if (rkq->rkq_qlen == 1)
+ rd_kafka_q_io_event(rkq);
+
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+ } else {
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+ rd_kafka_q_enq1(fwdq, rko, orig_destq, at_head, 1 /*do lock*/);
+ rd_kafka_q_destroy(fwdq);
+ }
+
+ return 1;
+}
+
+/**
+ * @brief Enqueue the 'rko' op at the tail of the queue 'rkq'.
+ *
+ * The provided 'rko' is either enqueued or destroyed.
+ *
+ * @returns 1 if op was enqueued or 0 if queue is disabled and
+ * there was no replyq to enqueue on in which case the rko is destroyed.
+ *
+ * @locality any thread.
+ * @locks rkq MUST NOT be locked
+ */
+static RD_INLINE RD_UNUSED int rd_kafka_q_enq(rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko) {
+ return rd_kafka_q_enq1(rkq, rko, rkq, 0 /*at tail*/, 1 /*do lock*/);
+}
+
+
+/**
+ * @brief Re-enqueue rko at head of rkq.
+ *
+ * The provided 'rko' is either enqueued or destroyed.
+ *
+ * @returns 1 if op was enqueued or 0 if queue is disabled and
+ * there was no replyq to enqueue on in which case the rko is destroyed.
+ *
+ * @locality any thread
+ * @locks rkq MUST BE locked
+ */
+static RD_INLINE RD_UNUSED int rd_kafka_q_reenq(rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko) {
+ return rd_kafka_q_enq1(rkq, rko, rkq, 1 /*at head*/, 0 /*don't lock*/);
+}
+
+
+/**
+ * Dequeue 'rko' from queue 'rkq'.
+ *
+ * NOTE: rkq_lock MUST be held
+ * Locality: any thread
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_q_deq0(rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko) {
+ rd_dassert(rkq->rkq_qlen > 0 &&
+ rkq->rkq_qsize >= (int64_t)rko->rko_len);
+
+ TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
+ rkq->rkq_qlen--;
+ rkq->rkq_qsize -= rko->rko_len;
+}
+
+
+/**
+ * @brief Mark queue as served / read.
+ *
+ * This is currently used by the queue reader side to reset the io-event
+ * wakeup flag.
+ *
+ * Should be called by all queue readers.
+ *
+ * @locks_required rkq must be locked.
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_q_mark_served(rd_kafka_q_t *rkq) {
+ if (rkq->rkq_qio)
+ rkq->rkq_qio->sent = rd_false;
+}
+
+
+/**
+ * Concat all elements of 'srcq' onto tail of 'rkq'.
+ * 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not.
+ * NOTE: 'srcq' will be reset.
+ *
+ * Locality: any thread.
+ *
+ * @returns 0 if operation was performed or -1 if rkq is disabled.
+ */
+static RD_INLINE RD_UNUSED int
+rd_kafka_q_concat0(rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {
+ int r = 0;
+
+ while (srcq->rkq_fwdq) /* Resolve source queue */
+ srcq = srcq->rkq_fwdq;
+ if (unlikely(srcq->rkq_qlen == 0))
+ return 0; /* Don't do anything if source queue is empty */
+
+ if (do_lock)
+ mtx_lock(&rkq->rkq_lock);
+ if (!rkq->rkq_fwdq) {
+ rd_kafka_op_t *rko;
+
+ rd_dassert(TAILQ_EMPTY(&srcq->rkq_q) || srcq->rkq_qlen > 0);
+ if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+ return -1;
+ }
+ /* First insert any prioritized ops from srcq
+ * in the right position in rkq. */
+ while ((rko = TAILQ_FIRST(&srcq->rkq_q)) && rko->rko_prio > 0) {
+ TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
+ TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, rd_kafka_op_t *,
+ rko_link, rd_kafka_op_cmp_prio);
+ }
+
+ TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link);
+ if (rkq->rkq_qlen == 0)
+ rd_kafka_q_io_event(rkq);
+ rkq->rkq_qlen += srcq->rkq_qlen;
+ rkq->rkq_qsize += srcq->rkq_qsize;
+ cnd_signal(&rkq->rkq_cond);
+
+ rd_kafka_q_mark_served(srcq);
+ rd_kafka_q_reset(srcq);
+ } else
+ r = rd_kafka_q_concat0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq,
+ srcq, rkq->rkq_fwdq ? do_lock : 0);
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+
+ return r;
+}
+
+#define rd_kafka_q_concat(dstq, srcq) rd_kafka_q_concat0(dstq, srcq, 1 /*lock*/)
+
+
+/**
+ * @brief Prepend all elements of 'srcq' onto head of 'rkq'.
+ * 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not.
+ * 'srcq' will be reset.
+ *
+ * @remark Will not respect priority of ops, srcq will be prepended in its
+ * original form to rkq.
+ *
+ * @locality any thread.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_q_prepend0(rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {
+ if (do_lock)
+ mtx_lock(&rkq->rkq_lock);
+ if (!rkq->rkq_fwdq && !srcq->rkq_fwdq) {
+ /* FIXME: prio-aware */
+ /* Concat rkq on srcq */
+ TAILQ_CONCAT(&srcq->rkq_q, &rkq->rkq_q, rko_link);
+ /* Move srcq to rkq */
+ TAILQ_MOVE(&rkq->rkq_q, &srcq->rkq_q, rko_link);
+ if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0)
+ rd_kafka_q_io_event(rkq);
+ rkq->rkq_qlen += srcq->rkq_qlen;
+ rkq->rkq_qsize += srcq->rkq_qsize;
+
+ rd_kafka_q_mark_served(srcq);
+ rd_kafka_q_reset(srcq);
+ } else
+ rd_kafka_q_prepend0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq,
+ srcq->rkq_fwdq ? srcq->rkq_fwdq : srcq,
+ rkq->rkq_fwdq ? do_lock : 0);
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+}
+
+#define rd_kafka_q_prepend(dstq, srcq) \
+ rd_kafka_q_prepend0(dstq, srcq, 1 /*lock*/)
+
+
+/* Returns the number of elements in the queue */
+static RD_INLINE RD_UNUSED int rd_kafka_q_len(rd_kafka_q_t *rkq) {
+ int qlen;
+ rd_kafka_q_t *fwdq;
+ mtx_lock(&rkq->rkq_lock);
+ if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ qlen = rkq->rkq_qlen;
+ mtx_unlock(&rkq->rkq_lock);
+ } else {
+ mtx_unlock(&rkq->rkq_lock);
+ qlen = rd_kafka_q_len(fwdq);
+ rd_kafka_q_destroy(fwdq);
+ }
+ return qlen;
+}
+
+/* Returns the total size of elements in the queue */
+static RD_INLINE RD_UNUSED uint64_t rd_kafka_q_size(rd_kafka_q_t *rkq) {
+ uint64_t sz;
+ rd_kafka_q_t *fwdq;
+ mtx_lock(&rkq->rkq_lock);
+ if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ sz = rkq->rkq_qsize;
+ mtx_unlock(&rkq->rkq_lock);
+ } else {
+ mtx_unlock(&rkq->rkq_lock);
+ sz = rd_kafka_q_size(fwdq);
+ rd_kafka_q_destroy(fwdq);
+ }
+ return sz;
+}
+
+/**
+ * @brief Construct a temporary on-stack replyq with increased
+ * \p rkq refcount (unless NULL), version, and debug id.
+ */
+static RD_INLINE RD_UNUSED rd_kafka_replyq_t
+rd_kafka_replyq_make(rd_kafka_q_t *rkq, int version, const char *id) {
+ rd_kafka_replyq_t replyq = RD_ZERO_INIT;
+
+ if (rkq) {
+ replyq.q = rd_kafka_q_keep(rkq);
+ replyq.version = version;
+#if ENABLE_DEVEL
+ replyq._id = rd_strdup(id);
+#endif
+ }
+
+ return replyq;
+}
+
+/* Construct temporary on-stack replyq with increased Q refcount and
+ * optional VERSION. */
+#define RD_KAFKA_REPLYQ(Q, VERSION) \
+ rd_kafka_replyq_make(Q, VERSION, __FUNCTION__)
+
+/* Construct temporary on-stack replyq for indicating no replyq. */
+#if ENABLE_DEVEL
+#define RD_KAFKA_NO_REPLYQ \
+ (rd_kafka_replyq_t) { \
+ NULL, 0, NULL \
+ }
+#else
+#define RD_KAFKA_NO_REPLYQ \
+ (rd_kafka_replyq_t) { \
+ NULL, 0 \
+ }
+#endif
+
+
+/**
+ * @returns true if the replyq is valid, else false.
+ */
+static RD_INLINE RD_UNUSED rd_bool_t
+rd_kafka_replyq_is_valid(rd_kafka_replyq_t *replyq) {
+ rd_bool_t valid = rd_true;
+
+ if (!replyq->q)
+ return rd_false;
+
+ rd_kafka_q_lock(replyq->q);
+ valid = rd_kafka_q_ready(replyq->q);
+ rd_kafka_q_unlock(replyq->q);
+
+ return valid;
+}
+
+
+
+/**
+ * Set up replyq.
+ * Q refcnt is increased.
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_set_replyq(rd_kafka_replyq_t *replyq,
+ rd_kafka_q_t *rkq,
+ int32_t version) {
+ replyq->q = rkq ? rd_kafka_q_keep(rkq) : NULL;
+ replyq->version = version;
+#if ENABLE_DEVEL
+ replyq->_id = rd_strdup(__FUNCTION__);
+#endif
+}
+
+/**
+ * Set rko's replyq with an optional version (versionptr != NULL).
+ * Q refcnt is increased.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_op_set_replyq(rd_kafka_op_t *rko,
+ rd_kafka_q_t *rkq,
+ rd_atomic32_t *versionptr) {
+ rd_kafka_set_replyq(&rko->rko_replyq, rkq,
+ versionptr ? rd_atomic32_get(versionptr) : 0);
+}
+
+/* Set reply rko's version from replyq's version */
+#define rd_kafka_op_get_reply_version(REPLY_RKO, ORIG_RKO) \
+ do { \
+ (REPLY_RKO)->rko_version = (ORIG_RKO)->rko_replyq.version; \
+ } while (0)
+
+
+/* Clear replyq holder without decreasing any .q references. */
+static RD_INLINE RD_UNUSED void
+rd_kafka_replyq_clear(rd_kafka_replyq_t *replyq) {
+ memset(replyq, 0, sizeof(*replyq));
+}
+
+/**
+ * @brief Make a copy of \p src in \p dst, with its own queue reference
+ */
+static RD_INLINE RD_UNUSED void rd_kafka_replyq_copy(rd_kafka_replyq_t *dst,
+ rd_kafka_replyq_t *src) {
+ dst->version = src->version;
+ dst->q = src->q;
+ if (dst->q)
+ rd_kafka_q_keep(dst->q);
+#if ENABLE_DEVEL
+ if (src->_id)
+ dst->_id = rd_strdup(src->_id);
+ else
+ dst->_id = NULL;
+#endif
+}
+
+
+/**
+ * Clear replyq holder and destroy any .q references.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_replyq_destroy(rd_kafka_replyq_t *replyq) {
+ if (replyq->q)
+ rd_kafka_q_destroy(replyq->q);
+#if ENABLE_DEVEL
+ if (replyq->_id) {
+ rd_free(replyq->_id);
+ replyq->_id = NULL;
+ }
+#endif
+ rd_kafka_replyq_clear(replyq);
+}
+
+
+/**
+ * @brief Wrapper for rd_kafka_q_enq() that takes a replyq,
+ * steals its queue reference, enqueues the op with the replyq version,
+ * and then destroys the queue reference.
+ *
+ * If \p version is non-zero it will be updated, else replyq->version.
+ *
+ * @returns Same as rd_kafka_q_enq()
+ */
+static RD_INLINE RD_UNUSED int rd_kafka_replyq_enq(rd_kafka_replyq_t *replyq,
+ rd_kafka_op_t *rko,
+ int version) {
+ rd_kafka_q_t *rkq = replyq->q;
+ int r;
+
+ if (version)
+ rko->rko_version = version;
+ else
+ rko->rko_version = replyq->version;
+
+ /* The replyq queue reference is done after we've enqueued the rko
+ * so clear it here. */
+ replyq->q = NULL; /* destroyed separately below */
+
+#if ENABLE_DEVEL
+ if (replyq->_id) {
+ rd_free(replyq->_id);
+ replyq->_id = NULL;
+ }
+#endif
+
+ /* Retain replyq->version since it is used by buf_callback
+ * when dispatching the callback. */
+
+ r = rd_kafka_q_enq(rkq, rko);
+
+ rd_kafka_q_destroy(rkq);
+
+ return r;
+}
+
+
+
+rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq,
+ rd_ts_t timeout_us,
+ int32_t version,
+ rd_kafka_q_cb_type_t cb_type,
+ rd_kafka_q_serve_cb_t *callback,
+ void *opaque);
+rd_kafka_op_t *
+rd_kafka_q_pop(rd_kafka_q_t *rkq, rd_ts_t timeout_us, int32_t version);
+int rd_kafka_q_serve(rd_kafka_q_t *rkq,
+ int timeout_ms,
+ int max_cnt,
+ rd_kafka_q_cb_type_t cb_type,
+ rd_kafka_q_serve_cb_t *callback,
+ void *opaque);
+
+
+int rd_kafka_q_move_cnt(rd_kafka_q_t *dstq,
+ rd_kafka_q_t *srcq,
+ int cnt,
+ int do_locks);
+
+int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
+ int timeout_ms,
+ rd_kafka_message_t **rkmessages,
+ size_t rkmessages_size);
+rd_kafka_resp_err_t rd_kafka_q_wait_result(rd_kafka_q_t *rkq, int timeout_ms);
+
+int rd_kafka_q_apply(rd_kafka_q_t *rkq,
+ int (*callback)(rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ void *opaque),
+ void *opaque);
+
+void rd_kafka_q_fix_offsets(rd_kafka_q_t *rkq,
+ int64_t min_offset,
+ int64_t base_offset);
+
+/**
+ * @returns the last op in the queue matching \p op_type and \p allow_err (bool)
+ * @remark The \p rkq must be properly locked before this call, the returned rko
+ * is not removed from the queue and may thus not be held for longer
+ * than the lock is held.
+ */
+static RD_INLINE RD_UNUSED rd_kafka_op_t *
+rd_kafka_q_last(rd_kafka_q_t *rkq, rd_kafka_op_type_t op_type, int allow_err) {
+ rd_kafka_op_t *rko;
+ TAILQ_FOREACH_REVERSE(rko, &rkq->rkq_q, rd_kafka_op_tailq, rko_link) {
+ if (rko->rko_type == op_type && (allow_err || !rko->rko_err))
+ return rko;
+ }
+
+ return NULL;
+}
+
+void rd_kafka_q_io_event_enable(rd_kafka_q_t *rkq,
+ rd_socket_t fd,
+ const void *payload,
+ size_t size);
+
+/* Public interface */
+struct rd_kafka_queue_s {
+ rd_kafka_q_t *rkqu_q;
+ rd_kafka_t *rkqu_rk;
+ int rkqu_is_owner; /**< Is owner/creator of rkqu_q */
+};
+
+
+rd_kafka_queue_t *rd_kafka_queue_new0(rd_kafka_t *rk, rd_kafka_q_t *rkq);
+
+void rd_kafka_q_dump(FILE *fp, rd_kafka_q_t *rkq);
+
+extern int RD_TLS rd_kafka_yield_thread;
+
+
+
+/**
+ * @name Enqueue op once
+ * @{
+ */
+
+/**
+ * @brief Minimal rd_kafka_op_t wrapper that ensures that
+ * the op is only enqueued on the provided queue once.
+ *
+ * Typical use-case is for an op to be triggered from multiple sources,
+ * but at most once, such as from a timer and some other source.
+ */
+typedef struct rd_kafka_enq_once_s {
+ mtx_t lock;
+ int refcnt;
+ rd_kafka_op_t *rko;
+ rd_kafka_replyq_t replyq;
+} rd_kafka_enq_once_t;
+
+
+/**
+ * @brief Allocate and set up a new eonce and set the initial refcount to 1.
+ * @remark This is to be called by the owner of the rko.
+ */
+static RD_INLINE RD_UNUSED rd_kafka_enq_once_t *
+rd_kafka_enq_once_new(rd_kafka_op_t *rko, rd_kafka_replyq_t replyq) {
+ rd_kafka_enq_once_t *eonce = rd_calloc(1, sizeof(*eonce));
+ mtx_init(&eonce->lock, mtx_plain);
+ eonce->rko = rko;
+ eonce->replyq = replyq; /* struct copy */
+ eonce->refcnt = 1;
+ return eonce;
+}
+
+/**
+ * @brief Re-enable triggering of a eonce even after it has been triggered
+ * once.
+ *
+ * @remark This is to be called by the owner.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_enq_once_reenable(rd_kafka_enq_once_t *eonce,
+ rd_kafka_op_t *rko,
+ rd_kafka_replyq_t replyq) {
+ mtx_lock(&eonce->lock);
+ eonce->rko = rko;
+ rd_kafka_replyq_destroy(&eonce->replyq);
+ eonce->replyq = replyq; /* struct copy */
+ mtx_unlock(&eonce->lock);
+}
+
+
+/**
+ * @brief Free eonce and its resources. Must only be called with refcnt==0
+ * and eonce->lock NOT held.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_enq_once_destroy0(rd_kafka_enq_once_t *eonce) {
+ /* This must not be called with the rko or replyq still set, which would
+ * indicate that no enqueueing was performed and that the owner
+ * did not clean up, which is a bug. */
+ rd_assert(!eonce->rko);
+ rd_assert(!eonce->replyq.q);
+#if ENABLE_DEVEL
+ rd_assert(!eonce->replyq._id);
+#endif
+ rd_assert(eonce->refcnt == 0);
+
+ mtx_destroy(&eonce->lock);
+ rd_free(eonce);
+}
+
+
+/**
+ * @brief Increment refcount for source (non-owner), such as a timer.
+ *
+ * @param srcdesc a human-readable descriptive string of the source.
+ * May be used for future debugging.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_enq_once_add_source(rd_kafka_enq_once_t *eonce, const char *srcdesc) {
+ mtx_lock(&eonce->lock);
+ eonce->refcnt++;
+ mtx_unlock(&eonce->lock);
+}
+
+
+/**
+ * @brief Decrement refcount for source (non-owner), such as a timer.
+ *
+ * @param srcdesc a human-readable descriptive string of the source.
+ * May be used for future debugging.
+ *
+ * @remark Must only be called from the owner with the owner
+ * still holding its own refcount.
+ * This API is used to undo an add_source() from the
+ * same code.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_enq_once_del_source(rd_kafka_enq_once_t *eonce, const char *srcdesc) {
+ int do_destroy;
+
+ mtx_lock(&eonce->lock);
+ rd_assert(eonce->refcnt > 0);
+ eonce->refcnt--;
+ do_destroy = eonce->refcnt == 0;
+ mtx_unlock(&eonce->lock);
+
+ if (do_destroy) {
+ /* We're the last refcount holder, clean up eonce. */
+ rd_kafka_enq_once_destroy0(eonce);
+ }
+}
+
+/**
+ * @brief Trigger a source's reference where the eonce resides on
+ * an rd_list_t. This is typically used as a free_cb for
+ * rd_list_destroy() and the trigger error code is
+ * always RD_KAFKA_RESP_ERR__DESTROY.
+ */
+void rd_kafka_enq_once_trigger_destroy(void *ptr);
+
+
+/**
+ * @brief Decrement refcount for source (non-owner) and return the rko
+ * if still set.
+ *
+ * @remark Must only be called by sources (non-owner) but only on the
+ * the owner's thread to make sure the rko is not freed.
+ *
+ * @remark The rko remains set on the eonce.
+ */
+static RD_INLINE RD_UNUSED rd_kafka_op_t *
+rd_kafka_enq_once_del_source_return(rd_kafka_enq_once_t *eonce,
+ const char *srcdesc) {
+ rd_bool_t do_destroy;
+ rd_kafka_op_t *rko;
+
+ mtx_lock(&eonce->lock);
+
+ rd_assert(eonce->refcnt > 0);
+ /* Owner must still hold a eonce reference, or the eonce must
+ * have been disabled by the owner (no rko) */
+ rd_assert(eonce->refcnt > 1 || !eonce->rko);
+ eonce->refcnt--;
+ do_destroy = eonce->refcnt == 0;
+
+ rko = eonce->rko;
+ mtx_unlock(&eonce->lock);
+
+ if (do_destroy) {
+ /* We're the last refcount holder, clean up eonce. */
+ rd_kafka_enq_once_destroy0(eonce);
+ }
+
+ return rko;
+}
+
+/**
+ * @brief Trigger enqueuing of the rko (unless already enqueued)
+ * and drops the source's refcount.
+ *
+ * @remark Must only be called by sources (non-owner).
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_enq_once_trigger(rd_kafka_enq_once_t *eonce,
+ rd_kafka_resp_err_t err,
+ const char *srcdesc) {
+ int do_destroy;
+ rd_kafka_op_t *rko = NULL;
+ rd_kafka_replyq_t replyq = RD_ZERO_INIT;
+
+ mtx_lock(&eonce->lock);
+
+ rd_assert(eonce->refcnt > 0);
+ eonce->refcnt--;
+ do_destroy = eonce->refcnt == 0;
+
+ if (eonce->rko) {
+ /* Not already enqueued, do it.
+ * Detach the rko and replyq from the eonce and unlock the eonce
+ * before enqueuing rko on reply to avoid recursive locks
+ * if the replyq has been disabled and the ops
+ * destructor is called (which might then access the eonce
+ * to clean up). */
+ rko = eonce->rko;
+ replyq = eonce->replyq;
+
+ eonce->rko = NULL;
+ rd_kafka_replyq_clear(&eonce->replyq);
+
+ /* Reply is enqueued at the end of this function */
+ }
+ mtx_unlock(&eonce->lock);
+
+ if (do_destroy) {
+ /* We're the last refcount holder, clean up eonce. */
+ rd_kafka_enq_once_destroy0(eonce);
+ }
+
+ if (rko) {
+ rko->rko_err = err;
+ rd_kafka_replyq_enq(&replyq, rko, replyq.version);
+ rd_kafka_replyq_destroy(&replyq);
+ }
+}
+
+/**
+ * @brief Destroy eonce, must only be called by the owner.
+ * There may be outstanding refcounts by non-owners after this call
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_enq_once_destroy(rd_kafka_enq_once_t *eonce) {
+ int do_destroy;
+
+ mtx_lock(&eonce->lock);
+ rd_assert(eonce->refcnt > 0);
+ eonce->refcnt--;
+ do_destroy = eonce->refcnt == 0;
+
+ eonce->rko = NULL;
+ rd_kafka_replyq_destroy(&eonce->replyq);
+
+ mtx_unlock(&eonce->lock);
+
+ if (do_destroy) {
+ /* We're the last refcount holder, clean up eonce. */
+ rd_kafka_enq_once_destroy0(eonce);
+ }
+}
+
+
+/**
+ * @brief Disable the owner's eonce, extracting, resetting and returning
+ * the \c rko object.
+ *
+ * This is the same as rd_kafka_enq_once_destroy() but returning
+ * the rko.
+ *
+ * Use this for owner-thread triggering where the enqueuing of the
+ * rko on the replyq is not necessary.
+ *
+ * @returns the eonce's rko object, if still available, else NULL.
+ */
+static RD_INLINE RD_UNUSED rd_kafka_op_t *
+rd_kafka_enq_once_disable(rd_kafka_enq_once_t *eonce) {
+ int do_destroy;
+ rd_kafka_op_t *rko;
+
+ mtx_lock(&eonce->lock);
+ rd_assert(eonce->refcnt > 0);
+ eonce->refcnt--;
+ do_destroy = eonce->refcnt == 0;
+
+ /* May be NULL */
+ rko = eonce->rko;
+ eonce->rko = NULL;
+ rd_kafka_replyq_destroy(&eonce->replyq);
+
+ mtx_unlock(&eonce->lock);
+
+ if (do_destroy) {
+ /* We're the last refcount holder, clean up eonce. */
+ rd_kafka_enq_once_destroy0(eonce);
+ }
+
+ return rko;
+}
+
+
+/**@}*/
+
+
+#endif /* _RDKAFKA_QUEUE_H_ */