diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0056-balanced_group_mt.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0056-balanced_group_mt.c | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0056-balanced_group_mt.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0056-balanced_group_mt.c new file mode 100644 index 00000000..e6205ddb --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0056-balanced_group_mt.c @@ -0,0 +1,311 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + +/* Typical include path would be <librdkafka/rdkafka.h>, but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ + +/** + * KafkaConsumer balanced group with multithreading tests + * + * Runs a consumer subscribing to a topic with multiple partitions and farms + * consuming of each partition to a separate thread. + */ + +#define MAX_THRD_CNT 4 + +static int assign_cnt = 0; +static int consumed_msg_cnt = 0; +static int consumers_running = 0; +static int exp_msg_cnt; + +static mtx_t lock; +static thrd_t tids[MAX_THRD_CNT]; + +typedef struct part_consume_info_s { + rd_kafka_queue_t *rkqu; + int partition; +} part_consume_info_t; + +static int is_consuming() { + int result; + mtx_lock(&lock); + result = consumers_running; + mtx_unlock(&lock); + return result; +} + +static int partition_consume(void *args) { + part_consume_info_t *info = (part_consume_info_t *)args; + rd_kafka_queue_t *rkqu = info->rkqu; + int partition = info->partition; + int64_t ts_start = test_clock(); + int max_time = (test_session_timeout_ms + 3000) * 1000; + int running = 1; + + free(args); /* Free the parameter struct dynamically allocated for us */ + + while (ts_start + max_time > test_clock() && running && + is_consuming()) { + rd_kafka_message_t *rkmsg; + + rkmsg = rd_kafka_consume_queue(rkqu, 500); + + if (!rkmsg) + continue; + else if (rkmsg->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) + running = 0; + else if (rkmsg->err) { + mtx_lock(&lock); + TEST_FAIL( + "Message error " + "(at offset %" PRId64 + " after " + "%d/%d messages and %dms): %s", + rkmsg->offset, consumed_msg_cnt, exp_msg_cnt, + (int)(test_clock() - ts_start) / 1000, + rd_kafka_message_errstr(rkmsg)); + mtx_unlock(&lock); + } else { + if (rkmsg->partition != partition) { + mtx_lock(&lock); + TEST_FAIL( + "Message consumed has partition %d " + "but we expected partition %d.", + rkmsg->partition, partition); + mtx_unlock(&lock); + } + } + rd_kafka_message_destroy(rkmsg); + + mtx_lock(&lock); + if (running && ++consumed_msg_cnt >= exp_msg_cnt) { + TEST_SAY("All messages consumed\n"); + running = 0; + } + mtx_unlock(&lock); + } + + rd_kafka_queue_destroy(rkqu); + + return thrd_success; +} + +static thrd_t spawn_thread(rd_kafka_queue_t *rkqu, int partition) { + thrd_t thr; + part_consume_info_t *info = malloc(sizeof(part_consume_info_t)); + + info->rkqu = rkqu; + info->partition = partition; + + if (thrd_create(&thr, &partition_consume, info) != thrd_success) { + TEST_FAIL("Failed to create consumer thread."); + } + return thr; +} + +static int rebalanced = 0; + +static void rebalance_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *partitions, + void *opaque) { + int i; + char *memberid = rd_kafka_memberid(rk); + + TEST_SAY("%s: MemberId \"%s\": Consumer group rebalanced: %s\n", + rd_kafka_name(rk), memberid, rd_kafka_err2str(err)); + + if (memberid) + free(memberid); + + test_print_partition_list(partitions); + + switch (err) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + assign_cnt++; + + rd_kafka_assign(rk, partitions); + mtx_lock(&lock); + consumers_running = 1; + mtx_unlock(&lock); + + for (i = 0; i < partitions->cnt && i < MAX_THRD_CNT; ++i) { + rd_kafka_topic_partition_t part = partitions->elems[i]; + rd_kafka_queue_t *rkqu; + /* This queue is loosed in partition-consume. */ + rkqu = rd_kafka_queue_get_partition(rk, part.topic, + part.partition); + + rd_kafka_queue_forward(rkqu, NULL); + tids[part.partition] = + spawn_thread(rkqu, part.partition); + } + + rebalanced = 1; + + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + if (assign_cnt == 0) + TEST_FAIL("asymetric rebalance_cb"); + assign_cnt--; + rd_kafka_assign(rk, NULL); + mtx_lock(&lock); + consumers_running = 0; + mtx_unlock(&lock); + + break; + + default: + TEST_FAIL("rebalance failed: %s", rd_kafka_err2str(err)); + break; + } +} + +static void get_assignment(rd_kafka_t *rk_c) { + while (!rebalanced) { + rd_kafka_message_t *rkmsg; + rkmsg = rd_kafka_consumer_poll(rk_c, 500); + if (rkmsg) + rd_kafka_message_destroy(rkmsg); + } +} + +int main_0056_balanced_group_mt(int argc, char **argv) { + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + rd_kafka_t *rk_p, *rk_c; + rd_kafka_topic_t *rkt_p; + int msg_cnt = test_quick ? 100 : 1000; + int msg_base = 0; + int partition_cnt = 2; + int partition; + uint64_t testid; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *default_topic_conf; + rd_kafka_topic_partition_list_t *sub, *topics; + rd_kafka_resp_err_t err; + test_timing_t t_assign, t_close, t_consume; + int i; + + exp_msg_cnt = msg_cnt * partition_cnt; + + testid = test_id_generate(); + + /* Produce messages */ + rk_p = test_create_producer(); + rkt_p = test_create_producer_topic(rk_p, topic, NULL); + + for (partition = 0; partition < partition_cnt; partition++) { + test_produce_msgs(rk_p, rkt_p, testid, partition, + msg_base + (partition * msg_cnt), msg_cnt, + NULL, 0); + } + + rd_kafka_topic_destroy(rkt_p); + rd_kafka_destroy(rk_p); + + if (mtx_init(&lock, mtx_plain) != thrd_success) + TEST_FAIL("Cannot create mutex."); + + test_conf_init(&conf, &default_topic_conf, + (test_session_timeout_ms * 3) / 1000); + + test_conf_set(conf, "enable.partition.eof", "true"); + + test_topic_conf_set(default_topic_conf, "auto.offset.reset", + "smallest"); + + /* Fill in topic subscription set */ + topics = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA); + + /* Create consumers and start subscription */ + rk_c = test_create_consumer(topic /*group_id*/, rebalance_cb, conf, + default_topic_conf); + + test_consumer_subscribe(rk_c, topic); + + rd_kafka_topic_partition_list_destroy(topics); + + /* Wait for both consumers to get an assignment */ + TIMING_START(&t_assign, "WAIT.ASSIGN"); + get_assignment(rk_c); + TIMING_STOP(&t_assign); + + TIMING_START(&t_consume, "CONSUME.WAIT"); + for (i = 0; i < MAX_THRD_CNT; ++i) { + int res; + if (tids[i] != 0) + thrd_join(tids[i], &res); + } + TIMING_STOP(&t_consume); + + TEST_SAY("Closing remaining consumers\n"); + /* Query subscription */ + err = rd_kafka_subscription(rk_c, &sub); + TEST_ASSERT(!err, "%s: subscription () failed: %s", rd_kafka_name(rk_c), + rd_kafka_err2str(err)); + TEST_SAY("%s: subscription (%d):\n", rd_kafka_name(rk_c), sub->cnt); + for (i = 0; i < sub->cnt; ++i) + TEST_SAY(" %s\n", sub->elems[i].topic); + rd_kafka_topic_partition_list_destroy(sub); + + /* Run an explicit unsubscribe () (async) prior to close () + * to trigger race condition issues on termination. */ + TEST_SAY("Unsubscribing instance %s\n", rd_kafka_name(rk_c)); + err = rd_kafka_unsubscribe(rk_c); + TEST_ASSERT(!err, "%s: unsubscribe failed: %s", rd_kafka_name(rk_c), + rd_kafka_err2str(err)); + + TEST_SAY("Closing %s\n", rd_kafka_name(rk_c)); + TIMING_START(&t_close, "CONSUMER.CLOSE"); + err = rd_kafka_consumer_close(rk_c); + TIMING_STOP(&t_close); + TEST_ASSERT(!err, "consumer_close failed: %s", rd_kafka_err2str(err)); + + rd_kafka_destroy(rk_c); + rk_c = NULL; + + TEST_SAY("%d/%d messages consumed\n", consumed_msg_cnt, exp_msg_cnt); + TEST_ASSERT(consumed_msg_cnt >= exp_msg_cnt, + "Only %d/%d messages were consumed", consumed_msg_cnt, + exp_msg_cnt); + + if (consumed_msg_cnt > exp_msg_cnt) + TEST_SAY( + "At least %d/%d messages were consumed " + "multiple times\n", + consumed_msg_cnt - exp_msg_cnt, exp_msg_cnt); + + mtx_destroy(&lock); + + return 0; +} |