summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0064-interceptors.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0064-interceptors.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0064-interceptors.c481
1 files changed, 481 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0064-interceptors.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0064-interceptors.c
new file mode 100644
index 000000000..e5c5b047a
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0064-interceptors.c
@@ -0,0 +1,481 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2017, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "rdkafka.h"
+#include <ctype.h>
+
+/**
+ * Verify interceptor functionality.
+ *
+ * Producer MO:
+ * - create a chain of N interceptors
+ * - allocate a state struct with unique id for each message produced,
+ * provide as msg_opaque and reference from payload.
+ * - in on_send: verify expected interceptor order by counting number
+ * of consecutive bits.
+ * - in on_acknowledge: same
+ * - produce message to invalid topic which should trigger on_send+on_ack..
+ * from within produce().
+ *
+ * Consumer MO:
+ * - create a chain of M interceptors
+ * - subscribe to the previously produced topic
+ * - in on_consume: find message by id, verify expected order by bit counting.
+ * - on on_commit: just count order per on_commit chain run.
+ */
+
+
+#define msgcnt 100
+static const int producer_ic_cnt = 5;
+static const int consumer_ic_cnt = 10;
+
+/* The base values help differentiating opaque values between interceptors */
+static const int on_send_base = 1 << 24;
+static const int on_ack_base = 1 << 25;
+static const int on_consume_base = 1 << 26;
+static const int on_commit_base = 1 << 27;
+static const int base_mask = 0xff << 24;
+
+#define _ON_SEND 0
+#define _ON_ACK 1
+#define _ON_CONSUME 2
+#define _ON_CNT 3
+struct msg_state {
+ int id;
+ int bits[_ON_CNT]; /* Bit field, one bit per interceptor */
+ mtx_t lock;
+};
+
+/* Per-message state */
+static struct msg_state msgs[msgcnt];
+
+/* on_commit bits */
+static int on_commit_bits = 0;
+
+/**
+ * @brief Verify that \p bits matches the number of expected interceptor
+ * call cnt.
+ *
+ * Verify interceptor order: the lower bits of ic_id
+ * denotes the order in which interceptors were added and it
+ * must be reflected here, meaning that all lower bits must be set,
+ * and no higher ones.
+ */
+static void msg_verify_ic_cnt(const struct msg_state *msg,
+ const char *what,
+ int bits,
+ int exp_cnt) {
+ int exp_bits = exp_cnt ? (1 << exp_cnt) - 1 : 0;
+
+ TEST_ASSERT(bits == exp_bits,
+ "msg #%d: %s: expected bits 0x%x (%d), got 0x%x", msg->id,
+ what, exp_bits, exp_cnt, bits);
+}
+
+/*
+ * @brief Same as msg_verify_ic_cnt() without the msg reliance
+ */
+static void verify_ic_cnt(const char *what, int bits, int exp_cnt) {
+ int exp_bits = exp_cnt ? (1 << exp_cnt) - 1 : 0;
+
+ TEST_ASSERT(bits == exp_bits, "%s: expected bits 0x%x (%d), got 0x%x",
+ what, exp_bits, exp_cnt, bits);
+}
+
+
+
+static void verify_msg(const char *what,
+ int base,
+ int bitid,
+ rd_kafka_message_t *rkmessage,
+ void *ic_opaque) {
+ const char *id_str = rkmessage->key;
+ struct msg_state *msg;
+ int id;
+ int ic_id = (int)(intptr_t)ic_opaque;
+
+ /* Verify opaque (base | ic id) */
+ TEST_ASSERT((ic_id & base_mask) == base);
+ ic_id &= ~base_mask;
+
+ /* Find message by id */
+ TEST_ASSERT(rkmessage->key && rkmessage->key_len > 0 &&
+ id_str[(int)rkmessage->key_len - 1] == '\0' &&
+ strlen(id_str) > 0 && isdigit(*id_str));
+ id = atoi(id_str);
+ TEST_ASSERT(id >= 0 && id < msgcnt, "%s: bad message id %s", what,
+ id_str);
+ msg = &msgs[id];
+
+ mtx_lock(&msg->lock);
+
+ TEST_ASSERT(msg->id == id, "expected msg #%d has wrong id %d", id,
+ msg->id);
+
+ /* Verify message opaque */
+ if (!strcmp(what, "on_send") || !strncmp(what, "on_ack", 6))
+ TEST_ASSERT(rkmessage->_private == (void *)msg);
+
+ TEST_SAYL(3, "%s: interceptor #%d called for message #%d (%d)\n", what,
+ ic_id, id, msg->id);
+
+ msg_verify_ic_cnt(msg, what, msg->bits[bitid], ic_id);
+
+ /* Set this interceptor's bit */
+ msg->bits[bitid] |= 1 << ic_id;
+
+ mtx_unlock(&msg->lock);
+}
+
+
+static rd_kafka_resp_err_t
+on_send(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) {
+ TEST_ASSERT(ic_opaque != NULL);
+ verify_msg("on_send", on_send_base, _ON_SEND, rkmessage, ic_opaque);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+static rd_kafka_resp_err_t
+on_ack(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) {
+ TEST_ASSERT(ic_opaque != NULL);
+ verify_msg("on_ack", on_ack_base, _ON_ACK, rkmessage, ic_opaque);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+static rd_kafka_resp_err_t
+on_consume(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) {
+ TEST_ASSERT(ic_opaque != NULL);
+ verify_msg("on_consume", on_consume_base, _ON_CONSUME, rkmessage,
+ ic_opaque);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+static rd_kafka_resp_err_t
+on_commit(rd_kafka_t *rk,
+ const rd_kafka_topic_partition_list_t *offsets,
+ rd_kafka_resp_err_t err,
+ void *ic_opaque) {
+ int ic_id = (int)(intptr_t)ic_opaque;
+
+ /* Since on_commit is triggered a bit randomly and not per
+ * message we only try to make sure it gets fully set at least once. */
+ TEST_ASSERT(ic_opaque != NULL);
+
+ /* Verify opaque (base | ic id) */
+ TEST_ASSERT((ic_id & base_mask) == on_commit_base);
+ ic_id &= ~base_mask;
+
+ TEST_ASSERT(ic_opaque != NULL);
+
+ TEST_SAYL(3, "on_commit: interceptor #%d called: %s\n", ic_id,
+ rd_kafka_err2str(err));
+ if (test_level >= 4)
+ test_print_partition_list(offsets);
+
+ /* Check for rollover where a previous on_commit stint was
+ * succesful and it just now started over */
+ if (on_commit_bits > 0 && ic_id == 0) {
+ /* Verify completeness of previous stint */
+ verify_ic_cnt("on_commit", on_commit_bits, consumer_ic_cnt);
+ /* Reset */
+ on_commit_bits = 0;
+ }
+
+ verify_ic_cnt("on_commit", on_commit_bits, ic_id);
+
+ /* Set this interceptor's bit */
+ on_commit_bits |= 1 << ic_id;
+
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+static void do_test_produce(rd_kafka_t *rk,
+ const char *topic,
+ int32_t partition,
+ int msgid,
+ int exp_fail,
+ int exp_ic_cnt) {
+ rd_kafka_resp_err_t err;
+ char key[16];
+ struct msg_state *msg = &msgs[msgid];
+ int i;
+
+ /* Message state should be empty, no interceptors should have
+ * been called yet.. */
+ for (i = 0; i < _ON_CNT; i++)
+ TEST_ASSERT(msg->bits[i] == 0);
+
+ mtx_init(&msg->lock, mtx_plain);
+ msg->id = msgid;
+ rd_snprintf(key, sizeof(key), "%d", msgid);
+
+ err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic),
+ RD_KAFKA_V_PARTITION(partition),
+ RD_KAFKA_V_KEY(key, strlen(key) + 1),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+ RD_KAFKA_V_OPAQUE(msg), RD_KAFKA_V_END);
+
+ mtx_lock(&msg->lock);
+ msg_verify_ic_cnt(msg, "on_send", msg->bits[_ON_SEND], exp_ic_cnt);
+
+ if (err) {
+ msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK],
+ exp_ic_cnt);
+ TEST_ASSERT(exp_fail, "producev() failed: %s",
+ rd_kafka_err2str(err));
+ } else {
+ msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK], 0);
+ TEST_ASSERT(!exp_fail,
+ "expected produce failure for msg #%d, not %s",
+ msgid, rd_kafka_err2str(err));
+ }
+ mtx_unlock(&msg->lock);
+}
+
+
+
+static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk,
+ const rd_kafka_conf_t *conf,
+ void *ic_opaque,
+ char *errstr,
+ size_t errstr_size) {
+ int i;
+
+ for (i = 0; i < producer_ic_cnt; i++) {
+ rd_kafka_resp_err_t err;
+
+ err = rd_kafka_interceptor_add_on_send(
+ rk, tsprintf("on_send:%d", i), on_send,
+ (void *)(intptr_t)(on_send_base | i));
+ TEST_ASSERT(!err, "add_on_send failed: %s",
+ rd_kafka_err2str(err));
+
+ err = rd_kafka_interceptor_add_on_acknowledgement(
+ rk, tsprintf("on_acknowledgement:%d", i), on_ack,
+ (void *)(intptr_t)(on_ack_base | i));
+ TEST_ASSERT(!err, "add_on_ack.. failed: %s",
+ rd_kafka_err2str(err));
+
+
+ /* Add consumer interceptors as well to make sure
+ * they are not called. */
+ err = rd_kafka_interceptor_add_on_consume(
+ rk, tsprintf("on_consume:%d", i), on_consume, NULL);
+ TEST_ASSERT(!err, "add_on_consume failed: %s",
+ rd_kafka_err2str(err));
+
+
+ err = rd_kafka_interceptor_add_on_commit(
+ rk, tsprintf("on_commit:%d", i), on_commit, NULL);
+ TEST_ASSERT(!err, "add_on_commit failed: %s",
+ rd_kafka_err2str(err));
+ }
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+static void do_test_producer(const char *topic) {
+ rd_kafka_conf_t *conf;
+ int i;
+ rd_kafka_t *rk;
+
+ TEST_SAY(_C_MAG "[ %s ]\n" _C_CLR, __FUNCTION__);
+
+ test_conf_init(&conf, NULL, 0);
+
+ rd_kafka_conf_interceptor_add_on_new(conf, "on_new_prodcer",
+ on_new_producer, NULL);
+
+ /* Create producer */
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ for (i = 0; i < msgcnt - 1; i++)
+ do_test_produce(rk, topic, RD_KAFKA_PARTITION_UA, i, 0,
+ producer_ic_cnt);
+
+ /* Wait for messages to be delivered */
+ test_flush(rk, -1);
+
+ /* Now send a message that will fail in produce()
+ * due to bad partition */
+ do_test_produce(rk, topic, 1234, i, 1, producer_ic_cnt);
+
+
+ /* Verify acks */
+ for (i = 0; i < msgcnt; i++) {
+ struct msg_state *msg = &msgs[i];
+ mtx_lock(&msg->lock);
+ msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK],
+ producer_ic_cnt);
+ mtx_unlock(&msg->lock);
+ }
+
+ rd_kafka_destroy(rk);
+}
+
+
+static rd_kafka_resp_err_t on_new_consumer(rd_kafka_t *rk,
+ const rd_kafka_conf_t *conf,
+ void *ic_opaque,
+ char *errstr,
+ size_t errstr_size) {
+ int i;
+
+ for (i = 0; i < consumer_ic_cnt; i++) {
+ rd_kafka_interceptor_add_on_consume(
+ rk, tsprintf("on_consume:%d", i), on_consume,
+ (void *)(intptr_t)(on_consume_base | i));
+
+ rd_kafka_interceptor_add_on_commit(
+ rk, tsprintf("on_commit:%d", i), on_commit,
+ (void *)(intptr_t)(on_commit_base | i));
+
+ /* Add producer interceptors as well to make sure they
+ * are not called. */
+ rd_kafka_interceptor_add_on_send(rk, tsprintf("on_send:%d", i),
+ on_send, NULL);
+
+ rd_kafka_interceptor_add_on_acknowledgement(
+ rk, tsprintf("on_acknowledgement:%d", i), on_ack, NULL);
+ }
+
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+static void do_test_consumer(const char *topic) {
+
+ rd_kafka_conf_t *conf;
+ int i;
+ rd_kafka_t *rk;
+
+ TEST_SAY(_C_MAG "[ %s ]\n" _C_CLR, __FUNCTION__);
+
+ test_conf_init(&conf, NULL, 0);
+
+ rd_kafka_conf_interceptor_add_on_new(conf, "on_new_consumer",
+ on_new_consumer, NULL);
+
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+
+ /* Create producer */
+ rk = test_create_consumer(topic, NULL, conf, NULL);
+
+ test_consumer_subscribe(rk, topic);
+
+ /* Consume messages (-1 for the one that failed producing) */
+ test_consumer_poll("interceptors.consume", rk, 0, -1, -1, msgcnt - 1,
+ NULL);
+
+ /* Verify on_consume */
+ for (i = 0; i < msgcnt - 1; i++) {
+ struct msg_state *msg = &msgs[i];
+ mtx_lock(&msg->lock);
+ msg_verify_ic_cnt(msg, "on_consume", msg->bits[_ON_CONSUME],
+ consumer_ic_cnt);
+ mtx_unlock(&msg->lock);
+ }
+
+ /* Verify that the produce-failed message didnt have
+ * interceptors called */
+ mtx_lock(&msgs[msgcnt - 1].lock);
+ msg_verify_ic_cnt(&msgs[msgcnt - 1], "on_consume",
+ msgs[msgcnt - 1].bits[_ON_CONSUME], 0);
+ mtx_unlock(&msgs[msgcnt - 1].lock);
+
+ test_consumer_close(rk);
+
+ verify_ic_cnt("on_commit", on_commit_bits, consumer_ic_cnt);
+
+ rd_kafka_destroy(rk);
+}
+
+/**
+ * @brief Interceptors must not be copied automatically by conf_dup()
+ * unless the interceptors have added on_conf_dup().
+ * This behaviour makes sure an interceptor's instance
+ * is not duplicated without the interceptor's knowledge or
+ * assistance.
+ */
+static void do_test_conf_copy(const char *topic) {
+ rd_kafka_conf_t *conf, *conf2;
+ int i;
+ rd_kafka_t *rk;
+
+ TEST_SAY(_C_MAG "[ %s ]\n" _C_CLR, __FUNCTION__);
+
+ memset(&msgs[0], 0, sizeof(msgs));
+
+ test_conf_init(&conf, NULL, 0);
+
+ rd_kafka_conf_interceptor_add_on_new(conf, "on_new_conf_copy",
+ on_new_producer, NULL);
+
+ /* Now copy the configuration to verify that interceptors are
+ * NOT copied. */
+ conf2 = conf;
+ conf = rd_kafka_conf_dup(conf2);
+ rd_kafka_conf_destroy(conf2);
+
+ /* Create producer */
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ for (i = 0; i < msgcnt - 1; i++)
+ do_test_produce(rk, topic, RD_KAFKA_PARTITION_UA, i, 0, 0);
+
+ /* Wait for messages to be delivered */
+ test_flush(rk, -1);
+
+ /* Verify acks */
+ for (i = 0; i < msgcnt; i++) {
+ struct msg_state *msg = &msgs[i];
+ mtx_lock(&msg->lock);
+ msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK], 0);
+ mtx_unlock(&msg->lock);
+ }
+
+ rd_kafka_destroy(rk);
+}
+
+
+int main_0064_interceptors(int argc, char **argv) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+
+ do_test_producer(topic);
+
+ do_test_consumer(topic);
+
+ do_test_conf_copy(topic);
+
+ return 0;
+}