diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c | 1085 |
1 files changed, 1085 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c new file mode 100644 index 00000000..57fce36b --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c @@ -0,0 +1,1085 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2016 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdkafka_int.h" +#include "rdkafka_offset.h" +#include "rdkafka_topic.h" +#include "rdkafka_interceptor.h" + +int RD_TLS rd_kafka_yield_thread = 0; + +void rd_kafka_yield(rd_kafka_t *rk) { + rd_kafka_yield_thread = 1; +} + + +/** + * @brief Check and reset yield flag. + * @returns rd_true if caller should yield, otherwise rd_false. + * @remarks rkq_lock MUST be held + */ +static RD_INLINE rd_bool_t rd_kafka_q_check_yield(rd_kafka_q_t *rkq) { + if (!(rkq->rkq_flags & RD_KAFKA_Q_F_YIELD)) + return rd_false; + + rkq->rkq_flags &= ~RD_KAFKA_Q_F_YIELD; + return rd_true; +} +/** + * Destroy a queue. refcnt must be at zero. + */ +void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq) { + + mtx_lock(&rkq->rkq_lock); + if (unlikely(rkq->rkq_qio != NULL)) { + rd_free(rkq->rkq_qio); + rkq->rkq_qio = NULL; + } + /* Queue must have been disabled prior to final destruction, + * this is to catch the case where the queue owner/poll does not + * use rd_kafka_q_destroy_owner(). */ + rd_dassert(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY)); + rd_kafka_q_disable0(rkq, 0 /*no-lock*/); /* for the non-devel case */ + rd_kafka_q_fwd_set0(rkq, NULL, 0 /*no-lock*/, 0 /*no-fwd-app*/); + rd_kafka_q_purge0(rkq, 0 /*no-lock*/); + assert(!rkq->rkq_fwdq); + mtx_unlock(&rkq->rkq_lock); + mtx_destroy(&rkq->rkq_lock); + cnd_destroy(&rkq->rkq_cond); + + if (rkq->rkq_flags & RD_KAFKA_Q_F_ALLOCATED) + rd_free(rkq); +} + + + +/** + * Initialize a queue. + */ +void rd_kafka_q_init0(rd_kafka_q_t *rkq, + rd_kafka_t *rk, + const char *func, + int line) { + rd_kafka_q_reset(rkq); + rkq->rkq_fwdq = NULL; + rkq->rkq_refcnt = 1; + rkq->rkq_flags = RD_KAFKA_Q_F_READY; + rkq->rkq_rk = rk; + rkq->rkq_qio = NULL; + rkq->rkq_serve = NULL; + rkq->rkq_opaque = NULL; + mtx_init(&rkq->rkq_lock, mtx_plain); + cnd_init(&rkq->rkq_cond); +#if ENABLE_DEVEL + rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line); +#else + rkq->rkq_name = func; +#endif +} + + +/** + * Allocate a new queue and initialize it. + */ +rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line) { + rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq)); + rd_kafka_q_init(rkq, rk); + rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED; +#if ENABLE_DEVEL + rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line); +#else + rkq->rkq_name = func; +#endif + return rkq; +} + +/** + * Set/clear forward queue. + * Queue forwarding enables message routing inside rdkafka. + * Typical use is to re-route all fetched messages for all partitions + * to one single queue. + * + * All access to rkq_fwdq are protected by rkq_lock. + */ +void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq, + rd_kafka_q_t *destq, + int do_lock, + int fwd_app) { + if (unlikely(srcq == destq)) + return; + + if (do_lock) + mtx_lock(&srcq->rkq_lock); + if (fwd_app) + srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP; + if (srcq->rkq_fwdq) { + rd_kafka_q_destroy(srcq->rkq_fwdq); + srcq->rkq_fwdq = NULL; + } + if (destq) { + rd_kafka_q_keep(destq); + + /* If rkq has ops in queue, append them to fwdq's queue. + * This is an irreversible operation. */ + if (srcq->rkq_qlen > 0) { + rd_dassert(destq->rkq_flags & RD_KAFKA_Q_F_READY); + rd_kafka_q_concat(destq, srcq); + } + + srcq->rkq_fwdq = destq; + } + if (do_lock) + mtx_unlock(&srcq->rkq_lock); +} + +/** + * Purge all entries from a queue. + */ +int rd_kafka_q_purge0(rd_kafka_q_t *rkq, int do_lock) { + rd_kafka_op_t *rko, *next; + TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); + rd_kafka_q_t *fwdq; + int cnt = 0; + + if (do_lock) + mtx_lock(&rkq->rkq_lock); + + if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + if (do_lock) + mtx_unlock(&rkq->rkq_lock); + cnt = rd_kafka_q_purge(fwdq); + rd_kafka_q_destroy(fwdq); + return cnt; + } + + /* Move ops queue to tmpq to avoid lock-order issue + * by locks taken from rd_kafka_op_destroy(). */ + TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link); + + rd_kafka_q_mark_served(rkq); + + /* Zero out queue */ + rd_kafka_q_reset(rkq); + + if (do_lock) + mtx_unlock(&rkq->rkq_lock); + + /* Destroy the ops */ + next = TAILQ_FIRST(&tmpq); + while ((rko = next)) { + next = TAILQ_NEXT(next, rko_link); + rd_kafka_op_destroy(rko); + cnt++; + } + + return cnt; +} + + +/** + * Purge all entries from a queue with a rktp version smaller than `version` + * This shaves off the head of the queue, up until the first rko with + * a non-matching rktp or version. + */ +void rd_kafka_q_purge_toppar_version(rd_kafka_q_t *rkq, + rd_kafka_toppar_t *rktp, + int version) { + rd_kafka_op_t *rko, *next; + TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); + int32_t cnt = 0; + int64_t size = 0; + rd_kafka_q_t *fwdq; + + mtx_lock(&rkq->rkq_lock); + + if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + mtx_unlock(&rkq->rkq_lock); + rd_kafka_q_purge_toppar_version(fwdq, rktp, version); + rd_kafka_q_destroy(fwdq); + return; + } + + /* Move ops to temporary queue and then destroy them from there + * without locks to avoid lock-ordering problems in op_destroy() */ + while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && rko->rko_rktp && + rko->rko_rktp == rktp && rko->rko_version < version) { + TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link); + TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); + cnt++; + size += rko->rko_len; + } + + rd_kafka_q_mark_served(rkq); + + rkq->rkq_qlen -= cnt; + rkq->rkq_qsize -= size; + mtx_unlock(&rkq->rkq_lock); + + next = TAILQ_FIRST(&tmpq); + while ((rko = next)) { + next = TAILQ_NEXT(next, rko_link); + rd_kafka_op_destroy(rko); + } +} + + +/** + * Move 'cnt' entries from 'srcq' to 'dstq'. + * If 'cnt' == -1 all entries will be moved. + * Returns the number of entries moved. + */ +int rd_kafka_q_move_cnt(rd_kafka_q_t *dstq, + rd_kafka_q_t *srcq, + int cnt, + int do_locks) { + rd_kafka_op_t *rko; + int mcnt = 0; + + if (do_locks) { + mtx_lock(&srcq->rkq_lock); + mtx_lock(&dstq->rkq_lock); + } + + if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) { + if (cnt > 0 && dstq->rkq_qlen == 0) + rd_kafka_q_io_event(dstq); + + /* Optimization, if 'cnt' is equal/larger than all + * items of 'srcq' we can move the entire queue. */ + if (cnt == -1 || cnt >= (int)srcq->rkq_qlen) { + mcnt = srcq->rkq_qlen; + rd_kafka_q_concat0(dstq, srcq, 0 /*no-lock*/); + } else { + while (mcnt < cnt && + (rko = TAILQ_FIRST(&srcq->rkq_q))) { + TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link); + if (likely(!rko->rko_prio)) + TAILQ_INSERT_TAIL(&dstq->rkq_q, rko, + rko_link); + else + TAILQ_INSERT_SORTED( + &dstq->rkq_q, rko, rd_kafka_op_t *, + rko_link, rd_kafka_op_cmp_prio); + + srcq->rkq_qlen--; + dstq->rkq_qlen++; + srcq->rkq_qsize -= rko->rko_len; + dstq->rkq_qsize += rko->rko_len; + mcnt++; + } + } + + rd_kafka_q_mark_served(srcq); + + } else + mcnt = rd_kafka_q_move_cnt( + dstq->rkq_fwdq ? dstq->rkq_fwdq : dstq, + srcq->rkq_fwdq ? srcq->rkq_fwdq : srcq, cnt, do_locks); + + if (do_locks) { + mtx_unlock(&dstq->rkq_lock); + mtx_unlock(&srcq->rkq_lock); + } + + return mcnt; +} + + +/** + * Filters out outdated ops. + */ +static RD_INLINE rd_kafka_op_t * +rd_kafka_op_filter(rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int version) { + if (unlikely(!rko)) + return NULL; + + if (unlikely(rd_kafka_op_version_outdated(rko, version))) { + rd_kafka_q_deq0(rkq, rko); + rd_kafka_op_destroy(rko); + return NULL; + } + + return rko; +} + + + +/** + * Pop an op from a queue. + * + * Locality: any thread. + */ + + +/** + * Serve q like rd_kafka_q_serve() until an op is found that can be returned + * as an event to the application. + * + * @returns the first event:able op, or NULL on timeout. + * + * Locality: any thread + */ +rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq, + rd_ts_t timeout_us, + int32_t version, + rd_kafka_q_cb_type_t cb_type, + rd_kafka_q_serve_cb_t *callback, + void *opaque) { + rd_kafka_op_t *rko; + rd_kafka_q_t *fwdq; + + rd_dassert(cb_type); + + mtx_lock(&rkq->rkq_lock); + + rd_kafka_yield_thread = 0; + if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + struct timespec timeout_tspec; + + rd_timeout_init_timespec_us(&timeout_tspec, timeout_us); + + while (1) { + rd_kafka_op_res_t res; + /* Keep track of current lock status to avoid + * unnecessary lock flapping in all the cases below. */ + rd_bool_t is_locked = rd_true; + + /* Filter out outdated ops */ + retry: + while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && + !(rko = rd_kafka_op_filter(rkq, rko, version))) + ; + + rd_kafka_q_mark_served(rkq); + + if (rko) { + /* Proper versioned op */ + rd_kafka_q_deq0(rkq, rko); + + /* Let op_handle() operate without lock + * held to allow re-enqueuing, etc. */ + mtx_unlock(&rkq->rkq_lock); + is_locked = rd_false; + + /* Ops with callbacks are considered handled + * and we move on to the next op, if any. + * Ops w/o callbacks are returned immediately */ + res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko, + cb_type, opaque, + callback); + + if (res == RD_KAFKA_OP_RES_HANDLED || + res == RD_KAFKA_OP_RES_KEEP) { + mtx_lock(&rkq->rkq_lock); + is_locked = rd_true; + goto retry; /* Next op */ + } else if (unlikely(res == + RD_KAFKA_OP_RES_YIELD)) { + /* Callback yielded, unroll */ + return NULL; + } else + break; /* Proper op, handle below. */ + } + + if (unlikely(rd_kafka_q_check_yield(rkq))) { + if (is_locked) + mtx_unlock(&rkq->rkq_lock); + return NULL; + } + + if (!is_locked) + mtx_lock(&rkq->rkq_lock); + + if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, + &timeout_tspec) != thrd_success) { + mtx_unlock(&rkq->rkq_lock); + return NULL; + } + } + + } else { + /* Since the q_pop may block we need to release the parent + * queue's lock. */ + mtx_unlock(&rkq->rkq_lock); + rko = rd_kafka_q_pop_serve(fwdq, timeout_us, version, cb_type, + callback, opaque); + rd_kafka_q_destroy(fwdq); + } + + + return rko; +} + +rd_kafka_op_t * +rd_kafka_q_pop(rd_kafka_q_t *rkq, rd_ts_t timeout_us, int32_t version) { + return rd_kafka_q_pop_serve(rkq, timeout_us, version, + RD_KAFKA_Q_CB_RETURN, NULL, NULL); +} + + +/** + * Pop all available ops from a queue and call the provided + * callback for each op. + * `max_cnt` limits the number of ops served, 0 = no limit. + * + * Returns the number of ops served. + * + * Locality: any thread. + */ +int rd_kafka_q_serve(rd_kafka_q_t *rkq, + int timeout_ms, + int max_cnt, + rd_kafka_q_cb_type_t cb_type, + rd_kafka_q_serve_cb_t *callback, + void *opaque) { + rd_kafka_t *rk = rkq->rkq_rk; + rd_kafka_op_t *rko; + rd_kafka_q_t localq; + rd_kafka_q_t *fwdq; + int cnt = 0; + struct timespec timeout_tspec; + + rd_dassert(cb_type); + + mtx_lock(&rkq->rkq_lock); + + rd_dassert(TAILQ_EMPTY(&rkq->rkq_q) || rkq->rkq_qlen > 0); + if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + int ret; + /* Since the q_pop may block we need to release the parent + * queue's lock. */ + mtx_unlock(&rkq->rkq_lock); + ret = rd_kafka_q_serve(fwdq, timeout_ms, max_cnt, cb_type, + callback, opaque); + rd_kafka_q_destroy(fwdq); + return ret; + } + + rd_timeout_init_timespec(&timeout_tspec, timeout_ms); + + /* Wait for op */ + while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && + !rd_kafka_q_check_yield(rkq) && + cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, + &timeout_tspec) == thrd_success) + ; + + rd_kafka_q_mark_served(rkq); + + if (!rko) { + mtx_unlock(&rkq->rkq_lock); + return 0; + } + + /* Move the first `max_cnt` ops. */ + rd_kafka_q_init(&localq, rkq->rkq_rk); + rd_kafka_q_move_cnt(&localq, rkq, max_cnt == 0 ? -1 /*all*/ : max_cnt, + 0 /*no-locks*/); + + mtx_unlock(&rkq->rkq_lock); + + rd_kafka_yield_thread = 0; + + /* Call callback for each op */ + while ((rko = TAILQ_FIRST(&localq.rkq_q))) { + rd_kafka_op_res_t res; + + rd_kafka_q_deq0(&localq, rko); + res = rd_kafka_op_handle(rk, &localq, rko, cb_type, opaque, + callback); + /* op must have been handled */ + rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS); + cnt++; + + if (unlikely(res == RD_KAFKA_OP_RES_YIELD || + rd_kafka_yield_thread)) { + /* Callback called rd_kafka_yield(), we must + * stop our callback dispatching and put the + * ops in localq back on the original queue head. */ + if (!TAILQ_EMPTY(&localq.rkq_q)) + rd_kafka_q_prepend(rkq, &localq); + break; + } + } + + rd_kafka_q_destroy_owner(&localq); + + return cnt; +} + +/** + * @brief Filter out and destroy outdated messages. + * + * @returns Returns the number of valid messages. + * + * @locality Any thread. + */ +static size_t +rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp, + int32_t version, + rd_kafka_message_t **rkmessages, + size_t cnt, + struct rd_kafka_op_tailq *ctrl_msg_q) { + size_t valid_count = 0; + size_t i; + rd_kafka_op_t *rko, *next; + + for (i = 0; i < cnt; i++) { + rko = rkmessages[i]->_private; + if (rko->rko_rktp == rktp && + rd_kafka_op_version_outdated(rko, version)) { + /* This also destroys the corresponding rkmessage. */ + rd_kafka_op_destroy(rko); + } else if (i > valid_count) { + rkmessages[valid_count++] = rkmessages[i]; + } else { + valid_count++; + } + } + + /* Discard outdated control msgs ops */ + next = TAILQ_FIRST(ctrl_msg_q); + while (next) { + rko = next; + next = TAILQ_NEXT(rko, rko_link); + if (rko->rko_rktp == rktp && + rd_kafka_op_version_outdated(rko, version)) { + TAILQ_REMOVE(ctrl_msg_q, rko, rko_link); + rd_kafka_op_destroy(rko); + } + } + + return valid_count; +} + + +/** + * Populate 'rkmessages' array with messages from 'rkq'. + * If 'auto_commit' is set, each message's offset will be committed + * to the offset store for that toppar. + * + * Returns the number of messages added. + */ + +int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, + int timeout_ms, + rd_kafka_message_t **rkmessages, + size_t rkmessages_size) { + unsigned int cnt = 0; + TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); + struct rd_kafka_op_tailq ctrl_msg_q = + TAILQ_HEAD_INITIALIZER(ctrl_msg_q); + rd_kafka_op_t *rko, *next; + rd_kafka_t *rk = rkq->rkq_rk; + rd_kafka_q_t *fwdq; + struct timespec timeout_tspec; + int i; + + mtx_lock(&rkq->rkq_lock); + if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + /* Since the q_pop may block we need to release the parent + * queue's lock. */ + mtx_unlock(&rkq->rkq_lock); + cnt = rd_kafka_q_serve_rkmessages(fwdq, timeout_ms, rkmessages, + rkmessages_size); + rd_kafka_q_destroy(fwdq); + return cnt; + } + mtx_unlock(&rkq->rkq_lock); + + if (timeout_ms) + rd_kafka_app_poll_blocking(rk); + + rd_timeout_init_timespec(&timeout_tspec, timeout_ms); + + rd_kafka_yield_thread = 0; + while (cnt < rkmessages_size) { + rd_kafka_op_res_t res; + + mtx_lock(&rkq->rkq_lock); + + while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && + !rd_kafka_q_check_yield(rkq) && + cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, + &timeout_tspec) == thrd_success) + ; + + rd_kafka_q_mark_served(rkq); + + if (!rko) { + mtx_unlock(&rkq->rkq_lock); + break; /* Timed out */ + } + + rd_kafka_q_deq0(rkq, rko); + + mtx_unlock(&rkq->rkq_lock); + + if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) { + cnt = (unsigned int)rd_kafka_purge_outdated_messages( + rko->rko_rktp, rko->rko_version, rkmessages, cnt, + &ctrl_msg_q); + rd_kafka_op_destroy(rko); + continue; + } + + if (rd_kafka_op_version_outdated(rko, 0)) { + /* Outdated op, put on discard queue */ + TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); + continue; + } + + /* Serve non-FETCH callbacks */ + res = + rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL); + if (res == RD_KAFKA_OP_RES_KEEP || + res == RD_KAFKA_OP_RES_HANDLED) { + /* Callback served, rko is destroyed (if HANDLED). */ + continue; + } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD || + rd_kafka_yield_thread)) { + /* Yield. */ + break; + } + rd_dassert(res == RD_KAFKA_OP_RES_PASS); + + /* If this is a control messages, don't return message to + * application. Add it to a tmp queue from where we can store + * the offset and destroy the op */ + if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) { + TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link); + continue; + } + + /* Get rkmessage from rko and append to array. */ + rkmessages[cnt++] = rd_kafka_message_get(rko); + } + + for (i = cnt - 1; i >= 0; i--) { + rko = (rd_kafka_op_t *)rkmessages[i]->_private; + rd_kafka_toppar_t *rktp = rko->rko_rktp; + int64_t offset = rkmessages[i]->offset + 1; + if (unlikely(rktp->rktp_app_pos.offset < offset)) + rd_kafka_update_app_pos( + rk, rktp, + RD_KAFKA_FETCH_POS( + offset, + rd_kafka_message_leader_epoch(rkmessages[i])), + RD_DO_LOCK); + } + + /* Discard non-desired and already handled ops */ + next = TAILQ_FIRST(&tmpq); + while (next) { + rko = next; + next = TAILQ_NEXT(next, rko_link); + rd_kafka_op_destroy(rko); + } + + /* Discard ctrl msgs */ + next = TAILQ_FIRST(&ctrl_msg_q); + while (next) { + rko = next; + next = TAILQ_NEXT(next, rko_link); + rd_kafka_toppar_t *rktp = rko->rko_rktp; + int64_t offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1; + if (rktp->rktp_app_pos.offset < offset) + rd_kafka_update_app_pos( + rk, rktp, + RD_KAFKA_FETCH_POS( + offset, + rd_kafka_message_leader_epoch( + &rko->rko_u.fetch.rkm.rkm_rkmessage)), + RD_DO_LOCK); + rd_kafka_op_destroy(rko); + } + + rd_kafka_app_polled(rk); + + return cnt; +} + + + +void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu) { + if (rkqu->rkqu_is_owner) + rd_kafka_q_destroy_owner(rkqu->rkqu_q); + else + rd_kafka_q_destroy(rkqu->rkqu_q); + rd_free(rkqu); +} + +rd_kafka_queue_t *rd_kafka_queue_new0(rd_kafka_t *rk, rd_kafka_q_t *rkq) { + rd_kafka_queue_t *rkqu; + + rkqu = rd_calloc(1, sizeof(*rkqu)); + + rkqu->rkqu_q = rkq; + rd_kafka_q_keep(rkq); + + rkqu->rkqu_rk = rk; + + return rkqu; +} + + +rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk) { + rd_kafka_q_t *rkq; + rd_kafka_queue_t *rkqu; + + rkq = rd_kafka_q_new(rk); + rkqu = rd_kafka_queue_new0(rk, rkq); + rd_kafka_q_destroy(rkq); /* Loose refcount from q_new, one is held + * by queue_new0 */ + rkqu->rkqu_is_owner = 1; + return rkqu; +} + + +rd_kafka_queue_t *rd_kafka_queue_get_main(rd_kafka_t *rk) { + return rd_kafka_queue_new0(rk, rk->rk_rep); +} + + +rd_kafka_queue_t *rd_kafka_queue_get_consumer(rd_kafka_t *rk) { + if (!rk->rk_cgrp) + return NULL; + return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q); +} + +rd_kafka_queue_t *rd_kafka_queue_get_partition(rd_kafka_t *rk, + const char *topic, + int32_t partition) { + rd_kafka_toppar_t *rktp; + rd_kafka_queue_t *result; + + if (rk->rk_type == RD_KAFKA_PRODUCER) + return NULL; + + rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, /* no ua_on_miss */ + 1 /* create_on_miss */); + + if (!rktp) + return NULL; + + result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq); + rd_kafka_toppar_destroy(rktp); + + return result; +} + +rd_kafka_queue_t *rd_kafka_queue_get_background(rd_kafka_t *rk) { + rd_kafka_queue_t *rkqu; + + rd_kafka_wrlock(rk); + if (!rk->rk_background.q) { + char errstr[256]; + + if (rd_kafka_background_thread_create(rk, errstr, + sizeof(errstr))) { + rd_kafka_log(rk, LOG_ERR, "BACKGROUND", + "Failed to create background thread: %s", + errstr); + rd_kafka_wrunlock(rk); + return NULL; + } + } + + rkqu = rd_kafka_queue_new0(rk, rk->rk_background.q); + rd_kafka_wrunlock(rk); + return rkqu; +} + + +rd_kafka_resp_err_t rd_kafka_set_log_queue(rd_kafka_t *rk, + rd_kafka_queue_t *rkqu) { + rd_kafka_q_t *rkq; + + if (!rk->rk_logq) + return RD_KAFKA_RESP_ERR__NOT_CONFIGURED; + + if (!rkqu) + rkq = rk->rk_rep; + else + rkq = rkqu->rkqu_q; + rd_kafka_q_fwd_set(rk->rk_logq, rkq); + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +void rd_kafka_queue_forward(rd_kafka_queue_t *src, rd_kafka_queue_t *dst) { + rd_kafka_q_fwd_set0(src->rkqu_q, dst ? dst->rkqu_q : NULL, + 1, /* do_lock */ + 1 /* fwd_app */); +} + + +size_t rd_kafka_queue_length(rd_kafka_queue_t *rkqu) { + return (size_t)rd_kafka_q_len(rkqu->rkqu_q); +} + +/** + * @brief Enable or disable(fd==-1) fd-based wake-ups for queue + */ +void rd_kafka_q_io_event_enable(rd_kafka_q_t *rkq, + rd_socket_t fd, + const void *payload, + size_t size) { + struct rd_kafka_q_io *qio = NULL; + + if (fd != -1) { + qio = rd_malloc(sizeof(*qio) + size); + qio->fd = fd; + qio->size = size; + qio->payload = (void *)(qio + 1); + qio->sent = rd_false; + qio->event_cb = NULL; + qio->event_cb_opaque = NULL; + memcpy(qio->payload, payload, size); + } + + mtx_lock(&rkq->rkq_lock); + if (rkq->rkq_qio) { + rd_free(rkq->rkq_qio); + rkq->rkq_qio = NULL; + } + + if (fd != -1) { + rkq->rkq_qio = qio; + } + + mtx_unlock(&rkq->rkq_lock); +} + +void rd_kafka_queue_io_event_enable(rd_kafka_queue_t *rkqu, + int fd, + const void *payload, + size_t size) { + rd_kafka_q_io_event_enable(rkqu->rkqu_q, fd, payload, size); +} + + +void rd_kafka_queue_yield(rd_kafka_queue_t *rkqu) { + rd_kafka_q_yield(rkqu->rkqu_q); +} + + +/** + * @brief Enable or disable(event_cb==NULL) callback-based wake-ups for queue + */ +void rd_kafka_q_cb_event_enable(rd_kafka_q_t *rkq, + void (*event_cb)(rd_kafka_t *rk, void *opaque), + void *opaque) { + struct rd_kafka_q_io *qio = NULL; + + if (event_cb) { + qio = rd_malloc(sizeof(*qio)); + qio->fd = -1; + qio->size = 0; + qio->payload = NULL; + qio->event_cb = event_cb; + qio->event_cb_opaque = opaque; + } + + mtx_lock(&rkq->rkq_lock); + if (rkq->rkq_qio) { + rd_free(rkq->rkq_qio); + rkq->rkq_qio = NULL; + } + + if (event_cb) { + rkq->rkq_qio = qio; + } + + mtx_unlock(&rkq->rkq_lock); +} + +void rd_kafka_queue_cb_event_enable(rd_kafka_queue_t *rkqu, + void (*event_cb)(rd_kafka_t *rk, + void *opaque), + void *opaque) { + rd_kafka_q_cb_event_enable(rkqu->rkqu_q, event_cb, opaque); +} + + +/** + * Helper: wait for single op on 'rkq', and return its error, + * or .._TIMED_OUT on timeout. + */ +rd_kafka_resp_err_t rd_kafka_q_wait_result(rd_kafka_q_t *rkq, int timeout_ms) { + rd_kafka_op_t *rko; + rd_kafka_resp_err_t err; + + rko = rd_kafka_q_pop(rkq, rd_timeout_us(timeout_ms), 0); + if (!rko) + err = RD_KAFKA_RESP_ERR__TIMED_OUT; + else { + err = rko->rko_err; + rd_kafka_op_destroy(rko); + } + + return err; +} + + +/** + * Apply \p callback on each op in queue. + * If the callback wishes to remove the rko it must do so using + * using rd_kafka_op_deq0(). + * + * @returns the sum of \p callback() return values. + * @remark rkq will be locked, callers should take care not to + * interact with \p rkq through other means from the callback to avoid + * deadlocks. + */ +int rd_kafka_q_apply(rd_kafka_q_t *rkq, + int (*callback)(rd_kafka_q_t *rkq, + rd_kafka_op_t *rko, + void *opaque), + void *opaque) { + rd_kafka_op_t *rko, *next; + rd_kafka_q_t *fwdq; + int cnt = 0; + + mtx_lock(&rkq->rkq_lock); + if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + mtx_unlock(&rkq->rkq_lock); + cnt = rd_kafka_q_apply(fwdq, callback, opaque); + rd_kafka_q_destroy(fwdq); + return cnt; + } + + next = TAILQ_FIRST(&rkq->rkq_q); + while ((rko = next)) { + next = TAILQ_NEXT(next, rko_link); + cnt += callback(rkq, rko, opaque); + } + + rd_kafka_q_mark_served(rkq); + + mtx_unlock(&rkq->rkq_lock); + + return cnt; +} + +/** + * @brief Convert relative to absolute offsets and also purge any messages + * that are older than \p min_offset. + * @remark Error ops with ERR__NOT_IMPLEMENTED will not be purged since + * they are used to indicate unknnown compression codecs and compressed + * messagesets may have a starting offset lower than what we requested. + * @remark \p rkq locking is not performed (caller's responsibility) + * @remark Must NOT be used on fwdq. + */ +void rd_kafka_q_fix_offsets(rd_kafka_q_t *rkq, + int64_t min_offset, + int64_t base_offset) { + rd_kafka_op_t *rko, *next; + int adj_len = 0; + int64_t adj_size = 0; + + rd_kafka_assert(NULL, !rkq->rkq_fwdq); + + next = TAILQ_FIRST(&rkq->rkq_q); + while ((rko = next)) { + next = TAILQ_NEXT(next, rko_link); + + if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH)) + continue; + + rko->rko_u.fetch.rkm.rkm_offset += base_offset; + + if (rko->rko_u.fetch.rkm.rkm_offset < min_offset && + rko->rko_err != RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) { + adj_len++; + adj_size += rko->rko_len; + TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link); + rd_kafka_op_destroy(rko); + continue; + } + } + + + rkq->rkq_qlen -= adj_len; + rkq->rkq_qsize -= adj_size; +} + + +/** + * @brief Print information and contents of queue + */ +void rd_kafka_q_dump(FILE *fp, rd_kafka_q_t *rkq) { + mtx_lock(&rkq->rkq_lock); + fprintf(fp, + "Queue %p \"%s\" (refcnt %d, flags 0x%x, %d ops, " + "%" PRId64 " bytes)\n", + rkq, rkq->rkq_name, rkq->rkq_refcnt, rkq->rkq_flags, + rkq->rkq_qlen, rkq->rkq_qsize); + + if (rkq->rkq_qio) + fprintf(fp, " QIO fd %d\n", (int)rkq->rkq_qio->fd); + if (rkq->rkq_serve) + fprintf(fp, " Serve callback %p, opaque %p\n", rkq->rkq_serve, + rkq->rkq_opaque); + + if (rkq->rkq_fwdq) { + fprintf(fp, " Forwarded ->\n"); + rd_kafka_q_dump(fp, rkq->rkq_fwdq); + } else { + rd_kafka_op_t *rko; + + if (!TAILQ_EMPTY(&rkq->rkq_q)) + fprintf(fp, " Queued ops:\n"); + TAILQ_FOREACH(rko, &rkq->rkq_q, rko_link) { + fprintf(fp, + " %p %s (v%" PRId32 + ", flags 0x%x, " + "prio %d, len %" PRId32 + ", source %s, " + "replyq %p)\n", + rko, rd_kafka_op2str(rko->rko_type), + rko->rko_version, rko->rko_flags, rko->rko_prio, + rko->rko_len, +#if ENABLE_DEVEL + rko->rko_source +#else + "-" +#endif + , + rko->rko_replyq.q); + } + } + + mtx_unlock(&rkq->rkq_lock); +} + + +void rd_kafka_enq_once_trigger_destroy(void *ptr) { + rd_kafka_enq_once_t *eonce = ptr; + + rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__DESTROY, "destroy"); +} |