diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c | 381 |
1 files changed, 0 insertions, 381 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c deleted file mode 100644 index e7e5c4074..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/0073-headers.c +++ /dev/null @@ -1,381 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012-2015, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "test.h" -#include "rdkafka.h" - -/** - * Message Headers end-to-end tests - */ - - - -static int exp_msgid = 0; - -struct expect { - const char *name; - const char *value; -}; - - - -static void expect_check(const char *what, - const struct expect *expected, - rd_kafka_message_t *rkmessage, - int is_const) { - const struct expect *exp; - rd_kafka_resp_err_t err; - size_t idx = 0; - const char *name; - const char *value; - size_t size; - rd_kafka_headers_t *hdrs; - int msgid; - - if (rkmessage->len != sizeof(msgid)) - TEST_FAIL("%s: expected message len %" PRIusz " == sizeof(int)", - what, rkmessage->len); - - memcpy(&msgid, rkmessage->payload, rkmessage->len); - - if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) { - if (msgid == 0) { - rd_kafka_resp_err_t err2; - TEST_SAYL(3, "%s: Msg #%d: no headers, good\n", what, - msgid); - - err2 = - rd_kafka_message_detach_headers(rkmessage, &hdrs); - TEST_ASSERT(err == err2, - "expected detach_headers() error %s " - "to match headers() error %s", - rd_kafka_err2str(err2), - rd_kafka_err2str(err)); - - return; /* No headers expected for first message */ - } - - TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid, - rd_kafka_err2str(err)); - } else { - TEST_ASSERT(msgid != 0, - "%s: first message should have no headers", what); - } - - test_headers_dump(what, 3, hdrs); - - for (idx = 0, exp = expected; !rd_kafka_header_get_all( - hdrs, idx, &name, (const void **)&value, &size); - idx++, exp++) { - - TEST_SAYL(3, - "%s: Msg #%d: " - "Header #%" PRIusz ": %s='%s' (expecting %s='%s')\n", - what, msgid, idx, name, value ? value : "(NULL)", - exp->name, exp->value ? exp->value : "(NULL)"); - - if (strcmp(name, exp->name)) - TEST_FAIL( - "%s: Msg #%d: " - "Expected header %s at idx #%" PRIusz - ", not '%s' (%" PRIusz ")", - what, msgid, exp->name, idx, name, strlen(name)); - - if (!strcmp(name, "msgid")) { - int vid; - - /* Special handling: compare msgid header value - * to message body, should be identical */ - if (size != rkmessage->len || size != sizeof(int)) - TEST_FAIL( - "%s: " - "Expected msgid/int-sized payload " - "%" PRIusz ", got %" PRIusz, - what, size, rkmessage->len); - - /* Copy to avoid unaligned access (by cast) */ - memcpy(&vid, value, size); - - if (vid != msgid) - TEST_FAIL("%s: Header msgid %d != payload %d", - what, vid, msgid); - - if (exp_msgid != vid) - TEST_FAIL("%s: Expected msgid %d, not %d", what, - exp_msgid, vid); - continue; - } - - if (!exp->value) { - /* Expected NULL value */ - TEST_ASSERT(!value, - "%s: Expected NULL value for %s, got %s", - what, exp->name, value); - - } else { - TEST_ASSERT(value, - "%s: " - "Expected non-NULL value for %s, got NULL", - what, exp->name); - - TEST_ASSERT(size == strlen(exp->value), - "%s: Expected size %" PRIusz - " for %s, " - "not %" PRIusz, - what, strlen(exp->value), exp->name, size); - - TEST_ASSERT(value[size] == '\0', - "%s: " - "Expected implicit null-terminator for %s", - what, exp->name); - - TEST_ASSERT(!strcmp(exp->value, value), - "%s: " - "Expected value %s for %s, not %s", - what, exp->value, exp->name, value); - } - } - - TEST_ASSERT(exp->name == NULL, - "%s: Expected the expected, but stuck at %s which was " - "unexpected", - what, exp->name); - - if (!strcmp(what, "handle_consumed_msg") && !is_const && - (msgid % 3) == 0) { - rd_kafka_headers_t *dhdrs; - - err = rd_kafka_message_detach_headers(rkmessage, &dhdrs); - TEST_ASSERT(!err, "detach_headers() should not fail, got %s", - rd_kafka_err2str(err)); - TEST_ASSERT(hdrs == dhdrs); - - /* Verify that a new headers object can be obtained */ - err = rd_kafka_message_headers(rkmessage, &hdrs); - TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR); - TEST_ASSERT(hdrs != dhdrs); - rd_kafka_headers_destroy(dhdrs); - - expect_check("post_detach_headers", expected, rkmessage, - is_const); - } -} - - -/** - * @brief Final (as in no more header modifications) message check. - */ -static void -msg_final_check(const char *what, rd_kafka_message_t *rkmessage, int is_const) { - const struct expect expected[] = { - {"msgid", NULL}, /* special handling */ - {"static", "hey"}, {"null", NULL}, {"empty", ""}, - {"send1", "1"}, {"multi", "multi5"}, {NULL}}; - - expect_check(what, expected, rkmessage, is_const); - - exp_msgid++; -} - -/** - * @brief Handle consumed message, must be identical to dr_msg_cb - */ -static void handle_consumed_msg(rd_kafka_message_t *rkmessage) { - msg_final_check(__FUNCTION__, rkmessage, 0); -} - -/** - * @brief Delivery report callback - */ -static void -dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { - TEST_ASSERT(!rkmessage->err, "Message delivery failed: %s", - rd_kafka_err2str(rkmessage->err)); - - msg_final_check(__FUNCTION__, (rd_kafka_message_t *)rkmessage, 1); -} - - -/** - * @brief First on_send() interceptor - */ -static rd_kafka_resp_err_t -on_send1(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) { - const struct expect expected[] = { - {"msgid", NULL}, /* special handling */ - {"static", "hey"}, - {"multi", "multi1"}, - {"multi", "multi2"}, - {"multi", "multi3"}, - {"null", NULL}, - {"empty", ""}, - {NULL}}; - rd_kafka_headers_t *hdrs; - rd_kafka_resp_err_t err; - - expect_check(__FUNCTION__, expected, rkmessage, 0); - - err = rd_kafka_message_headers(rkmessage, &hdrs); - if (err) /* First message has no headers. */ - return RD_KAFKA_RESP_ERR_NO_ERROR; - - rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1); - rd_kafka_header_add(hdrs, "send1", -1, "1", -1); - rd_kafka_header_remove(hdrs, "multi"); - rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Second on_send() interceptor - */ -static rd_kafka_resp_err_t -on_send2(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) { - const struct expect expected[] = { - {"msgid", NULL}, /* special handling */ - {"static", "hey"}, {"null", NULL}, {"empty", ""}, - {"send1", "1"}, {"multi", "multi5"}, {NULL}}; - - expect_check(__FUNCTION__, expected, rkmessage, 0); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - -/** - * @brief on_new() interceptor to set up message interceptors - * from rd_kafka_new(). - */ -static rd_kafka_resp_err_t on_new(rd_kafka_t *rk, - const rd_kafka_conf_t *conf, - void *ic_opaque, - char *errstr, - size_t errstr_size) { - rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL); - rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL); - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -static void do_produce(const char *topic, int msgcnt) { - rd_kafka_t *rk; - rd_kafka_conf_t *conf; - int i; - rd_kafka_resp_err_t err; - - test_conf_init(&conf, NULL, 0); - test_conf_set(conf, "acks", "all"); - rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); - - rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL); - - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - - /* First message is without headers (negative testing) */ - i = 0; - err = rd_kafka_producev( - rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), - RD_KAFKA_V_VALUE(&i, sizeof(i)), - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_END); - TEST_ASSERT(!err, "producev() failed: %s", rd_kafka_err2str(err)); - exp_msgid++; - - for (i = 1; i < msgcnt; i++, exp_msgid++) { - err = rd_kafka_producev( - rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), - RD_KAFKA_V_VALUE(&i, sizeof(i)), - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)), - RD_KAFKA_V_HEADER("static", "hey", -1), - RD_KAFKA_V_HEADER("multi", "multi1", -1), - RD_KAFKA_V_HEADER("multi", "multi2", 6), - RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")), - RD_KAFKA_V_HEADER("null", NULL, 0), - RD_KAFKA_V_HEADER("empty", "", 0), RD_KAFKA_V_END); - TEST_ASSERT(!err, "producev() failed: %s", - rd_kafka_err2str(err)); - } - - /* Reset expected message id for dr */ - exp_msgid = 0; - - /* Wait for timeouts and delivery reports */ - rd_kafka_flush(rk, tmout_multip(5000)); - - rd_kafka_destroy(rk); -} - -static void do_consume(const char *topic, int msgcnt) { - rd_kafka_t *rk; - rd_kafka_topic_partition_list_t *parts; - - rk = test_create_consumer(topic, NULL, NULL, NULL); - - parts = rd_kafka_topic_partition_list_new(1); - rd_kafka_topic_partition_list_add(parts, topic, 0)->offset = - RD_KAFKA_OFFSET_BEGINNING; - - test_consumer_assign("assign", rk, parts); - - rd_kafka_topic_partition_list_destroy(parts); - - exp_msgid = 0; - - while (exp_msgid < msgcnt) { - rd_kafka_message_t *rkm; - - rkm = rd_kafka_consumer_poll(rk, 1000); - if (!rkm) - continue; - - if (rkm->err) - TEST_FAIL( - "consume error while expecting msgid %d/%d: " - "%s", - exp_msgid, msgcnt, rd_kafka_message_errstr(rkm)); - - handle_consumed_msg(rkm); - - rd_kafka_message_destroy(rkm); - } - - test_consumer_close(rk); - rd_kafka_destroy(rk); -} - - -int main_0073_headers(int argc, char **argv) { - const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 1); - const int msgcnt = 10; - - do_produce(topic, msgcnt); - do_consume(topic, msgcnt); - - return 0; -} |