summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c1085
1 files changed, 1085 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c
new file mode 100644
index 000000000..57fce36b8
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c
@@ -0,0 +1,1085 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2016 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_interceptor.h"
+
+int RD_TLS rd_kafka_yield_thread = 0;
+
+void rd_kafka_yield(rd_kafka_t *rk) {
+ rd_kafka_yield_thread = 1;
+}
+
+
+/**
+ * @brief Check and reset yield flag.
+ * @returns rd_true if caller should yield, otherwise rd_false.
+ * @remarks rkq_lock MUST be held
+ */
+static RD_INLINE rd_bool_t rd_kafka_q_check_yield(rd_kafka_q_t *rkq) {
+ if (!(rkq->rkq_flags & RD_KAFKA_Q_F_YIELD))
+ return rd_false;
+
+ rkq->rkq_flags &= ~RD_KAFKA_Q_F_YIELD;
+ return rd_true;
+}
+/**
+ * Destroy a queue. refcnt must be at zero.
+ */
+void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq) {
+
+ mtx_lock(&rkq->rkq_lock);
+ if (unlikely(rkq->rkq_qio != NULL)) {
+ rd_free(rkq->rkq_qio);
+ rkq->rkq_qio = NULL;
+ }
+ /* Queue must have been disabled prior to final destruction,
+ * this is to catch the case where the queue owner/poll does not
+ * use rd_kafka_q_destroy_owner(). */
+ rd_dassert(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY));
+ rd_kafka_q_disable0(rkq, 0 /*no-lock*/); /* for the non-devel case */
+ rd_kafka_q_fwd_set0(rkq, NULL, 0 /*no-lock*/, 0 /*no-fwd-app*/);
+ rd_kafka_q_purge0(rkq, 0 /*no-lock*/);
+ assert(!rkq->rkq_fwdq);
+ mtx_unlock(&rkq->rkq_lock);
+ mtx_destroy(&rkq->rkq_lock);
+ cnd_destroy(&rkq->rkq_cond);
+
+ if (rkq->rkq_flags & RD_KAFKA_Q_F_ALLOCATED)
+ rd_free(rkq);
+}
+
+
+
+/**
+ * Initialize a queue.
+ */
+void rd_kafka_q_init0(rd_kafka_q_t *rkq,
+ rd_kafka_t *rk,
+ const char *func,
+ int line) {
+ rd_kafka_q_reset(rkq);
+ rkq->rkq_fwdq = NULL;
+ rkq->rkq_refcnt = 1;
+ rkq->rkq_flags = RD_KAFKA_Q_F_READY;
+ rkq->rkq_rk = rk;
+ rkq->rkq_qio = NULL;
+ rkq->rkq_serve = NULL;
+ rkq->rkq_opaque = NULL;
+ mtx_init(&rkq->rkq_lock, mtx_plain);
+ cnd_init(&rkq->rkq_cond);
+#if ENABLE_DEVEL
+ rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line);
+#else
+ rkq->rkq_name = func;
+#endif
+}
+
+
+/**
+ * Allocate a new queue and initialize it.
+ */
+rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line) {
+ rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq));
+ rd_kafka_q_init(rkq, rk);
+ rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED;
+#if ENABLE_DEVEL
+ rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line);
+#else
+ rkq->rkq_name = func;
+#endif
+ return rkq;
+}
+
+/**
+ * Set/clear forward queue.
+ * Queue forwarding enables message routing inside rdkafka.
+ * Typical use is to re-route all fetched messages for all partitions
+ * to one single queue.
+ *
+ * All access to rkq_fwdq are protected by rkq_lock.
+ */
+void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq,
+ rd_kafka_q_t *destq,
+ int do_lock,
+ int fwd_app) {
+ if (unlikely(srcq == destq))
+ return;
+
+ if (do_lock)
+ mtx_lock(&srcq->rkq_lock);
+ if (fwd_app)
+ srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP;
+ if (srcq->rkq_fwdq) {
+ rd_kafka_q_destroy(srcq->rkq_fwdq);
+ srcq->rkq_fwdq = NULL;
+ }
+ if (destq) {
+ rd_kafka_q_keep(destq);
+
+ /* If rkq has ops in queue, append them to fwdq's queue.
+ * This is an irreversible operation. */
+ if (srcq->rkq_qlen > 0) {
+ rd_dassert(destq->rkq_flags & RD_KAFKA_Q_F_READY);
+ rd_kafka_q_concat(destq, srcq);
+ }
+
+ srcq->rkq_fwdq = destq;
+ }
+ if (do_lock)
+ mtx_unlock(&srcq->rkq_lock);
+}
+
+/**
+ * Purge all entries from a queue.
+ */
+int rd_kafka_q_purge0(rd_kafka_q_t *rkq, int do_lock) {
+ rd_kafka_op_t *rko, *next;
+ TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
+ rd_kafka_q_t *fwdq;
+ int cnt = 0;
+
+ if (do_lock)
+ mtx_lock(&rkq->rkq_lock);
+
+ if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+ cnt = rd_kafka_q_purge(fwdq);
+ rd_kafka_q_destroy(fwdq);
+ return cnt;
+ }
+
+ /* Move ops queue to tmpq to avoid lock-order issue
+ * by locks taken from rd_kafka_op_destroy(). */
+ TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link);
+
+ rd_kafka_q_mark_served(rkq);
+
+ /* Zero out queue */
+ rd_kafka_q_reset(rkq);
+
+ if (do_lock)
+ mtx_unlock(&rkq->rkq_lock);
+
+ /* Destroy the ops */
+ next = TAILQ_FIRST(&tmpq);
+ while ((rko = next)) {
+ next = TAILQ_NEXT(next, rko_link);
+ rd_kafka_op_destroy(rko);
+ cnt++;
+ }
+
+ return cnt;
+}
+
+
+/**
+ * Purge all entries from a queue with a rktp version smaller than `version`
+ * This shaves off the head of the queue, up until the first rko with
+ * a non-matching rktp or version.
+ */
+void rd_kafka_q_purge_toppar_version(rd_kafka_q_t *rkq,
+ rd_kafka_toppar_t *rktp,
+ int version) {
+ rd_kafka_op_t *rko, *next;
+ TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
+ int32_t cnt = 0;
+ int64_t size = 0;
+ rd_kafka_q_t *fwdq;
+
+ mtx_lock(&rkq->rkq_lock);
+
+ if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ mtx_unlock(&rkq->rkq_lock);
+ rd_kafka_q_purge_toppar_version(fwdq, rktp, version);
+ rd_kafka_q_destroy(fwdq);
+ return;
+ }
+
+ /* Move ops to temporary queue and then destroy them from there
+ * without locks to avoid lock-ordering problems in op_destroy() */
+ while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && rko->rko_rktp &&
+ rko->rko_rktp == rktp && rko->rko_version < version) {
+ TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
+ TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
+ cnt++;
+ size += rko->rko_len;
+ }
+
+ rd_kafka_q_mark_served(rkq);
+
+ rkq->rkq_qlen -= cnt;
+ rkq->rkq_qsize -= size;
+ mtx_unlock(&rkq->rkq_lock);
+
+ next = TAILQ_FIRST(&tmpq);
+ while ((rko = next)) {
+ next = TAILQ_NEXT(next, rko_link);
+ rd_kafka_op_destroy(rko);
+ }
+}
+
+
+/**
+ * Move 'cnt' entries from 'srcq' to 'dstq'.
+ * If 'cnt' == -1 all entries will be moved.
+ * Returns the number of entries moved.
+ */
+int rd_kafka_q_move_cnt(rd_kafka_q_t *dstq,
+ rd_kafka_q_t *srcq,
+ int cnt,
+ int do_locks) {
+ rd_kafka_op_t *rko;
+ int mcnt = 0;
+
+ if (do_locks) {
+ mtx_lock(&srcq->rkq_lock);
+ mtx_lock(&dstq->rkq_lock);
+ }
+
+ if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) {
+ if (cnt > 0 && dstq->rkq_qlen == 0)
+ rd_kafka_q_io_event(dstq);
+
+ /* Optimization, if 'cnt' is equal/larger than all
+ * items of 'srcq' we can move the entire queue. */
+ if (cnt == -1 || cnt >= (int)srcq->rkq_qlen) {
+ mcnt = srcq->rkq_qlen;
+ rd_kafka_q_concat0(dstq, srcq, 0 /*no-lock*/);
+ } else {
+ while (mcnt < cnt &&
+ (rko = TAILQ_FIRST(&srcq->rkq_q))) {
+ TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
+ if (likely(!rko->rko_prio))
+ TAILQ_INSERT_TAIL(&dstq->rkq_q, rko,
+ rko_link);
+ else
+ TAILQ_INSERT_SORTED(
+ &dstq->rkq_q, rko, rd_kafka_op_t *,
+ rko_link, rd_kafka_op_cmp_prio);
+
+ srcq->rkq_qlen--;
+ dstq->rkq_qlen++;
+ srcq->rkq_qsize -= rko->rko_len;
+ dstq->rkq_qsize += rko->rko_len;
+ mcnt++;
+ }
+ }
+
+ rd_kafka_q_mark_served(srcq);
+
+ } else
+ mcnt = rd_kafka_q_move_cnt(
+ dstq->rkq_fwdq ? dstq->rkq_fwdq : dstq,
+ srcq->rkq_fwdq ? srcq->rkq_fwdq : srcq, cnt, do_locks);
+
+ if (do_locks) {
+ mtx_unlock(&dstq->rkq_lock);
+ mtx_unlock(&srcq->rkq_lock);
+ }
+
+ return mcnt;
+}
+
+
+/**
+ * Filters out outdated ops.
+ */
+static RD_INLINE rd_kafka_op_t *
+rd_kafka_op_filter(rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int version) {
+ if (unlikely(!rko))
+ return NULL;
+
+ if (unlikely(rd_kafka_op_version_outdated(rko, version))) {
+ rd_kafka_q_deq0(rkq, rko);
+ rd_kafka_op_destroy(rko);
+ return NULL;
+ }
+
+ return rko;
+}
+
+
+
+/**
+ * Pop an op from a queue.
+ *
+ * Locality: any thread.
+ */
+
+
+/**
+ * Serve q like rd_kafka_q_serve() until an op is found that can be returned
+ * as an event to the application.
+ *
+ * @returns the first event:able op, or NULL on timeout.
+ *
+ * Locality: any thread
+ */
+rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq,
+ rd_ts_t timeout_us,
+ int32_t version,
+ rd_kafka_q_cb_type_t cb_type,
+ rd_kafka_q_serve_cb_t *callback,
+ void *opaque) {
+ rd_kafka_op_t *rko;
+ rd_kafka_q_t *fwdq;
+
+ rd_dassert(cb_type);
+
+ mtx_lock(&rkq->rkq_lock);
+
+ rd_kafka_yield_thread = 0;
+ if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ struct timespec timeout_tspec;
+
+ rd_timeout_init_timespec_us(&timeout_tspec, timeout_us);
+
+ while (1) {
+ rd_kafka_op_res_t res;
+ /* Keep track of current lock status to avoid
+ * unnecessary lock flapping in all the cases below. */
+ rd_bool_t is_locked = rd_true;
+
+ /* Filter out outdated ops */
+ retry:
+ while ((rko = TAILQ_FIRST(&rkq->rkq_q)) &&
+ !(rko = rd_kafka_op_filter(rkq, rko, version)))
+ ;
+
+ rd_kafka_q_mark_served(rkq);
+
+ if (rko) {
+ /* Proper versioned op */
+ rd_kafka_q_deq0(rkq, rko);
+
+ /* Let op_handle() operate without lock
+ * held to allow re-enqueuing, etc. */
+ mtx_unlock(&rkq->rkq_lock);
+ is_locked = rd_false;
+
+ /* Ops with callbacks are considered handled
+ * and we move on to the next op, if any.
+ * Ops w/o callbacks are returned immediately */
+ res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko,
+ cb_type, opaque,
+ callback);
+
+ if (res == RD_KAFKA_OP_RES_HANDLED ||
+ res == RD_KAFKA_OP_RES_KEEP) {
+ mtx_lock(&rkq->rkq_lock);
+ is_locked = rd_true;
+ goto retry; /* Next op */
+ } else if (unlikely(res ==
+ RD_KAFKA_OP_RES_YIELD)) {
+ /* Callback yielded, unroll */
+ return NULL;
+ } else
+ break; /* Proper op, handle below. */
+ }
+
+ if (unlikely(rd_kafka_q_check_yield(rkq))) {
+ if (is_locked)
+ mtx_unlock(&rkq->rkq_lock);
+ return NULL;
+ }
+
+ if (!is_locked)
+ mtx_lock(&rkq->rkq_lock);
+
+ if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock,
+ &timeout_tspec) != thrd_success) {
+ mtx_unlock(&rkq->rkq_lock);
+ return NULL;
+ }
+ }
+
+ } else {
+ /* Since the q_pop may block we need to release the parent
+ * queue's lock. */
+ mtx_unlock(&rkq->rkq_lock);
+ rko = rd_kafka_q_pop_serve(fwdq, timeout_us, version, cb_type,
+ callback, opaque);
+ rd_kafka_q_destroy(fwdq);
+ }
+
+
+ return rko;
+}
+
+rd_kafka_op_t *
+rd_kafka_q_pop(rd_kafka_q_t *rkq, rd_ts_t timeout_us, int32_t version) {
+ return rd_kafka_q_pop_serve(rkq, timeout_us, version,
+ RD_KAFKA_Q_CB_RETURN, NULL, NULL);
+}
+
+
+/**
+ * Pop all available ops from a queue and call the provided
+ * callback for each op.
+ * `max_cnt` limits the number of ops served, 0 = no limit.
+ *
+ * Returns the number of ops served.
+ *
+ * Locality: any thread.
+ */
+int rd_kafka_q_serve(rd_kafka_q_t *rkq,
+ int timeout_ms,
+ int max_cnt,
+ rd_kafka_q_cb_type_t cb_type,
+ rd_kafka_q_serve_cb_t *callback,
+ void *opaque) {
+ rd_kafka_t *rk = rkq->rkq_rk;
+ rd_kafka_op_t *rko;
+ rd_kafka_q_t localq;
+ rd_kafka_q_t *fwdq;
+ int cnt = 0;
+ struct timespec timeout_tspec;
+
+ rd_dassert(cb_type);
+
+ mtx_lock(&rkq->rkq_lock);
+
+ rd_dassert(TAILQ_EMPTY(&rkq->rkq_q) || rkq->rkq_qlen > 0);
+ if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ int ret;
+ /* Since the q_pop may block we need to release the parent
+ * queue's lock. */
+ mtx_unlock(&rkq->rkq_lock);
+ ret = rd_kafka_q_serve(fwdq, timeout_ms, max_cnt, cb_type,
+ callback, opaque);
+ rd_kafka_q_destroy(fwdq);
+ return ret;
+ }
+
+ rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
+
+ /* Wait for op */
+ while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
+ !rd_kafka_q_check_yield(rkq) &&
+ cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock,
+ &timeout_tspec) == thrd_success)
+ ;
+
+ rd_kafka_q_mark_served(rkq);
+
+ if (!rko) {
+ mtx_unlock(&rkq->rkq_lock);
+ return 0;
+ }
+
+ /* Move the first `max_cnt` ops. */
+ rd_kafka_q_init(&localq, rkq->rkq_rk);
+ rd_kafka_q_move_cnt(&localq, rkq, max_cnt == 0 ? -1 /*all*/ : max_cnt,
+ 0 /*no-locks*/);
+
+ mtx_unlock(&rkq->rkq_lock);
+
+ rd_kafka_yield_thread = 0;
+
+ /* Call callback for each op */
+ while ((rko = TAILQ_FIRST(&localq.rkq_q))) {
+ rd_kafka_op_res_t res;
+
+ rd_kafka_q_deq0(&localq, rko);
+ res = rd_kafka_op_handle(rk, &localq, rko, cb_type, opaque,
+ callback);
+ /* op must have been handled */
+ rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS);
+ cnt++;
+
+ if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
+ rd_kafka_yield_thread)) {
+ /* Callback called rd_kafka_yield(), we must
+ * stop our callback dispatching and put the
+ * ops in localq back on the original queue head. */
+ if (!TAILQ_EMPTY(&localq.rkq_q))
+ rd_kafka_q_prepend(rkq, &localq);
+ break;
+ }
+ }
+
+ rd_kafka_q_destroy_owner(&localq);
+
+ return cnt;
+}
+
+/**
+ * @brief Filter out and destroy outdated messages.
+ *
+ * @returns Returns the number of valid messages.
+ *
+ * @locality Any thread.
+ */
+static size_t
+rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp,
+ int32_t version,
+ rd_kafka_message_t **rkmessages,
+ size_t cnt,
+ struct rd_kafka_op_tailq *ctrl_msg_q) {
+ size_t valid_count = 0;
+ size_t i;
+ rd_kafka_op_t *rko, *next;
+
+ for (i = 0; i < cnt; i++) {
+ rko = rkmessages[i]->_private;
+ if (rko->rko_rktp == rktp &&
+ rd_kafka_op_version_outdated(rko, version)) {
+ /* This also destroys the corresponding rkmessage. */
+ rd_kafka_op_destroy(rko);
+ } else if (i > valid_count) {
+ rkmessages[valid_count++] = rkmessages[i];
+ } else {
+ valid_count++;
+ }
+ }
+
+ /* Discard outdated control msgs ops */
+ next = TAILQ_FIRST(ctrl_msg_q);
+ while (next) {
+ rko = next;
+ next = TAILQ_NEXT(rko, rko_link);
+ if (rko->rko_rktp == rktp &&
+ rd_kafka_op_version_outdated(rko, version)) {
+ TAILQ_REMOVE(ctrl_msg_q, rko, rko_link);
+ rd_kafka_op_destroy(rko);
+ }
+ }
+
+ return valid_count;
+}
+
+
+/**
+ * Populate 'rkmessages' array with messages from 'rkq'.
+ * If 'auto_commit' is set, each message's offset will be committed
+ * to the offset store for that toppar.
+ *
+ * Returns the number of messages added.
+ */
+
+int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
+ int timeout_ms,
+ rd_kafka_message_t **rkmessages,
+ size_t rkmessages_size) {
+ unsigned int cnt = 0;
+ TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
+ struct rd_kafka_op_tailq ctrl_msg_q =
+ TAILQ_HEAD_INITIALIZER(ctrl_msg_q);
+ rd_kafka_op_t *rko, *next;
+ rd_kafka_t *rk = rkq->rkq_rk;
+ rd_kafka_q_t *fwdq;
+ struct timespec timeout_tspec;
+ int i;
+
+ mtx_lock(&rkq->rkq_lock);
+ if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ /* Since the q_pop may block we need to release the parent
+ * queue's lock. */
+ mtx_unlock(&rkq->rkq_lock);
+ cnt = rd_kafka_q_serve_rkmessages(fwdq, timeout_ms, rkmessages,
+ rkmessages_size);
+ rd_kafka_q_destroy(fwdq);
+ return cnt;
+ }
+ mtx_unlock(&rkq->rkq_lock);
+
+ if (timeout_ms)
+ rd_kafka_app_poll_blocking(rk);
+
+ rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
+
+ rd_kafka_yield_thread = 0;
+ while (cnt < rkmessages_size) {
+ rd_kafka_op_res_t res;
+
+ mtx_lock(&rkq->rkq_lock);
+
+ while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
+ !rd_kafka_q_check_yield(rkq) &&
+ cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock,
+ &timeout_tspec) == thrd_success)
+ ;
+
+ rd_kafka_q_mark_served(rkq);
+
+ if (!rko) {
+ mtx_unlock(&rkq->rkq_lock);
+ break; /* Timed out */
+ }
+
+ rd_kafka_q_deq0(rkq, rko);
+
+ mtx_unlock(&rkq->rkq_lock);
+
+ if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) {
+ cnt = (unsigned int)rd_kafka_purge_outdated_messages(
+ rko->rko_rktp, rko->rko_version, rkmessages, cnt,
+ &ctrl_msg_q);
+ rd_kafka_op_destroy(rko);
+ continue;
+ }
+
+ if (rd_kafka_op_version_outdated(rko, 0)) {
+ /* Outdated op, put on discard queue */
+ TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
+ continue;
+ }
+
+ /* Serve non-FETCH callbacks */
+ res =
+ rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);
+ if (res == RD_KAFKA_OP_RES_KEEP ||
+ res == RD_KAFKA_OP_RES_HANDLED) {
+ /* Callback served, rko is destroyed (if HANDLED). */
+ continue;
+ } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
+ rd_kafka_yield_thread)) {
+ /* Yield. */
+ break;
+ }
+ rd_dassert(res == RD_KAFKA_OP_RES_PASS);
+
+ /* If this is a control messages, don't return message to
+ * application. Add it to a tmp queue from where we can store
+ * the offset and destroy the op */
+ if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
+ TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link);
+ continue;
+ }
+
+ /* Get rkmessage from rko and append to array. */
+ rkmessages[cnt++] = rd_kafka_message_get(rko);
+ }
+
+ for (i = cnt - 1; i >= 0; i--) {
+ rko = (rd_kafka_op_t *)rkmessages[i]->_private;
+ rd_kafka_toppar_t *rktp = rko->rko_rktp;
+ int64_t offset = rkmessages[i]->offset + 1;
+ if (unlikely(rktp->rktp_app_pos.offset < offset))
+ rd_kafka_update_app_pos(
+ rk, rktp,
+ RD_KAFKA_FETCH_POS(
+ offset,
+ rd_kafka_message_leader_epoch(rkmessages[i])),
+ RD_DO_LOCK);
+ }
+
+ /* Discard non-desired and already handled ops */
+ next = TAILQ_FIRST(&tmpq);
+ while (next) {
+ rko = next;
+ next = TAILQ_NEXT(next, rko_link);
+ rd_kafka_op_destroy(rko);
+ }
+
+ /* Discard ctrl msgs */
+ next = TAILQ_FIRST(&ctrl_msg_q);
+ while (next) {
+ rko = next;
+ next = TAILQ_NEXT(next, rko_link);
+ rd_kafka_toppar_t *rktp = rko->rko_rktp;
+ int64_t offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1;
+ if (rktp->rktp_app_pos.offset < offset)
+ rd_kafka_update_app_pos(
+ rk, rktp,
+ RD_KAFKA_FETCH_POS(
+ offset,
+ rd_kafka_message_leader_epoch(
+ &rko->rko_u.fetch.rkm.rkm_rkmessage)),
+ RD_DO_LOCK);
+ rd_kafka_op_destroy(rko);
+ }
+
+ rd_kafka_app_polled(rk);
+
+ return cnt;
+}
+
+
+
+void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu) {
+ if (rkqu->rkqu_is_owner)
+ rd_kafka_q_destroy_owner(rkqu->rkqu_q);
+ else
+ rd_kafka_q_destroy(rkqu->rkqu_q);
+ rd_free(rkqu);
+}
+
+rd_kafka_queue_t *rd_kafka_queue_new0(rd_kafka_t *rk, rd_kafka_q_t *rkq) {
+ rd_kafka_queue_t *rkqu;
+
+ rkqu = rd_calloc(1, sizeof(*rkqu));
+
+ rkqu->rkqu_q = rkq;
+ rd_kafka_q_keep(rkq);
+
+ rkqu->rkqu_rk = rk;
+
+ return rkqu;
+}
+
+
+rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk) {
+ rd_kafka_q_t *rkq;
+ rd_kafka_queue_t *rkqu;
+
+ rkq = rd_kafka_q_new(rk);
+ rkqu = rd_kafka_queue_new0(rk, rkq);
+ rd_kafka_q_destroy(rkq); /* Loose refcount from q_new, one is held
+ * by queue_new0 */
+ rkqu->rkqu_is_owner = 1;
+ return rkqu;
+}
+
+
+rd_kafka_queue_t *rd_kafka_queue_get_main(rd_kafka_t *rk) {
+ return rd_kafka_queue_new0(rk, rk->rk_rep);
+}
+
+
+rd_kafka_queue_t *rd_kafka_queue_get_consumer(rd_kafka_t *rk) {
+ if (!rk->rk_cgrp)
+ return NULL;
+ return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q);
+}
+
+rd_kafka_queue_t *rd_kafka_queue_get_partition(rd_kafka_t *rk,
+ const char *topic,
+ int32_t partition) {
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_queue_t *result;
+
+ if (rk->rk_type == RD_KAFKA_PRODUCER)
+ return NULL;
+
+ rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, /* no ua_on_miss */
+ 1 /* create_on_miss */);
+
+ if (!rktp)
+ return NULL;
+
+ result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq);
+ rd_kafka_toppar_destroy(rktp);
+
+ return result;
+}
+
+rd_kafka_queue_t *rd_kafka_queue_get_background(rd_kafka_t *rk) {
+ rd_kafka_queue_t *rkqu;
+
+ rd_kafka_wrlock(rk);
+ if (!rk->rk_background.q) {
+ char errstr[256];
+
+ if (rd_kafka_background_thread_create(rk, errstr,
+ sizeof(errstr))) {
+ rd_kafka_log(rk, LOG_ERR, "BACKGROUND",
+ "Failed to create background thread: %s",
+ errstr);
+ rd_kafka_wrunlock(rk);
+ return NULL;
+ }
+ }
+
+ rkqu = rd_kafka_queue_new0(rk, rk->rk_background.q);
+ rd_kafka_wrunlock(rk);
+ return rkqu;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_set_log_queue(rd_kafka_t *rk,
+ rd_kafka_queue_t *rkqu) {
+ rd_kafka_q_t *rkq;
+
+ if (!rk->rk_logq)
+ return RD_KAFKA_RESP_ERR__NOT_CONFIGURED;
+
+ if (!rkqu)
+ rkq = rk->rk_rep;
+ else
+ rkq = rkqu->rkqu_q;
+ rd_kafka_q_fwd_set(rk->rk_logq, rkq);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+void rd_kafka_queue_forward(rd_kafka_queue_t *src, rd_kafka_queue_t *dst) {
+ rd_kafka_q_fwd_set0(src->rkqu_q, dst ? dst->rkqu_q : NULL,
+ 1, /* do_lock */
+ 1 /* fwd_app */);
+}
+
+
+size_t rd_kafka_queue_length(rd_kafka_queue_t *rkqu) {
+ return (size_t)rd_kafka_q_len(rkqu->rkqu_q);
+}
+
+/**
+ * @brief Enable or disable(fd==-1) fd-based wake-ups for queue
+ */
+void rd_kafka_q_io_event_enable(rd_kafka_q_t *rkq,
+ rd_socket_t fd,
+ const void *payload,
+ size_t size) {
+ struct rd_kafka_q_io *qio = NULL;
+
+ if (fd != -1) {
+ qio = rd_malloc(sizeof(*qio) + size);
+ qio->fd = fd;
+ qio->size = size;
+ qio->payload = (void *)(qio + 1);
+ qio->sent = rd_false;
+ qio->event_cb = NULL;
+ qio->event_cb_opaque = NULL;
+ memcpy(qio->payload, payload, size);
+ }
+
+ mtx_lock(&rkq->rkq_lock);
+ if (rkq->rkq_qio) {
+ rd_free(rkq->rkq_qio);
+ rkq->rkq_qio = NULL;
+ }
+
+ if (fd != -1) {
+ rkq->rkq_qio = qio;
+ }
+
+ mtx_unlock(&rkq->rkq_lock);
+}
+
+void rd_kafka_queue_io_event_enable(rd_kafka_queue_t *rkqu,
+ int fd,
+ const void *payload,
+ size_t size) {
+ rd_kafka_q_io_event_enable(rkqu->rkqu_q, fd, payload, size);
+}
+
+
+void rd_kafka_queue_yield(rd_kafka_queue_t *rkqu) {
+ rd_kafka_q_yield(rkqu->rkqu_q);
+}
+
+
+/**
+ * @brief Enable or disable(event_cb==NULL) callback-based wake-ups for queue
+ */
+void rd_kafka_q_cb_event_enable(rd_kafka_q_t *rkq,
+ void (*event_cb)(rd_kafka_t *rk, void *opaque),
+ void *opaque) {
+ struct rd_kafka_q_io *qio = NULL;
+
+ if (event_cb) {
+ qio = rd_malloc(sizeof(*qio));
+ qio->fd = -1;
+ qio->size = 0;
+ qio->payload = NULL;
+ qio->event_cb = event_cb;
+ qio->event_cb_opaque = opaque;
+ }
+
+ mtx_lock(&rkq->rkq_lock);
+ if (rkq->rkq_qio) {
+ rd_free(rkq->rkq_qio);
+ rkq->rkq_qio = NULL;
+ }
+
+ if (event_cb) {
+ rkq->rkq_qio = qio;
+ }
+
+ mtx_unlock(&rkq->rkq_lock);
+}
+
+void rd_kafka_queue_cb_event_enable(rd_kafka_queue_t *rkqu,
+ void (*event_cb)(rd_kafka_t *rk,
+ void *opaque),
+ void *opaque) {
+ rd_kafka_q_cb_event_enable(rkqu->rkqu_q, event_cb, opaque);
+}
+
+
+/**
+ * Helper: wait for single op on 'rkq', and return its error,
+ * or .._TIMED_OUT on timeout.
+ */
+rd_kafka_resp_err_t rd_kafka_q_wait_result(rd_kafka_q_t *rkq, int timeout_ms) {
+ rd_kafka_op_t *rko;
+ rd_kafka_resp_err_t err;
+
+ rko = rd_kafka_q_pop(rkq, rd_timeout_us(timeout_ms), 0);
+ if (!rko)
+ err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+ else {
+ err = rko->rko_err;
+ rd_kafka_op_destroy(rko);
+ }
+
+ return err;
+}
+
+
+/**
+ * Apply \p callback on each op in queue.
+ * If the callback wishes to remove the rko it must do so using
+ * using rd_kafka_op_deq0().
+ *
+ * @returns the sum of \p callback() return values.
+ * @remark rkq will be locked, callers should take care not to
+ * interact with \p rkq through other means from the callback to avoid
+ * deadlocks.
+ */
+int rd_kafka_q_apply(rd_kafka_q_t *rkq,
+ int (*callback)(rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ void *opaque),
+ void *opaque) {
+ rd_kafka_op_t *rko, *next;
+ rd_kafka_q_t *fwdq;
+ int cnt = 0;
+
+ mtx_lock(&rkq->rkq_lock);
+ if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
+ mtx_unlock(&rkq->rkq_lock);
+ cnt = rd_kafka_q_apply(fwdq, callback, opaque);
+ rd_kafka_q_destroy(fwdq);
+ return cnt;
+ }
+
+ next = TAILQ_FIRST(&rkq->rkq_q);
+ while ((rko = next)) {
+ next = TAILQ_NEXT(next, rko_link);
+ cnt += callback(rkq, rko, opaque);
+ }
+
+ rd_kafka_q_mark_served(rkq);
+
+ mtx_unlock(&rkq->rkq_lock);
+
+ return cnt;
+}
+
+/**
+ * @brief Convert relative to absolute offsets and also purge any messages
+ * that are older than \p min_offset.
+ * @remark Error ops with ERR__NOT_IMPLEMENTED will not be purged since
+ * they are used to indicate unknnown compression codecs and compressed
+ * messagesets may have a starting offset lower than what we requested.
+ * @remark \p rkq locking is not performed (caller's responsibility)
+ * @remark Must NOT be used on fwdq.
+ */
+void rd_kafka_q_fix_offsets(rd_kafka_q_t *rkq,
+ int64_t min_offset,
+ int64_t base_offset) {
+ rd_kafka_op_t *rko, *next;
+ int adj_len = 0;
+ int64_t adj_size = 0;
+
+ rd_kafka_assert(NULL, !rkq->rkq_fwdq);
+
+ next = TAILQ_FIRST(&rkq->rkq_q);
+ while ((rko = next)) {
+ next = TAILQ_NEXT(next, rko_link);
+
+ if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH))
+ continue;
+
+ rko->rko_u.fetch.rkm.rkm_offset += base_offset;
+
+ if (rko->rko_u.fetch.rkm.rkm_offset < min_offset &&
+ rko->rko_err != RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) {
+ adj_len++;
+ adj_size += rko->rko_len;
+ TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
+ rd_kafka_op_destroy(rko);
+ continue;
+ }
+ }
+
+
+ rkq->rkq_qlen -= adj_len;
+ rkq->rkq_qsize -= adj_size;
+}
+
+
+/**
+ * @brief Print information and contents of queue
+ */
+void rd_kafka_q_dump(FILE *fp, rd_kafka_q_t *rkq) {
+ mtx_lock(&rkq->rkq_lock);
+ fprintf(fp,
+ "Queue %p \"%s\" (refcnt %d, flags 0x%x, %d ops, "
+ "%" PRId64 " bytes)\n",
+ rkq, rkq->rkq_name, rkq->rkq_refcnt, rkq->rkq_flags,
+ rkq->rkq_qlen, rkq->rkq_qsize);
+
+ if (rkq->rkq_qio)
+ fprintf(fp, " QIO fd %d\n", (int)rkq->rkq_qio->fd);
+ if (rkq->rkq_serve)
+ fprintf(fp, " Serve callback %p, opaque %p\n", rkq->rkq_serve,
+ rkq->rkq_opaque);
+
+ if (rkq->rkq_fwdq) {
+ fprintf(fp, " Forwarded ->\n");
+ rd_kafka_q_dump(fp, rkq->rkq_fwdq);
+ } else {
+ rd_kafka_op_t *rko;
+
+ if (!TAILQ_EMPTY(&rkq->rkq_q))
+ fprintf(fp, " Queued ops:\n");
+ TAILQ_FOREACH(rko, &rkq->rkq_q, rko_link) {
+ fprintf(fp,
+ " %p %s (v%" PRId32
+ ", flags 0x%x, "
+ "prio %d, len %" PRId32
+ ", source %s, "
+ "replyq %p)\n",
+ rko, rd_kafka_op2str(rko->rko_type),
+ rko->rko_version, rko->rko_flags, rko->rko_prio,
+ rko->rko_len,
+#if ENABLE_DEVEL
+ rko->rko_source
+#else
+ "-"
+#endif
+ ,
+ rko->rko_replyq.q);
+ }
+ }
+
+ mtx_unlock(&rkq->rkq_lock);
+}
+
+
+void rd_kafka_enq_once_trigger_destroy(void *ptr) {
+ rd_kafka_enq_once_t *eonce = ptr;
+
+ rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__DESTROY, "destroy");
+}