summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0018-cgrp_term.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0018-cgrp_term.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0018-cgrp_term.c332
1 files changed, 332 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0018-cgrp_term.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0018-cgrp_term.c
new file mode 100644
index 000000000..6b22339d7
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0018-cgrp_term.c
@@ -0,0 +1,332 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "rdstring.h"
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is built from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h" /* for Kafka driver */
+
+
+/**
+ * KafkaConsumer balanced group testing: termination
+ *
+ * Runs two consumers subscribing to the same topics, waits for both to
+ * get an assignment and then closes one of them.
+ */
+
+
+static int assign_cnt = 0;
+static int consumed_msg_cnt = 0;
+
+
+static void rebalance_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *partitions,
+ void *opaque) {
+ char *memberid = rd_kafka_memberid(rk);
+
+ TEST_SAY("%s: MemberId \"%s\": Consumer group rebalanced: %s\n",
+ rd_kafka_name(rk), memberid, rd_kafka_err2str(err));
+
+ if (memberid)
+ free(memberid);
+
+ test_print_partition_list(partitions);
+
+ switch (err) {
+ case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+ assign_cnt++;
+ rd_kafka_assign(rk, partitions);
+ break;
+
+ case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
+ if (assign_cnt == 0)
+ TEST_FAIL("asymetric rebalance_cb\n");
+ assign_cnt--;
+ rd_kafka_assign(rk, NULL);
+ break;
+
+ default:
+ TEST_FAIL("rebalance failed: %s\n", rd_kafka_err2str(err));
+ break;
+ }
+}
+
+
+static void consume_all(rd_kafka_t **rk_c,
+ int rk_cnt,
+ int exp_msg_cnt,
+ int max_time /*ms*/) {
+ int64_t ts_start = test_clock();
+ int i;
+
+ max_time *= 1000;
+ while (ts_start + max_time > test_clock()) {
+ for (i = 0; i < rk_cnt; i++) {
+ rd_kafka_message_t *rkmsg;
+
+ if (!rk_c[i])
+ continue;
+
+ rkmsg = rd_kafka_consumer_poll(rk_c[i], 500);
+
+ if (!rkmsg)
+ continue;
+ else if (rkmsg->err)
+ TEST_SAY(
+ "Message error "
+ "(at offset %" PRId64
+ " after "
+ "%d/%d messages and %dms): %s\n",
+ rkmsg->offset, consumed_msg_cnt,
+ exp_msg_cnt,
+ (int)(test_clock() - ts_start) / 1000,
+ rd_kafka_message_errstr(rkmsg));
+ else
+ consumed_msg_cnt++;
+
+ rd_kafka_message_destroy(rkmsg);
+
+ if (consumed_msg_cnt >= exp_msg_cnt) {
+ static int once = 0;
+ if (!once++)
+ TEST_SAY("All messages consumed\n");
+ return;
+ }
+ }
+ }
+}
+
+struct args {
+ rd_kafka_t *c;
+ rd_kafka_queue_t *queue;
+};
+
+static int poller_thread_main(void *p) {
+ struct args *args = (struct args *)p;
+
+ while (!rd_kafka_consumer_closed(args->c)) {
+ rd_kafka_message_t *rkm;
+
+ /* Using a long timeout (1 minute) to verify that the
+ * queue is woken when close is done. */
+ rkm = rd_kafka_consume_queue(args->queue, 60 * 1000);
+ if (rkm)
+ rd_kafka_message_destroy(rkm);
+ }
+
+ return 0;
+}
+
+/**
+ * @brief Close consumer using async queue.
+ */
+static void consumer_close_queue(rd_kafka_t *c) {
+ /* Use the standard consumer queue rather than a temporary queue,
+ * the latter is covered by test 0116. */
+ rd_kafka_queue_t *queue = rd_kafka_queue_get_consumer(c);
+ struct args args = {c, queue};
+ thrd_t thrd;
+ int ret;
+
+ /* Spin up poller thread */
+ if (thrd_create(&thrd, poller_thread_main, (void *)&args) !=
+ thrd_success)
+ TEST_FAIL("Failed to create thread");
+
+ TEST_SAY("Closing consumer %s using queue\n", rd_kafka_name(c));
+ TEST_CALL_ERROR__(rd_kafka_consumer_close_queue(c, queue));
+
+ if (thrd_join(thrd, &ret) != thrd_success)
+ TEST_FAIL("thrd_join failed");
+
+ rd_kafka_queue_destroy(queue);
+}
+
+
+static void do_test(rd_bool_t with_queue) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+#define _CONS_CNT 2
+ rd_kafka_t *rk_p, *rk_c[_CONS_CNT];
+ rd_kafka_topic_t *rkt_p;
+ int msg_cnt = test_quick ? 100 : 1000;
+ int msg_base = 0;
+ int partition_cnt = 2;
+ int partition;
+ uint64_t testid;
+ rd_kafka_topic_conf_t *default_topic_conf;
+ rd_kafka_topic_partition_list_t *topics;
+ rd_kafka_resp_err_t err;
+ test_timing_t t_assign, t_consume;
+ char errstr[512];
+ int i;
+
+ SUB_TEST("with_queue=%s", RD_STR_ToF(with_queue));
+
+ testid = test_id_generate();
+
+ /* Produce messages */
+ rk_p = test_create_producer();
+ rkt_p = test_create_producer_topic(rk_p, topic, NULL);
+
+ for (partition = 0; partition < partition_cnt; partition++) {
+ test_produce_msgs(rk_p, rkt_p, testid, partition,
+ msg_base + (partition * msg_cnt), msg_cnt,
+ NULL, 0);
+ }
+
+ rd_kafka_topic_destroy(rkt_p);
+ rd_kafka_destroy(rk_p);
+
+
+ test_conf_init(NULL, &default_topic_conf,
+ 5 + ((test_session_timeout_ms * 3 * 2) / 1000));
+ if (rd_kafka_topic_conf_set(default_topic_conf, "auto.offset.reset",
+ "smallest", errstr,
+ sizeof(errstr)) != RD_KAFKA_CONF_OK)
+ TEST_FAIL("%s\n", errstr);
+
+ /* Fill in topic subscription set */
+ topics = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(topics, topic, -1);
+
+ /* Create consumers and start subscription */
+ for (i = 0; i < _CONS_CNT; i++) {
+ rk_c[i] = test_create_consumer(
+ topic /*group_id*/, rebalance_cb, NULL,
+ rd_kafka_topic_conf_dup(default_topic_conf));
+
+ err = rd_kafka_poll_set_consumer(rk_c[i]);
+ if (err)
+ TEST_FAIL("poll_set_consumer: %s\n",
+ rd_kafka_err2str(err));
+
+ err = rd_kafka_subscribe(rk_c[i], topics);
+ if (err)
+ TEST_FAIL("subscribe: %s\n", rd_kafka_err2str(err));
+ }
+
+ rd_kafka_topic_conf_destroy(default_topic_conf);
+
+ rd_kafka_topic_partition_list_destroy(topics);
+
+
+ /* Wait for both consumers to get an assignment */
+ TEST_SAY("Awaiting assignments for %d consumer(s)\n", _CONS_CNT);
+ TIMING_START(&t_assign, "WAIT.ASSIGN");
+ while (assign_cnt < _CONS_CNT)
+ consume_all(rk_c, _CONS_CNT, msg_cnt,
+ test_session_timeout_ms + 3000);
+ TIMING_STOP(&t_assign);
+
+ /* Now close one of the consumers, this will cause a rebalance. */
+ TEST_SAY("Closing down 1/%d consumer(s): %s\n", _CONS_CNT,
+ rd_kafka_name(rk_c[0]));
+ if (with_queue)
+ consumer_close_queue(rk_c[0]);
+ else
+ TEST_CALL_ERR__(rd_kafka_consumer_close(rk_c[0]));
+
+ rd_kafka_destroy(rk_c[0]);
+ rk_c[0] = NULL;
+
+ /* Let remaining consumers run for a while to take over the now
+ * lost partitions. */
+
+ if (assign_cnt != _CONS_CNT - 1)
+ TEST_FAIL("assign_cnt %d, should be %d\n", assign_cnt,
+ _CONS_CNT - 1);
+
+ TIMING_START(&t_consume, "CONSUME.WAIT");
+ consume_all(rk_c, _CONS_CNT, msg_cnt, test_session_timeout_ms + 3000);
+ TIMING_STOP(&t_consume);
+
+ TEST_SAY("Closing remaining consumers\n");
+ for (i = 0; i < _CONS_CNT; i++) {
+ test_timing_t t_close;
+ rd_kafka_topic_partition_list_t *sub;
+ int j;
+
+ if (!rk_c[i])
+ continue;
+
+ /* Query subscription */
+ err = rd_kafka_subscription(rk_c[i], &sub);
+ if (err)
+ TEST_FAIL("%s: subscription() failed: %s\n",
+ rd_kafka_name(rk_c[i]),
+ rd_kafka_err2str(err));
+ TEST_SAY("%s: subscription (%d):\n", rd_kafka_name(rk_c[i]),
+ sub->cnt);
+ for (j = 0; j < sub->cnt; j++)
+ TEST_SAY(" %s\n", sub->elems[j].topic);
+ rd_kafka_topic_partition_list_destroy(sub);
+
+ /* Run an explicit unsubscribe() (async) prior to close()
+ * to trigger race condition issues on termination. */
+ TEST_SAY("Unsubscribing instance %s\n", rd_kafka_name(rk_c[i]));
+ err = rd_kafka_unsubscribe(rk_c[i]);
+ if (err)
+ TEST_FAIL("%s: unsubscribe failed: %s\n",
+ rd_kafka_name(rk_c[i]),
+ rd_kafka_err2str(err));
+
+ TEST_SAY("Closing %s\n", rd_kafka_name(rk_c[i]));
+ TIMING_START(&t_close, "CONSUMER.CLOSE");
+ if (with_queue)
+ consumer_close_queue(rk_c[i]);
+ else
+ TEST_CALL_ERR__(rd_kafka_consumer_close(rk_c[i]));
+ TIMING_STOP(&t_close);
+
+ rd_kafka_destroy(rk_c[i]);
+ rk_c[i] = NULL;
+ }
+
+ TEST_SAY("%d/%d messages consumed\n", consumed_msg_cnt, msg_cnt);
+ if (consumed_msg_cnt < msg_cnt)
+ TEST_FAIL("Only %d/%d messages were consumed\n",
+ consumed_msg_cnt, msg_cnt);
+ else if (consumed_msg_cnt > msg_cnt)
+ TEST_SAY(
+ "At least %d/%d messages were consumed "
+ "multiple times\n",
+ consumed_msg_cnt - msg_cnt, msg_cnt);
+
+ SUB_TEST_PASS();
+}
+
+
+int main_0018_cgrp_term(int argc, char **argv) {
+ do_test(rd_false /* rd_kafka_consumer_close() */);
+ do_test(rd_true /* rd_kafka_consumer_close_queue() */);
+
+ return 0;
+}