summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c448
1 files changed, 448 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c
new file mode 100644
index 000000000..0576d611a
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0072-headers_ut.c
@@ -0,0 +1,448 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "rdkafka.h"
+
+/**
+ * Local (no broker) unit-like tests of Message Headers
+ */
+
+
+
+static int exp_msgid = 0;
+
+struct expect {
+ const char *name;
+ const char *value;
+};
+
+/**
+ * @brief returns the message id
+ */
+static int expect_check(const char *what,
+ const struct expect *expected,
+ const rd_kafka_message_t *rkmessage) {
+ const struct expect *exp;
+ rd_kafka_resp_err_t err;
+ size_t idx = 0;
+ const char *name;
+ const char *value;
+ size_t size;
+ rd_kafka_headers_t *hdrs;
+ int msgid;
+
+ if (rkmessage->len != sizeof(msgid))
+ TEST_FAIL("%s: expected message len %" PRIusz " == sizeof(int)",
+ what, rkmessage->len);
+
+ memcpy(&msgid, rkmessage->payload, rkmessage->len);
+
+ if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) {
+ if (msgid == 0)
+ return 0; /* No headers expected for first message */
+
+ TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid,
+ rd_kafka_err2str(err));
+ } else {
+ TEST_ASSERT(msgid != 0,
+ "%s: first message should have no headers", what);
+ }
+
+ /* msgid should always be first and has a variable value so hard to
+ * match with the expect struct. */
+ for (idx = 0, exp = expected; !rd_kafka_header_get_all(
+ hdrs, idx, &name, (const void **)&value, &size);
+ idx++, exp++) {
+
+ TEST_SAYL(3,
+ "%s: Msg #%d: "
+ "Header #%" PRIusz ": %s='%s' (expecting %s='%s')\n",
+ what, msgid, idx, name, value ? value : "(NULL)",
+ exp->name, exp->value ? exp->value : "(NULL)");
+
+ if (strcmp(name, exp->name))
+ TEST_FAIL("%s: Expected header %s at idx #%" PRIusz
+ ", not %s",
+ what, exp->name, idx - 1, name);
+
+ if (!strcmp(name, "msgid")) {
+ int vid;
+
+ /* Special handling: compare msgid header value
+ * to message body, should be identical */
+ if (size != rkmessage->len || size != sizeof(int))
+ TEST_FAIL(
+ "%s: "
+ "Expected msgid/int-sized payload "
+ "%" PRIusz ", got %" PRIusz,
+ what, size, rkmessage->len);
+
+ /* Copy to avoid unaligned access (by cast) */
+ memcpy(&vid, value, size);
+
+ if (vid != msgid)
+ TEST_FAIL("%s: Header msgid %d != payload %d",
+ what, vid, msgid);
+
+ if (exp_msgid != vid)
+ TEST_FAIL("%s: Expected msgid %d, not %d", what,
+ exp_msgid, vid);
+ continue;
+ }
+
+ if (!exp->value) {
+ /* Expected NULL value */
+ TEST_ASSERT(!value,
+ "%s: Expected NULL value for %s, got %s",
+ what, exp->name, value);
+
+ } else {
+ TEST_ASSERT(value,
+ "%s: "
+ "Expected non-NULL value for %s, got NULL",
+ what, exp->name);
+
+ TEST_ASSERT(size == strlen(exp->value),
+ "%s: Expected size %" PRIusz
+ " for %s, "
+ "not %" PRIusz,
+ what, strlen(exp->value), exp->name, size);
+
+ TEST_ASSERT(value[size] == '\0',
+ "%s: "
+ "Expected implicit null-terminator for %s",
+ what, exp->name);
+
+ TEST_ASSERT(!strcmp(exp->value, value),
+ "%s: "
+ "Expected value %s for %s, not %s",
+ what, exp->value, exp->name, value);
+ }
+ }
+
+ TEST_ASSERT(exp->name == NULL,
+ "%s: Expected the expected, but stuck at %s which was "
+ "unexpected",
+ what, exp->name);
+
+ return msgid;
+}
+
+
+/**
+ * @brief Delivery report callback
+ */
+static void
+dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
+ const struct expect expected[] = {
+ {"msgid", NULL}, /* special handling */
+ {"static", "hey"}, {"null", NULL}, {"empty", ""},
+ {"send1", "1"}, {"multi", "multi5"}, {NULL}};
+ const struct expect replace_expected[] = {
+ {"msgid", NULL}, {"new", "one"},
+ {"this is the", NULL}, {"replaced headers\"", ""},
+ {"new", "right?"}, {NULL}};
+ const struct expect *exp;
+ rd_kafka_headers_t *new_hdrs;
+ int msgid;
+
+ TEST_ASSERT(rkmessage->err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
+ "Expected message to fail with MSG_TIMED_OUT, not %s",
+ rd_kafka_err2str(rkmessage->err));
+
+ msgid = expect_check(__FUNCTION__, expected, rkmessage);
+
+ /* Replace entire headers list */
+ if (msgid > 0) {
+ new_hdrs = rd_kafka_headers_new(1);
+ rd_kafka_header_add(new_hdrs, "msgid", -1, &msgid,
+ sizeof(msgid));
+ for (exp = &replace_expected[1]; exp->name; exp++)
+ rd_kafka_header_add(new_hdrs, exp->name, -1, exp->value,
+ -1);
+
+ rd_kafka_message_set_headers((rd_kafka_message_t *)rkmessage,
+ new_hdrs);
+
+ expect_check(__FUNCTION__, replace_expected, rkmessage);
+ }
+
+ exp_msgid++;
+}
+
+static void expect_iter(const char *what,
+ const rd_kafka_headers_t *hdrs,
+ const char *name,
+ const char **expected,
+ size_t cnt) {
+ size_t idx;
+ rd_kafka_resp_err_t err;
+ const void *value;
+ size_t size;
+
+ for (idx = 0;
+ !(err = rd_kafka_header_get(hdrs, idx, name, &value, &size));
+ idx++) {
+ TEST_ASSERT(idx < cnt,
+ "%s: too many headers matching '%s', "
+ "expected %" PRIusz,
+ what, name, cnt);
+ TEST_SAYL(3,
+ "%s: get(%" PRIusz
+ ", '%s') "
+ "expecting '%s' =? '%s'\n",
+ what, idx, name, expected[idx], (const char *)value);
+
+
+ TEST_ASSERT(
+ !strcmp((const char *)value, expected[idx]),
+ "%s: get(%" PRIusz ", '%s') expected '%s', not '%s'", what,
+ idx, name, expected[idx], (const char *)value);
+ }
+
+ TEST_ASSERT(idx == cnt,
+ "%s: expected %" PRIusz
+ " headers matching '%s', not %" PRIusz,
+ what, cnt, name, idx);
+}
+
+
+
+/**
+ * @brief First on_send() interceptor
+ */
+static rd_kafka_resp_err_t
+on_send1(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) {
+ const struct expect expected[] = {
+ {"msgid", NULL}, /* special handling */
+ {"static", "hey"},
+ {"multi", "multi1"},
+ {"multi", "multi2"},
+ {"multi", "multi3"},
+ {"null", NULL},
+ {"empty", ""},
+ {NULL}};
+ const char *expect_iter_multi[4] = {
+ "multi1", "multi2", "multi3", "multi4" /* added below */
+ };
+ const char *expect_iter_static[1] = {"hey"};
+ rd_kafka_headers_t *hdrs;
+ size_t header_cnt;
+ rd_kafka_resp_err_t err;
+ const void *value;
+ size_t size;
+
+ expect_check(__FUNCTION__, expected, rkmessage);
+
+ err = rd_kafka_message_headers(rkmessage, &hdrs);
+ if (err) /* First message has no headers. */
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ header_cnt = rd_kafka_header_cnt(hdrs);
+ TEST_ASSERT(header_cnt == 7, "Expected 7 length got %" PRIusz "",
+ header_cnt);
+
+ rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1);
+
+ header_cnt = rd_kafka_header_cnt(hdrs);
+ TEST_ASSERT(header_cnt == 8, "Expected 8 length got %" PRIusz "",
+ header_cnt);
+
+ /* test iter() */
+ expect_iter(__FUNCTION__, hdrs, "multi", expect_iter_multi, 4);
+ expect_iter(__FUNCTION__, hdrs, "static", expect_iter_static, 1);
+ expect_iter(__FUNCTION__, hdrs, "notexists", NULL, 0);
+
+ rd_kafka_header_add(hdrs, "send1", -1, "1", -1);
+
+ header_cnt = rd_kafka_header_cnt(hdrs);
+ TEST_ASSERT(header_cnt == 9, "Expected 9 length got %" PRIusz "",
+ header_cnt);
+
+ rd_kafka_header_remove(hdrs, "multi");
+
+ header_cnt = rd_kafka_header_cnt(hdrs);
+ TEST_ASSERT(header_cnt == 5, "Expected 5 length got %" PRIusz "",
+ header_cnt);
+
+ rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1);
+
+ header_cnt = rd_kafka_header_cnt(hdrs);
+ TEST_ASSERT(header_cnt == 6, "Expected 6 length got %" PRIusz "",
+ header_cnt);
+
+ /* test get_last() */
+ err = rd_kafka_header_get_last(hdrs, "multi", &value, &size);
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ TEST_ASSERT(size == strlen("multi5") &&
+ !strcmp((const char *)value, "multi5"),
+ "expected 'multi5', not '%s'", (const char *)value);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Second on_send() interceptor
+ */
+static rd_kafka_resp_err_t
+on_send2(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) {
+ const struct expect expected[] = {
+ {"msgid", NULL}, /* special handling */
+ {"static", "hey"}, {"null", NULL}, {"empty", ""},
+ {"send1", "1"}, {"multi", "multi5"}, {NULL}};
+
+ expect_check(__FUNCTION__, expected, rkmessage);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief on_new() interceptor to set up message interceptors
+ * from rd_kafka_new().
+ */
+static rd_kafka_resp_err_t on_new(rd_kafka_t *rk,
+ const rd_kafka_conf_t *conf,
+ void *ic_opaque,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL);
+ rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+int main_0072_headers_ut(int argc, char **argv) {
+ const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 0);
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ int i;
+ size_t header_cnt;
+ const int msgcnt = 10;
+ rd_kafka_resp_err_t err;
+
+ conf = rd_kafka_conf_new();
+ test_conf_set(conf, "message.timeout.ms", "1");
+ rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
+
+ rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL);
+
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ /* First message is without headers (negative testing) */
+ i = 0;
+ err = rd_kafka_producev(
+ rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_VALUE(&i, sizeof(i)),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_END);
+ TEST_ASSERT(!err, "producev() failed: %s", rd_kafka_err2str(err));
+ exp_msgid++;
+
+ for (i = 1; i < msgcnt; i++, exp_msgid++) {
+ /* Use headers list on one message */
+ if (i == 3) {
+ rd_kafka_headers_t *hdrs = rd_kafka_headers_new(4);
+
+ header_cnt = rd_kafka_header_cnt(hdrs);
+ TEST_ASSERT(header_cnt == 0,
+ "Expected 0 length got %" PRIusz "",
+ header_cnt);
+
+ rd_kafka_headers_t *copied;
+
+ rd_kafka_header_add(hdrs, "msgid", -1, &i, sizeof(i));
+ rd_kafka_header_add(hdrs, "static", -1, "hey", -1);
+ rd_kafka_header_add(hdrs, "multi", -1, "multi1", -1);
+ rd_kafka_header_add(hdrs, "multi", -1, "multi2", 6);
+ rd_kafka_header_add(hdrs, "multi", -1, "multi3",
+ strlen("multi3"));
+ rd_kafka_header_add(hdrs, "null", -1, NULL, 0);
+
+ /* Make a copy of the headers to verify copy() */
+ copied = rd_kafka_headers_copy(hdrs);
+
+ header_cnt = rd_kafka_header_cnt(hdrs);
+ TEST_ASSERT(header_cnt == 6,
+ "Expected 6 length got %" PRIusz "",
+ header_cnt);
+
+ rd_kafka_headers_destroy(hdrs);
+
+ /* Last header ("empty") is added below */
+
+ /* Try unsupported _V_HEADER() and _V_HEADERS() mix,
+ * must fail with CONFLICT */
+ err = rd_kafka_producev(
+ rk, RD_KAFKA_V_TOPIC(topic),
+ RD_KAFKA_V_VALUE(&i, sizeof(i)),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+ RD_KAFKA_V_HEADER("will_be_removed", "yep", -1),
+ RD_KAFKA_V_HEADERS(copied),
+ RD_KAFKA_V_HEADER("empty", "", 0), RD_KAFKA_V_END);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__CONFLICT,
+ "producev(): expected CONFLICT, got %s",
+ rd_kafka_err2str(err));
+
+ /* Proper call using only _V_HEADERS() */
+ rd_kafka_header_add(copied, "empty", -1, "", -1);
+ err = rd_kafka_producev(
+ rk, RD_KAFKA_V_TOPIC(topic),
+ RD_KAFKA_V_VALUE(&i, sizeof(i)),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+ RD_KAFKA_V_HEADERS(copied), RD_KAFKA_V_END);
+ TEST_ASSERT(!err, "producev() failed: %s",
+ rd_kafka_err2str(err));
+
+ } else {
+ err = rd_kafka_producev(
+ rk, RD_KAFKA_V_TOPIC(topic),
+ RD_KAFKA_V_VALUE(&i, sizeof(i)),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+ RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)),
+ RD_KAFKA_V_HEADER("static", "hey", -1),
+ RD_KAFKA_V_HEADER("multi", "multi1", -1),
+ RD_KAFKA_V_HEADER("multi", "multi2", 6),
+ RD_KAFKA_V_HEADER("multi", "multi3",
+ strlen("multi3")),
+ RD_KAFKA_V_HEADER("null", NULL, 0),
+ RD_KAFKA_V_HEADER("empty", "", 0), RD_KAFKA_V_END);
+ TEST_ASSERT(!err, "producev() failed: %s",
+ rd_kafka_err2str(err));
+ }
+ }
+
+ /* Reset expected message id for dr */
+ exp_msgid = 0;
+
+ /* Wait for timeouts and delivery reports */
+ rd_kafka_flush(rk, 5000);
+
+ rd_kafka_destroy(rk);
+
+ return 0;
+}