diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c | 460 |
1 files changed, 0 insertions, 460 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c deleted file mode 100644 index a2fc085c5..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c +++ /dev/null @@ -1,460 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2016, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - - -#include "rdkafka_int.h" -#include "rdkafka_feature.h" - -#include <stdlib.h> - -static const char *rd_kafka_feature_names[] = {"MsgVer1", - "ApiVersion", - "BrokerBalancedConsumer", - "ThrottleTime", - "Sasl", - "SaslHandshake", - "BrokerGroupCoordinator", - "LZ4", - "OffsetTime", - "MsgVer2", - "IdempotentProducer", - "ZSTD", - "SaslAuthReq", - "UnitTest", - NULL}; - - -static const struct rd_kafka_feature_map { - /* RD_KAFKA_FEATURE_... */ - int feature; - - /* Depends on the following ApiVersions overlapping with - * what the broker supports: */ - struct rd_kafka_ApiVersion depends[RD_KAFKAP__NUM]; - -} rd_kafka_feature_map[] = { - /** - * @brief List of features and the ApiVersions they depend on. - * - * The dependency list consists of the ApiKey followed by this - * client's supported minimum and maximum API versions. - * As long as this list and its versions overlaps with the - * broker supported API versions the feature will be enabled. - */ - { - - /* @brief >=0.10.0: Message.MagicByte version 1: - * Relative offsets (KIP-31) and message timestamps (KIP-32). */ - .feature = RD_KAFKA_FEATURE_MSGVER1, - .depends = - { - {RD_KAFKAP_Produce, 2, 2}, - {RD_KAFKAP_Fetch, 2, 2}, - {-1}, - }, - }, - { - /* @brief >=0.11.0: Message.MagicByte version 2 */ - .feature = RD_KAFKA_FEATURE_MSGVER2, - .depends = - { - {RD_KAFKAP_Produce, 3, 3}, - {RD_KAFKAP_Fetch, 4, 4}, - {-1}, - }, - }, - { - /* @brief >=0.10.0: ApiVersionQuery support. - * @remark This is a bit of chicken-and-egg problem but needs to be - * set by feature_check() to avoid the feature being cleared - * even when broker supports it. */ - .feature = RD_KAFKA_FEATURE_APIVERSION, - .depends = - { - {RD_KAFKAP_ApiVersion, 0, 0}, - {-1}, - }, - }, - { - /* @brief >=0.8.2.0: Broker-based Group coordinator */ - .feature = RD_KAFKA_FEATURE_BROKER_GROUP_COORD, - .depends = - { - {RD_KAFKAP_FindCoordinator, 0, 0}, - {-1}, - }, - }, - { - /* @brief >=0.9.0: Broker-based balanced consumer groups. */ - .feature = RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER, - .depends = - { - {RD_KAFKAP_FindCoordinator, 0, 0}, - {RD_KAFKAP_OffsetCommit, 1, 2}, - {RD_KAFKAP_OffsetFetch, 1, 1}, - {RD_KAFKAP_JoinGroup, 0, 0}, - {RD_KAFKAP_SyncGroup, 0, 0}, - {RD_KAFKAP_Heartbeat, 0, 0}, - {RD_KAFKAP_LeaveGroup, 0, 0}, - {-1}, - }, - }, - { - /* @brief >=0.9.0: ThrottleTime */ - .feature = RD_KAFKA_FEATURE_THROTTLETIME, - .depends = - { - {RD_KAFKAP_Produce, 1, 2}, - {RD_KAFKAP_Fetch, 1, 2}, - {-1}, - }, - - }, - { - /* @brief >=0.9.0: SASL (GSSAPI) authentication. - * Since SASL is not using the Kafka protocol - * we must use something else to map us to the - * proper broker version support: - * JoinGroup was released along with SASL in 0.9.0. */ - .feature = RD_KAFKA_FEATURE_SASL_GSSAPI, - .depends = - { - {RD_KAFKAP_JoinGroup, 0, 0}, - {-1}, - }, - }, - { - /* @brief >=0.10.0: SASL mechanism handshake (KIP-43) - * to automatically support other mechanisms - * than GSSAPI, such as PLAIN. */ - .feature = RD_KAFKA_FEATURE_SASL_HANDSHAKE, - .depends = - { - {RD_KAFKAP_SaslHandshake, 0, 0}, - {-1}, - }, - }, - { - /* @brief >=0.8.2: LZ4 compression. - * Since LZ4 initially did not rely on a specific API - * type or version (it does in >=0.10.0) - * we must use something else to map us to the - * proper broker version support: - * GrooupCoordinator was released in 0.8.2 */ - .feature = RD_KAFKA_FEATURE_LZ4, - .depends = - { - {RD_KAFKAP_FindCoordinator, 0, 0}, - {-1}, - }, - }, - {/* @brief >=0.10.1.0: Offset v1 (KIP-79) - * Time-based offset requests */ - .feature = RD_KAFKA_FEATURE_OFFSET_TIME, - .depends = - { - {RD_KAFKAP_ListOffsets, 1, 1}, - {-1}, - }}, - {/* @brief >=0.11.0.0: Idempotent Producer*/ - .feature = RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER, - .depends = - { - {RD_KAFKAP_InitProducerId, 0, 0}, - {-1}, - }}, - { - /* @brief >=2.1.0-IV2: Support ZStandard Compression Codec (KIP-110) */ - .feature = RD_KAFKA_FEATURE_ZSTD, - .depends = - { - {RD_KAFKAP_Produce, 7, 7}, - {RD_KAFKAP_Fetch, 10, 10}, - {-1}, - }, - }, - { - /* @brief >=1.0.0: SaslAuthenticateRequest */ - .feature = RD_KAFKA_FEATURE_SASL_AUTH_REQ, - .depends = - { - {RD_KAFKAP_SaslHandshake, 1, 1}, - {RD_KAFKAP_SaslAuthenticate, 0, 0}, - {-1}, - }, - }, - {.feature = 0}, /* sentinel */ -}; - - - -/** - * @brief In absence of KIP-35 support in earlier broker versions we provide - * hardcoded lists that corresponds to older broker versions. - */ - -/* >= 0.10.0.0: dummy for all future versions that support ApiVersionRequest */ -static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_Queryable[] = { - {RD_KAFKAP_ApiVersion, 0, 0}}; - - -/* =~ 0.9.0 */ -static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_9_0[] = { - {RD_KAFKAP_Produce, 0, 1}, {RD_KAFKAP_Fetch, 0, 1}, - {RD_KAFKAP_ListOffsets, 0, 0}, {RD_KAFKAP_Metadata, 0, 0}, - {RD_KAFKAP_OffsetCommit, 0, 2}, {RD_KAFKAP_OffsetFetch, 0, 1}, - {RD_KAFKAP_FindCoordinator, 0, 0}, {RD_KAFKAP_JoinGroup, 0, 0}, - {RD_KAFKAP_Heartbeat, 0, 0}, {RD_KAFKAP_LeaveGroup, 0, 0}, - {RD_KAFKAP_SyncGroup, 0, 0}, {RD_KAFKAP_DescribeGroups, 0, 0}, - {RD_KAFKAP_ListGroups, 0, 0}}; - -/* =~ 0.8.2 */ -static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_2[] = { - {RD_KAFKAP_Produce, 0, 0}, {RD_KAFKAP_Fetch, 0, 0}, - {RD_KAFKAP_ListOffsets, 0, 0}, {RD_KAFKAP_Metadata, 0, 0}, - {RD_KAFKAP_OffsetCommit, 0, 1}, {RD_KAFKAP_OffsetFetch, 0, 1}, - {RD_KAFKAP_FindCoordinator, 0, 0}}; - -/* =~ 0.8.1 */ -static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_1[] = { - {RD_KAFKAP_Produce, 0, 0}, {RD_KAFKAP_Fetch, 0, 0}, - {RD_KAFKAP_ListOffsets, 0, 0}, {RD_KAFKAP_Metadata, 0, 0}, - {RD_KAFKAP_OffsetCommit, 0, 1}, {RD_KAFKAP_OffsetFetch, 0, 0}}; - -/* =~ 0.8.0 */ -static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_0[] = { - {RD_KAFKAP_Produce, 0, 0}, - {RD_KAFKAP_Fetch, 0, 0}, - {RD_KAFKAP_ListOffsets, 0, 0}, - {RD_KAFKAP_Metadata, 0, 0}}; - - -/** - * @brief Returns the ApiVersion list for legacy broker versions that do not - * support the ApiVersionQuery request. E.g., brokers <0.10.0. - * - * @param broker_version Broker version to match (longest prefix matching). - * @param use_default If no match is found return the default APIs (but return - * 0). - * - * @returns 1 if \p broker_version was recognized: \p *apisp will point to - * the ApiVersion list and *api_cntp will be set to its element count. - * 0 if \p broker_version was not recognized: \p *apisp remains - * unchanged. - * - */ -int rd_kafka_get_legacy_ApiVersions(const char *broker_version, - struct rd_kafka_ApiVersion **apisp, - size_t *api_cntp, - const char *fallback) { - static const struct { - const char *pfx; - struct rd_kafka_ApiVersion *apis; - size_t api_cnt; - } vermap[] = { -#define _VERMAP(PFX, APIS) {PFX, APIS, RD_ARRAYSIZE(APIS)} - _VERMAP("0.9.0", rd_kafka_ApiVersion_0_9_0), - _VERMAP("0.8.2", rd_kafka_ApiVersion_0_8_2), - _VERMAP("0.8.1", rd_kafka_ApiVersion_0_8_1), - _VERMAP("0.8.0", rd_kafka_ApiVersion_0_8_0), - {"0.7.", NULL}, /* Unsupported */ - {"0.6.", NULL}, /* Unsupported */ - _VERMAP("", rd_kafka_ApiVersion_Queryable), - {NULL}}; - int i; - int fallback_i = -1; - int ret = 0; - - *apisp = NULL; - *api_cntp = 0; - - for (i = 0; vermap[i].pfx; i++) { - if (!strncmp(vermap[i].pfx, broker_version, - strlen(vermap[i].pfx))) { - if (!vermap[i].apis) - return 0; - *apisp = vermap[i].apis; - *api_cntp = vermap[i].api_cnt; - ret = 1; - break; - } else if (fallback && !strcmp(vermap[i].pfx, fallback)) - fallback_i = i; - } - - if (!*apisp && fallback) { - rd_kafka_assert(NULL, fallback_i != -1); - *apisp = vermap[fallback_i].apis; - *api_cntp = vermap[fallback_i].api_cnt; - } - - return ret; -} - - -/** - * @returns 1 if the provided broker version (probably) - * supports api.version.request. - */ -int rd_kafka_ApiVersion_is_queryable(const char *broker_version) { - struct rd_kafka_ApiVersion *apis; - size_t api_cnt; - - - if (!rd_kafka_get_legacy_ApiVersions(broker_version, &apis, &api_cnt, - 0)) - return 0; - - return apis == rd_kafka_ApiVersion_Queryable; -} - - - -/** - * @brief Check if match's versions overlaps with \p apis. - * - * @returns 1 if true, else 0. - * @remark \p apis must be sorted using rd_kafka_ApiVersion_key_cmp() - */ -static RD_INLINE int -rd_kafka_ApiVersion_check(const struct rd_kafka_ApiVersion *apis, - size_t api_cnt, - const struct rd_kafka_ApiVersion *match) { - const struct rd_kafka_ApiVersion *api; - - api = bsearch(match, apis, api_cnt, sizeof(*apis), - rd_kafka_ApiVersion_key_cmp); - if (unlikely(!api)) - return 0; - - return match->MinVer <= api->MaxVer && api->MinVer <= match->MaxVer; -} - - -/** - * @brief Compare broker's supported API versions to our feature request map - * and enable/disable features accordingly. - * - * @param broker_apis Broker's supported APIs. If NULL the - * \p broker.version.fallback configuration property will specify a - * default legacy version to use. - * @param broker_api_cnt Number of elements in \p broker_apis - * - * @returns the supported features (bitmask) to enable. - */ -int rd_kafka_features_check(rd_kafka_broker_t *rkb, - struct rd_kafka_ApiVersion *broker_apis, - size_t broker_api_cnt) { - int features = 0; - int i; - - /* Scan through features. */ - for (i = 0; rd_kafka_feature_map[i].feature != 0; i++) { - const struct rd_kafka_ApiVersion *match; - int fails = 0; - - /* For each feature check that all its API dependencies - * can be fullfilled. */ - - for (match = &rd_kafka_feature_map[i].depends[0]; - match->ApiKey != -1; match++) { - int r; - - r = rd_kafka_ApiVersion_check(broker_apis, - broker_api_cnt, match); - - rd_rkb_dbg(rkb, FEATURE, "APIVERSION", - " Feature %s: %s (%hd..%hd) " - "%ssupported by broker", - rd_kafka_features2str( - rd_kafka_feature_map[i].feature), - rd_kafka_ApiKey2str(match->ApiKey), - match->MinVer, match->MaxVer, - r ? "" : "NOT "); - - fails += !r; - } - - rd_rkb_dbg( - rkb, FEATURE, "APIVERSION", "%s feature %s", - fails ? "Disabling" : "Enabling", - rd_kafka_features2str(rd_kafka_feature_map[i].feature)); - - - if (!fails) - features |= rd_kafka_feature_map[i].feature; - } - - return features; -} - - - -/** - * @brief Make an allocated and sorted copy of \p src. - */ -void rd_kafka_ApiVersions_copy(const struct rd_kafka_ApiVersion *src, - size_t src_cnt, - struct rd_kafka_ApiVersion **dstp, - size_t *dst_cntp) { - *dstp = rd_memdup(src, sizeof(*src) * src_cnt); - *dst_cntp = src_cnt; - qsort(*dstp, *dst_cntp, sizeof(**dstp), rd_kafka_ApiVersion_key_cmp); -} - - - -/** - * @returns a human-readable feature flag string. - */ -const char *rd_kafka_features2str(int features) { - static RD_TLS char ret[4][256]; - size_t of = 0; - static RD_TLS int reti = 0; - int i; - - reti = (reti + 1) % 4; - - *ret[reti] = '\0'; - for (i = 0; rd_kafka_feature_names[i]; i++) { - int r; - if (!(features & (1 << i))) - continue; - - r = rd_snprintf(ret[reti] + of, sizeof(ret[reti]) - of, "%s%s", - of == 0 ? "" : ",", rd_kafka_feature_names[i]); - if ((size_t)r > sizeof(ret[reti]) - of) { - /* Out of space */ - memcpy(&ret[reti][sizeof(ret[reti]) - 3], "..", 3); - break; - } - - of += r; - } - - return ret[reti]; -} |