diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 11:19:16 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:07:37 +0000 |
commit | b485aab7e71c1625cfc27e0f92c9509f42378458 (patch) | |
tree | ae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0130-store_offsets.c | |
parent | Adding upstream version 1.44.3. (diff) | |
download | netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip |
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0130-store_offsets.c')
-rw-r--r-- | src/fluent-bit/lib/librdkafka-2.1.0/tests/0130-store_offsets.c | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0130-store_offsets.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0130-store_offsets.c new file mode 100644 index 000000000..9fb8d2350 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0130-store_offsets.c @@ -0,0 +1,127 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2020, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + + +/** + * Verify that offsets_store() is not allowed for unassigned partitions, + * and that those offsets are not committed. + */ +static void do_test_store_unassigned(void) { + const char *topic = test_mk_topic_name("0130_store_unassigned", 1); + rd_kafka_conf_t *conf; + rd_kafka_t *c; + rd_kafka_topic_partition_list_t *parts; + rd_kafka_resp_err_t err; + rd_kafka_message_t *rkmessage; + const int64_t proper_offset = 900, bad_offset = 300; + + SUB_TEST_QUICK(); + + test_produce_msgs_easy(topic, 0, 0, 1000); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.offset.store", "false"); + test_conf_set(conf, "enable.partition.eof", "true"); + + c = test_create_consumer(topic, NULL, conf, NULL); + + parts = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(parts, topic, 0); + TEST_CALL_ERR__(rd_kafka_assign(c, parts)); + + TEST_SAY("Consume one message\n"); + test_consumer_poll_once(c, NULL, tmout_multip(3000)); + + parts->elems[0].offset = proper_offset; + TEST_SAY("Storing offset %" PRId64 " while assigned: should succeed\n", + parts->elems[0].offset); + TEST_CALL_ERR__(rd_kafka_offsets_store(c, parts)); + + TEST_SAY("Committing\n"); + TEST_CALL_ERR__(rd_kafka_commit(c, NULL, rd_false /*sync*/)); + + TEST_SAY("Unassigning partitions and trying to store again\n"); + TEST_CALL_ERR__(rd_kafka_assign(c, NULL)); + + parts->elems[0].offset = bad_offset; + TEST_SAY("Storing offset %" PRId64 " while unassigned: should fail\n", + parts->elems[0].offset); + err = rd_kafka_offsets_store(c, parts); + TEST_ASSERT_LATER(err != RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected offsets_store() to fail"); + TEST_ASSERT(parts->cnt == 1); + + TEST_ASSERT(parts->elems[0].err == RD_KAFKA_RESP_ERR__STATE, + "Expected %s [%" PRId32 + "] to fail with " + "_STATE, not %s", + parts->elems[0].topic, parts->elems[0].partition, + rd_kafka_err2name(parts->elems[0].err)); + + TEST_SAY("Committing: should fail\n"); + err = rd_kafka_commit(c, NULL, rd_false /*sync*/); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__NO_OFFSET, + "Expected commit() to fail with NO_OFFSET, not %s", + rd_kafka_err2name(err)); + + TEST_SAY("Assigning partition again\n"); + parts->elems[0].offset = RD_KAFKA_OFFSET_INVALID; /* Use committed */ + TEST_CALL_ERR__(rd_kafka_assign(c, parts)); + + TEST_SAY("Consuming message to verify committed offset\n"); + rkmessage = rd_kafka_consumer_poll(c, tmout_multip(3000)); + TEST_ASSERT(rkmessage != NULL, "Expected message"); + TEST_SAY("Consumed message with offset %" PRId64 "\n", + rkmessage->offset); + TEST_ASSERT(!rkmessage->err, "Expected proper message, not error %s", + rd_kafka_message_errstr(rkmessage)); + TEST_ASSERT(rkmessage->offset == proper_offset, + "Expected first message to be properly stored " + "offset %" PRId64 ", not %" PRId64, + proper_offset, rkmessage->offset); + + rd_kafka_message_destroy(rkmessage); + + rd_kafka_topic_partition_list_destroy(parts); + + rd_kafka_consumer_close(c); + rd_kafka_destroy(c); + + SUB_TEST_PASS(); +} + + +int main_0130_store_offsets(int argc, char **argv) { + + do_test_store_unassigned(); + + return 0; +} |