diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_range_assignor.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_range_assignor.c | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_range_assignor.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_range_assignor.c new file mode 100644 index 000000000..c83f1f1a4 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_range_assignor.c @@ -0,0 +1,138 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2015 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#include "rdkafka_int.h" +#include "rdkafka_assignor.h" + + + +/** + * Source: + * https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java + * + * The range assignor works on a per-topic basis. For each topic, we lay out the + * available partitions in numeric order and the consumers in lexicographic + * order. We then divide the number of partitions by the total number of + * consumers to determine the number of partitions to assign to each consumer. + * If it does not evenly divide, then the first few consumers will have one + * extra partition. + * + * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, + * and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, + * t1p0, t1p1, and t1p2. + * + * The assignment will be: + * C0: [t0p0, t0p1, t1p0, t1p1] + * C1: [t0p2, t1p2] + */ + +rd_kafka_resp_err_t +rd_kafka_range_assignor_assign_cb(rd_kafka_t *rk, + const rd_kafka_assignor_t *rkas, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t **eligible_topics, + size_t eligible_topic_cnt, + char *errstr, + size_t errstr_size, + void *opaque) { + unsigned int ti; + int i; + + /* The range assignor works on a per-topic basis. */ + for (ti = 0; ti < eligible_topic_cnt; ti++) { + rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti]; + int numPartitionsPerConsumer; + int consumersWithExtraPartition; + + /* For each topic, we lay out the available partitions in + * numeric order and the consumers in lexicographic order. */ + rd_list_sort(&eligible_topic->members, + rd_kafka_group_member_cmp); + + /* We then divide the number of partitions by the total number + * of consumers to determine the number of partitions to assign + * to each consumer. */ + numPartitionsPerConsumer = + eligible_topic->metadata->partition_cnt / + rd_list_cnt(&eligible_topic->members); + + /* If it does not evenly divide, then the first few consumers + * will have one extra partition. */ + consumersWithExtraPartition = + eligible_topic->metadata->partition_cnt % + rd_list_cnt(&eligible_topic->members); + + rd_kafka_dbg(rk, CGRP, "ASSIGN", + "range: Topic %s with %d partition(s) and " + "%d subscribing member(s)", + eligible_topic->metadata->topic, + eligible_topic->metadata->partition_cnt, + rd_list_cnt(&eligible_topic->members)); + + for (i = 0; i < rd_list_cnt(&eligible_topic->members); i++) { + rd_kafka_group_member_t *rkgm = + rd_list_elem(&eligible_topic->members, i); + int start = numPartitionsPerConsumer * i + + RD_MIN(i, consumersWithExtraPartition); + int length = + numPartitionsPerConsumer + + (i + 1 > consumersWithExtraPartition ? 0 : 1); + + if (length == 0) + continue; + + rd_kafka_dbg(rk, CGRP, "ASSIGN", + "range: Member \"%s\": " + "assigned topic %s partitions %d..%d", + rkgm->rkgm_member_id->str, + eligible_topic->metadata->topic, start, + start + length - 1); + rd_kafka_topic_partition_list_add_range( + rkgm->rkgm_assignment, + eligible_topic->metadata->topic, start, + start + length - 1); + } + } + + return 0; +} + + + +/** + * @brief Initialzie and add range assignor. + */ +rd_kafka_resp_err_t rd_kafka_range_assignor_init(rd_kafka_t *rk) { + return rd_kafka_assignor_add( + rk, "consumer", "range", RD_KAFKA_REBALANCE_PROTOCOL_EAGER, + rd_kafka_range_assignor_assign_cb, + rd_kafka_assignor_get_metadata_with_empty_userdata, NULL, NULL, + NULL, NULL); +} |