/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2015, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "test.h" #include "rdkafka.h" /** * Message Headers end-to-end tests */ static int exp_msgid = 0; struct expect { const char *name; const char *value; }; static void expect_check(const char *what, const struct expect *expected, rd_kafka_message_t *rkmessage, int is_const) { const struct expect *exp; rd_kafka_resp_err_t err; size_t idx = 0; const char *name; const char *value; size_t size; rd_kafka_headers_t *hdrs; int msgid; if (rkmessage->len != sizeof(msgid)) TEST_FAIL("%s: expected message len %" PRIusz " == sizeof(int)", what, rkmessage->len); memcpy(&msgid, rkmessage->payload, rkmessage->len); if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) { if (msgid == 0) { rd_kafka_resp_err_t err2; TEST_SAYL(3, "%s: Msg #%d: no headers, good\n", what, msgid); err2 = rd_kafka_message_detach_headers(rkmessage, &hdrs); TEST_ASSERT(err == err2, "expected detach_headers() error %s " "to match headers() error %s", rd_kafka_err2str(err2), rd_kafka_err2str(err)); return; /* No headers expected for first message */ } TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid, rd_kafka_err2str(err)); } else { TEST_ASSERT(msgid != 0, "%s: first message should have no headers", what); } test_headers_dump(what, 3, hdrs); for (idx = 0, exp = expected; !rd_kafka_header_get_all( hdrs, idx, &name, (const void **)&value, &size); idx++, exp++) { TEST_SAYL(3, "%s: Msg #%d: " "Header #%" PRIusz ": %s='%s' (expecting %s='%s')\n", what, msgid, idx, name, value ? value : "(NULL)", exp->name, exp->value ? exp->value : "(NULL)"); if (strcmp(name, exp->name)) TEST_FAIL( "%s: Msg #%d: " "Expected header %s at idx #%" PRIusz ", not '%s' (%" PRIusz ")", what, msgid, exp->name, idx, name, strlen(name)); if (!strcmp(name, "msgid")) { int vid; /* Special handling: compare msgid header value * to message body, should be identical */ if (size != rkmessage->len || size != sizeof(int)) TEST_FAIL( "%s: " "Expected msgid/int-sized payload " "%" PRIusz ", got %" PRIusz, what, size, rkmessage->len); /* Copy to avoid unaligned access (by cast) */ memcpy(&vid, value, size); if (vid != msgid) TEST_FAIL("%s: Header msgid %d != payload %d", what, vid, msgid); if (exp_msgid != vid) TEST_FAIL("%s: Expected msgid %d, not %d", what, exp_msgid, vid); continue; } if (!exp->value) { /* Expected NULL value */ TEST_ASSERT(!value, "%s: Expected NULL value for %s, got %s", what, exp->name, value); } else { TEST_ASSERT(value, "%s: " "Expected non-NULL value for %s, got NULL", what, exp->name); TEST_ASSERT(size == strlen(exp->value), "%s: Expected size %" PRIusz " for %s, " "not %" PRIusz, what, strlen(exp->value), exp->name, size); TEST_ASSERT(value[size] == '\0', "%s: " "Expected implicit null-terminator for %s", what, exp->name); TEST_ASSERT(!strcmp(exp->value, value), "%s: " "Expected value %s for %s, not %s", what, exp->value, exp->name, value); } } TEST_ASSERT(exp->name == NULL, "%s: Expected the expected, but stuck at %s which was " "unexpected", what, exp->name); if (!strcmp(what, "handle_consumed_msg") && !is_const && (msgid % 3) == 0) { rd_kafka_headers_t *dhdrs; err = rd_kafka_message_detach_headers(rkmessage, &dhdrs); TEST_ASSERT(!err, "detach_headers() should not fail, got %s", rd_kafka_err2str(err)); TEST_ASSERT(hdrs == dhdrs); /* Verify that a new headers object can be obtained */ err = rd_kafka_message_headers(rkmessage, &hdrs); TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR); TEST_ASSERT(hdrs != dhdrs); rd_kafka_headers_destroy(dhdrs); expect_check("post_detach_headers", expected, rkmessage, is_const); } } /** * @brief Final (as in no more header modifications) message check. */ static void msg_final_check(const char *what, rd_kafka_message_t *rkmessage, int is_const) { const struct expect expected[] = { {"msgid", NULL}, /* special handling */ {"static", "hey"}, {"null", NULL}, {"empty", ""}, {"send1", "1"}, {"multi", "multi5"}, {NULL}}; expect_check(what, expected, rkmessage, is_const); exp_msgid++; } /** * @brief Handle consumed message, must be identical to dr_msg_cb */ static void handle_consumed_msg(rd_kafka_message_t *rkmessage) { msg_final_check(__FUNCTION__, rkmessage, 0); } /** * @brief Delivery report callback */ static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { TEST_ASSERT(!rkmessage->err, "Message delivery failed: %s", rd_kafka_err2str(rkmessage->err)); msg_final_check(__FUNCTION__, (rd_kafka_message_t *)rkmessage, 1); } /** * @brief First on_send() interceptor */ static rd_kafka_resp_err_t on_send1(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) { const struct expect expected[] = { {"msgid", NULL}, /* special handling */ {"static", "hey"}, {"multi", "multi1"}, {"multi", "multi2"}, {"multi", "multi3"}, {"null", NULL}, {"empty", ""}, {NULL}}; rd_kafka_headers_t *hdrs; rd_kafka_resp_err_t err; expect_check(__FUNCTION__, expected, rkmessage, 0); err = rd_kafka_message_headers(rkmessage, &hdrs); if (err) /* First message has no headers. */ return RD_KAFKA_RESP_ERR_NO_ERROR; rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1); rd_kafka_header_add(hdrs, "send1", -1, "1", -1); rd_kafka_header_remove(hdrs, "multi"); rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Second on_send() interceptor */ static rd_kafka_resp_err_t on_send2(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque) { const struct expect expected[] = { {"msgid", NULL}, /* special handling */ {"static", "hey"}, {"null", NULL}, {"empty", ""}, {"send1", "1"}, {"multi", "multi5"}, {NULL}}; expect_check(__FUNCTION__, expected, rkmessage, 0); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief on_new() interceptor to set up message interceptors * from rd_kafka_new(). */ static rd_kafka_resp_err_t on_new(rd_kafka_t *rk, const rd_kafka_conf_t *conf, void *ic_opaque, char *errstr, size_t errstr_size) { rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL); rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL); return RD_KAFKA_RESP_ERR_NO_ERROR; } static void do_produce(const char *topic, int msgcnt) { rd_kafka_t *rk; rd_kafka_conf_t *conf; int i; rd_kafka_resp_err_t err; test_conf_init(&conf, NULL, 0); test_conf_set(conf, "acks", "all"); rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL); rk = test_create_handle(RD_KAFKA_PRODUCER, conf); /* First message is without headers (negative testing) */ i = 0; err = rd_kafka_producev( rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE(&i, sizeof(i)), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_END); TEST_ASSERT(!err, "producev() failed: %s", rd_kafka_err2str(err)); exp_msgid++; for (i = 1; i < msgcnt; i++, exp_msgid++) { err = rd_kafka_producev( rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE(&i, sizeof(i)), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)), RD_KAFKA_V_HEADER("static", "hey", -1), RD_KAFKA_V_HEADER("multi", "multi1", -1), RD_KAFKA_V_HEADER("multi", "multi2", 6), RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")), RD_KAFKA_V_HEADER("null", NULL, 0), RD_KAFKA_V_HEADER("empty", "", 0), RD_KAFKA_V_END); TEST_ASSERT(!err, "producev() failed: %s", rd_kafka_err2str(err)); } /* Reset expected message id for dr */ exp_msgid = 0; /* Wait for timeouts and delivery reports */ rd_kafka_flush(rk, tmout_multip(5000)); rd_kafka_destroy(rk); } static void do_consume(const char *topic, int msgcnt) { rd_kafka_t *rk; rd_kafka_topic_partition_list_t *parts; rk = test_create_consumer(topic, NULL, NULL, NULL); parts = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(parts, topic, 0)->offset = RD_KAFKA_OFFSET_BEGINNING; test_consumer_assign("assign", rk, parts); rd_kafka_topic_partition_list_destroy(parts); exp_msgid = 0; while (exp_msgid < msgcnt) { rd_kafka_message_t *rkm; rkm = rd_kafka_consumer_poll(rk, 1000); if (!rkm) continue; if (rkm->err) TEST_FAIL( "consume error while expecting msgid %d/%d: " "%s", exp_msgid, msgcnt, rd_kafka_message_errstr(rkm)); handle_consumed_msg(rkm); rd_kafka_message_destroy(rkm); } test_consumer_close(rk); rd_kafka_destroy(rk); } int main_0073_headers(int argc, char **argv) { const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 1); const int msgcnt = 10; do_produce(topic, msgcnt); do_consume(topic, msgcnt); return 0; }