diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c | 221 |
1 files changed, 0 insertions, 221 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c deleted file mode 100644 index c69ec1767..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c +++ /dev/null @@ -1,221 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2018 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -/** - * Background queue thread and event handling. - * - * See rdkafka.h's rd_kafka_conf_set_background_event_cb() for details. - */ - -#include "rd.h" -#include "rdkafka_int.h" -#include "rdkafka_event.h" -#include "rdkafka_interceptor.h" - -#include <signal.h> - -/** - * @brief Call the registered background_event_cb. - * @locality rdkafka background queue thread - */ -static RD_INLINE void rd_kafka_call_background_event_cb(rd_kafka_t *rk, - rd_kafka_op_t *rko) { - rd_assert(!rk->rk_background.calling); - rk->rk_background.calling = 1; - - rk->rk_conf.background_event_cb(rk, rko, rk->rk_conf.opaque); - - rk->rk_background.calling = 0; -} - - -/** - * @brief Background queue handler. - * - * Triggers the background_event_cb for all event:able ops, - * for non-event:able ops: - * - call op callback if set, else - * - log and discard the op. This is a user error, forwarding non-event - * APIs to the background queue. - */ -static rd_kafka_op_res_t -rd_kafka_background_queue_serve(rd_kafka_t *rk, - rd_kafka_q_t *rkq, - rd_kafka_op_t *rko, - rd_kafka_q_cb_type_t cb_type, - void *opaque) { - rd_kafka_op_res_t res; - - /* - * Dispatch Event:able ops to background_event_cb() - */ - if (likely(rk->rk_conf.background_event_cb && - rd_kafka_event_setup(rk, rko))) { - rd_kafka_call_background_event_cb(rk, rko); - /* Event must be destroyed by application. */ - return RD_KAFKA_OP_RES_HANDLED; - } - - /* - * Handle non-event:able ops through the standard poll_cb that - * will trigger type-specific callbacks (and return OP_RES_HANDLED) - * or do no handling and return OP_RES_PASS. - * Also signal yield to q_serve() (which implies that op was handled). - */ - res = rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_CALLBACK, opaque); - if (res == RD_KAFKA_OP_RES_HANDLED || res == RD_KAFKA_OP_RES_YIELD) - return res; - - /* Op was not handled, log and destroy it. */ - rd_kafka_log(rk, LOG_NOTICE, "BGQUEUE", - "No support for handling " - "non-event op %s in background queue: discarding", - rd_kafka_op2str(rko->rko_type)); - rd_kafka_op_destroy(rko); - - /* Indicate that the op was handled. */ - return RD_KAFKA_OP_RES_HANDLED; -} - - -/** - * @brief Main loop for background queue thread. - */ -int rd_kafka_background_thread_main(void *arg) { - rd_kafka_t *rk = arg; - - rd_kafka_set_thread_name("background"); - rd_kafka_set_thread_sysname("rdk:bg"); - - rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BACKGROUND); - - (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1); - - /* Acquire lock (which was held by thread creator during creation) - * to synchronise state. */ - rd_kafka_wrlock(rk); - rd_kafka_wrunlock(rk); - - mtx_lock(&rk->rk_init_lock); - rk->rk_init_wait_cnt--; - cnd_broadcast(&rk->rk_init_cnd); - mtx_unlock(&rk->rk_init_lock); - - while (likely(!rd_kafka_terminating(rk))) { - rd_kafka_q_serve(rk->rk_background.q, 10 * 1000, 0, - RD_KAFKA_Q_CB_RETURN, - rd_kafka_background_queue_serve, NULL); - } - - /* Inform the user that they terminated the client before - * all outstanding events were handled. */ - if (rd_kafka_q_len(rk->rk_background.q) > 0) - rd_kafka_log(rk, LOG_INFO, "BGQUEUE", - "Purging %d unserved events from background queue", - rd_kafka_q_len(rk->rk_background.q)); - rd_kafka_q_disable(rk->rk_background.q); - rd_kafka_q_purge(rk->rk_background.q); - - rd_kafka_dbg(rk, GENERIC, "BGQUEUE", "Background queue thread exiting"); - - rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_BACKGROUND); - - rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1); - - return 0; -} - - -/** - * @brief Create the background thread. - * - * @locks_acquired rk_init_lock - * @locks_required rd_kafka_wrlock() - */ -rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk, - char *errstr, - size_t errstr_size) { -#ifndef _WIN32 - sigset_t newset, oldset; -#endif - - if (rk->rk_background.q) { - rd_snprintf(errstr, errstr_size, - "Background thread already created"); - return RD_KAFKA_RESP_ERR__CONFLICT; - } - - rk->rk_background.q = rd_kafka_q_new(rk); - - mtx_lock(&rk->rk_init_lock); - rk->rk_init_wait_cnt++; - -#ifndef _WIN32 - /* Block all signals in newly created threads. - * To avoid race condition we block all signals in the calling - * thread, which the new thread will inherit its sigmask from, - * and then restore the original sigmask of the calling thread when - * we're done creating the thread. */ - sigemptyset(&oldset); - sigfillset(&newset); - if (rk->rk_conf.term_sig) { - struct sigaction sa_term = {.sa_handler = - rd_kafka_term_sig_handler}; - sigaction(rk->rk_conf.term_sig, &sa_term, NULL); - } - pthread_sigmask(SIG_SETMASK, &newset, &oldset); -#endif - - - if ((thrd_create(&rk->rk_background.thread, - rd_kafka_background_thread_main, rk)) != - thrd_success) { - rd_snprintf(errstr, errstr_size, - "Failed to create background thread: %s", - rd_strerror(errno)); - rd_kafka_q_destroy_owner(rk->rk_background.q); - rk->rk_background.q = NULL; - rk->rk_init_wait_cnt--; - mtx_unlock(&rk->rk_init_lock); - -#ifndef _WIN32 - /* Restore sigmask of caller */ - pthread_sigmask(SIG_SETMASK, &oldset, NULL); -#endif - return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; - } - - mtx_unlock(&rk->rk_init_lock); - -#ifndef _WIN32 - /* Restore sigmask of caller */ - pthread_sigmask(SIG_SETMASK, &oldset, NULL); -#endif - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} |