diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c | 473 |
1 files changed, 0 insertions, 473 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c deleted file mode 100644 index 26a7ac070..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c +++ /dev/null @@ -1,473 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012-2013, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -/** - * Produce NULL payload messages, then consume them. - */ - -#include "test.h" - -/* Typical include path would be <librdkafka/rdkafka.h>, but this program - * is built from within the librdkafka source tree and thus differs. */ -#include "rdkafka.h" /* for Kafka driver */ - - -static int prod_msg_remains = 0; -static int fails = 0; - -/** - * Delivery reported callback. - * Called for each message once to signal its delivery status. - */ -static void dr_cb(rd_kafka_t *rk, - void *payload, - size_t len, - rd_kafka_resp_err_t err, - void *opaque, - void *msg_opaque) { - - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) - TEST_FAIL("Message delivery failed: %s\n", - rd_kafka_err2str(err)); - - if (prod_msg_remains == 0) - TEST_FAIL("Too many messages delivered (prod_msg_remains %i)", - prod_msg_remains); - - prod_msg_remains--; -} - - -/** - * Produces 'msgcnt' messages split over 'partition_cnt' partitions. - */ -static void produce_null_messages(uint64_t testid, - const char *topic, - int partition_cnt, - int msgcnt) { - int r; - rd_kafka_t *rk; - rd_kafka_topic_t *rkt; - rd_kafka_conf_t *conf; - rd_kafka_topic_conf_t *topic_conf; - char errstr[512]; - int i; - int32_t partition; - int msgid = 0; - - test_conf_init(&conf, &topic_conf, 20); - - rd_kafka_conf_set_dr_cb(conf, dr_cb); - - /* Make sure all replicas are in-sync after producing - * so that consume test wont fail. */ - rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1", - errstr, sizeof(errstr)); - - /* Create kafka instance */ - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", - rd_kafka_err2str(rd_kafka_last_error())); - - /* Produce messages */ - prod_msg_remains = msgcnt; - for (partition = 0; partition < partition_cnt; partition++) { - int batch_cnt = msgcnt / partition_cnt; - - for (i = 0; i < batch_cnt; i++) { - char key[128]; - rd_snprintf(key, sizeof(key), - "testid=%" PRIu64 ", partition=%i, msg=%i", - testid, (int)partition, msgid); - r = rd_kafka_produce(rkt, partition, 0, NULL, 0, key, - strlen(key), NULL); - if (r == -1) - TEST_FAIL( - "Failed to produce message %i " - "to partition %i: %s", - msgid, (int)partition, - rd_kafka_err2str(rd_kafka_last_error())); - msgid++; - } - } - - - TEST_SAY( - "Produced %d messages to %d partition(s), " - "waiting for deliveries\n", - msgcnt, partition_cnt); - /* Wait for messages to be delivered */ - while (rd_kafka_outq_len(rk) > 0) - rd_kafka_poll(rk, 100); - - if (fails) - TEST_FAIL("%i failures, see previous errors", fails); - - if (prod_msg_remains != 0) - TEST_FAIL("Still waiting for %i messages to be produced", - prod_msg_remains); - else - TEST_SAY("All messages delivered\n"); - - /* Destroy topic */ - rd_kafka_topic_destroy(rkt); - - /* Destroy rdkafka instance */ - TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); - rd_kafka_destroy(rk); -} - - - -static int *cons_msgs; -static int cons_msgs_size; -static int cons_msgs_cnt; - -static void verify_consumed_msg_reset(int msgcnt) { - if (cons_msgs) { - free(cons_msgs); - cons_msgs = NULL; - } - - if (msgcnt) { - int i; - - cons_msgs = malloc(sizeof(*cons_msgs) * msgcnt); - for (i = 0; i < msgcnt; i++) - cons_msgs[i] = -1; - } - - cons_msgs_size = msgcnt; - cons_msgs_cnt = 0; -} - - -static int int_cmp(const void *_a, const void *_b) { - int a = *(int *)_a; - int b = *(int *)_b; - return RD_CMP(a, b); -} - -static void verify_consumed_msg_check0(const char *func, int line) { - int i; - int fails = 0; - - if (cons_msgs_cnt < cons_msgs_size) { - TEST_SAY("Missing %i messages in consumer\n", - cons_msgs_size - cons_msgs_cnt); - fails++; - } - - qsort(cons_msgs, cons_msgs_size, sizeof(*cons_msgs), int_cmp); - - for (i = 0; i < cons_msgs_size; i++) { - if (cons_msgs[i] != i) { - TEST_SAY( - "Consumed message #%i is wrong, " - "expected #%i\n", - cons_msgs[i], i); - fails++; - } - } - - if (fails) - TEST_FAIL("See above error(s)"); - - verify_consumed_msg_reset(0); -} - - -#define verify_consumed_msg_check() \ - verify_consumed_msg_check0(__FUNCTION__, __LINE__) - - - -static void verify_consumed_msg0(const char *func, - int line, - uint64_t testid, - int32_t partition, - int msgnum, - rd_kafka_message_t *rkmessage) { - uint64_t in_testid; - int in_part; - int in_msgnum; - char buf[128]; - - if (rkmessage->len != 0) - TEST_FAIL("Incoming message not NULL: %i bytes", - (int)rkmessage->len); - - if (rkmessage->key_len + 1 >= sizeof(buf)) - TEST_FAIL( - "Incoming message key too large (%i): " - "not sourced by this test", - (int)rkmessage->key_len); - - rd_snprintf(buf, sizeof(buf), "%.*s", (int)rkmessage->key_len, - (char *)rkmessage->key); - - if (sscanf(buf, "testid=%" SCNu64 ", partition=%i, msg=%i", &in_testid, - &in_part, &in_msgnum) != 3) - TEST_FAIL("Incorrect key format: %s", buf); - - if (testid != in_testid || (partition != -1 && partition != in_part) || - (msgnum != -1 && msgnum != in_msgnum) || - (in_msgnum < 0 || in_msgnum > cons_msgs_size)) - goto fail_match; - - if (test_level > 2) { - TEST_SAY("%s:%i: Our testid %" PRIu64 - ", part %i (%i), " - "msg %i/%i did " - ", key's: \"%s\"\n", - func, line, testid, (int)partition, - (int)rkmessage->partition, msgnum, cons_msgs_size, - buf); - } - - if (cons_msgs_cnt == cons_msgs_size) { - TEST_SAY( - "Too many messages in cons_msgs (%i) while reading " - "message key \"%s\"\n", - cons_msgs_cnt, buf); - verify_consumed_msg_check(); - TEST_FAIL("See above error(s)"); - } - - cons_msgs[cons_msgs_cnt++] = in_msgnum; - - return; - -fail_match: - TEST_FAIL("%s:%i: Our testid %" PRIu64 - ", part %i, msg %i/%i did " - "not match message's key: \"%s\"\n", - func, line, testid, (int)partition, msgnum, cons_msgs_size, - buf); -} - -#define verify_consumed_msg(testid, part, msgnum, rkmessage) \ - verify_consumed_msg0(__FUNCTION__, __LINE__, testid, part, msgnum, \ - rkmessage) - - -static void consume_messages(uint64_t testid, - const char *topic, - int32_t partition, - int msg_base, - int batch_cnt, - int msgcnt) { - rd_kafka_t *rk; - rd_kafka_topic_t *rkt; - rd_kafka_conf_t *conf; - rd_kafka_topic_conf_t *topic_conf; - int i; - - test_conf_init(&conf, &topic_conf, 20); - - /* Create kafka instance */ - rk = test_create_handle(RD_KAFKA_CONSUMER, conf); - - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", - rd_kafka_err2str(rd_kafka_last_error())); - - TEST_SAY("Consuming %i messages from partition %i\n", batch_cnt, - partition); - - /* Consume messages */ - if (rd_kafka_consume_start(rkt, partition, - RD_KAFKA_OFFSET_TAIL(batch_cnt)) == -1) - TEST_FAIL("consume_start(%i, -%i) failed: %s", (int)partition, - batch_cnt, rd_kafka_err2str(rd_kafka_last_error())); - - for (i = 0; i < batch_cnt; i++) { - rd_kafka_message_t *rkmessage; - - rkmessage = - rd_kafka_consume(rkt, partition, tmout_multip(5000)); - if (!rkmessage) - TEST_FAIL( - "Failed to consume message %i/%i from " - "partition %i: %s", - i, batch_cnt, (int)partition, - rd_kafka_err2str(rd_kafka_last_error())); - if (rkmessage->err) - TEST_FAIL( - "Consume message %i/%i from partition %i " - "has error: %s", - i, batch_cnt, (int)partition, - rd_kafka_err2str(rkmessage->err)); - - verify_consumed_msg(testid, partition, msg_base + i, rkmessage); - - rd_kafka_message_destroy(rkmessage); - } - - rd_kafka_consume_stop(rkt, partition); - - /* Destroy topic */ - rd_kafka_topic_destroy(rkt); - - /* Destroy rdkafka instance */ - TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); - rd_kafka_destroy(rk); -} - - -static void consume_messages_with_queues(uint64_t testid, - const char *topic, - int partition_cnt, - int msgcnt) { - rd_kafka_t *rk; - rd_kafka_topic_t *rkt; - rd_kafka_conf_t *conf; - rd_kafka_topic_conf_t *topic_conf; - rd_kafka_queue_t *rkqu; - int i; - int32_t partition; - int batch_cnt = msgcnt / partition_cnt; - - test_conf_init(&conf, &topic_conf, 20); - - /* Create kafka instance */ - rk = test_create_handle(RD_KAFKA_CONSUMER, conf); - - /* Create queue */ - rkqu = rd_kafka_queue_new(rk); - - - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", - rd_kafka_err2str(rd_kafka_last_error())); - - TEST_SAY("Consuming %i messages from one queue serving %i partitions\n", - msgcnt, partition_cnt); - - /* Start consuming each partition */ - for (partition = 0; partition < partition_cnt; partition++) { - /* Consume messages */ - TEST_SAY("Start consuming partition %i at tail offset -%i\n", - partition, batch_cnt); - if (rd_kafka_consume_start_queue( - rkt, partition, RD_KAFKA_OFFSET_TAIL(batch_cnt), - rkqu) == -1) - TEST_FAIL("consume_start_queue(%i) failed: %s", - (int)partition, - rd_kafka_err2str(rd_kafka_last_error())); - } - - - /* Consume messages from queue */ - for (i = 0; i < msgcnt; i++) { - rd_kafka_message_t *rkmessage; - - rkmessage = rd_kafka_consume_queue(rkqu, tmout_multip(5000)); - if (!rkmessage) - TEST_FAIL( - "Failed to consume message %i/%i from " - "queue: %s", - i, msgcnt, rd_kafka_err2str(rd_kafka_last_error())); - if (rkmessage->err) - TEST_FAIL( - "Consume message %i/%i from queue " - "has error (partition %" PRId32 "): %s", - i, msgcnt, rkmessage->partition, - rd_kafka_err2str(rkmessage->err)); - - verify_consumed_msg(testid, -1, -1, rkmessage); - - rd_kafka_message_destroy(rkmessage); - } - - /* Stop consuming each partition */ - for (partition = 0; partition < partition_cnt; partition++) - rd_kafka_consume_stop(rkt, partition); - - /* Destroy queue */ - rd_kafka_queue_destroy(rkqu); - - /* Destroy topic */ - rd_kafka_topic_destroy(rkt); - - /* Destroy rdkafka instance */ - TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); - rd_kafka_destroy(rk); -} - - -static void test_produce_consume(void) { - int msgcnt = test_quick ? 100 : 1000; - int partition_cnt = 1; - int i; - uint64_t testid; - int msg_base = 0; - const char *topic; - - /* Generate a testid so we can differentiate messages - * from other tests */ - testid = test_id_generate(); - - /* Read test.conf to configure topic name */ - test_conf_init(NULL, NULL, 20); - topic = test_mk_topic_name("0013", 0); - - TEST_SAY("Topic %s, testid %" PRIu64 "\n", topic, testid); - - /* Produce messages */ - produce_null_messages(testid, topic, partition_cnt, msgcnt); - - - /* Consume messages with standard interface */ - verify_consumed_msg_reset(msgcnt); - for (i = 0; i < partition_cnt; i++) { - consume_messages(testid, topic, i, msg_base, - msgcnt / partition_cnt, msgcnt); - msg_base += msgcnt / partition_cnt; - } - verify_consumed_msg_check(); - - /* Consume messages with queue interface */ - verify_consumed_msg_reset(msgcnt); - consume_messages_with_queues(testid, topic, partition_cnt, msgcnt); - verify_consumed_msg_check(); - - return; -} - - - -int main_0013_null_msgs(int argc, char **argv) { - test_produce_consume(); - return 0; -} |