summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0100-thread_interceptors.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0100-thread_interceptors.cpp')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0100-thread_interceptors.cpp195
1 files changed, 195 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0100-thread_interceptors.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0100-thread_interceptors.cpp
new file mode 100644
index 000000000..a34ccac98
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0100-thread_interceptors.cpp
@@ -0,0 +1,195 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2019, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include "testcpp.h"
+
+extern "C" {
+#include "rdkafka.h" /* For interceptor interface */
+#include "../src/tinycthread.h" /* For mutexes */
+}
+
+class myThreadCb {
+ public:
+ myThreadCb() : startCnt_(0), exitCnt_(0) {
+ mtx_init(&lock_, mtx_plain);
+ }
+ ~myThreadCb() {
+ mtx_destroy(&lock_);
+ }
+ int startCount() {
+ int cnt;
+ mtx_lock(&lock_);
+ cnt = startCnt_;
+ mtx_unlock(&lock_);
+ return cnt;
+ }
+ int exitCount() {
+ int cnt;
+ mtx_lock(&lock_);
+ cnt = exitCnt_;
+ mtx_unlock(&lock_);
+ return cnt;
+ }
+ virtual void thread_start_cb(const char *threadname) {
+ Test::Say(tostr() << "Started thread: " << threadname << "\n");
+ mtx_lock(&lock_);
+ startCnt_++;
+ mtx_unlock(&lock_);
+ }
+ virtual void thread_exit_cb(const char *threadname) {
+ Test::Say(tostr() << "Exiting from thread: " << threadname << "\n");
+ mtx_lock(&lock_);
+ exitCnt_++;
+ mtx_unlock(&lock_);
+ }
+
+ private:
+ int startCnt_;
+ int exitCnt_;
+ mtx_t lock_;
+};
+
+
+/**
+ * @brief C to C++ callback trampoline.
+ */
+static rd_kafka_resp_err_t on_thread_start_trampoline(
+ rd_kafka_t *rk,
+ rd_kafka_thread_type_t thread_type,
+ const char *threadname,
+ void *ic_opaque) {
+ myThreadCb *threadcb = (myThreadCb *)ic_opaque;
+
+ Test::Say(tostr() << "on_thread_start(" << thread_type << ", " << threadname
+ << ") called\n");
+
+ threadcb->thread_start_cb(threadname);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief C to C++ callback trampoline.
+ */
+static rd_kafka_resp_err_t on_thread_exit_trampoline(
+ rd_kafka_t *rk,
+ rd_kafka_thread_type_t thread_type,
+ const char *threadname,
+ void *ic_opaque) {
+ myThreadCb *threadcb = (myThreadCb *)ic_opaque;
+
+ Test::Say(tostr() << "on_thread_exit(" << thread_type << ", " << threadname
+ << ") called\n");
+
+ threadcb->thread_exit_cb(threadname);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief This interceptor is called when a new client instance is created
+ * prior to any threads being created.
+ * We use it to set up the instance's thread interceptors.
+ */
+static rd_kafka_resp_err_t on_new(rd_kafka_t *rk,
+ const rd_kafka_conf_t *conf,
+ void *ic_opaque,
+ char *errstr,
+ size_t errstr_size) {
+ Test::Say("on_new() interceptor called\n");
+ rd_kafka_interceptor_add_on_thread_start(
+ rk, "test:0100", on_thread_start_trampoline, ic_opaque);
+ rd_kafka_interceptor_add_on_thread_exit(rk, "test:0100",
+ on_thread_exit_trampoline, ic_opaque);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief The on_conf_dup() interceptor let's use add the on_new interceptor
+ * in case the config object is copied, since interceptors are not
+ * automatically copied.
+ */
+static rd_kafka_resp_err_t on_conf_dup(rd_kafka_conf_t *new_conf,
+ const rd_kafka_conf_t *old_conf,
+ size_t filter_cnt,
+ const char **filter,
+ void *ic_opaque) {
+ Test::Say("on_conf_dup() interceptor called\n");
+ return rd_kafka_conf_interceptor_add_on_new(new_conf, "test:0100", on_new,
+ ic_opaque);
+}
+
+
+
+static void test_thread_cbs() {
+ RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+ std::string errstr;
+ rd_kafka_conf_t *c_conf;
+ myThreadCb my_threads;
+
+ Test::conf_set(conf, "bootstrap.servers", "127.0.0.1:1");
+
+ /* Interceptors are not supported in the C++ API, instead use the C API:
+ * 1. Extract the C conf_t object
+ * 2. Set up an on_new() interceptor
+ * 3. Set up an on_conf_dup() interceptor to add interceptors in the
+ * case the config object is copied (which the C++ Conf always does).
+ * 4. In the on_new() interceptor, add the thread interceptors. */
+ c_conf = conf->c_ptr_global();
+ rd_kafka_conf_interceptor_add_on_new(c_conf, "test:0100", on_new,
+ &my_threads);
+ rd_kafka_conf_interceptor_add_on_conf_dup(c_conf, "test:0100", on_conf_dup,
+ &my_threads);
+
+ RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
+ if (!p)
+ Test::Fail("Failed to create Producer: " + errstr);
+ p->poll(500);
+ delete conf;
+ delete p;
+
+ Test::Say(tostr() << my_threads.startCount() << " thread start calls, "
+ << my_threads.exitCount() << " thread exit calls seen\n");
+
+ /* 3 = rdkafka main thread + internal broker + bootstrap broker */
+ if (my_threads.startCount() < 3)
+ Test::Fail("Did not catch enough thread start callback calls");
+ if (my_threads.exitCount() < 3)
+ Test::Fail("Did not catch enough thread exit callback calls");
+ if (my_threads.startCount() != my_threads.exitCount())
+ Test::Fail("Did not catch same number of start and exit callback calls");
+}
+
+
+extern "C" {
+int main_0100_thread_interceptors(int argc, char **argv) {
+ test_thread_cbs();
+ return 0;
+}
+}