diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c new file mode 100644 index 00000000..0576d611 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c @@ -0,0 +1,448 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "rdkafka.h" + +/** + * Local (no broker) unit-like tests of Message Headers + */ + + + +static int exp_msgid = 0; + +struct expect { + const char *name; + const char *value; +}; + +/** + * @brief returns the message id + */ +static int expect_check(const char *what, + const struct expect *expected, + const rd_kafka_message_t *rkmessage) { + const struct expect *exp; + rd_kafka_resp_err_t err; + size_t idx = 0; + const char *name; + const char *value; + size_t size; + rd_kafka_headers_t *hdrs; + int msgid; + + if (rkmessage->len != sizeof(msgid)) + TEST_FAIL("%s: expected message len %" PRIusz " == sizeof(int)", + what, rkmessage->len); + + memcpy(&msgid, rkmessage->payload, rkmessage->len); + + if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) { + if (msgid == 0) + return 0; /* No headers expected for first message */ + + TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid, + rd_kafka_err2str(err)); + } else { + TEST_ASSERT(msgid != 0, + "%s: first message should have no headers", what); + } + + /* msgid should always be first and has a variable value so hard to + * match with the expect struct. */ + for (idx = 0, exp = expected; !rd_kafka_header_get_all( + hdrs, idx, &name, (const void **)&value, &size); + idx++, exp++) { + + TEST_SAYL(3, + "%s: Msg #%d: " + "Header #%" PRIusz ": %s='%s' (expecting %s='%s')\n", + what, msgid, idx, name, value ? value : "(NULL)", + exp->name, exp->value ? exp->value : "(NULL)"); + + if (strcmp(name, exp->name)) + TEST_FAIL("%s: Expected header %s at idx #%" PRIusz + ", not %s", + what, exp->name, idx - 1, name); + + if (!strcmp(name, "msgid")) { + int vid; + + /* Special handling: compare msgid header value + * to message body, should be identical */ + if (size != rkmessage->len || size != sizeof(int)) + TEST_FAIL( + "%s: " + "Expected msgid/int-sized payload " + "%" PRIusz ", got %" PRIusz, + what, size, rkmessage->len); + + /* Copy to avoid unaligned access (by cast) */ + memcpy(&vid, value, size); + + if (vid != msgid) + TEST_FAIL("%s: Header msgid %d != payload %d", + what, vid, msgid); + + if (exp_msgid != vid) + TEST_FAIL("%s: Expected msgid %d, not %d", what, + exp_msgid, vid); + continue; + } + + if (!exp->value) { + /* Expected NULL value */ + TEST_ASSERT(!value, + "%s: Expected NULL value for %s, got %s", + what, exp->name, value); + + } else { + TEST_ASSERT(value, + "%s: " + "Expected non-NULL value for %s, got NULL", + what, exp->name); + + TEST_ASSERT(size == strlen(exp->value), + "%s: Expected size %" PRIusz + " for %s, " + "not %" PRIusz, + what, strlen(exp->value), exp->name, size); + + TEST_ASSERT(value[size] == '\0', + "%s: " + "Expected implicit null-terminator for %s", + what, exp->name); + + TEST_ASSERT(!strcmp(exp->value, value), + "%s: " + "Expected value %s for %s, not %s", + what, exp->value, exp->name, value); + } + } + + TEST_ASSERT(exp->name == NULL, + "%s: Expected the expected, but stuck at %s which was " + "unexpected", + what, exp->name); + + return msgid; +} + + +/** + * @brief Delivery report callback + */ +static void +dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { + const struct expect expected[] = { + {"msgid", NULL}, /* special handling */ + {"static", "hey"}, {"null", NULL}, {"empty", ""}, + {"send1", "1"}, {"multi", "multi5"}, {NULL}}; + const struct expect replace_expected[] = { + {"msgid", NULL}, {"new", "one"}, + {"this is the", NULL}, {"replaced headers\"", ""}, + {"new", "right?"}, {NULL}}; + const struct expect *exp; + rd_kafka_headers_t *new_hdrs; + int msgid; + + TEST_ASSERT(rkmessage->err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, + "Expected message to fail with MSG_TIMED_OUT, not %s", + rd_kafka_err2str(rkmessage->err)); + + msgid = expect_check(__FUNCTION__, expected, rkmessage); + + /* Replace entire headers list */ + if (msgid > 0) { + new_hdrs = rd_kafka_headers_new(1); + rd_kafka_header_add(new_hdrs, "msgid", -1, &msgid, + sizeof(msgid)); + for (exp = &replace_expected[1]; exp->name; exp++) + rd_kafka_header_add(new_hdrs, exp->name, -1, exp->value, + -1); + + rd_kafka_message_set_headers((rd_kafka_message_t *)rkmessage, + new_hdrs); + + expect_check(__FUNCTION__, replace_expected, rkmessage); + } + + exp_msgid++; +} + +static void expect_iter(const char *what, + const rd_kafka_headers_t *hdrs, + const char *name, + const char **expected, + size_t cnt) { + size_t idx; + rd_kafka_resp_err_t err; + const void *value; + size_t size; + + for (idx = 0; + !(err = rd_kafka_header_get(hdrs, idx, name, &value, &size)); + idx++) { + TEST_ASSERT(idx < cnt, + "%s: too many headers matching '%s', " + "expected %" PRIusz, + what, name, cnt); + TEST_SAYL(3, + "%s: get(%" PRIusz + ", '%s') " + "expecting '%s' =? '%s'\n", + what, idx, name, expected[idx], (const char *)value); + + + TEST_ASSERT( + !strcmp((const char *)value, expected[idx]), + "%s: get(%" PRIusz ", '%s') expected '%s', not '%s'", what, + idx, name, expected[idx], (const char *)value); + } + + TEST_ASSERT(idx == cnt, + "%s: expected %" PRIusz + " headers matching '%s', not %" PRIusz, + what, cnt, name, idx); +} + + + +/** + * @brief First on_send() interceptor + */ +static rd_kafka_resp_err_t +on_send1(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) { + const struct expect expected[] = { + {"msgid", NULL}, /* special handling */ + {"static", "hey"}, + {"multi", "multi1"}, + {"multi", "multi2"}, + {"multi", "multi3"}, + {"null", NULL}, + {"empty", ""}, + {NULL}}; + const char *expect_iter_multi[4] = { + "multi1", "multi2", "multi3", "multi4" /* added below */ + }; + const char *expect_iter_static[1] = {"hey"}; + rd_kafka_headers_t *hdrs; + size_t header_cnt; + rd_kafka_resp_err_t err; + const void *value; + size_t size; + + expect_check(__FUNCTION__, expected, rkmessage); + + err = rd_kafka_message_headers(rkmessage, &hdrs); + if (err) /* First message has no headers. */ + return RD_KAFKA_RESP_ERR_NO_ERROR; + + header_cnt = rd_kafka_header_cnt(hdrs); + TEST_ASSERT(header_cnt == 7, "Expected 7 length got %" PRIusz "", + header_cnt); + + rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1); + + header_cnt = rd_kafka_header_cnt(hdrs); + TEST_ASSERT(header_cnt == 8, "Expected 8 length got %" PRIusz "", + header_cnt); + + /* test iter() */ + expect_iter(__FUNCTION__, hdrs, "multi", expect_iter_multi, 4); + expect_iter(__FUNCTION__, hdrs, "static", expect_iter_static, 1); + expect_iter(__FUNCTION__, hdrs, "notexists", NULL, 0); + + rd_kafka_header_add(hdrs, "send1", -1, "1", -1); + + header_cnt = rd_kafka_header_cnt(hdrs); + TEST_ASSERT(header_cnt == 9, "Expected 9 length got %" PRIusz "", + header_cnt); + + rd_kafka_header_remove(hdrs, "multi"); + + header_cnt = rd_kafka_header_cnt(hdrs); + TEST_ASSERT(header_cnt == 5, "Expected 5 length got %" PRIusz "", + header_cnt); + + rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1); + + header_cnt = rd_kafka_header_cnt(hdrs); + TEST_ASSERT(header_cnt == 6, "Expected 6 length got %" PRIusz "", + header_cnt); + + /* test get_last() */ + err = rd_kafka_header_get_last(hdrs, "multi", &value, &size); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + TEST_ASSERT(size == strlen("multi5") && + !strcmp((const char *)value, "multi5"), + "expected 'multi5', not '%s'", (const char *)value); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Second on_send() interceptor + */ +static rd_kafka_resp_err_t +on_send2(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) { + const struct expect expected[] = { + {"msgid", NULL}, /* special handling */ + {"static", "hey"}, {"null", NULL}, {"empty", ""}, + {"send1", "1"}, {"multi", "multi5"}, {NULL}}; + + expect_check(__FUNCTION__, expected, rkmessage); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief on_new() interceptor to set up message interceptors + * from rd_kafka_new(). + */ +static rd_kafka_resp_err_t on_new(rd_kafka_t *rk, + const rd_kafka_conf_t *conf, + void *ic_opaque, + char *errstr, + size_t errstr_size) { + rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL); + rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL); + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +int main_0072_headers_ut(int argc, char **argv) { + const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 0); + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + int i; + size_t header_cnt; + const int msgcnt = 10; + rd_kafka_resp_err_t err; + + conf = rd_kafka_conf_new(); + test_conf_set(conf, "message.timeout.ms", "1"); + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + + rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL); + + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + /* First message is without headers (negative testing) */ + i = 0; + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_VALUE(&i, sizeof(i)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_END); + TEST_ASSERT(!err, "producev() failed: %s", rd_kafka_err2str(err)); + exp_msgid++; + + for (i = 1; i < msgcnt; i++, exp_msgid++) { + /* Use headers list on one message */ + if (i == 3) { + rd_kafka_headers_t *hdrs = rd_kafka_headers_new(4); + + header_cnt = rd_kafka_header_cnt(hdrs); + TEST_ASSERT(header_cnt == 0, + "Expected 0 length got %" PRIusz "", + header_cnt); + + rd_kafka_headers_t *copied; + + rd_kafka_header_add(hdrs, "msgid", -1, &i, sizeof(i)); + rd_kafka_header_add(hdrs, "static", -1, "hey", -1); + rd_kafka_header_add(hdrs, "multi", -1, "multi1", -1); + rd_kafka_header_add(hdrs, "multi", -1, "multi2", 6); + rd_kafka_header_add(hdrs, "multi", -1, "multi3", + strlen("multi3")); + rd_kafka_header_add(hdrs, "null", -1, NULL, 0); + + /* Make a copy of the headers to verify copy() */ + copied = rd_kafka_headers_copy(hdrs); + + header_cnt = rd_kafka_header_cnt(hdrs); + TEST_ASSERT(header_cnt == 6, + "Expected 6 length got %" PRIusz "", + header_cnt); + + rd_kafka_headers_destroy(hdrs); + + /* Last header ("empty") is added below */ + + /* Try unsupported _V_HEADER() and _V_HEADERS() mix, + * must fail with CONFLICT */ + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_VALUE(&i, sizeof(i)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_HEADER("will_be_removed", "yep", -1), + RD_KAFKA_V_HEADERS(copied), + RD_KAFKA_V_HEADER("empty", "", 0), RD_KAFKA_V_END); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__CONFLICT, + "producev(): expected CONFLICT, got %s", + rd_kafka_err2str(err)); + + /* Proper call using only _V_HEADERS() */ + rd_kafka_header_add(copied, "empty", -1, "", -1); + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_VALUE(&i, sizeof(i)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_HEADERS(copied), RD_KAFKA_V_END); + TEST_ASSERT(!err, "producev() failed: %s", + rd_kafka_err2str(err)); + + } else { + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_VALUE(&i, sizeof(i)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)), + RD_KAFKA_V_HEADER("static", "hey", -1), + RD_KAFKA_V_HEADER("multi", "multi1", -1), + RD_KAFKA_V_HEADER("multi", "multi2", 6), + RD_KAFKA_V_HEADER("multi", "multi3", + strlen("multi3")), + RD_KAFKA_V_HEADER("null", NULL, 0), + RD_KAFKA_V_HEADER("empty", "", 0), RD_KAFKA_V_END); + TEST_ASSERT(!err, "producev() failed: %s", + rd_kafka_err2str(err)); + } + } + + /* Reset expected message id for dr */ + exp_msgid = 0; + + /* Wait for timeouts and delivery reports */ + rd_kafka_flush(rk, 5000); + + rd_kafka_destroy(rk); + + return 0; +} |