diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c new file mode 100644 index 000000000..c69ec1767 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_background.c @@ -0,0 +1,221 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2018 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Background queue thread and event handling. + * + * See rdkafka.h's rd_kafka_conf_set_background_event_cb() for details. + */ + +#include "rd.h" +#include "rdkafka_int.h" +#include "rdkafka_event.h" +#include "rdkafka_interceptor.h" + +#include <signal.h> + +/** + * @brief Call the registered background_event_cb. + * @locality rdkafka background queue thread + */ +static RD_INLINE void rd_kafka_call_background_event_cb(rd_kafka_t *rk, + rd_kafka_op_t *rko) { + rd_assert(!rk->rk_background.calling); + rk->rk_background.calling = 1; + + rk->rk_conf.background_event_cb(rk, rko, rk->rk_conf.opaque); + + rk->rk_background.calling = 0; +} + + +/** + * @brief Background queue handler. + * + * Triggers the background_event_cb for all event:able ops, + * for non-event:able ops: + * - call op callback if set, else + * - log and discard the op. This is a user error, forwarding non-event + * APIs to the background queue. + */ +static rd_kafka_op_res_t +rd_kafka_background_queue_serve(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko, + rd_kafka_q_cb_type_t cb_type, + void *opaque) { + rd_kafka_op_res_t res; + + /* + * Dispatch Event:able ops to background_event_cb() + */ + if (likely(rk->rk_conf.background_event_cb && + rd_kafka_event_setup(rk, rko))) { + rd_kafka_call_background_event_cb(rk, rko); + /* Event must be destroyed by application. */ + return RD_KAFKA_OP_RES_HANDLED; + } + + /* + * Handle non-event:able ops through the standard poll_cb that + * will trigger type-specific callbacks (and return OP_RES_HANDLED) + * or do no handling and return OP_RES_PASS. + * Also signal yield to q_serve() (which implies that op was handled). + */ + res = rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_CALLBACK, opaque); + if (res == RD_KAFKA_OP_RES_HANDLED || res == RD_KAFKA_OP_RES_YIELD) + return res; + + /* Op was not handled, log and destroy it. */ + rd_kafka_log(rk, LOG_NOTICE, "BGQUEUE", + "No support for handling " + "non-event op %s in background queue: discarding", + rd_kafka_op2str(rko->rko_type)); + rd_kafka_op_destroy(rko); + + /* Indicate that the op was handled. */ + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * @brief Main loop for background queue thread. + */ +int rd_kafka_background_thread_main(void *arg) { + rd_kafka_t *rk = arg; + + rd_kafka_set_thread_name("background"); + rd_kafka_set_thread_sysname("rdk:bg"); + + rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BACKGROUND); + + (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1); + + /* Acquire lock (which was held by thread creator during creation) + * to synchronise state. */ + rd_kafka_wrlock(rk); + rd_kafka_wrunlock(rk); + + mtx_lock(&rk->rk_init_lock); + rk->rk_init_wait_cnt--; + cnd_broadcast(&rk->rk_init_cnd); + mtx_unlock(&rk->rk_init_lock); + + while (likely(!rd_kafka_terminating(rk))) { + rd_kafka_q_serve(rk->rk_background.q, 10 * 1000, 0, + RD_KAFKA_Q_CB_RETURN, + rd_kafka_background_queue_serve, NULL); + } + + /* Inform the user that they terminated the client before + * all outstanding events were handled. */ + if (rd_kafka_q_len(rk->rk_background.q) > 0) + rd_kafka_log(rk, LOG_INFO, "BGQUEUE", + "Purging %d unserved events from background queue", + rd_kafka_q_len(rk->rk_background.q)); + rd_kafka_q_disable(rk->rk_background.q); + rd_kafka_q_purge(rk->rk_background.q); + + rd_kafka_dbg(rk, GENERIC, "BGQUEUE", "Background queue thread exiting"); + + rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_BACKGROUND); + + rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1); + + return 0; +} + + +/** + * @brief Create the background thread. + * + * @locks_acquired rk_init_lock + * @locks_required rd_kafka_wrlock() + */ +rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk, + char *errstr, + size_t errstr_size) { +#ifndef _WIN32 + sigset_t newset, oldset; +#endif + + if (rk->rk_background.q) { + rd_snprintf(errstr, errstr_size, + "Background thread already created"); + return RD_KAFKA_RESP_ERR__CONFLICT; + } + + rk->rk_background.q = rd_kafka_q_new(rk); + + mtx_lock(&rk->rk_init_lock); + rk->rk_init_wait_cnt++; + +#ifndef _WIN32 + /* Block all signals in newly created threads. + * To avoid race condition we block all signals in the calling + * thread, which the new thread will inherit its sigmask from, + * and then restore the original sigmask of the calling thread when + * we're done creating the thread. */ + sigemptyset(&oldset); + sigfillset(&newset); + if (rk->rk_conf.term_sig) { + struct sigaction sa_term = {.sa_handler = + rd_kafka_term_sig_handler}; + sigaction(rk->rk_conf.term_sig, &sa_term, NULL); + } + pthread_sigmask(SIG_SETMASK, &newset, &oldset); +#endif + + + if ((thrd_create(&rk->rk_background.thread, + rd_kafka_background_thread_main, rk)) != + thrd_success) { + rd_snprintf(errstr, errstr_size, + "Failed to create background thread: %s", + rd_strerror(errno)); + rd_kafka_q_destroy_owner(rk->rk_background.q); + rk->rk_background.q = NULL; + rk->rk_init_wait_cnt--; + mtx_unlock(&rk->rk_init_lock); + +#ifndef _WIN32 + /* Restore sigmask of caller */ + pthread_sigmask(SIG_SETMASK, &oldset, NULL); +#endif + return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + } + + mtx_unlock(&rk->rk_init_lock); + +#ifndef _WIN32 + /* Restore sigmask of caller */ + pthread_sigmask(SIG_SETMASK, &oldset, NULL); +#endif + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} |