diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignor.h | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignor.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignor.h | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignor.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignor.h new file mode 100644 index 00000000..b90e7dc9 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignor.h @@ -0,0 +1,212 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2015 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _RDKAFKA_ASSIGNOR_H_ +#define _RDKAFKA_ASSIGNOR_H_ + + + +/*! + * Enumerates the different rebalance protocol types. + * + * @sa rd_kafka_rebalance_protocol() + */ +typedef enum rd_kafka_rebalance_protocol_t { + RD_KAFKA_REBALANCE_PROTOCOL_NONE, /**< Rebalance protocol is + unknown */ + RD_KAFKA_REBALANCE_PROTOCOL_EAGER, /**< Eager rebalance + protocol */ + RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE /**< Cooperative + rebalance protocol*/ +} rd_kafka_rebalance_protocol_t; + + + +typedef struct rd_kafka_group_member_s { + /** Subscribed topics (partition field is ignored). */ + rd_kafka_topic_partition_list_t *rkgm_subscription; + /** Partitions assigned to this member after running the assignor. + * E.g., the current assignment coming out of the rebalance. */ + rd_kafka_topic_partition_list_t *rkgm_assignment; + /** Partitions reported as currently owned by the member, read + * from consumer metadata. E.g., the current assignment going into + * the rebalance. */ + rd_kafka_topic_partition_list_t *rkgm_owned; + /** List of eligible topics in subscription. E.g., subscribed topics + * that exist. */ + rd_list_t rkgm_eligible; + /** Member id (e.g., client.id-some-uuid). */ + rd_kafkap_str_t *rkgm_member_id; + /** Group instance id. */ + rd_kafkap_str_t *rkgm_group_instance_id; + /** Member-specific opaque userdata. */ + rd_kafkap_bytes_t *rkgm_userdata; + /** Member metadata, e.g., the currently owned partitions. */ + rd_kafkap_bytes_t *rkgm_member_metadata; + /** Group generation id. */ + int rkgm_generation; +} rd_kafka_group_member_t; + + +int rd_kafka_group_member_cmp(const void *_a, const void *_b); + +int rd_kafka_group_member_find_subscription(rd_kafka_t *rk, + const rd_kafka_group_member_t *rkgm, + const char *topic); + + +/** + * Structure to hold metadata for a single topic and all its + * subscribing members. + */ +typedef struct rd_kafka_assignor_topic_s { + const rd_kafka_metadata_topic_t *metadata; + rd_list_t members; /* rd_kafka_group_member_t * */ +} rd_kafka_assignor_topic_t; + + +int rd_kafka_assignor_topic_cmp(const void *_a, const void *_b); + + +typedef struct rd_kafka_assignor_s { + rd_kafkap_str_t *rkas_protocol_type; + rd_kafkap_str_t *rkas_protocol_name; + + int rkas_enabled; + + /** Order for strategies. */ + int rkas_index; + + rd_kafka_rebalance_protocol_t rkas_protocol; + + rd_kafka_resp_err_t (*rkas_assign_cb)( + rd_kafka_t *rk, + const struct rd_kafka_assignor_s *rkas, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t **eligible_topics, + size_t eligible_topic_cnt, + char *errstr, + size_t errstr_size, + void *opaque); + + rd_kafkap_bytes_t *(*rkas_get_metadata_cb)( + const struct rd_kafka_assignor_s *rkas, + void *assignor_state, + const rd_list_t *topics, + const rd_kafka_topic_partition_list_t *owned_partitions); + + void (*rkas_on_assignment_cb)( + const struct rd_kafka_assignor_s *rkas, + void **assignor_state, + const rd_kafka_topic_partition_list_t *assignment, + const rd_kafkap_bytes_t *assignment_userdata, + const rd_kafka_consumer_group_metadata_t *rkcgm); + + void (*rkas_destroy_state_cb)(void *assignor_state); + + int (*rkas_unittest)(void); + + void *rkas_opaque; +} rd_kafka_assignor_t; + + +rd_kafka_resp_err_t rd_kafka_assignor_add( + rd_kafka_t *rk, + const char *protocol_type, + const char *protocol_name, + rd_kafka_rebalance_protocol_t rebalance_protocol, + rd_kafka_resp_err_t (*assign_cb)( + rd_kafka_t *rk, + const struct rd_kafka_assignor_s *rkas, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t **eligible_topics, + size_t eligible_topic_cnt, + char *errstr, + size_t errstr_size, + void *opaque), + rd_kafkap_bytes_t *(*get_metadata_cb)( + const struct rd_kafka_assignor_s *rkas, + void *assignor_state, + const rd_list_t *topics, + const rd_kafka_topic_partition_list_t *owned_partitions), + void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas, + void **assignor_state, + const rd_kafka_topic_partition_list_t *assignment, + const rd_kafkap_bytes_t *userdata, + const rd_kafka_consumer_group_metadata_t *rkcgm), + void (*destroy_state_cb)(void *assignor_state), + int (*unittest_cb)(void), + void *opaque); + +rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( + const rd_list_t *topics, + const void *userdata, + size_t userdata_size, + const rd_kafka_topic_partition_list_t *owned_partitions); + +rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata( + const rd_kafka_assignor_t *rkas, + void *assignor_state, + const rd_list_t *topics, + const rd_kafka_topic_partition_list_t *owned_partitions); + + +void rd_kafka_assignor_update_subscription( + const rd_kafka_assignor_t *rkas, + const rd_kafka_topic_partition_list_t *subscription); + + +rd_kafka_resp_err_t rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, + const rd_kafka_assignor_t *rkas, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt, + char *errstr, + size_t errstr_size); + +rd_kafka_assignor_t *rd_kafka_assignor_find(rd_kafka_t *rk, + const char *protocol); + +int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size); +void rd_kafka_assignors_term(rd_kafka_t *rk); + + + +void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm); + + +rd_kafka_resp_err_t rd_kafka_range_assignor_init(rd_kafka_t *rk); +rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_init(rd_kafka_t *rk); +rd_kafka_resp_err_t rd_kafka_sticky_assignor_init(rd_kafka_t *rk); + +#endif /* _RDKAFKA_ASSIGNOR_H_ */ |