diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h | 1171 |
1 files changed, 1171 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h new file mode 100644 index 000000000..0d50f5870 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.h @@ -0,0 +1,1171 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2016 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RDKAFKA_QUEUE_H_ +#define _RDKAFKA_QUEUE_H_ + +#include "rdkafka_op.h" +#include "rdkafka_int.h" + +#ifdef _WIN32 +#include <io.h> /* for _write() */ +#endif + +/** @brief Queueing strategy */ +#define RD_KAFKA_QUEUE_FIFO 0 +#define RD_KAFKA_QUEUE_LIFO 1 + +TAILQ_HEAD(rd_kafka_op_tailq, rd_kafka_op_s); + +/** + * @struct Queue for rd_kafka_op_t*. + * + * @remark All readers of the queue must call rd_kafka_q_mark_served() + * after reading the queue (while still holding the queue lock) to + * clear the wakeup-sent flag. + */ +struct rd_kafka_q_s { + mtx_t rkq_lock; + cnd_t rkq_cond; + struct rd_kafka_q_s *rkq_fwdq; /* Forwarded/Routed queue. + * Used in place of this queue + * for all operations. */ + + struct rd_kafka_op_tailq rkq_q; /* TAILQ_HEAD(, rd_kafka_op_s) */ + int rkq_qlen; /* Number of entries in queue */ + int64_t rkq_qsize; /* Size of all entries in queue */ + int rkq_refcnt; + int rkq_flags; +#define RD_KAFKA_Q_F_ALLOCATED 0x1 /* Allocated: rd_free on destroy */ +#define RD_KAFKA_Q_F_READY \ + 0x2 /* Queue is ready to be used. \ + * Flag is cleared on destroy */ +#define RD_KAFKA_Q_F_FWD_APP \ + 0x4 /* Queue is being forwarded by a call \ + * to rd_kafka_queue_forward. */ +#define RD_KAFKA_Q_F_YIELD \ + 0x8 /* Have waiters return even if \ + * no rko was enqueued. \ + * This is used to wake up a waiter \ + * by triggering the cond-var \ + * but without having to enqueue \ + * an op. */ + + rd_kafka_t *rkq_rk; + struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */ + + /* Op serve callback (optional). + * Mainly used for forwarded queues to use the original queue's + * serve function from the forwarded position. + * Shall return 1 if op was handled, else 0. */ + rd_kafka_q_serve_cb_t *rkq_serve; + void *rkq_opaque; + +#if ENABLE_DEVEL + char rkq_name[64]; /* Debugging: queue name (FUNC:LINE) */ +#else + const char *rkq_name; /* Debugging: queue name (FUNC) */ +#endif +}; + + +/* Application signalling state holder. */ +struct rd_kafka_q_io { + /* For FD-based signalling */ + rd_socket_t fd; + void *payload; + size_t size; + rd_bool_t sent; /**< Wake-up has been sent. + * This field is reset to false by the queue + * reader, allowing a new wake-up to be sent by a + * subsequent writer. */ + /* For callback-based signalling */ + void (*event_cb)(rd_kafka_t *rk, void *opaque); + void *event_cb_opaque; +}; + + + +/** + * @return true if queue is ready/enabled, else false. + * @remark queue luck must be held by caller (if applicable) + */ +static RD_INLINE RD_UNUSED int rd_kafka_q_ready(rd_kafka_q_t *rkq) { + return rkq->rkq_flags & RD_KAFKA_Q_F_READY; +} + + + +void rd_kafka_q_init0(rd_kafka_q_t *rkq, + rd_kafka_t *rk, + const char *func, + int line); +#define rd_kafka_q_init(rkq, rk) \ + rd_kafka_q_init0(rkq, rk, __FUNCTION__, __LINE__) +rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line); +#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk, __FUNCTION__, __LINE__) +void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq); + +#define rd_kafka_q_lock(rkqu) mtx_lock(&(rkqu)->rkq_lock) +#define rd_kafka_q_unlock(rkqu) mtx_unlock(&(rkqu)->rkq_lock) + +static RD_INLINE RD_UNUSED rd_kafka_q_t *rd_kafka_q_keep(rd_kafka_q_t *rkq) { + mtx_lock(&rkq->rkq_lock); + rkq->rkq_refcnt++; + mtx_unlock(&rkq->rkq_lock); + return rkq; +} + +static RD_INLINE RD_UNUSED rd_kafka_q_t * +rd_kafka_q_keep_nolock(rd_kafka_q_t *rkq) { + rkq->rkq_refcnt++; + return rkq; +} + + +/** + * @returns the queue's name (used for debugging) + */ +static RD_INLINE RD_UNUSED const char *rd_kafka_q_name(rd_kafka_q_t *rkq) { + return rkq->rkq_name; +} + +/** + * @returns the final destination queue name (after forwarding) + * @remark rkq MUST NOT be locked + */ +static RD_INLINE RD_UNUSED const char *rd_kafka_q_dest_name(rd_kafka_q_t *rkq) { + const char *ret; + mtx_lock(&rkq->rkq_lock); + if (rkq->rkq_fwdq) + ret = rd_kafka_q_dest_name(rkq->rkq_fwdq); + else + ret = rd_kafka_q_name(rkq); + mtx_unlock(&rkq->rkq_lock); + return ret; +} + +/** + * @brief Disable a queue. + * Attempting to enqueue ops to the queue will destroy the ops. + */ +static RD_INLINE RD_UNUSED void rd_kafka_q_disable0(rd_kafka_q_t *rkq, + int do_lock) { + if (do_lock) + mtx_lock(&rkq->rkq_lock); + rkq->rkq_flags &= ~RD_KAFKA_Q_F_READY; + if (do_lock) + mtx_unlock(&rkq->rkq_lock); +} +#define rd_kafka_q_disable(rkq) rd_kafka_q_disable0(rkq, 1 /*lock*/) + +int rd_kafka_q_purge0(rd_kafka_q_t *rkq, int do_lock); +#define rd_kafka_q_purge(rkq) rd_kafka_q_purge0(rkq, 1 /*lock*/) +void rd_kafka_q_purge_toppar_version(rd_kafka_q_t *rkq, + rd_kafka_toppar_t *rktp, + int version); + +/** + * @brief Loose reference to queue, when refcount reaches 0 the queue + * will be destroyed. + * + * @param disable Also disable the queue, to be used by owner of the queue. + */ +static RD_INLINE RD_UNUSED void rd_kafka_q_destroy0(rd_kafka_q_t *rkq, + int disable) { + int do_delete = 0; + + if (disable) { + /* To avoid recursive locking (from ops being purged + * that reference this queue somehow), + * we disable the queue and purge it with individual + * locking. */ + rd_kafka_q_disable0(rkq, 1 /*lock*/); + rd_kafka_q_purge0(rkq, 1 /*lock*/); + } + + mtx_lock(&rkq->rkq_lock); + rd_kafka_assert(NULL, rkq->rkq_refcnt > 0); + do_delete = !--rkq->rkq_refcnt; + mtx_unlock(&rkq->rkq_lock); + + if (unlikely(do_delete)) + rd_kafka_q_destroy_final(rkq); +} + +#define rd_kafka_q_destroy(rkq) rd_kafka_q_destroy0(rkq, 0 /*dont-disable*/) + +/** + * @brief Queue destroy method to be used by the owner (poller) of + * the queue. The only difference to q_destroy() is that this + * method also disables the queue so that any q_enq() operations + * will fail. + * Failure to disable a queue on the poller when it destroys its + * queue reference results in ops being enqueued on the queue + * but there is noone left to poll it, possibly resulting in a + * hang on termination due to refcounts held by the op. + */ +static RD_INLINE RD_UNUSED void rd_kafka_q_destroy_owner(rd_kafka_q_t *rkq) { + rd_kafka_q_destroy0(rkq, 1 /*disable*/); +} + + +/** + * Reset a queue. + * WARNING: All messages will be lost and leaked. + * NOTE: No locking is performed. + */ +static RD_INLINE RD_UNUSED void rd_kafka_q_reset(rd_kafka_q_t *rkq) { + TAILQ_INIT(&rkq->rkq_q); + rd_dassert(TAILQ_EMPTY(&rkq->rkq_q)); + rkq->rkq_qlen = 0; + rkq->rkq_qsize = 0; +} + + + +/** + * Forward 'srcq' to 'destq' + */ +void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq, + rd_kafka_q_t *destq, + int do_lock, + int fwd_app); +#define rd_kafka_q_fwd_set(S, D) \ + rd_kafka_q_fwd_set0(S, D, 1 /*lock*/, 0 /*no fwd_app*/) + +/** + * @returns the forward queue (if any) with its refcount increased. + * @locks rd_kafka_q_lock(rkq) == !do_lock + */ +static RD_INLINE RD_UNUSED rd_kafka_q_t *rd_kafka_q_fwd_get(rd_kafka_q_t *rkq, + int do_lock) { + rd_kafka_q_t *fwdq; + if (do_lock) + mtx_lock(&rkq->rkq_lock); + + if ((fwdq = rkq->rkq_fwdq)) + rd_kafka_q_keep(fwdq); + + if (do_lock) + mtx_unlock(&rkq->rkq_lock); + + return fwdq; +} + + +/** + * @returns true if queue is forwarded, else false. + * + * @remark Thread-safe. + */ +static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded(rd_kafka_q_t *rkq) { + int r; + mtx_lock(&rkq->rkq_lock); + r = rkq->rkq_fwdq ? 1 : 0; + mtx_unlock(&rkq->rkq_lock); + return r; +} + + + +/** + * @brief Trigger an IO event for this queue. + * + * @remark Queue MUST be locked + */ +static RD_INLINE RD_UNUSED void rd_kafka_q_io_event(rd_kafka_q_t *rkq) { + + if (likely(!rkq->rkq_qio)) + return; + + if (rkq->rkq_qio->event_cb) { + rkq->rkq_qio->event_cb(rkq->rkq_rk, + rkq->rkq_qio->event_cb_opaque); + return; + } + + + /* Only one wake-up event should be sent per non-polling period. + * As the queue reader calls poll/reads the channel it calls to + * rd_kafka_q_mark_served() to reset the wakeup sent flag, allowing + * further wakeups in the next non-polling period. */ + if (rkq->rkq_qio->sent) + return; /* Wake-up event already written */ + + rkq->rkq_qio->sent = rd_true; + + /* Write wake-up event to socket. + * Ignore errors, not much to do anyway. */ + if (rd_socket_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload, + (int)rkq->rkq_qio->size) == -1) + ; +} + + +/** + * @brief rko->rko_prio comparator + * @remark: descending order: higher priority takes preceedence. + */ +static RD_INLINE RD_UNUSED int rd_kafka_op_cmp_prio(const void *_a, + const void *_b) { + const rd_kafka_op_t *a = _a, *b = _b; + + return RD_CMP(b->rko_prio, a->rko_prio); +} + + +/** + * @brief Wake up waiters without enqueuing an op. + */ +static RD_INLINE RD_UNUSED void rd_kafka_q_yield(rd_kafka_q_t *rkq) { + rd_kafka_q_t *fwdq; + + mtx_lock(&rkq->rkq_lock); + + rd_dassert(rkq->rkq_refcnt > 0); + + if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) { + /* Queue has been disabled */ + mtx_unlock(&rkq->rkq_lock); + return; + } + + if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + rkq->rkq_flags |= RD_KAFKA_Q_F_YIELD; + cnd_broadcast(&rkq->rkq_cond); + if (rkq->rkq_qlen == 0) + rd_kafka_q_io_event(rkq); + + mtx_unlock(&rkq->rkq_lock); + } else { + mtx_unlock(&rkq->rkq_lock); + rd_kafka_q_yield(fwdq); + rd_kafka_q_destroy(fwdq); + } +} + +/** + * @brief Low-level unprotected enqueue that only performs + * the actual queue enqueue and counter updates. + * @remark Will not perform locking, signaling, fwdq, READY checking, etc. + */ +static RD_INLINE RD_UNUSED void +rd_kafka_q_enq0(rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int at_head) { + if (likely(!rko->rko_prio)) + TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link); + else if (at_head) + TAILQ_INSERT_HEAD(&rkq->rkq_q, rko, rko_link); + else + TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, rd_kafka_op_t *, rko_link, + rd_kafka_op_cmp_prio); + rkq->rkq_qlen++; + rkq->rkq_qsize += rko->rko_len; +} + + +/** + * @brief Enqueue \p rko either at head or tail of \p rkq. + * + * The provided \p rko is either enqueued or destroyed. + * + * \p orig_destq is the original (outermost) dest queue for which + * this op was enqueued, before any queue forwarding has kicked in. + * The rko_serve callback from the orig_destq will be set on the rko + * if there is no rko_serve callback already set, and the \p rko isn't + * failed because the final queue is disabled. + * + * @returns 1 if op was enqueued or 0 if queue is disabled and + * there was no replyq to enqueue on in which case the rko is destroyed. + * + * @locality any thread. + */ +static RD_INLINE RD_UNUSED int rd_kafka_q_enq1(rd_kafka_q_t *rkq, + rd_kafka_op_t *rko, + rd_kafka_q_t *orig_destq, + int at_head, + int do_lock) { + rd_kafka_q_t *fwdq; + + if (do_lock) + mtx_lock(&rkq->rkq_lock); + + rd_dassert(rkq->rkq_refcnt > 0); + + if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) { + /* Queue has been disabled, reply to and fail the rko. */ + if (do_lock) + mtx_unlock(&rkq->rkq_lock); + + return rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__DESTROY); + } + + if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + if (!rko->rko_serve && orig_destq->rkq_serve) { + /* Store original queue's serve callback and opaque + * prior to forwarding. */ + rko->rko_serve = orig_destq->rkq_serve; + rko->rko_serve_opaque = orig_destq->rkq_opaque; + } + + rd_kafka_q_enq0(rkq, rko, at_head); + cnd_signal(&rkq->rkq_cond); + if (rkq->rkq_qlen == 1) + rd_kafka_q_io_event(rkq); + + if (do_lock) + mtx_unlock(&rkq->rkq_lock); + } else { + if (do_lock) + mtx_unlock(&rkq->rkq_lock); + rd_kafka_q_enq1(fwdq, rko, orig_destq, at_head, 1 /*do lock*/); + rd_kafka_q_destroy(fwdq); + } + + return 1; +} + +/** + * @brief Enqueue the 'rko' op at the tail of the queue 'rkq'. + * + * The provided 'rko' is either enqueued or destroyed. + * + * @returns 1 if op was enqueued or 0 if queue is disabled and + * there was no replyq to enqueue on in which case the rko is destroyed. + * + * @locality any thread. + * @locks rkq MUST NOT be locked + */ +static RD_INLINE RD_UNUSED int rd_kafka_q_enq(rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + return rd_kafka_q_enq1(rkq, rko, rkq, 0 /*at tail*/, 1 /*do lock*/); +} + + +/** + * @brief Re-enqueue rko at head of rkq. + * + * The provided 'rko' is either enqueued or destroyed. + * + * @returns 1 if op was enqueued or 0 if queue is disabled and + * there was no replyq to enqueue on in which case the rko is destroyed. + * + * @locality any thread + * @locks rkq MUST BE locked + */ +static RD_INLINE RD_UNUSED int rd_kafka_q_reenq(rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + return rd_kafka_q_enq1(rkq, rko, rkq, 1 /*at head*/, 0 /*don't lock*/); +} + + +/** + * Dequeue 'rko' from queue 'rkq'. + * + * NOTE: rkq_lock MUST be held + * Locality: any thread + */ +static RD_INLINE RD_UNUSED void rd_kafka_q_deq0(rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_dassert(rkq->rkq_qlen > 0 && + rkq->rkq_qsize >= (int64_t)rko->rko_len); + + TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link); + rkq->rkq_qlen--; + rkq->rkq_qsize -= rko->rko_len; +} + + +/** + * @brief Mark queue as served / read. + * + * This is currently used by the queue reader side to reset the io-event + * wakeup flag. + * + * Should be called by all queue readers. + * + * @locks_required rkq must be locked. + */ +static RD_INLINE RD_UNUSED void rd_kafka_q_mark_served(rd_kafka_q_t *rkq) { + if (rkq->rkq_qio) + rkq->rkq_qio->sent = rd_false; +} + + +/** + * Concat all elements of 'srcq' onto tail of 'rkq'. + * 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not. + * NOTE: 'srcq' will be reset. + * + * Locality: any thread. + * + * @returns 0 if operation was performed or -1 if rkq is disabled. + */ +static RD_INLINE RD_UNUSED int +rd_kafka_q_concat0(rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) { + int r = 0; + + while (srcq->rkq_fwdq) /* Resolve source queue */ + srcq = srcq->rkq_fwdq; + if (unlikely(srcq->rkq_qlen == 0)) + return 0; /* Don't do anything if source queue is empty */ + + if (do_lock) + mtx_lock(&rkq->rkq_lock); + if (!rkq->rkq_fwdq) { + rd_kafka_op_t *rko; + + rd_dassert(TAILQ_EMPTY(&srcq->rkq_q) || srcq->rkq_qlen > 0); + if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) { + if (do_lock) + mtx_unlock(&rkq->rkq_lock); + return -1; + } + /* First insert any prioritized ops from srcq + * in the right position in rkq. */ + while ((rko = TAILQ_FIRST(&srcq->rkq_q)) && rko->rko_prio > 0) { + TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link); + TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, rd_kafka_op_t *, + rko_link, rd_kafka_op_cmp_prio); + } + + TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link); + if (rkq->rkq_qlen == 0) + rd_kafka_q_io_event(rkq); + rkq->rkq_qlen += srcq->rkq_qlen; + rkq->rkq_qsize += srcq->rkq_qsize; + cnd_signal(&rkq->rkq_cond); + + rd_kafka_q_mark_served(srcq); + rd_kafka_q_reset(srcq); + } else + r = rd_kafka_q_concat0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq, + srcq, rkq->rkq_fwdq ? do_lock : 0); + if (do_lock) + mtx_unlock(&rkq->rkq_lock); + + return r; +} + +#define rd_kafka_q_concat(dstq, srcq) rd_kafka_q_concat0(dstq, srcq, 1 /*lock*/) + + +/** + * @brief Prepend all elements of 'srcq' onto head of 'rkq'. + * 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not. + * 'srcq' will be reset. + * + * @remark Will not respect priority of ops, srcq will be prepended in its + * original form to rkq. + * + * @locality any thread. + */ +static RD_INLINE RD_UNUSED void +rd_kafka_q_prepend0(rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) { + if (do_lock) + mtx_lock(&rkq->rkq_lock); + if (!rkq->rkq_fwdq && !srcq->rkq_fwdq) { + /* FIXME: prio-aware */ + /* Concat rkq on srcq */ + TAILQ_CONCAT(&srcq->rkq_q, &rkq->rkq_q, rko_link); + /* Move srcq to rkq */ + TAILQ_MOVE(&rkq->rkq_q, &srcq->rkq_q, rko_link); + if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0) + rd_kafka_q_io_event(rkq); + rkq->rkq_qlen += srcq->rkq_qlen; + rkq->rkq_qsize += srcq->rkq_qsize; + + rd_kafka_q_mark_served(srcq); + rd_kafka_q_reset(srcq); + } else + rd_kafka_q_prepend0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq, + srcq->rkq_fwdq ? srcq->rkq_fwdq : srcq, + rkq->rkq_fwdq ? do_lock : 0); + if (do_lock) + mtx_unlock(&rkq->rkq_lock); +} + +#define rd_kafka_q_prepend(dstq, srcq) \ + rd_kafka_q_prepend0(dstq, srcq, 1 /*lock*/) + + +/* Returns the number of elements in the queue */ +static RD_INLINE RD_UNUSED int rd_kafka_q_len(rd_kafka_q_t *rkq) { + int qlen; + rd_kafka_q_t *fwdq; + mtx_lock(&rkq->rkq_lock); + if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + qlen = rkq->rkq_qlen; + mtx_unlock(&rkq->rkq_lock); + } else { + mtx_unlock(&rkq->rkq_lock); + qlen = rd_kafka_q_len(fwdq); + rd_kafka_q_destroy(fwdq); + } + return qlen; +} + +/* Returns the total size of elements in the queue */ +static RD_INLINE RD_UNUSED uint64_t rd_kafka_q_size(rd_kafka_q_t *rkq) { + uint64_t sz; + rd_kafka_q_t *fwdq; + mtx_lock(&rkq->rkq_lock); + if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + sz = rkq->rkq_qsize; + mtx_unlock(&rkq->rkq_lock); + } else { + mtx_unlock(&rkq->rkq_lock); + sz = rd_kafka_q_size(fwdq); + rd_kafka_q_destroy(fwdq); + } + return sz; +} + +/** + * @brief Construct a temporary on-stack replyq with increased + * \p rkq refcount (unless NULL), version, and debug id. + */ +static RD_INLINE RD_UNUSED rd_kafka_replyq_t +rd_kafka_replyq_make(rd_kafka_q_t *rkq, int version, const char *id) { + rd_kafka_replyq_t replyq = RD_ZERO_INIT; + + if (rkq) { + replyq.q = rd_kafka_q_keep(rkq); + replyq.version = version; +#if ENABLE_DEVEL + replyq._id = rd_strdup(id); +#endif + } + + return replyq; +} + +/* Construct temporary on-stack replyq with increased Q refcount and + * optional VERSION. */ +#define RD_KAFKA_REPLYQ(Q, VERSION) \ + rd_kafka_replyq_make(Q, VERSION, __FUNCTION__) + +/* Construct temporary on-stack replyq for indicating no replyq. */ +#if ENABLE_DEVEL +#define RD_KAFKA_NO_REPLYQ \ + (rd_kafka_replyq_t) { \ + NULL, 0, NULL \ + } +#else +#define RD_KAFKA_NO_REPLYQ \ + (rd_kafka_replyq_t) { \ + NULL, 0 \ + } +#endif + + +/** + * @returns true if the replyq is valid, else false. + */ +static RD_INLINE RD_UNUSED rd_bool_t +rd_kafka_replyq_is_valid(rd_kafka_replyq_t *replyq) { + rd_bool_t valid = rd_true; + + if (!replyq->q) + return rd_false; + + rd_kafka_q_lock(replyq->q); + valid = rd_kafka_q_ready(replyq->q); + rd_kafka_q_unlock(replyq->q); + + return valid; +} + + + +/** + * Set up replyq. + * Q refcnt is increased. + */ +static RD_INLINE RD_UNUSED void rd_kafka_set_replyq(rd_kafka_replyq_t *replyq, + rd_kafka_q_t *rkq, + int32_t version) { + replyq->q = rkq ? rd_kafka_q_keep(rkq) : NULL; + replyq->version = version; +#if ENABLE_DEVEL + replyq->_id = rd_strdup(__FUNCTION__); +#endif +} + +/** + * Set rko's replyq with an optional version (versionptr != NULL). + * Q refcnt is increased. + */ +static RD_INLINE RD_UNUSED void +rd_kafka_op_set_replyq(rd_kafka_op_t *rko, + rd_kafka_q_t *rkq, + rd_atomic32_t *versionptr) { + rd_kafka_set_replyq(&rko->rko_replyq, rkq, + versionptr ? rd_atomic32_get(versionptr) : 0); +} + +/* Set reply rko's version from replyq's version */ +#define rd_kafka_op_get_reply_version(REPLY_RKO, ORIG_RKO) \ + do { \ + (REPLY_RKO)->rko_version = (ORIG_RKO)->rko_replyq.version; \ + } while (0) + + +/* Clear replyq holder without decreasing any .q references. */ +static RD_INLINE RD_UNUSED void +rd_kafka_replyq_clear(rd_kafka_replyq_t *replyq) { + memset(replyq, 0, sizeof(*replyq)); +} + +/** + * @brief Make a copy of \p src in \p dst, with its own queue reference + */ +static RD_INLINE RD_UNUSED void rd_kafka_replyq_copy(rd_kafka_replyq_t *dst, + rd_kafka_replyq_t *src) { + dst->version = src->version; + dst->q = src->q; + if (dst->q) + rd_kafka_q_keep(dst->q); +#if ENABLE_DEVEL + if (src->_id) + dst->_id = rd_strdup(src->_id); + else + dst->_id = NULL; +#endif +} + + +/** + * Clear replyq holder and destroy any .q references. + */ +static RD_INLINE RD_UNUSED void +rd_kafka_replyq_destroy(rd_kafka_replyq_t *replyq) { + if (replyq->q) + rd_kafka_q_destroy(replyq->q); +#if ENABLE_DEVEL + if (replyq->_id) { + rd_free(replyq->_id); + replyq->_id = NULL; + } +#endif + rd_kafka_replyq_clear(replyq); +} + + +/** + * @brief Wrapper for rd_kafka_q_enq() that takes a replyq, + * steals its queue reference, enqueues the op with the replyq version, + * and then destroys the queue reference. + * + * If \p version is non-zero it will be updated, else replyq->version. + * + * @returns Same as rd_kafka_q_enq() + */ +static RD_INLINE RD_UNUSED int rd_kafka_replyq_enq(rd_kafka_replyq_t *replyq, + rd_kafka_op_t *rko, + int version) { + rd_kafka_q_t *rkq = replyq->q; + int r; + + if (version) + rko->rko_version = version; + else + rko->rko_version = replyq->version; + + /* The replyq queue reference is done after we've enqueued the rko + * so clear it here. */ + replyq->q = NULL; /* destroyed separately below */ + +#if ENABLE_DEVEL + if (replyq->_id) { + rd_free(replyq->_id); + replyq->_id = NULL; + } +#endif + + /* Retain replyq->version since it is used by buf_callback + * when dispatching the callback. */ + + r = rd_kafka_q_enq(rkq, rko); + + rd_kafka_q_destroy(rkq); + + return r; +} + + + +rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq, + rd_ts_t timeout_us, + int32_t version, + rd_kafka_q_cb_type_t cb_type, + rd_kafka_q_serve_cb_t *callback, + void *opaque); +rd_kafka_op_t * +rd_kafka_q_pop(rd_kafka_q_t *rkq, rd_ts_t timeout_us, int32_t version); +int rd_kafka_q_serve(rd_kafka_q_t *rkq, + int timeout_ms, + int max_cnt, + rd_kafka_q_cb_type_t cb_type, + rd_kafka_q_serve_cb_t *callback, + void *opaque); + + +int rd_kafka_q_move_cnt(rd_kafka_q_t *dstq, + rd_kafka_q_t *srcq, + int cnt, + int do_locks); + +int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, + int timeout_ms, + rd_kafka_message_t **rkmessages, + size_t rkmessages_size); +rd_kafka_resp_err_t rd_kafka_q_wait_result(rd_kafka_q_t *rkq, int timeout_ms); + +int rd_kafka_q_apply(rd_kafka_q_t *rkq, + int (*callback)(rd_kafka_q_t *rkq, + rd_kafka_op_t *rko, + void *opaque), + void *opaque); + +void rd_kafka_q_fix_offsets(rd_kafka_q_t *rkq, + int64_t min_offset, + int64_t base_offset); + +/** + * @returns the last op in the queue matching \p op_type and \p allow_err (bool) + * @remark The \p rkq must be properly locked before this call, the returned rko + * is not removed from the queue and may thus not be held for longer + * than the lock is held. + */ +static RD_INLINE RD_UNUSED rd_kafka_op_t * +rd_kafka_q_last(rd_kafka_q_t *rkq, rd_kafka_op_type_t op_type, int allow_err) { + rd_kafka_op_t *rko; + TAILQ_FOREACH_REVERSE(rko, &rkq->rkq_q, rd_kafka_op_tailq, rko_link) { + if (rko->rko_type == op_type && (allow_err || !rko->rko_err)) + return rko; + } + + return NULL; +} + +void rd_kafka_q_io_event_enable(rd_kafka_q_t *rkq, + rd_socket_t fd, + const void *payload, + size_t size); + +/* Public interface */ +struct rd_kafka_queue_s { + rd_kafka_q_t *rkqu_q; + rd_kafka_t *rkqu_rk; + int rkqu_is_owner; /**< Is owner/creator of rkqu_q */ +}; + + +rd_kafka_queue_t *rd_kafka_queue_new0(rd_kafka_t *rk, rd_kafka_q_t *rkq); + +void rd_kafka_q_dump(FILE *fp, rd_kafka_q_t *rkq); + +extern int RD_TLS rd_kafka_yield_thread; + + + +/** + * @name Enqueue op once + * @{ + */ + +/** + * @brief Minimal rd_kafka_op_t wrapper that ensures that + * the op is only enqueued on the provided queue once. + * + * Typical use-case is for an op to be triggered from multiple sources, + * but at most once, such as from a timer and some other source. + */ +typedef struct rd_kafka_enq_once_s { + mtx_t lock; + int refcnt; + rd_kafka_op_t *rko; + rd_kafka_replyq_t replyq; +} rd_kafka_enq_once_t; + + +/** + * @brief Allocate and set up a new eonce and set the initial refcount to 1. + * @remark This is to be called by the owner of the rko. + */ +static RD_INLINE RD_UNUSED rd_kafka_enq_once_t * +rd_kafka_enq_once_new(rd_kafka_op_t *rko, rd_kafka_replyq_t replyq) { + rd_kafka_enq_once_t *eonce = rd_calloc(1, sizeof(*eonce)); + mtx_init(&eonce->lock, mtx_plain); + eonce->rko = rko; + eonce->replyq = replyq; /* struct copy */ + eonce->refcnt = 1; + return eonce; +} + +/** + * @brief Re-enable triggering of a eonce even after it has been triggered + * once. + * + * @remark This is to be called by the owner. + */ +static RD_INLINE RD_UNUSED void +rd_kafka_enq_once_reenable(rd_kafka_enq_once_t *eonce, + rd_kafka_op_t *rko, + rd_kafka_replyq_t replyq) { + mtx_lock(&eonce->lock); + eonce->rko = rko; + rd_kafka_replyq_destroy(&eonce->replyq); + eonce->replyq = replyq; /* struct copy */ + mtx_unlock(&eonce->lock); +} + + +/** + * @brief Free eonce and its resources. Must only be called with refcnt==0 + * and eonce->lock NOT held. + */ +static RD_INLINE RD_UNUSED void +rd_kafka_enq_once_destroy0(rd_kafka_enq_once_t *eonce) { + /* This must not be called with the rko or replyq still set, which would + * indicate that no enqueueing was performed and that the owner + * did not clean up, which is a bug. */ + rd_assert(!eonce->rko); + rd_assert(!eonce->replyq.q); +#if ENABLE_DEVEL + rd_assert(!eonce->replyq._id); +#endif + rd_assert(eonce->refcnt == 0); + + mtx_destroy(&eonce->lock); + rd_free(eonce); +} + + +/** + * @brief Increment refcount for source (non-owner), such as a timer. + * + * @param srcdesc a human-readable descriptive string of the source. + * May be used for future debugging. + */ +static RD_INLINE RD_UNUSED void +rd_kafka_enq_once_add_source(rd_kafka_enq_once_t *eonce, const char *srcdesc) { + mtx_lock(&eonce->lock); + eonce->refcnt++; + mtx_unlock(&eonce->lock); +} + + +/** + * @brief Decrement refcount for source (non-owner), such as a timer. + * + * @param srcdesc a human-readable descriptive string of the source. + * May be used for future debugging. + * + * @remark Must only be called from the owner with the owner + * still holding its own refcount. + * This API is used to undo an add_source() from the + * same code. + */ +static RD_INLINE RD_UNUSED void +rd_kafka_enq_once_del_source(rd_kafka_enq_once_t *eonce, const char *srcdesc) { + int do_destroy; + + mtx_lock(&eonce->lock); + rd_assert(eonce->refcnt > 0); + eonce->refcnt--; + do_destroy = eonce->refcnt == 0; + mtx_unlock(&eonce->lock); + + if (do_destroy) { + /* We're the last refcount holder, clean up eonce. */ + rd_kafka_enq_once_destroy0(eonce); + } +} + +/** + * @brief Trigger a source's reference where the eonce resides on + * an rd_list_t. This is typically used as a free_cb for + * rd_list_destroy() and the trigger error code is + * always RD_KAFKA_RESP_ERR__DESTROY. + */ +void rd_kafka_enq_once_trigger_destroy(void *ptr); + + +/** + * @brief Decrement refcount for source (non-owner) and return the rko + * if still set. + * + * @remark Must only be called by sources (non-owner) but only on the + * the owner's thread to make sure the rko is not freed. + * + * @remark The rko remains set on the eonce. + */ +static RD_INLINE RD_UNUSED rd_kafka_op_t * +rd_kafka_enq_once_del_source_return(rd_kafka_enq_once_t *eonce, + const char *srcdesc) { + rd_bool_t do_destroy; + rd_kafka_op_t *rko; + + mtx_lock(&eonce->lock); + + rd_assert(eonce->refcnt > 0); + /* Owner must still hold a eonce reference, or the eonce must + * have been disabled by the owner (no rko) */ + rd_assert(eonce->refcnt > 1 || !eonce->rko); + eonce->refcnt--; + do_destroy = eonce->refcnt == 0; + + rko = eonce->rko; + mtx_unlock(&eonce->lock); + + if (do_destroy) { + /* We're the last refcount holder, clean up eonce. */ + rd_kafka_enq_once_destroy0(eonce); + } + + return rko; +} + +/** + * @brief Trigger enqueuing of the rko (unless already enqueued) + * and drops the source's refcount. + * + * @remark Must only be called by sources (non-owner). + */ +static RD_INLINE RD_UNUSED void +rd_kafka_enq_once_trigger(rd_kafka_enq_once_t *eonce, + rd_kafka_resp_err_t err, + const char *srcdesc) { + int do_destroy; + rd_kafka_op_t *rko = NULL; + rd_kafka_replyq_t replyq = RD_ZERO_INIT; + + mtx_lock(&eonce->lock); + + rd_assert(eonce->refcnt > 0); + eonce->refcnt--; + do_destroy = eonce->refcnt == 0; + + if (eonce->rko) { + /* Not already enqueued, do it. + * Detach the rko and replyq from the eonce and unlock the eonce + * before enqueuing rko on reply to avoid recursive locks + * if the replyq has been disabled and the ops + * destructor is called (which might then access the eonce + * to clean up). */ + rko = eonce->rko; + replyq = eonce->replyq; + + eonce->rko = NULL; + rd_kafka_replyq_clear(&eonce->replyq); + + /* Reply is enqueued at the end of this function */ + } + mtx_unlock(&eonce->lock); + + if (do_destroy) { + /* We're the last refcount holder, clean up eonce. */ + rd_kafka_enq_once_destroy0(eonce); + } + + if (rko) { + rko->rko_err = err; + rd_kafka_replyq_enq(&replyq, rko, replyq.version); + rd_kafka_replyq_destroy(&replyq); + } +} + +/** + * @brief Destroy eonce, must only be called by the owner. + * There may be outstanding refcounts by non-owners after this call + */ +static RD_INLINE RD_UNUSED void +rd_kafka_enq_once_destroy(rd_kafka_enq_once_t *eonce) { + int do_destroy; + + mtx_lock(&eonce->lock); + rd_assert(eonce->refcnt > 0); + eonce->refcnt--; + do_destroy = eonce->refcnt == 0; + + eonce->rko = NULL; + rd_kafka_replyq_destroy(&eonce->replyq); + + mtx_unlock(&eonce->lock); + + if (do_destroy) { + /* We're the last refcount holder, clean up eonce. */ + rd_kafka_enq_once_destroy0(eonce); + } +} + + +/** + * @brief Disable the owner's eonce, extracting, resetting and returning + * the \c rko object. + * + * This is the same as rd_kafka_enq_once_destroy() but returning + * the rko. + * + * Use this for owner-thread triggering where the enqueuing of the + * rko on the replyq is not necessary. + * + * @returns the eonce's rko object, if still available, else NULL. + */ +static RD_INLINE RD_UNUSED rd_kafka_op_t * +rd_kafka_enq_once_disable(rd_kafka_enq_once_t *eonce) { + int do_destroy; + rd_kafka_op_t *rko; + + mtx_lock(&eonce->lock); + rd_assert(eonce->refcnt > 0); + eonce->refcnt--; + do_destroy = eonce->refcnt == 0; + + /* May be NULL */ + rko = eonce->rko; + eonce->rko = NULL; + rd_kafka_replyq_destroy(&eonce->replyq); + + mtx_unlock(&eonce->lock); + + if (do_destroy) { + /* We're the last refcount holder, clean up eonce. */ + rd_kafka_enq_once_destroy0(eonce); + } + + return rko; +} + + +/**@}*/ + + +#endif /* _RDKAFKA_QUEUE_H_ */ |