From b485aab7e71c1625cfc27e0f92c9509f42378458 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 13:19:16 +0200 Subject: Adding upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../lib/librdkafka-2.1.0/tests/0073-headers.c | 381 +++++++++++++++++++++ 1 file changed, 381 insertions(+) create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c') diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c new file mode 100644 index 000000000..e7e5c4074 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c @@ -0,0 +1,381 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "rdkafka.h" + +/** + * Message Headers end-to-end tests + */ + + + +static int exp_msgid = 0; + +struct expect { + const char *name; + const char *value; +}; + + + +static void expect_check(const char *what, + const struct expect *expected, + rd_kafka_message_t *rkmessage, + int is_const) { + const struct expect *exp; + rd_kafka_resp_err_t err; + size_t idx = 0; + const char *name; + const char *value; + size_t size; + rd_kafka_headers_t *hdrs; + int msgid; + + if (rkmessage->len != sizeof(msgid)) + TEST_FAIL("%s: expected message len %" PRIusz " == sizeof(int)", + what, rkmessage->len); + + memcpy(&msgid, rkmessage->payload, rkmessage->len); + + if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) { + if (msgid == 0) { + rd_kafka_resp_err_t err2; + TEST_SAYL(3, "%s: Msg #%d: no headers, good\n", what, + msgid); + + err2 = + rd_kafka_message_detach_headers(rkmessage, &hdrs); + TEST_ASSERT(err == err2, + "expected detach_headers() error %s " + "to match headers() error %s", + rd_kafka_err2str(err2), + rd_kafka_err2str(err)); + + return; /* No headers expected for first message */ + } + + TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid, + rd_kafka_err2str(err)); + } else { + TEST_ASSERT(msgid != 0, + "%s: first message should have no headers", what); + } + + test_headers_dump(what, 3, hdrs); + + for (idx = 0, exp = expected; !rd_kafka_header_get_all( + hdrs, idx, &name, (const void **)&value, &size); + idx++, exp++) { + + TEST_SAYL(3, + "%s: Msg #%d: " + "Header #%" PRIusz ": %s='%s' (expecting %s='%s')\n", + what, msgid, idx, name, value ? value : "(NULL)", + exp->name, exp->value ? exp->value : "(NULL)"); + + if (strcmp(name, exp->name)) + TEST_FAIL( + "%s: Msg #%d: " + "Expected header %s at idx #%" PRIusz + ", not '%s' (%" PRIusz ")", + what, msgid, exp->name, idx, name, strlen(name)); + + if (!strcmp(name, "msgid")) { + int vid; + + /* Special handling: compare msgid header value + * to message body, should be identical */ + if (size != rkmessage->len || size != sizeof(int)) + TEST_FAIL( + "%s: " + "Expected msgid/int-sized payload " + "%" PRIusz ", got %" PRIusz, + what, size, rkmessage->len); + + /* Copy to avoid unaligned access (by cast) */ + memcpy(&vid, value, size); + + if (vid != msgid) + TEST_FAIL("%s: Header msgid %d != payload %d", + what, vid, msgid); + + if (exp_msgid != vid) + TEST_FAIL("%s: Expected msgid %d, not %d", what, + exp_msgid, vid); + continue; + } + + if (!exp->value) { + /* Expected NULL value */ + TEST_ASSERT(!value, + "%s: Expected NULL value for %s, got %s", + what, exp->name, value); + + } else { + TEST_ASSERT(value, + "%s: " + "Expected non-NULL value for %s, got NULL", + what, exp->name); + + TEST_ASSERT(size == strlen(exp->value), + "%s: Expected size %" PRIusz + " for %s, " + "not %" PRIusz, + what, strlen(exp->value), exp->name, size); + + TEST_ASSERT(value[size] == '\0', + "%s: " + "Expected implicit null-terminator for %s", + what, exp->name); + + TEST_ASSERT(!strcmp(exp->value, value), + "%s: " + "Expected value %s for %s, not %s", + what, exp->value, exp->name, value); + } + } + + TEST_ASSERT(exp->name == NULL, + "%s: Expected the expected, but stuck at %s which was " + "unexpected", + what, exp->name); + + if (!strcmp(what, "handle_consumed_msg") && !is_const && + (msgid % 3) == 0) { + rd_kafka_headers_t *dhdrs; + + err = rd_kafka_message_detach_headers(rkmessage, &dhdrs); + TEST_ASSERT(!err, "detach_headers() should not fail, got %s", + rd_kafka_err2str(err)); + TEST_ASSERT(hdrs == dhdrs); + + /* Verify that a new headers object can be obtained */ + err = rd_kafka_message_headers(rkmessage, &hdrs); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR); + TEST_ASSERT(hdrs != dhdrs); + rd_kafka_headers_destroy(dhdrs); + + expect_check("post_detach_headers", expected, rkmessage, + is_const); + } +} + + +/** + * @brief Final (as in no more header modifications) message check. + */ +static void +msg_final_check(const char *what, rd_kafka_message_t *rkmessage, int is_const) { + const struct expect expected[] = { + {"msgid", NULL}, /* special handling */ + {"static", "hey"}, {"null", NULL}, {"empty", ""}, + {"send1", "1"}, {"multi", "multi5"}, {NULL}}; + + expect_check(what, expected, rkmessage, is_const); + + exp_msgid++; +} + +/** + * @brief Handle consumed message, must be identical to dr_msg_cb + */ +static void handle_consumed_msg(rd_kafka_message_t *rkmessage) { + msg_final_check(__FUNCTION__, rkmessage, 0); +} + +/** + * @brief Delivery report callback + */ +static void +dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { + TEST_ASSERT(!rkmessage->err, "Message delivery failed: %s", + rd_kafka_err2str(rkmessage->err)); + + msg_final_check(__FUNCTION__, (rd_kafka_message_t *)rkmessage, 1); +} + + +/** + * @brief First on_send() interceptor + */ +static rd_kafka_resp_err_t +on_send1(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) { + const struct expect expected[] = { + {"msgid", NULL}, /* special handling */ + {"static", "hey"}, + {"multi", "multi1"}, + {"multi", "multi2"}, + {"multi", "multi3"}, + {"null", NULL}, + {"empty", ""}, + {NULL}}; + rd_kafka_headers_t *hdrs; + rd_kafka_resp_err_t err; + + expect_check(__FUNCTION__, expected, rkmessage, 0); + + err = rd_kafka_message_headers(rkmessage, &hdrs); + if (err) /* First message has no headers. */ + return RD_KAFKA_RESP_ERR_NO_ERROR; + + rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1); + rd_kafka_header_add(hdrs, "send1", -1, "1", -1); + rd_kafka_header_remove(hdrs, "multi"); + rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Second on_send() interceptor + */ +static rd_kafka_resp_err_t +on_send2(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) { + const struct expect expected[] = { + {"msgid", NULL}, /* special handling */ + {"static", "hey"}, {"null", NULL}, {"empty", ""}, + {"send1", "1"}, {"multi", "multi5"}, {NULL}}; + + expect_check(__FUNCTION__, expected, rkmessage, 0); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief on_new() interceptor to set up message interceptors + * from rd_kafka_new(). + */ +static rd_kafka_resp_err_t on_new(rd_kafka_t *rk, + const rd_kafka_conf_t *conf, + void *ic_opaque, + char *errstr, + size_t errstr_size) { + rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL); + rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL); + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +static void do_produce(const char *topic, int msgcnt) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + int i; + rd_kafka_resp_err_t err; + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "acks", "all"); + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + + rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL); + + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + /* First message is without headers (negative testing) */ + i = 0; + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE(&i, sizeof(i)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_END); + TEST_ASSERT(!err, "producev() failed: %s", rd_kafka_err2str(err)); + exp_msgid++; + + for (i = 1; i < msgcnt; i++, exp_msgid++) { + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE(&i, sizeof(i)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)), + RD_KAFKA_V_HEADER("static", "hey", -1), + RD_KAFKA_V_HEADER("multi", "multi1", -1), + RD_KAFKA_V_HEADER("multi", "multi2", 6), + RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")), + RD_KAFKA_V_HEADER("null", NULL, 0), + RD_KAFKA_V_HEADER("empty", "", 0), RD_KAFKA_V_END); + TEST_ASSERT(!err, "producev() failed: %s", + rd_kafka_err2str(err)); + } + + /* Reset expected message id for dr */ + exp_msgid = 0; + + /* Wait for timeouts and delivery reports */ + rd_kafka_flush(rk, tmout_multip(5000)); + + rd_kafka_destroy(rk); +} + +static void do_consume(const char *topic, int msgcnt) { + rd_kafka_t *rk; + rd_kafka_topic_partition_list_t *parts; + + rk = test_create_consumer(topic, NULL, NULL, NULL); + + parts = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(parts, topic, 0)->offset = + RD_KAFKA_OFFSET_BEGINNING; + + test_consumer_assign("assign", rk, parts); + + rd_kafka_topic_partition_list_destroy(parts); + + exp_msgid = 0; + + while (exp_msgid < msgcnt) { + rd_kafka_message_t *rkm; + + rkm = rd_kafka_consumer_poll(rk, 1000); + if (!rkm) + continue; + + if (rkm->err) + TEST_FAIL( + "consume error while expecting msgid %d/%d: " + "%s", + exp_msgid, msgcnt, rd_kafka_message_errstr(rkm)); + + handle_consumed_msg(rkm); + + rd_kafka_message_destroy(rkm); + } + + test_consumer_close(rk); + rd_kafka_destroy(rk); +} + + +int main_0073_headers(int argc, char **argv) { + const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 1); + const int msgcnt = 10; + + do_produce(topic, msgcnt); + do_consume(topic, msgcnt); + + return 0; +} -- cgit v1.2.3