diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c | 1085 |
1 files changed, 0 insertions, 1085 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c deleted file mode 100644 index 57fce36b..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_queue.c +++ /dev/null @@ -1,1085 +0,0 @@ -/* - * librdkafka - The Apache Kafka C/C++ library - * - * Copyright (c) 2016 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "rdkafka_int.h" -#include "rdkafka_offset.h" -#include "rdkafka_topic.h" -#include "rdkafka_interceptor.h" - -int RD_TLS rd_kafka_yield_thread = 0; - -void rd_kafka_yield(rd_kafka_t *rk) { - rd_kafka_yield_thread = 1; -} - - -/** - * @brief Check and reset yield flag. - * @returns rd_true if caller should yield, otherwise rd_false. - * @remarks rkq_lock MUST be held - */ -static RD_INLINE rd_bool_t rd_kafka_q_check_yield(rd_kafka_q_t *rkq) { - if (!(rkq->rkq_flags & RD_KAFKA_Q_F_YIELD)) - return rd_false; - - rkq->rkq_flags &= ~RD_KAFKA_Q_F_YIELD; - return rd_true; -} -/** - * Destroy a queue. refcnt must be at zero. - */ -void rd_kafka_q_destroy_final(rd_kafka_q_t *rkq) { - - mtx_lock(&rkq->rkq_lock); - if (unlikely(rkq->rkq_qio != NULL)) { - rd_free(rkq->rkq_qio); - rkq->rkq_qio = NULL; - } - /* Queue must have been disabled prior to final destruction, - * this is to catch the case where the queue owner/poll does not - * use rd_kafka_q_destroy_owner(). */ - rd_dassert(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY)); - rd_kafka_q_disable0(rkq, 0 /*no-lock*/); /* for the non-devel case */ - rd_kafka_q_fwd_set0(rkq, NULL, 0 /*no-lock*/, 0 /*no-fwd-app*/); - rd_kafka_q_purge0(rkq, 0 /*no-lock*/); - assert(!rkq->rkq_fwdq); - mtx_unlock(&rkq->rkq_lock); - mtx_destroy(&rkq->rkq_lock); - cnd_destroy(&rkq->rkq_cond); - - if (rkq->rkq_flags & RD_KAFKA_Q_F_ALLOCATED) - rd_free(rkq); -} - - - -/** - * Initialize a queue. - */ -void rd_kafka_q_init0(rd_kafka_q_t *rkq, - rd_kafka_t *rk, - const char *func, - int line) { - rd_kafka_q_reset(rkq); - rkq->rkq_fwdq = NULL; - rkq->rkq_refcnt = 1; - rkq->rkq_flags = RD_KAFKA_Q_F_READY; - rkq->rkq_rk = rk; - rkq->rkq_qio = NULL; - rkq->rkq_serve = NULL; - rkq->rkq_opaque = NULL; - mtx_init(&rkq->rkq_lock, mtx_plain); - cnd_init(&rkq->rkq_cond); -#if ENABLE_DEVEL - rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line); -#else - rkq->rkq_name = func; -#endif -} - - -/** - * Allocate a new queue and initialize it. - */ -rd_kafka_q_t *rd_kafka_q_new0(rd_kafka_t *rk, const char *func, int line) { - rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq)); - rd_kafka_q_init(rkq, rk); - rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED; -#if ENABLE_DEVEL - rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line); -#else - rkq->rkq_name = func; -#endif - return rkq; -} - -/** - * Set/clear forward queue. - * Queue forwarding enables message routing inside rdkafka. - * Typical use is to re-route all fetched messages for all partitions - * to one single queue. - * - * All access to rkq_fwdq are protected by rkq_lock. - */ -void rd_kafka_q_fwd_set0(rd_kafka_q_t *srcq, - rd_kafka_q_t *destq, - int do_lock, - int fwd_app) { - if (unlikely(srcq == destq)) - return; - - if (do_lock) - mtx_lock(&srcq->rkq_lock); - if (fwd_app) - srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP; - if (srcq->rkq_fwdq) { - rd_kafka_q_destroy(srcq->rkq_fwdq); - srcq->rkq_fwdq = NULL; - } - if (destq) { - rd_kafka_q_keep(destq); - - /* If rkq has ops in queue, append them to fwdq's queue. - * This is an irreversible operation. */ - if (srcq->rkq_qlen > 0) { - rd_dassert(destq->rkq_flags & RD_KAFKA_Q_F_READY); - rd_kafka_q_concat(destq, srcq); - } - - srcq->rkq_fwdq = destq; - } - if (do_lock) - mtx_unlock(&srcq->rkq_lock); -} - -/** - * Purge all entries from a queue. - */ -int rd_kafka_q_purge0(rd_kafka_q_t *rkq, int do_lock) { - rd_kafka_op_t *rko, *next; - TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); - rd_kafka_q_t *fwdq; - int cnt = 0; - - if (do_lock) - mtx_lock(&rkq->rkq_lock); - - if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { - if (do_lock) - mtx_unlock(&rkq->rkq_lock); - cnt = rd_kafka_q_purge(fwdq); - rd_kafka_q_destroy(fwdq); - return cnt; - } - - /* Move ops queue to tmpq to avoid lock-order issue - * by locks taken from rd_kafka_op_destroy(). */ - TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link); - - rd_kafka_q_mark_served(rkq); - - /* Zero out queue */ - rd_kafka_q_reset(rkq); - - if (do_lock) - mtx_unlock(&rkq->rkq_lock); - - /* Destroy the ops */ - next = TAILQ_FIRST(&tmpq); - while ((rko = next)) { - next = TAILQ_NEXT(next, rko_link); - rd_kafka_op_destroy(rko); - cnt++; - } - - return cnt; -} - - -/** - * Purge all entries from a queue with a rktp version smaller than `version` - * This shaves off the head of the queue, up until the first rko with - * a non-matching rktp or version. - */ -void rd_kafka_q_purge_toppar_version(rd_kafka_q_t *rkq, - rd_kafka_toppar_t *rktp, - int version) { - rd_kafka_op_t *rko, *next; - TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); - int32_t cnt = 0; - int64_t size = 0; - rd_kafka_q_t *fwdq; - - mtx_lock(&rkq->rkq_lock); - - if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { - mtx_unlock(&rkq->rkq_lock); - rd_kafka_q_purge_toppar_version(fwdq, rktp, version); - rd_kafka_q_destroy(fwdq); - return; - } - - /* Move ops to temporary queue and then destroy them from there - * without locks to avoid lock-ordering problems in op_destroy() */ - while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && rko->rko_rktp && - rko->rko_rktp == rktp && rko->rko_version < version) { - TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link); - TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); - cnt++; - size += rko->rko_len; - } - - rd_kafka_q_mark_served(rkq); - - rkq->rkq_qlen -= cnt; - rkq->rkq_qsize -= size; - mtx_unlock(&rkq->rkq_lock); - - next = TAILQ_FIRST(&tmpq); - while ((rko = next)) { - next = TAILQ_NEXT(next, rko_link); - rd_kafka_op_destroy(rko); - } -} - - -/** - * Move 'cnt' entries from 'srcq' to 'dstq'. - * If 'cnt' == -1 all entries will be moved. - * Returns the number of entries moved. - */ -int rd_kafka_q_move_cnt(rd_kafka_q_t *dstq, - rd_kafka_q_t *srcq, - int cnt, - int do_locks) { - rd_kafka_op_t *rko; - int mcnt = 0; - - if (do_locks) { - mtx_lock(&srcq->rkq_lock); - mtx_lock(&dstq->rkq_lock); - } - - if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) { - if (cnt > 0 && dstq->rkq_qlen == 0) - rd_kafka_q_io_event(dstq); - - /* Optimization, if 'cnt' is equal/larger than all - * items of 'srcq' we can move the entire queue. */ - if (cnt == -1 || cnt >= (int)srcq->rkq_qlen) { - mcnt = srcq->rkq_qlen; - rd_kafka_q_concat0(dstq, srcq, 0 /*no-lock*/); - } else { - while (mcnt < cnt && - (rko = TAILQ_FIRST(&srcq->rkq_q))) { - TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link); - if (likely(!rko->rko_prio)) - TAILQ_INSERT_TAIL(&dstq->rkq_q, rko, - rko_link); - else - TAILQ_INSERT_SORTED( - &dstq->rkq_q, rko, rd_kafka_op_t *, - rko_link, rd_kafka_op_cmp_prio); - - srcq->rkq_qlen--; - dstq->rkq_qlen++; - srcq->rkq_qsize -= rko->rko_len; - dstq->rkq_qsize += rko->rko_len; - mcnt++; - } - } - - rd_kafka_q_mark_served(srcq); - - } else - mcnt = rd_kafka_q_move_cnt( - dstq->rkq_fwdq ? dstq->rkq_fwdq : dstq, - srcq->rkq_fwdq ? srcq->rkq_fwdq : srcq, cnt, do_locks); - - if (do_locks) { - mtx_unlock(&dstq->rkq_lock); - mtx_unlock(&srcq->rkq_lock); - } - - return mcnt; -} - - -/** - * Filters out outdated ops. - */ -static RD_INLINE rd_kafka_op_t * -rd_kafka_op_filter(rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int version) { - if (unlikely(!rko)) - return NULL; - - if (unlikely(rd_kafka_op_version_outdated(rko, version))) { - rd_kafka_q_deq0(rkq, rko); - rd_kafka_op_destroy(rko); - return NULL; - } - - return rko; -} - - - -/** - * Pop an op from a queue. - * - * Locality: any thread. - */ - - -/** - * Serve q like rd_kafka_q_serve() until an op is found that can be returned - * as an event to the application. - * - * @returns the first event:able op, or NULL on timeout. - * - * Locality: any thread - */ -rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq, - rd_ts_t timeout_us, - int32_t version, - rd_kafka_q_cb_type_t cb_type, - rd_kafka_q_serve_cb_t *callback, - void *opaque) { - rd_kafka_op_t *rko; - rd_kafka_q_t *fwdq; - - rd_dassert(cb_type); - - mtx_lock(&rkq->rkq_lock); - - rd_kafka_yield_thread = 0; - if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { - struct timespec timeout_tspec; - - rd_timeout_init_timespec_us(&timeout_tspec, timeout_us); - - while (1) { - rd_kafka_op_res_t res; - /* Keep track of current lock status to avoid - * unnecessary lock flapping in all the cases below. */ - rd_bool_t is_locked = rd_true; - - /* Filter out outdated ops */ - retry: - while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && - !(rko = rd_kafka_op_filter(rkq, rko, version))) - ; - - rd_kafka_q_mark_served(rkq); - - if (rko) { - /* Proper versioned op */ - rd_kafka_q_deq0(rkq, rko); - - /* Let op_handle() operate without lock - * held to allow re-enqueuing, etc. */ - mtx_unlock(&rkq->rkq_lock); - is_locked = rd_false; - - /* Ops with callbacks are considered handled - * and we move on to the next op, if any. - * Ops w/o callbacks are returned immediately */ - res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko, - cb_type, opaque, - callback); - - if (res == RD_KAFKA_OP_RES_HANDLED || - res == RD_KAFKA_OP_RES_KEEP) { - mtx_lock(&rkq->rkq_lock); - is_locked = rd_true; - goto retry; /* Next op */ - } else if (unlikely(res == - RD_KAFKA_OP_RES_YIELD)) { - /* Callback yielded, unroll */ - return NULL; - } else - break; /* Proper op, handle below. */ - } - - if (unlikely(rd_kafka_q_check_yield(rkq))) { - if (is_locked) - mtx_unlock(&rkq->rkq_lock); - return NULL; - } - - if (!is_locked) - mtx_lock(&rkq->rkq_lock); - - if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, - &timeout_tspec) != thrd_success) { - mtx_unlock(&rkq->rkq_lock); - return NULL; - } - } - - } else { - /* Since the q_pop may block we need to release the parent - * queue's lock. */ - mtx_unlock(&rkq->rkq_lock); - rko = rd_kafka_q_pop_serve(fwdq, timeout_us, version, cb_type, - callback, opaque); - rd_kafka_q_destroy(fwdq); - } - - - return rko; -} - -rd_kafka_op_t * -rd_kafka_q_pop(rd_kafka_q_t *rkq, rd_ts_t timeout_us, int32_t version) { - return rd_kafka_q_pop_serve(rkq, timeout_us, version, - RD_KAFKA_Q_CB_RETURN, NULL, NULL); -} - - -/** - * Pop all available ops from a queue and call the provided - * callback for each op. - * `max_cnt` limits the number of ops served, 0 = no limit. - * - * Returns the number of ops served. - * - * Locality: any thread. - */ -int rd_kafka_q_serve(rd_kafka_q_t *rkq, - int timeout_ms, - int max_cnt, - rd_kafka_q_cb_type_t cb_type, - rd_kafka_q_serve_cb_t *callback, - void *opaque) { - rd_kafka_t *rk = rkq->rkq_rk; - rd_kafka_op_t *rko; - rd_kafka_q_t localq; - rd_kafka_q_t *fwdq; - int cnt = 0; - struct timespec timeout_tspec; - - rd_dassert(cb_type); - - mtx_lock(&rkq->rkq_lock); - - rd_dassert(TAILQ_EMPTY(&rkq->rkq_q) || rkq->rkq_qlen > 0); - if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { - int ret; - /* Since the q_pop may block we need to release the parent - * queue's lock. */ - mtx_unlock(&rkq->rkq_lock); - ret = rd_kafka_q_serve(fwdq, timeout_ms, max_cnt, cb_type, - callback, opaque); - rd_kafka_q_destroy(fwdq); - return ret; - } - - rd_timeout_init_timespec(&timeout_tspec, timeout_ms); - - /* Wait for op */ - while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && - !rd_kafka_q_check_yield(rkq) && - cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, - &timeout_tspec) == thrd_success) - ; - - rd_kafka_q_mark_served(rkq); - - if (!rko) { - mtx_unlock(&rkq->rkq_lock); - return 0; - } - - /* Move the first `max_cnt` ops. */ - rd_kafka_q_init(&localq, rkq->rkq_rk); - rd_kafka_q_move_cnt(&localq, rkq, max_cnt == 0 ? -1 /*all*/ : max_cnt, - 0 /*no-locks*/); - - mtx_unlock(&rkq->rkq_lock); - - rd_kafka_yield_thread = 0; - - /* Call callback for each op */ - while ((rko = TAILQ_FIRST(&localq.rkq_q))) { - rd_kafka_op_res_t res; - - rd_kafka_q_deq0(&localq, rko); - res = rd_kafka_op_handle(rk, &localq, rko, cb_type, opaque, - callback); - /* op must have been handled */ - rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS); - cnt++; - - if (unlikely(res == RD_KAFKA_OP_RES_YIELD || - rd_kafka_yield_thread)) { - /* Callback called rd_kafka_yield(), we must - * stop our callback dispatching and put the - * ops in localq back on the original queue head. */ - if (!TAILQ_EMPTY(&localq.rkq_q)) - rd_kafka_q_prepend(rkq, &localq); - break; - } - } - - rd_kafka_q_destroy_owner(&localq); - - return cnt; -} - -/** - * @brief Filter out and destroy outdated messages. - * - * @returns Returns the number of valid messages. - * - * @locality Any thread. - */ -static size_t -rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp, - int32_t version, - rd_kafka_message_t **rkmessages, - size_t cnt, - struct rd_kafka_op_tailq *ctrl_msg_q) { - size_t valid_count = 0; - size_t i; - rd_kafka_op_t *rko, *next; - - for (i = 0; i < cnt; i++) { - rko = rkmessages[i]->_private; - if (rko->rko_rktp == rktp && - rd_kafka_op_version_outdated(rko, version)) { - /* This also destroys the corresponding rkmessage. */ - rd_kafka_op_destroy(rko); - } else if (i > valid_count) { - rkmessages[valid_count++] = rkmessages[i]; - } else { - valid_count++; - } - } - - /* Discard outdated control msgs ops */ - next = TAILQ_FIRST(ctrl_msg_q); - while (next) { - rko = next; - next = TAILQ_NEXT(rko, rko_link); - if (rko->rko_rktp == rktp && - rd_kafka_op_version_outdated(rko, version)) { - TAILQ_REMOVE(ctrl_msg_q, rko, rko_link); - rd_kafka_op_destroy(rko); - } - } - - return valid_count; -} - - -/** - * Populate 'rkmessages' array with messages from 'rkq'. - * If 'auto_commit' is set, each message's offset will be committed - * to the offset store for that toppar. - * - * Returns the number of messages added. - */ - -int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, - int timeout_ms, - rd_kafka_message_t **rkmessages, - size_t rkmessages_size) { - unsigned int cnt = 0; - TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); - struct rd_kafka_op_tailq ctrl_msg_q = - TAILQ_HEAD_INITIALIZER(ctrl_msg_q); - rd_kafka_op_t *rko, *next; - rd_kafka_t *rk = rkq->rkq_rk; - rd_kafka_q_t *fwdq; - struct timespec timeout_tspec; - int i; - - mtx_lock(&rkq->rkq_lock); - if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { - /* Since the q_pop may block we need to release the parent - * queue's lock. */ - mtx_unlock(&rkq->rkq_lock); - cnt = rd_kafka_q_serve_rkmessages(fwdq, timeout_ms, rkmessages, - rkmessages_size); - rd_kafka_q_destroy(fwdq); - return cnt; - } - mtx_unlock(&rkq->rkq_lock); - - if (timeout_ms) - rd_kafka_app_poll_blocking(rk); - - rd_timeout_init_timespec(&timeout_tspec, timeout_ms); - - rd_kafka_yield_thread = 0; - while (cnt < rkmessages_size) { - rd_kafka_op_res_t res; - - mtx_lock(&rkq->rkq_lock); - - while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && - !rd_kafka_q_check_yield(rkq) && - cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, - &timeout_tspec) == thrd_success) - ; - - rd_kafka_q_mark_served(rkq); - - if (!rko) { - mtx_unlock(&rkq->rkq_lock); - break; /* Timed out */ - } - - rd_kafka_q_deq0(rkq, rko); - - mtx_unlock(&rkq->rkq_lock); - - if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) { - cnt = (unsigned int)rd_kafka_purge_outdated_messages( - rko->rko_rktp, rko->rko_version, rkmessages, cnt, - &ctrl_msg_q); - rd_kafka_op_destroy(rko); - continue; - } - - if (rd_kafka_op_version_outdated(rko, 0)) { - /* Outdated op, put on discard queue */ - TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); - continue; - } - - /* Serve non-FETCH callbacks */ - res = - rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL); - if (res == RD_KAFKA_OP_RES_KEEP || - res == RD_KAFKA_OP_RES_HANDLED) { - /* Callback served, rko is destroyed (if HANDLED). */ - continue; - } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD || - rd_kafka_yield_thread)) { - /* Yield. */ - break; - } - rd_dassert(res == RD_KAFKA_OP_RES_PASS); - - /* If this is a control messages, don't return message to - * application. Add it to a tmp queue from where we can store - * the offset and destroy the op */ - if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) { - TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link); - continue; - } - - /* Get rkmessage from rko and append to array. */ - rkmessages[cnt++] = rd_kafka_message_get(rko); - } - - for (i = cnt - 1; i >= 0; i--) { - rko = (rd_kafka_op_t *)rkmessages[i]->_private; - rd_kafka_toppar_t *rktp = rko->rko_rktp; - int64_t offset = rkmessages[i]->offset + 1; - if (unlikely(rktp->rktp_app_pos.offset < offset)) - rd_kafka_update_app_pos( - rk, rktp, - RD_KAFKA_FETCH_POS( - offset, - rd_kafka_message_leader_epoch(rkmessages[i])), - RD_DO_LOCK); - } - - /* Discard non-desired and already handled ops */ - next = TAILQ_FIRST(&tmpq); - while (next) { - rko = next; - next = TAILQ_NEXT(next, rko_link); - rd_kafka_op_destroy(rko); - } - - /* Discard ctrl msgs */ - next = TAILQ_FIRST(&ctrl_msg_q); - while (next) { - rko = next; - next = TAILQ_NEXT(next, rko_link); - rd_kafka_toppar_t *rktp = rko->rko_rktp; - int64_t offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1; - if (rktp->rktp_app_pos.offset < offset) - rd_kafka_update_app_pos( - rk, rktp, - RD_KAFKA_FETCH_POS( - offset, - rd_kafka_message_leader_epoch( - &rko->rko_u.fetch.rkm.rkm_rkmessage)), - RD_DO_LOCK); - rd_kafka_op_destroy(rko); - } - - rd_kafka_app_polled(rk); - - return cnt; -} - - - -void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu) { - if (rkqu->rkqu_is_owner) - rd_kafka_q_destroy_owner(rkqu->rkqu_q); - else - rd_kafka_q_destroy(rkqu->rkqu_q); - rd_free(rkqu); -} - -rd_kafka_queue_t *rd_kafka_queue_new0(rd_kafka_t *rk, rd_kafka_q_t *rkq) { - rd_kafka_queue_t *rkqu; - - rkqu = rd_calloc(1, sizeof(*rkqu)); - - rkqu->rkqu_q = rkq; - rd_kafka_q_keep(rkq); - - rkqu->rkqu_rk = rk; - - return rkqu; -} - - -rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk) { - rd_kafka_q_t *rkq; - rd_kafka_queue_t *rkqu; - - rkq = rd_kafka_q_new(rk); - rkqu = rd_kafka_queue_new0(rk, rkq); - rd_kafka_q_destroy(rkq); /* Loose refcount from q_new, one is held - * by queue_new0 */ - rkqu->rkqu_is_owner = 1; - return rkqu; -} - - -rd_kafka_queue_t *rd_kafka_queue_get_main(rd_kafka_t *rk) { - return rd_kafka_queue_new0(rk, rk->rk_rep); -} - - -rd_kafka_queue_t *rd_kafka_queue_get_consumer(rd_kafka_t *rk) { - if (!rk->rk_cgrp) - return NULL; - return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q); -} - -rd_kafka_queue_t *rd_kafka_queue_get_partition(rd_kafka_t *rk, - const char *topic, - int32_t partition) { - rd_kafka_toppar_t *rktp; - rd_kafka_queue_t *result; - - if (rk->rk_type == RD_KAFKA_PRODUCER) - return NULL; - - rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, /* no ua_on_miss */ - 1 /* create_on_miss */); - - if (!rktp) - return NULL; - - result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq); - rd_kafka_toppar_destroy(rktp); - - return result; -} - -rd_kafka_queue_t *rd_kafka_queue_get_background(rd_kafka_t *rk) { - rd_kafka_queue_t *rkqu; - - rd_kafka_wrlock(rk); - if (!rk->rk_background.q) { - char errstr[256]; - - if (rd_kafka_background_thread_create(rk, errstr, - sizeof(errstr))) { - rd_kafka_log(rk, LOG_ERR, "BACKGROUND", - "Failed to create background thread: %s", - errstr); - rd_kafka_wrunlock(rk); - return NULL; - } - } - - rkqu = rd_kafka_queue_new0(rk, rk->rk_background.q); - rd_kafka_wrunlock(rk); - return rkqu; -} - - -rd_kafka_resp_err_t rd_kafka_set_log_queue(rd_kafka_t *rk, - rd_kafka_queue_t *rkqu) { - rd_kafka_q_t *rkq; - - if (!rk->rk_logq) - return RD_KAFKA_RESP_ERR__NOT_CONFIGURED; - - if (!rkqu) - rkq = rk->rk_rep; - else - rkq = rkqu->rkqu_q; - rd_kafka_q_fwd_set(rk->rk_logq, rkq); - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - -void rd_kafka_queue_forward(rd_kafka_queue_t *src, rd_kafka_queue_t *dst) { - rd_kafka_q_fwd_set0(src->rkqu_q, dst ? dst->rkqu_q : NULL, - 1, /* do_lock */ - 1 /* fwd_app */); -} - - -size_t rd_kafka_queue_length(rd_kafka_queue_t *rkqu) { - return (size_t)rd_kafka_q_len(rkqu->rkqu_q); -} - -/** - * @brief Enable or disable(fd==-1) fd-based wake-ups for queue - */ -void rd_kafka_q_io_event_enable(rd_kafka_q_t *rkq, - rd_socket_t fd, - const void *payload, - size_t size) { - struct rd_kafka_q_io *qio = NULL; - - if (fd != -1) { - qio = rd_malloc(sizeof(*qio) + size); - qio->fd = fd; - qio->size = size; - qio->payload = (void *)(qio + 1); - qio->sent = rd_false; - qio->event_cb = NULL; - qio->event_cb_opaque = NULL; - memcpy(qio->payload, payload, size); - } - - mtx_lock(&rkq->rkq_lock); - if (rkq->rkq_qio) { - rd_free(rkq->rkq_qio); - rkq->rkq_qio = NULL; - } - - if (fd != -1) { - rkq->rkq_qio = qio; - } - - mtx_unlock(&rkq->rkq_lock); -} - -void rd_kafka_queue_io_event_enable(rd_kafka_queue_t *rkqu, - int fd, - const void *payload, - size_t size) { - rd_kafka_q_io_event_enable(rkqu->rkqu_q, fd, payload, size); -} - - -void rd_kafka_queue_yield(rd_kafka_queue_t *rkqu) { - rd_kafka_q_yield(rkqu->rkqu_q); -} - - -/** - * @brief Enable or disable(event_cb==NULL) callback-based wake-ups for queue - */ -void rd_kafka_q_cb_event_enable(rd_kafka_q_t *rkq, - void (*event_cb)(rd_kafka_t *rk, void *opaque), - void *opaque) { - struct rd_kafka_q_io *qio = NULL; - - if (event_cb) { - qio = rd_malloc(sizeof(*qio)); - qio->fd = -1; - qio->size = 0; - qio->payload = NULL; - qio->event_cb = event_cb; - qio->event_cb_opaque = opaque; - } - - mtx_lock(&rkq->rkq_lock); - if (rkq->rkq_qio) { - rd_free(rkq->rkq_qio); - rkq->rkq_qio = NULL; - } - - if (event_cb) { - rkq->rkq_qio = qio; - } - - mtx_unlock(&rkq->rkq_lock); -} - -void rd_kafka_queue_cb_event_enable(rd_kafka_queue_t *rkqu, - void (*event_cb)(rd_kafka_t *rk, - void *opaque), - void *opaque) { - rd_kafka_q_cb_event_enable(rkqu->rkqu_q, event_cb, opaque); -} - - -/** - * Helper: wait for single op on 'rkq', and return its error, - * or .._TIMED_OUT on timeout. - */ -rd_kafka_resp_err_t rd_kafka_q_wait_result(rd_kafka_q_t *rkq, int timeout_ms) { - rd_kafka_op_t *rko; - rd_kafka_resp_err_t err; - - rko = rd_kafka_q_pop(rkq, rd_timeout_us(timeout_ms), 0); - if (!rko) - err = RD_KAFKA_RESP_ERR__TIMED_OUT; - else { - err = rko->rko_err; - rd_kafka_op_destroy(rko); - } - - return err; -} - - -/** - * Apply \p callback on each op in queue. - * If the callback wishes to remove the rko it must do so using - * using rd_kafka_op_deq0(). - * - * @returns the sum of \p callback() return values. - * @remark rkq will be locked, callers should take care not to - * interact with \p rkq through other means from the callback to avoid - * deadlocks. - */ -int rd_kafka_q_apply(rd_kafka_q_t *rkq, - int (*callback)(rd_kafka_q_t *rkq, - rd_kafka_op_t *rko, - void *opaque), - void *opaque) { - rd_kafka_op_t *rko, *next; - rd_kafka_q_t *fwdq; - int cnt = 0; - - mtx_lock(&rkq->rkq_lock); - if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { - mtx_unlock(&rkq->rkq_lock); - cnt = rd_kafka_q_apply(fwdq, callback, opaque); - rd_kafka_q_destroy(fwdq); - return cnt; - } - - next = TAILQ_FIRST(&rkq->rkq_q); - while ((rko = next)) { - next = TAILQ_NEXT(next, rko_link); - cnt += callback(rkq, rko, opaque); - } - - rd_kafka_q_mark_served(rkq); - - mtx_unlock(&rkq->rkq_lock); - - return cnt; -} - -/** - * @brief Convert relative to absolute offsets and also purge any messages - * that are older than \p min_offset. - * @remark Error ops with ERR__NOT_IMPLEMENTED will not be purged since - * they are used to indicate unknnown compression codecs and compressed - * messagesets may have a starting offset lower than what we requested. - * @remark \p rkq locking is not performed (caller's responsibility) - * @remark Must NOT be used on fwdq. - */ -void rd_kafka_q_fix_offsets(rd_kafka_q_t *rkq, - int64_t min_offset, - int64_t base_offset) { - rd_kafka_op_t *rko, *next; - int adj_len = 0; - int64_t adj_size = 0; - - rd_kafka_assert(NULL, !rkq->rkq_fwdq); - - next = TAILQ_FIRST(&rkq->rkq_q); - while ((rko = next)) { - next = TAILQ_NEXT(next, rko_link); - - if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH)) - continue; - - rko->rko_u.fetch.rkm.rkm_offset += base_offset; - - if (rko->rko_u.fetch.rkm.rkm_offset < min_offset && - rko->rko_err != RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) { - adj_len++; - adj_size += rko->rko_len; - TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link); - rd_kafka_op_destroy(rko); - continue; - } - } - - - rkq->rkq_qlen -= adj_len; - rkq->rkq_qsize -= adj_size; -} - - -/** - * @brief Print information and contents of queue - */ -void rd_kafka_q_dump(FILE *fp, rd_kafka_q_t *rkq) { - mtx_lock(&rkq->rkq_lock); - fprintf(fp, - "Queue %p \"%s\" (refcnt %d, flags 0x%x, %d ops, " - "%" PRId64 " bytes)\n", - rkq, rkq->rkq_name, rkq->rkq_refcnt, rkq->rkq_flags, - rkq->rkq_qlen, rkq->rkq_qsize); - - if (rkq->rkq_qio) - fprintf(fp, " QIO fd %d\n", (int)rkq->rkq_qio->fd); - if (rkq->rkq_serve) - fprintf(fp, " Serve callback %p, opaque %p\n", rkq->rkq_serve, - rkq->rkq_opaque); - - if (rkq->rkq_fwdq) { - fprintf(fp, " Forwarded ->\n"); - rd_kafka_q_dump(fp, rkq->rkq_fwdq); - } else { - rd_kafka_op_t *rko; - - if (!TAILQ_EMPTY(&rkq->rkq_q)) - fprintf(fp, " Queued ops:\n"); - TAILQ_FOREACH(rko, &rkq->rkq_q, rko_link) { - fprintf(fp, - " %p %s (v%" PRId32 - ", flags 0x%x, " - "prio %d, len %" PRId32 - ", source %s, " - "replyq %p)\n", - rko, rd_kafka_op2str(rko->rko_type), - rko->rko_version, rko->rko_flags, rko->rko_prio, - rko->rko_len, -#if ENABLE_DEVEL - rko->rko_source -#else - "-" -#endif - , - rko->rko_replyq.q); - } - } - - mtx_unlock(&rkq->rkq_lock); -} - - -void rd_kafka_enq_once_trigger_destroy(void *ptr) { - rd_kafka_enq_once_t *eonce = ptr; - - rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__DESTROY, "destroy"); -} |