summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c221
1 files changed, 221 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c
new file mode 100644
index 000000000..c69ec1767
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c
@@ -0,0 +1,221 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2018 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Background queue thread and event handling.
+ *
+ * See rdkafka.h's rd_kafka_conf_set_background_event_cb() for details.
+ */
+
+#include "rd.h"
+#include "rdkafka_int.h"
+#include "rdkafka_event.h"
+#include "rdkafka_interceptor.h"
+
+#include <signal.h>
+
+/**
+ * @brief Call the registered background_event_cb.
+ * @locality rdkafka background queue thread
+ */
+static RD_INLINE void rd_kafka_call_background_event_cb(rd_kafka_t *rk,
+ rd_kafka_op_t *rko) {
+ rd_assert(!rk->rk_background.calling);
+ rk->rk_background.calling = 1;
+
+ rk->rk_conf.background_event_cb(rk, rko, rk->rk_conf.opaque);
+
+ rk->rk_background.calling = 0;
+}
+
+
+/**
+ * @brief Background queue handler.
+ *
+ * Triggers the background_event_cb for all event:able ops,
+ * for non-event:able ops:
+ * - call op callback if set, else
+ * - log and discard the op. This is a user error, forwarding non-event
+ * APIs to the background queue.
+ */
+static rd_kafka_op_res_t
+rd_kafka_background_queue_serve(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ rd_kafka_q_cb_type_t cb_type,
+ void *opaque) {
+ rd_kafka_op_res_t res;
+
+ /*
+ * Dispatch Event:able ops to background_event_cb()
+ */
+ if (likely(rk->rk_conf.background_event_cb &&
+ rd_kafka_event_setup(rk, rko))) {
+ rd_kafka_call_background_event_cb(rk, rko);
+ /* Event must be destroyed by application. */
+ return RD_KAFKA_OP_RES_HANDLED;
+ }
+
+ /*
+ * Handle non-event:able ops through the standard poll_cb that
+ * will trigger type-specific callbacks (and return OP_RES_HANDLED)
+ * or do no handling and return OP_RES_PASS.
+ * Also signal yield to q_serve() (which implies that op was handled).
+ */
+ res = rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_CALLBACK, opaque);
+ if (res == RD_KAFKA_OP_RES_HANDLED || res == RD_KAFKA_OP_RES_YIELD)
+ return res;
+
+ /* Op was not handled, log and destroy it. */
+ rd_kafka_log(rk, LOG_NOTICE, "BGQUEUE",
+ "No support for handling "
+ "non-event op %s in background queue: discarding",
+ rd_kafka_op2str(rko->rko_type));
+ rd_kafka_op_destroy(rko);
+
+ /* Indicate that the op was handled. */
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+/**
+ * @brief Main loop for background queue thread.
+ */
+int rd_kafka_background_thread_main(void *arg) {
+ rd_kafka_t *rk = arg;
+
+ rd_kafka_set_thread_name("background");
+ rd_kafka_set_thread_sysname("rdk:bg");
+
+ rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BACKGROUND);
+
+ (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
+
+ /* Acquire lock (which was held by thread creator during creation)
+ * to synchronise state. */
+ rd_kafka_wrlock(rk);
+ rd_kafka_wrunlock(rk);
+
+ mtx_lock(&rk->rk_init_lock);
+ rk->rk_init_wait_cnt--;
+ cnd_broadcast(&rk->rk_init_cnd);
+ mtx_unlock(&rk->rk_init_lock);
+
+ while (likely(!rd_kafka_terminating(rk))) {
+ rd_kafka_q_serve(rk->rk_background.q, 10 * 1000, 0,
+ RD_KAFKA_Q_CB_RETURN,
+ rd_kafka_background_queue_serve, NULL);
+ }
+
+ /* Inform the user that they terminated the client before
+ * all outstanding events were handled. */
+ if (rd_kafka_q_len(rk->rk_background.q) > 0)
+ rd_kafka_log(rk, LOG_INFO, "BGQUEUE",
+ "Purging %d unserved events from background queue",
+ rd_kafka_q_len(rk->rk_background.q));
+ rd_kafka_q_disable(rk->rk_background.q);
+ rd_kafka_q_purge(rk->rk_background.q);
+
+ rd_kafka_dbg(rk, GENERIC, "BGQUEUE", "Background queue thread exiting");
+
+ rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_BACKGROUND);
+
+ rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
+
+ return 0;
+}
+
+
+/**
+ * @brief Create the background thread.
+ *
+ * @locks_acquired rk_init_lock
+ * @locks_required rd_kafka_wrlock()
+ */
+rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk,
+ char *errstr,
+ size_t errstr_size) {
+#ifndef _WIN32
+ sigset_t newset, oldset;
+#endif
+
+ if (rk->rk_background.q) {
+ rd_snprintf(errstr, errstr_size,
+ "Background thread already created");
+ return RD_KAFKA_RESP_ERR__CONFLICT;
+ }
+
+ rk->rk_background.q = rd_kafka_q_new(rk);
+
+ mtx_lock(&rk->rk_init_lock);
+ rk->rk_init_wait_cnt++;
+
+#ifndef _WIN32
+ /* Block all signals in newly created threads.
+ * To avoid race condition we block all signals in the calling
+ * thread, which the new thread will inherit its sigmask from,
+ * and then restore the original sigmask of the calling thread when
+ * we're done creating the thread. */
+ sigemptyset(&oldset);
+ sigfillset(&newset);
+ if (rk->rk_conf.term_sig) {
+ struct sigaction sa_term = {.sa_handler =
+ rd_kafka_term_sig_handler};
+ sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
+ }
+ pthread_sigmask(SIG_SETMASK, &newset, &oldset);
+#endif
+
+
+ if ((thrd_create(&rk->rk_background.thread,
+ rd_kafka_background_thread_main, rk)) !=
+ thrd_success) {
+ rd_snprintf(errstr, errstr_size,
+ "Failed to create background thread: %s",
+ rd_strerror(errno));
+ rd_kafka_q_destroy_owner(rk->rk_background.q);
+ rk->rk_background.q = NULL;
+ rk->rk_init_wait_cnt--;
+ mtx_unlock(&rk->rk_init_lock);
+
+#ifndef _WIN32
+ /* Restore sigmask of caller */
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+ return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ }
+
+ mtx_unlock(&rk->rk_init_lock);
+
+#ifndef _WIN32
+ /* Restore sigmask of caller */
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}