summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c221
1 files changed, 0 insertions, 221 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c
deleted file mode 100644
index c69ec1767..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2018 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-/**
- * Background queue thread and event handling.
- *
- * See rdkafka.h's rd_kafka_conf_set_background_event_cb() for details.
- */
-
-#include "rd.h"
-#include "rdkafka_int.h"
-#include "rdkafka_event.h"
-#include "rdkafka_interceptor.h"
-
-#include <signal.h>
-
-/**
- * @brief Call the registered background_event_cb.
- * @locality rdkafka background queue thread
- */
-static RD_INLINE void rd_kafka_call_background_event_cb(rd_kafka_t *rk,
- rd_kafka_op_t *rko) {
- rd_assert(!rk->rk_background.calling);
- rk->rk_background.calling = 1;
-
- rk->rk_conf.background_event_cb(rk, rko, rk->rk_conf.opaque);
-
- rk->rk_background.calling = 0;
-}
-
-
-/**
- * @brief Background queue handler.
- *
- * Triggers the background_event_cb for all event:able ops,
- * for non-event:able ops:
- * - call op callback if set, else
- * - log and discard the op. This is a user error, forwarding non-event
- * APIs to the background queue.
- */
-static rd_kafka_op_res_t
-rd_kafka_background_queue_serve(rd_kafka_t *rk,
- rd_kafka_q_t *rkq,
- rd_kafka_op_t *rko,
- rd_kafka_q_cb_type_t cb_type,
- void *opaque) {
- rd_kafka_op_res_t res;
-
- /*
- * Dispatch Event:able ops to background_event_cb()
- */
- if (likely(rk->rk_conf.background_event_cb &&
- rd_kafka_event_setup(rk, rko))) {
- rd_kafka_call_background_event_cb(rk, rko);
- /* Event must be destroyed by application. */
- return RD_KAFKA_OP_RES_HANDLED;
- }
-
- /*
- * Handle non-event:able ops through the standard poll_cb that
- * will trigger type-specific callbacks (and return OP_RES_HANDLED)
- * or do no handling and return OP_RES_PASS.
- * Also signal yield to q_serve() (which implies that op was handled).
- */
- res = rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_CALLBACK, opaque);
- if (res == RD_KAFKA_OP_RES_HANDLED || res == RD_KAFKA_OP_RES_YIELD)
- return res;
-
- /* Op was not handled, log and destroy it. */
- rd_kafka_log(rk, LOG_NOTICE, "BGQUEUE",
- "No support for handling "
- "non-event op %s in background queue: discarding",
- rd_kafka_op2str(rko->rko_type));
- rd_kafka_op_destroy(rko);
-
- /* Indicate that the op was handled. */
- return RD_KAFKA_OP_RES_HANDLED;
-}
-
-
-/**
- * @brief Main loop for background queue thread.
- */
-int rd_kafka_background_thread_main(void *arg) {
- rd_kafka_t *rk = arg;
-
- rd_kafka_set_thread_name("background");
- rd_kafka_set_thread_sysname("rdk:bg");
-
- rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BACKGROUND);
-
- (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
-
- /* Acquire lock (which was held by thread creator during creation)
- * to synchronise state. */
- rd_kafka_wrlock(rk);
- rd_kafka_wrunlock(rk);
-
- mtx_lock(&rk->rk_init_lock);
- rk->rk_init_wait_cnt--;
- cnd_broadcast(&rk->rk_init_cnd);
- mtx_unlock(&rk->rk_init_lock);
-
- while (likely(!rd_kafka_terminating(rk))) {
- rd_kafka_q_serve(rk->rk_background.q, 10 * 1000, 0,
- RD_KAFKA_Q_CB_RETURN,
- rd_kafka_background_queue_serve, NULL);
- }
-
- /* Inform the user that they terminated the client before
- * all outstanding events were handled. */
- if (rd_kafka_q_len(rk->rk_background.q) > 0)
- rd_kafka_log(rk, LOG_INFO, "BGQUEUE",
- "Purging %d unserved events from background queue",
- rd_kafka_q_len(rk->rk_background.q));
- rd_kafka_q_disable(rk->rk_background.q);
- rd_kafka_q_purge(rk->rk_background.q);
-
- rd_kafka_dbg(rk, GENERIC, "BGQUEUE", "Background queue thread exiting");
-
- rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_BACKGROUND);
-
- rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
-
- return 0;
-}
-
-
-/**
- * @brief Create the background thread.
- *
- * @locks_acquired rk_init_lock
- * @locks_required rd_kafka_wrlock()
- */
-rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk,
- char *errstr,
- size_t errstr_size) {
-#ifndef _WIN32
- sigset_t newset, oldset;
-#endif
-
- if (rk->rk_background.q) {
- rd_snprintf(errstr, errstr_size,
- "Background thread already created");
- return RD_KAFKA_RESP_ERR__CONFLICT;
- }
-
- rk->rk_background.q = rd_kafka_q_new(rk);
-
- mtx_lock(&rk->rk_init_lock);
- rk->rk_init_wait_cnt++;
-
-#ifndef _WIN32
- /* Block all signals in newly created threads.
- * To avoid race condition we block all signals in the calling
- * thread, which the new thread will inherit its sigmask from,
- * and then restore the original sigmask of the calling thread when
- * we're done creating the thread. */
- sigemptyset(&oldset);
- sigfillset(&newset);
- if (rk->rk_conf.term_sig) {
- struct sigaction sa_term = {.sa_handler =
- rd_kafka_term_sig_handler};
- sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
- }
- pthread_sigmask(SIG_SETMASK, &newset, &oldset);
-#endif
-
-
- if ((thrd_create(&rk->rk_background.thread,
- rd_kafka_background_thread_main, rk)) !=
- thrd_success) {
- rd_snprintf(errstr, errstr_size,
- "Failed to create background thread: %s",
- rd_strerror(errno));
- rd_kafka_q_destroy_owner(rk->rk_background.q);
- rk->rk_background.q = NULL;
- rk->rk_init_wait_cnt--;
- mtx_unlock(&rk->rk_init_lock);
-
-#ifndef _WIN32
- /* Restore sigmask of caller */
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
-#endif
- return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
- }
-
- mtx_unlock(&rk->rk_init_lock);
-
-#ifndef _WIN32
- /* Restore sigmask of caller */
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
-#endif
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}