summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c381
1 files changed, 0 insertions, 381 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c
deleted file mode 100644
index e7e5c4074..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "test.h"
-#include "rdkafka.h"
-
-/**
- * Message Headers end-to-end tests
- */
-
-
-
-static int exp_msgid = 0;
-
-struct expect {
- const char *name;
- const char *value;
-};
-
-
-
-static void expect_check(const char *what,
- const struct expect *expected,
- rd_kafka_message_t *rkmessage,
- int is_const) {
- const struct expect *exp;
- rd_kafka_resp_err_t err;
- size_t idx = 0;
- const char *name;
- const char *value;
- size_t size;
- rd_kafka_headers_t *hdrs;
- int msgid;
-
- if (rkmessage->len != sizeof(msgid))
- TEST_FAIL("%s: expected message len %" PRIusz " == sizeof(int)",
- what, rkmessage->len);
-
- memcpy(&msgid, rkmessage->payload, rkmessage->len);
-
- if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) {
- if (msgid == 0) {
- rd_kafka_resp_err_t err2;
- TEST_SAYL(3, "%s: Msg #%d: no headers, good\n", what,
- msgid);
-
- err2 =
- rd_kafka_message_detach_headers(rkmessage, &hdrs);
- TEST_ASSERT(err == err2,
- "expected detach_headers() error %s "
- "to match headers() error %s",
- rd_kafka_err2str(err2),
- rd_kafka_err2str(err));
-
- return; /* No headers expected for first message */
- }
-
- TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid,
- rd_kafka_err2str(err));
- } else {
- TEST_ASSERT(msgid != 0,
- "%s: first message should have no headers", what);
- }
-
- test_headers_dump(what, 3, hdrs);
-
- for (idx = 0, exp = expected; !rd_kafka_header_get_all(
- hdrs, idx, &name, (const void **)&value, &size);
- idx++, exp++) {
-
- TEST_SAYL(3,
- "%s: Msg #%d: "
- "Header #%" PRIusz ": %s='%s' (expecting %s='%s')\n",
- what, msgid, idx, name, value ? value : "(NULL)",
- exp->name, exp->value ? exp->value : "(NULL)");
-
- if (strcmp(name, exp->name))
- TEST_FAIL(
- "%s: Msg #%d: "
- "Expected header %s at idx #%" PRIusz
- ", not '%s' (%" PRIusz ")",
- what, msgid, exp->name, idx, name, strlen(name));
-
- if (!strcmp(name, "msgid")) {
- int vid;
-
- /* Special handling: compare msgid header value
- * to message body, should be identical */
- if (size != rkmessage->len || size != sizeof(int))
- TEST_FAIL(
- "%s: "
- "Expected msgid/int-sized payload "
- "%" PRIusz ", got %" PRIusz,
- what, size, rkmessage->len);
-
- /* Copy to avoid unaligned access (by cast) */
- memcpy(&vid, value, size);
-
- if (vid != msgid)
- TEST_FAIL("%s: Header msgid %d != payload %d",
- what, vid, msgid);
-
- if (exp_msgid != vid)
- TEST_FAIL("%s: Expected msgid %d, not %d", what,
- exp_msgid, vid);
- continue;
- }
-
- if (!exp->value) {
- /* Expected NULL value */
- TEST_ASSERT(!value,
- "%s: Expected NULL value for %s, got %s",
- what, exp->name, value);
-
- } else {
- TEST_ASSERT(value,
- "%s: "
- "Expected non-NULL value for %s, got NULL",
- what, exp->name);
-
- TEST_ASSERT(size == strlen(exp->value),
- "%s: Expected size %" PRIusz
- " for %s, "
- "not %" PRIusz,
- what, strlen(exp->value), exp->name, size);
-
- TEST_ASSERT(value[size] == '\0',
- "%s: "
- "Expected implicit null-terminator for %s",
- what, exp->name);
-
- TEST_ASSERT(!strcmp(exp->value, value),
- "%s: "
- "Expected value %s for %s, not %s",
- what, exp->value, exp->name, value);
- }
- }
-
- TEST_ASSERT(exp->name == NULL,
- "%s: Expected the expected, but stuck at %s which was "
- "unexpected",
- what, exp->name);
-
- if (!strcmp(what, "handle_consumed_msg") && !is_const &&
- (msgid % 3) == 0) {
- rd_kafka_headers_t *dhdrs;
-
- err = rd_kafka_message_detach_headers(rkmessage, &dhdrs);
- TEST_ASSERT(!err, "detach_headers() should not fail, got %s",
- rd_kafka_err2str(err));
- TEST_ASSERT(hdrs == dhdrs);
-
- /* Verify that a new headers object can be obtained */
- err = rd_kafka_message_headers(rkmessage, &hdrs);
- TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR);
- TEST_ASSERT(hdrs != dhdrs);
- rd_kafka_headers_destroy(dhdrs);
-
- expect_check("post_detach_headers", expected, rkmessage,
- is_const);
- }
-}
-
-
-/**
- * @brief Final (as in no more header modifications) message check.
- */
-static void
-msg_final_check(const char *what, rd_kafka_message_t *rkmessage, int is_const) {
- const struct expect expected[] = {
- {"msgid", NULL}, /* special handling */
- {"static", "hey"}, {"null", NULL}, {"empty", ""},
- {"send1", "1"}, {"multi", "multi5"}, {NULL}};
-
- expect_check(what, expected, rkmessage, is_const);
-
- exp_msgid++;
-}
-
-/**
- * @brief Handle consumed message, must be identical to dr_msg_cb
- */
-static void handle_consumed_msg(rd_kafka_message_t *rkmessage) {
- msg_final_check(__FUNCTION__, rkmessage, 0);
-}
-
-/**
- * @brief Delivery report callback
- */
-static void
-dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
- TEST_ASSERT(!rkmessage->err, "Message delivery failed: %s",
- rd_kafka_err2str(rkmessage->err));
-
- msg_final_check(__FUNCTION__, (rd_kafka_message_t *)rkmessage, 1);
-}
-
-
-/**
- * @brief First on_send() interceptor
- */
-static rd_kafka_resp_err_t
-on_send1(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) {
- const struct expect expected[] = {
- {"msgid", NULL}, /* special handling */
- {"static", "hey"},
- {"multi", "multi1"},
- {"multi", "multi2"},
- {"multi", "multi3"},
- {"null", NULL},
- {"empty", ""},
- {NULL}};
- rd_kafka_headers_t *hdrs;
- rd_kafka_resp_err_t err;
-
- expect_check(__FUNCTION__, expected, rkmessage, 0);
-
- err = rd_kafka_message_headers(rkmessage, &hdrs);
- if (err) /* First message has no headers. */
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1);
- rd_kafka_header_add(hdrs, "send1", -1, "1", -1);
- rd_kafka_header_remove(hdrs, "multi");
- rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Second on_send() interceptor
- */
-static rd_kafka_resp_err_t
-on_send2(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) {
- const struct expect expected[] = {
- {"msgid", NULL}, /* special handling */
- {"static", "hey"}, {"null", NULL}, {"empty", ""},
- {"send1", "1"}, {"multi", "multi5"}, {NULL}};
-
- expect_check(__FUNCTION__, expected, rkmessage, 0);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-/**
- * @brief on_new() interceptor to set up message interceptors
- * from rd_kafka_new().
- */
-static rd_kafka_resp_err_t on_new(rd_kafka_t *rk,
- const rd_kafka_conf_t *conf,
- void *ic_opaque,
- char *errstr,
- size_t errstr_size) {
- rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL);
- rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL);
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-static void do_produce(const char *topic, int msgcnt) {
- rd_kafka_t *rk;
- rd_kafka_conf_t *conf;
- int i;
- rd_kafka_resp_err_t err;
-
- test_conf_init(&conf, NULL, 0);
- test_conf_set(conf, "acks", "all");
- rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
-
- rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL);
-
- rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
-
- /* First message is without headers (negative testing) */
- i = 0;
- err = rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE(&i, sizeof(i)),
- RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "producev() failed: %s", rd_kafka_err2str(err));
- exp_msgid++;
-
- for (i = 1; i < msgcnt; i++, exp_msgid++) {
- err = rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE(&i, sizeof(i)),
- RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
- RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)),
- RD_KAFKA_V_HEADER("static", "hey", -1),
- RD_KAFKA_V_HEADER("multi", "multi1", -1),
- RD_KAFKA_V_HEADER("multi", "multi2", 6),
- RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")),
- RD_KAFKA_V_HEADER("null", NULL, 0),
- RD_KAFKA_V_HEADER("empty", "", 0), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "producev() failed: %s",
- rd_kafka_err2str(err));
- }
-
- /* Reset expected message id for dr */
- exp_msgid = 0;
-
- /* Wait for timeouts and delivery reports */
- rd_kafka_flush(rk, tmout_multip(5000));
-
- rd_kafka_destroy(rk);
-}
-
-static void do_consume(const char *topic, int msgcnt) {
- rd_kafka_t *rk;
- rd_kafka_topic_partition_list_t *parts;
-
- rk = test_create_consumer(topic, NULL, NULL, NULL);
-
- parts = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(parts, topic, 0)->offset =
- RD_KAFKA_OFFSET_BEGINNING;
-
- test_consumer_assign("assign", rk, parts);
-
- rd_kafka_topic_partition_list_destroy(parts);
-
- exp_msgid = 0;
-
- while (exp_msgid < msgcnt) {
- rd_kafka_message_t *rkm;
-
- rkm = rd_kafka_consumer_poll(rk, 1000);
- if (!rkm)
- continue;
-
- if (rkm->err)
- TEST_FAIL(
- "consume error while expecting msgid %d/%d: "
- "%s",
- exp_msgid, msgcnt, rd_kafka_message_errstr(rkm));
-
- handle_consumed_msg(rkm);
-
- rd_kafka_message_destroy(rkm);
- }
-
- test_consumer_close(rk);
- rd_kafka_destroy(rk);
-}
-
-
-int main_0073_headers(int argc, char **argv) {
- const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 1);
- const int msgcnt = 10;
-
- do_produce(topic, msgcnt);
- do_consume(topic, msgcnt);
-
- return 0;
-}