summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c473
1 files changed, 0 insertions, 473 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c
deleted file mode 100644
index 26a7ac070..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/0013-null-msgs.c
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-/**
- * Produce NULL payload messages, then consume them.
- */
-
-#include "test.h"
-
-/* Typical include path would be <librdkafka/rdkafka.h>, but this program
- * is built from within the librdkafka source tree and thus differs. */
-#include "rdkafka.h" /* for Kafka driver */
-
-
-static int prod_msg_remains = 0;
-static int fails = 0;
-
-/**
- * Delivery reported callback.
- * Called for each message once to signal its delivery status.
- */
-static void dr_cb(rd_kafka_t *rk,
- void *payload,
- size_t len,
- rd_kafka_resp_err_t err,
- void *opaque,
- void *msg_opaque) {
-
- if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
- TEST_FAIL("Message delivery failed: %s\n",
- rd_kafka_err2str(err));
-
- if (prod_msg_remains == 0)
- TEST_FAIL("Too many messages delivered (prod_msg_remains %i)",
- prod_msg_remains);
-
- prod_msg_remains--;
-}
-
-
-/**
- * Produces 'msgcnt' messages split over 'partition_cnt' partitions.
- */
-static void produce_null_messages(uint64_t testid,
- const char *topic,
- int partition_cnt,
- int msgcnt) {
- int r;
- rd_kafka_t *rk;
- rd_kafka_topic_t *rkt;
- rd_kafka_conf_t *conf;
- rd_kafka_topic_conf_t *topic_conf;
- char errstr[512];
- int i;
- int32_t partition;
- int msgid = 0;
-
- test_conf_init(&conf, &topic_conf, 20);
-
- rd_kafka_conf_set_dr_cb(conf, dr_cb);
-
- /* Make sure all replicas are in-sync after producing
- * so that consume test wont fail. */
- rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
- errstr, sizeof(errstr));
-
- /* Create kafka instance */
- rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
-
- rkt = rd_kafka_topic_new(rk, topic, topic_conf);
- if (!rkt)
- TEST_FAIL("Failed to create topic: %s\n",
- rd_kafka_err2str(rd_kafka_last_error()));
-
- /* Produce messages */
- prod_msg_remains = msgcnt;
- for (partition = 0; partition < partition_cnt; partition++) {
- int batch_cnt = msgcnt / partition_cnt;
-
- for (i = 0; i < batch_cnt; i++) {
- char key[128];
- rd_snprintf(key, sizeof(key),
- "testid=%" PRIu64 ", partition=%i, msg=%i",
- testid, (int)partition, msgid);
- r = rd_kafka_produce(rkt, partition, 0, NULL, 0, key,
- strlen(key), NULL);
- if (r == -1)
- TEST_FAIL(
- "Failed to produce message %i "
- "to partition %i: %s",
- msgid, (int)partition,
- rd_kafka_err2str(rd_kafka_last_error()));
- msgid++;
- }
- }
-
-
- TEST_SAY(
- "Produced %d messages to %d partition(s), "
- "waiting for deliveries\n",
- msgcnt, partition_cnt);
- /* Wait for messages to be delivered */
- while (rd_kafka_outq_len(rk) > 0)
- rd_kafka_poll(rk, 100);
-
- if (fails)
- TEST_FAIL("%i failures, see previous errors", fails);
-
- if (prod_msg_remains != 0)
- TEST_FAIL("Still waiting for %i messages to be produced",
- prod_msg_remains);
- else
- TEST_SAY("All messages delivered\n");
-
- /* Destroy topic */
- rd_kafka_topic_destroy(rkt);
-
- /* Destroy rdkafka instance */
- TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
- rd_kafka_destroy(rk);
-}
-
-
-
-static int *cons_msgs;
-static int cons_msgs_size;
-static int cons_msgs_cnt;
-
-static void verify_consumed_msg_reset(int msgcnt) {
- if (cons_msgs) {
- free(cons_msgs);
- cons_msgs = NULL;
- }
-
- if (msgcnt) {
- int i;
-
- cons_msgs = malloc(sizeof(*cons_msgs) * msgcnt);
- for (i = 0; i < msgcnt; i++)
- cons_msgs[i] = -1;
- }
-
- cons_msgs_size = msgcnt;
- cons_msgs_cnt = 0;
-}
-
-
-static int int_cmp(const void *_a, const void *_b) {
- int a = *(int *)_a;
- int b = *(int *)_b;
- return RD_CMP(a, b);
-}
-
-static void verify_consumed_msg_check0(const char *func, int line) {
- int i;
- int fails = 0;
-
- if (cons_msgs_cnt < cons_msgs_size) {
- TEST_SAY("Missing %i messages in consumer\n",
- cons_msgs_size - cons_msgs_cnt);
- fails++;
- }
-
- qsort(cons_msgs, cons_msgs_size, sizeof(*cons_msgs), int_cmp);
-
- for (i = 0; i < cons_msgs_size; i++) {
- if (cons_msgs[i] != i) {
- TEST_SAY(
- "Consumed message #%i is wrong, "
- "expected #%i\n",
- cons_msgs[i], i);
- fails++;
- }
- }
-
- if (fails)
- TEST_FAIL("See above error(s)");
-
- verify_consumed_msg_reset(0);
-}
-
-
-#define verify_consumed_msg_check() \
- verify_consumed_msg_check0(__FUNCTION__, __LINE__)
-
-
-
-static void verify_consumed_msg0(const char *func,
- int line,
- uint64_t testid,
- int32_t partition,
- int msgnum,
- rd_kafka_message_t *rkmessage) {
- uint64_t in_testid;
- int in_part;
- int in_msgnum;
- char buf[128];
-
- if (rkmessage->len != 0)
- TEST_FAIL("Incoming message not NULL: %i bytes",
- (int)rkmessage->len);
-
- if (rkmessage->key_len + 1 >= sizeof(buf))
- TEST_FAIL(
- "Incoming message key too large (%i): "
- "not sourced by this test",
- (int)rkmessage->key_len);
-
- rd_snprintf(buf, sizeof(buf), "%.*s", (int)rkmessage->key_len,
- (char *)rkmessage->key);
-
- if (sscanf(buf, "testid=%" SCNu64 ", partition=%i, msg=%i", &in_testid,
- &in_part, &in_msgnum) != 3)
- TEST_FAIL("Incorrect key format: %s", buf);
-
- if (testid != in_testid || (partition != -1 && partition != in_part) ||
- (msgnum != -1 && msgnum != in_msgnum) ||
- (in_msgnum < 0 || in_msgnum > cons_msgs_size))
- goto fail_match;
-
- if (test_level > 2) {
- TEST_SAY("%s:%i: Our testid %" PRIu64
- ", part %i (%i), "
- "msg %i/%i did "
- ", key's: \"%s\"\n",
- func, line, testid, (int)partition,
- (int)rkmessage->partition, msgnum, cons_msgs_size,
- buf);
- }
-
- if (cons_msgs_cnt == cons_msgs_size) {
- TEST_SAY(
- "Too many messages in cons_msgs (%i) while reading "
- "message key \"%s\"\n",
- cons_msgs_cnt, buf);
- verify_consumed_msg_check();
- TEST_FAIL("See above error(s)");
- }
-
- cons_msgs[cons_msgs_cnt++] = in_msgnum;
-
- return;
-
-fail_match:
- TEST_FAIL("%s:%i: Our testid %" PRIu64
- ", part %i, msg %i/%i did "
- "not match message's key: \"%s\"\n",
- func, line, testid, (int)partition, msgnum, cons_msgs_size,
- buf);
-}
-
-#define verify_consumed_msg(testid, part, msgnum, rkmessage) \
- verify_consumed_msg0(__FUNCTION__, __LINE__, testid, part, msgnum, \
- rkmessage)
-
-
-static void consume_messages(uint64_t testid,
- const char *topic,
- int32_t partition,
- int msg_base,
- int batch_cnt,
- int msgcnt) {
- rd_kafka_t *rk;
- rd_kafka_topic_t *rkt;
- rd_kafka_conf_t *conf;
- rd_kafka_topic_conf_t *topic_conf;
- int i;
-
- test_conf_init(&conf, &topic_conf, 20);
-
- /* Create kafka instance */
- rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
-
- rkt = rd_kafka_topic_new(rk, topic, topic_conf);
- if (!rkt)
- TEST_FAIL("Failed to create topic: %s\n",
- rd_kafka_err2str(rd_kafka_last_error()));
-
- TEST_SAY("Consuming %i messages from partition %i\n", batch_cnt,
- partition);
-
- /* Consume messages */
- if (rd_kafka_consume_start(rkt, partition,
- RD_KAFKA_OFFSET_TAIL(batch_cnt)) == -1)
- TEST_FAIL("consume_start(%i, -%i) failed: %s", (int)partition,
- batch_cnt, rd_kafka_err2str(rd_kafka_last_error()));
-
- for (i = 0; i < batch_cnt; i++) {
- rd_kafka_message_t *rkmessage;
-
- rkmessage =
- rd_kafka_consume(rkt, partition, tmout_multip(5000));
- if (!rkmessage)
- TEST_FAIL(
- "Failed to consume message %i/%i from "
- "partition %i: %s",
- i, batch_cnt, (int)partition,
- rd_kafka_err2str(rd_kafka_last_error()));
- if (rkmessage->err)
- TEST_FAIL(
- "Consume message %i/%i from partition %i "
- "has error: %s",
- i, batch_cnt, (int)partition,
- rd_kafka_err2str(rkmessage->err));
-
- verify_consumed_msg(testid, partition, msg_base + i, rkmessage);
-
- rd_kafka_message_destroy(rkmessage);
- }
-
- rd_kafka_consume_stop(rkt, partition);
-
- /* Destroy topic */
- rd_kafka_topic_destroy(rkt);
-
- /* Destroy rdkafka instance */
- TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
- rd_kafka_destroy(rk);
-}
-
-
-static void consume_messages_with_queues(uint64_t testid,
- const char *topic,
- int partition_cnt,
- int msgcnt) {
- rd_kafka_t *rk;
- rd_kafka_topic_t *rkt;
- rd_kafka_conf_t *conf;
- rd_kafka_topic_conf_t *topic_conf;
- rd_kafka_queue_t *rkqu;
- int i;
- int32_t partition;
- int batch_cnt = msgcnt / partition_cnt;
-
- test_conf_init(&conf, &topic_conf, 20);
-
- /* Create kafka instance */
- rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
-
- /* Create queue */
- rkqu = rd_kafka_queue_new(rk);
-
-
- rkt = rd_kafka_topic_new(rk, topic, topic_conf);
- if (!rkt)
- TEST_FAIL("Failed to create topic: %s\n",
- rd_kafka_err2str(rd_kafka_last_error()));
-
- TEST_SAY("Consuming %i messages from one queue serving %i partitions\n",
- msgcnt, partition_cnt);
-
- /* Start consuming each partition */
- for (partition = 0; partition < partition_cnt; partition++) {
- /* Consume messages */
- TEST_SAY("Start consuming partition %i at tail offset -%i\n",
- partition, batch_cnt);
- if (rd_kafka_consume_start_queue(
- rkt, partition, RD_KAFKA_OFFSET_TAIL(batch_cnt),
- rkqu) == -1)
- TEST_FAIL("consume_start_queue(%i) failed: %s",
- (int)partition,
- rd_kafka_err2str(rd_kafka_last_error()));
- }
-
-
- /* Consume messages from queue */
- for (i = 0; i < msgcnt; i++) {
- rd_kafka_message_t *rkmessage;
-
- rkmessage = rd_kafka_consume_queue(rkqu, tmout_multip(5000));
- if (!rkmessage)
- TEST_FAIL(
- "Failed to consume message %i/%i from "
- "queue: %s",
- i, msgcnt, rd_kafka_err2str(rd_kafka_last_error()));
- if (rkmessage->err)
- TEST_FAIL(
- "Consume message %i/%i from queue "
- "has error (partition %" PRId32 "): %s",
- i, msgcnt, rkmessage->partition,
- rd_kafka_err2str(rkmessage->err));
-
- verify_consumed_msg(testid, -1, -1, rkmessage);
-
- rd_kafka_message_destroy(rkmessage);
- }
-
- /* Stop consuming each partition */
- for (partition = 0; partition < partition_cnt; partition++)
- rd_kafka_consume_stop(rkt, partition);
-
- /* Destroy queue */
- rd_kafka_queue_destroy(rkqu);
-
- /* Destroy topic */
- rd_kafka_topic_destroy(rkt);
-
- /* Destroy rdkafka instance */
- TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
- rd_kafka_destroy(rk);
-}
-
-
-static void test_produce_consume(void) {
- int msgcnt = test_quick ? 100 : 1000;
- int partition_cnt = 1;
- int i;
- uint64_t testid;
- int msg_base = 0;
- const char *topic;
-
- /* Generate a testid so we can differentiate messages
- * from other tests */
- testid = test_id_generate();
-
- /* Read test.conf to configure topic name */
- test_conf_init(NULL, NULL, 20);
- topic = test_mk_topic_name("0013", 0);
-
- TEST_SAY("Topic %s, testid %" PRIu64 "\n", topic, testid);
-
- /* Produce messages */
- produce_null_messages(testid, topic, partition_cnt, msgcnt);
-
-
- /* Consume messages with standard interface */
- verify_consumed_msg_reset(msgcnt);
- for (i = 0; i < partition_cnt; i++) {
- consume_messages(testid, topic, i, msg_base,
- msgcnt / partition_cnt, msgcnt);
- msg_base += msgcnt / partition_cnt;
- }
- verify_consumed_msg_check();
-
- /* Consume messages with queue interface */
- verify_consumed_msg_reset(msgcnt);
- consume_messages_with_queues(testid, topic, partition_cnt, msgcnt);
- verify_consumed_msg_check();
-
- return;
-}
-
-
-
-int main_0013_null_msgs(int argc, char **argv) {
- test_produce_consume();
- return 0;
-}