summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0026-consume_pause.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0026-consume_pause.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0026-consume_pause.c541
1 files changed, 541 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0026-consume_pause.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0026-consume_pause.c
new file mode 100644
index 000000000..c8adc3885
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0026-consume_pause.c
@@ -0,0 +1,541 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is built from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h" /* for Kafka driver */
+
+
+/**
+ * Consumer: pause and resume.
+ * Make sure no messages are lost or duplicated.
+ */
+
+
+
+static void consume_pause(void) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ const int partition_cnt = 3;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *tconf;
+ rd_kafka_topic_partition_list_t *topics;
+ rd_kafka_resp_err_t err;
+ const int msgcnt = 1000;
+ uint64_t testid;
+ int it, iterations = 3;
+ int msg_base = 0;
+ int fails = 0;
+ char group_id[32];
+
+ SUB_TEST();
+
+ test_conf_init(&conf, &tconf,
+ 60 + (test_session_timeout_ms * 3 / 1000));
+ test_conf_set(conf, "enable.partition.eof", "true");
+ test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
+
+ test_create_topic(NULL, topic, partition_cnt, 1);
+
+ /* Produce messages */
+ testid =
+ test_produce_msgs_easy(topic, 0, RD_KAFKA_PARTITION_UA, msgcnt);
+
+ topics = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(topics, topic, -1);
+
+ for (it = 0; it < iterations; it++) {
+ const int pause_cnt = 5;
+ int per_pause_msg_cnt = msgcnt / pause_cnt;
+ const int pause_time = 1200 /* 1.2s */;
+ int eof_cnt = -1;
+ int pause;
+ rd_kafka_topic_partition_list_t *parts;
+ test_msgver_t mv_all;
+ int j;
+
+ test_msgver_init(&mv_all, testid); /* All messages */
+
+ /* On the last iteration reuse the previous group.id
+ * to make consumer start at committed offsets which should
+ * also be EOF. This to trigger #1307. */
+ if (it < iterations - 1)
+ test_str_id_generate(group_id, sizeof(group_id));
+ else {
+ TEST_SAY("Reusing previous group.id %s\n", group_id);
+ per_pause_msg_cnt = 0;
+ eof_cnt = partition_cnt;
+ }
+
+ TEST_SAY(
+ "Iteration %d/%d, using group.id %s, "
+ "expecting %d messages/pause and %d EOFs\n",
+ it, iterations - 1, group_id, per_pause_msg_cnt, eof_cnt);
+
+ rk = test_create_consumer(group_id, NULL,
+ rd_kafka_conf_dup(conf),
+ rd_kafka_topic_conf_dup(tconf));
+
+
+ TEST_SAY("Subscribing to %d topic(s): %s\n", topics->cnt,
+ topics->elems[0].topic);
+ if ((err = rd_kafka_subscribe(rk, topics)))
+ TEST_FAIL("Failed to subscribe: %s\n",
+ rd_kafka_err2str(err));
+
+
+ for (pause = 0; pause < pause_cnt; pause++) {
+ int rcnt;
+ test_timing_t t_assignment;
+ test_msgver_t mv;
+
+ test_msgver_init(&mv, testid);
+ mv.fwd = &mv_all;
+
+ /* Consume sub-part of the messages. */
+ TEST_SAY(
+ "Pause-Iteration #%d: Consume %d messages at "
+ "msg_base %d\n",
+ pause, per_pause_msg_cnt, msg_base);
+ rcnt = test_consumer_poll(
+ "consume.part", rk, testid, eof_cnt, msg_base,
+ per_pause_msg_cnt == 0 ? -1 : per_pause_msg_cnt,
+ &mv);
+
+ TEST_ASSERT(rcnt == per_pause_msg_cnt,
+ "expected %d messages, got %d",
+ per_pause_msg_cnt, rcnt);
+
+ test_msgver_verify("pause.iteration", &mv,
+ TEST_MSGVER_PER_PART, msg_base,
+ per_pause_msg_cnt);
+ test_msgver_clear(&mv);
+
+ msg_base += per_pause_msg_cnt;
+
+ TIMING_START(&t_assignment, "rd_kafka_assignment()");
+ if ((err = rd_kafka_assignment(rk, &parts)))
+ TEST_FAIL("failed to get assignment: %s\n",
+ rd_kafka_err2str(err));
+ TIMING_STOP(&t_assignment);
+
+ TEST_ASSERT(parts->cnt > 0,
+ "parts->cnt %d, expected > 0", parts->cnt);
+
+ TEST_SAY("Now pausing %d partition(s) for %dms\n",
+ parts->cnt, pause_time);
+ if ((err = rd_kafka_pause_partitions(rk, parts)))
+ TEST_FAIL("Failed to pause: %s\n",
+ rd_kafka_err2str(err));
+
+ /* Check per-partition errors */
+ for (j = 0; j < parts->cnt; j++) {
+ if (parts->elems[j].err) {
+ TEST_WARN(
+ "pause failure for "
+ "%s %" PRId32 "]: %s\n",
+ parts->elems[j].topic,
+ parts->elems[j].partition,
+ rd_kafka_err2str(
+ parts->elems[j].err));
+ fails++;
+ }
+ }
+ TEST_ASSERT(fails == 0, "See previous warnings\n");
+
+ TEST_SAY(
+ "Waiting for %dms, should not receive any "
+ "messages during this time\n",
+ pause_time);
+
+ test_consumer_poll_no_msgs("silence.while.paused", rk,
+ testid, pause_time);
+
+ TEST_SAY("Resuming %d partitions\n", parts->cnt);
+ if ((err = rd_kafka_resume_partitions(rk, parts)))
+ TEST_FAIL("Failed to resume: %s\n",
+ rd_kafka_err2str(err));
+
+ /* Check per-partition errors */
+ for (j = 0; j < parts->cnt; j++) {
+ if (parts->elems[j].err) {
+ TEST_WARN(
+ "resume failure for "
+ "%s %" PRId32 "]: %s\n",
+ parts->elems[j].topic,
+ parts->elems[j].partition,
+ rd_kafka_err2str(
+ parts->elems[j].err));
+ fails++;
+ }
+ }
+ TEST_ASSERT(fails == 0, "See previous warnings\n");
+
+ rd_kafka_topic_partition_list_destroy(parts);
+ }
+
+ if (per_pause_msg_cnt > 0)
+ test_msgver_verify("all.msgs", &mv_all,
+ TEST_MSGVER_ALL_PART, 0, msgcnt);
+ else
+ test_msgver_verify("all.msgs", &mv_all,
+ TEST_MSGVER_ALL_PART, 0, 0);
+ test_msgver_clear(&mv_all);
+
+ /* Should now not see any more messages. */
+ test_consumer_poll_no_msgs("end.exp.no.msgs", rk, testid, 3000);
+
+ test_consumer_close(rk);
+
+ /* Hangs if bug isn't fixed */
+ rd_kafka_destroy(rk);
+ }
+
+ rd_kafka_topic_partition_list_destroy(topics);
+ rd_kafka_conf_destroy(conf);
+ rd_kafka_topic_conf_destroy(tconf);
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief Verify that the paused partition state is not used after
+ * the partition has been re-assigned.
+ *
+ * 1. Produce N messages
+ * 2. Consume N/4 messages
+ * 3. Pause partitions
+ * 4. Manually commit offset N/2
+ * 5. Unassign partitions
+ * 6. Assign partitions again
+ * 7. Verify that consumption starts at N/2 and not N/4
+ */
+static void consume_pause_resume_after_reassign(void) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ const int32_t partition = 0;
+ const int msgcnt = 4000;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_partition_list_t *partitions, *pos;
+ rd_kafka_resp_err_t err;
+ int exp_msg_cnt;
+ uint64_t testid;
+ int r;
+ int msg_base = 0;
+ test_msgver_t mv;
+ rd_kafka_topic_partition_t *toppar;
+
+ SUB_TEST();
+
+ test_conf_init(&conf, NULL, 60);
+
+ test_create_topic(NULL, topic, (int)partition + 1, 1);
+
+ /* Produce messages */
+ testid = test_produce_msgs_easy(topic, 0, partition, msgcnt);
+
+ /* Set start offset to beginning */
+ partitions = rd_kafka_topic_partition_list_new(1);
+ toppar =
+ rd_kafka_topic_partition_list_add(partitions, topic, partition);
+ toppar->offset = RD_KAFKA_OFFSET_BEGINNING;
+
+
+ /**
+ * Create consumer.
+ */
+ test_conf_set(conf, "enable.auto.commit", "false");
+ test_conf_set(conf, "enable.partition.eof", "true");
+ rk = test_create_consumer(topic, NULL, conf, NULL);
+
+ test_consumer_assign("assign", rk, partitions);
+
+
+ exp_msg_cnt = msgcnt / 4;
+ TEST_SAY("Consuming first quarter (%d) of messages\n", exp_msg_cnt);
+ test_msgver_init(&mv, testid);
+ r = test_consumer_poll("consume.first.quarter", rk, testid, 0, msg_base,
+ exp_msg_cnt, &mv);
+ TEST_ASSERT(r == exp_msg_cnt, "expected %d messages, got %d",
+ exp_msg_cnt, r);
+
+
+ TEST_SAY("Pausing partitions\n");
+ if ((err = rd_kafka_pause_partitions(rk, partitions)))
+ TEST_FAIL("Failed to pause: %s", rd_kafka_err2str(err));
+
+ TEST_SAY("Verifying pause, should see no new messages...\n");
+ test_consumer_poll_no_msgs("silence.while.paused", rk, testid, 3000);
+
+ test_msgver_verify("first.quarter", &mv, TEST_MSGVER_ALL_PART, msg_base,
+ exp_msg_cnt);
+ test_msgver_clear(&mv);
+
+
+ /* Check position */
+ pos = rd_kafka_topic_partition_list_copy(partitions);
+ if ((err = rd_kafka_position(rk, pos)))
+ TEST_FAIL("position() failed: %s", rd_kafka_err2str(err));
+
+ TEST_ASSERT(!pos->elems[0].err,
+ "position() returned error for our partition: %s",
+ rd_kafka_err2str(pos->elems[0].err));
+ TEST_SAY("Current application consume position is %" PRId64 "\n",
+ pos->elems[0].offset);
+ TEST_ASSERT(pos->elems[0].offset == (int64_t)exp_msg_cnt,
+ "expected position %" PRId64 ", not %" PRId64,
+ (int64_t)exp_msg_cnt, pos->elems[0].offset);
+ rd_kafka_topic_partition_list_destroy(pos);
+
+
+ toppar->offset = (int64_t)(msgcnt / 2);
+ TEST_SAY("Committing (yet unread) offset %" PRId64 "\n",
+ toppar->offset);
+ if ((err = rd_kafka_commit(rk, partitions, 0 /*sync*/)))
+ TEST_FAIL("Commit failed: %s", rd_kafka_err2str(err));
+
+
+ TEST_SAY("Unassigning\n");
+ test_consumer_unassign("Unassign", rk);
+
+ /* Set start offset to INVALID so that the standard start offset
+ * logic kicks in. */
+ toppar->offset = RD_KAFKA_OFFSET_INVALID;
+
+ TEST_SAY("Reassigning\n");
+ test_consumer_assign("Reassign", rk, partitions);
+
+
+ TEST_SAY("Resuming partitions\n");
+ if ((err = rd_kafka_resume_partitions(rk, partitions)))
+ TEST_FAIL("Failed to resume: %s", rd_kafka_err2str(err));
+
+ msg_base = msgcnt / 2;
+ exp_msg_cnt = msgcnt / 2;
+ TEST_SAY("Consuming second half (%d) of messages at msg_base %d\n",
+ exp_msg_cnt, msg_base);
+ test_msgver_init(&mv, testid);
+ r = test_consumer_poll("consume.second.half", rk, testid, 1 /*exp eof*/,
+ msg_base, exp_msg_cnt, &mv);
+ TEST_ASSERT(r == exp_msg_cnt, "expected %d messages, got %d",
+ exp_msg_cnt, r);
+
+ test_msgver_verify("second.half", &mv, TEST_MSGVER_ALL_PART, msg_base,
+ exp_msg_cnt);
+ test_msgver_clear(&mv);
+
+
+ rd_kafka_topic_partition_list_destroy(partitions);
+
+ test_consumer_close(rk);
+
+ rd_kafka_destroy(rk);
+
+ SUB_TEST_PASS();
+}
+
+
+static void rebalance_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *parts,
+ void *opaque) {
+ rd_kafka_resp_err_t err2;
+
+ switch (err) {
+ case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+ /* Set start offset to beginning,
+ * while auto.offset.reset is default at `latest`. */
+
+ parts->elems[0].offset = RD_KAFKA_OFFSET_BEGINNING;
+ test_consumer_assign("rebalance", rk, parts);
+ TEST_SAY("Pausing partitions\n");
+ if ((err2 = rd_kafka_pause_partitions(rk, parts)))
+ TEST_FAIL("Failed to pause: %s",
+ rd_kafka_err2str(err2));
+ TEST_SAY("Resuming partitions\n");
+ if ((err2 = rd_kafka_resume_partitions(rk, parts)))
+ TEST_FAIL("Failed to pause: %s",
+ rd_kafka_err2str(err2));
+ break;
+ default:
+ test_consumer_unassign("rebalance", rk);
+ break;
+ }
+}
+
+
+/**
+ * @brief Verify that the assigned offset is used after pause+resume
+ * if no messages were consumed prior to pause. #2105
+ *
+ * We do this by setting the start offset to BEGINNING in the rebalance_cb
+ * and relying on auto.offset.reset=latest (default) to catch the failure case
+ * where the assigned offset was not honoured.
+ */
+static void consume_subscribe_assign_pause_resume(void) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ const int32_t partition = 0;
+ const int msgcnt = 1;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ uint64_t testid;
+ int r;
+ test_msgver_t mv;
+
+ SUB_TEST();
+
+ test_conf_init(&conf, NULL, 20);
+
+ test_create_topic(NULL, topic, (int)partition + 1, 1);
+
+ /* Produce messages */
+ testid = test_produce_msgs_easy(topic, 0, partition, msgcnt);
+
+ /**
+ * Create consumer.
+ */
+ rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
+ test_conf_set(conf, "session.timeout.ms", "6000");
+ test_conf_set(conf, "enable.partition.eof", "true");
+ rk = test_create_consumer(topic, NULL, conf, NULL);
+
+ test_consumer_subscribe(rk, topic);
+
+ test_msgver_init(&mv, testid);
+ r = test_consumer_poll("consume", rk, testid, 1 /*exp eof*/, 0, msgcnt,
+ &mv);
+ TEST_ASSERT(r == msgcnt, "expected %d messages, got %d", msgcnt, r);
+
+ test_msgver_verify("consumed", &mv, TEST_MSGVER_ALL_PART, 0, msgcnt);
+ test_msgver_clear(&mv);
+
+
+ test_consumer_close(rk);
+
+ rd_kafka_destroy(rk);
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief seek() prior to pause() may overwrite the seek()ed offset
+ * when later resume()ing. #3471
+ */
+static void consume_seek_pause_resume(void) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ const int32_t partition = 0;
+ const int msgcnt = 1000;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ uint64_t testid;
+ int r;
+ test_msgver_t mv;
+ rd_kafka_topic_partition_list_t *parts;
+
+ SUB_TEST();
+
+ test_conf_init(&conf, NULL, 20);
+
+ test_create_topic(NULL, topic, (int)partition + 1, 1);
+
+ /* Produce messages */
+ testid = test_produce_msgs_easy(topic, 0, partition, msgcnt);
+
+ /**
+ * Create consumer.
+ */
+ test_conf_set(conf, "enable.auto.commit", "false");
+ test_conf_set(conf, "enable.partition.eof", "true");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ rk = test_create_consumer(topic, NULL, conf, NULL);
+
+ parts = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(parts, topic, partition);
+
+ TEST_SAY("Assigning partition\n");
+ TEST_CALL_ERR__(rd_kafka_assign(rk, parts));
+
+ rd_kafka_topic_partition_list_destroy(parts);
+
+
+ TEST_SAY("Consuming messages 0..100\n");
+ test_msgver_init(&mv, testid);
+ r = test_consumer_poll("consume", rk, testid, 0, 0, 100, &mv);
+ TEST_ASSERT(r == 100, "expected %d messages, got %d", 100, r);
+
+ test_msgver_verify("consumed", &mv, TEST_MSGVER_ALL_PART, 0, 100);
+ test_msgver_clear(&mv);
+
+ parts = rd_kafka_topic_partition_list_new(1);
+ TEST_SAY("Seeking to offset 500\n");
+ rd_kafka_topic_partition_list_add(parts, topic, partition)->offset =
+ 500;
+ TEST_CALL_ERROR__(rd_kafka_seek_partitions(rk, parts, -1));
+
+ TEST_SAY("Pausing\n");
+ TEST_CALL_ERR__(rd_kafka_pause_partitions(rk, parts));
+
+ TEST_SAY("Waiting a short while for things to settle\n");
+ rd_sleep(2);
+
+ TEST_SAY("Resuming\n");
+ TEST_CALL_ERR__(rd_kafka_resume_partitions(rk, parts));
+
+ TEST_SAY("Consuming remaining messages from offset 500.. hopefully\n");
+ r = test_consumer_poll("consume", rk, testid, 1 /*exp eof*/,
+ 500 /* base msgid */,
+ -1 /* remaining messages */, &mv);
+ TEST_ASSERT_LATER(r == 500, "expected %d messages, got %d", 500, r);
+
+ test_msgver_verify("consumed", &mv, TEST_MSGVER_ALL_PART, 500, 500);
+ test_msgver_clear(&mv);
+
+ rd_kafka_topic_partition_list_destroy(parts);
+
+ test_consumer_close(rk);
+
+ rd_kafka_destroy(rk);
+
+ SUB_TEST_PASS();
+}
+
+
+int main_0026_consume_pause(int argc, char **argv) {
+
+ consume_pause();
+ consume_pause_resume_after_reassign();
+ consume_subscribe_assign_pause_resume();
+ consume_seek_pause_resume();
+
+ return 0;
+}