summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c460
1 files changed, 0 insertions, 460 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c
deleted file mode 100644
index a2fc085c5..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-#include "rdkafka_int.h"
-#include "rdkafka_feature.h"
-
-#include <stdlib.h>
-
-static const char *rd_kafka_feature_names[] = {"MsgVer1",
- "ApiVersion",
- "BrokerBalancedConsumer",
- "ThrottleTime",
- "Sasl",
- "SaslHandshake",
- "BrokerGroupCoordinator",
- "LZ4",
- "OffsetTime",
- "MsgVer2",
- "IdempotentProducer",
- "ZSTD",
- "SaslAuthReq",
- "UnitTest",
- NULL};
-
-
-static const struct rd_kafka_feature_map {
- /* RD_KAFKA_FEATURE_... */
- int feature;
-
- /* Depends on the following ApiVersions overlapping with
- * what the broker supports: */
- struct rd_kafka_ApiVersion depends[RD_KAFKAP__NUM];
-
-} rd_kafka_feature_map[] = {
- /**
- * @brief List of features and the ApiVersions they depend on.
- *
- * The dependency list consists of the ApiKey followed by this
- * client's supported minimum and maximum API versions.
- * As long as this list and its versions overlaps with the
- * broker supported API versions the feature will be enabled.
- */
- {
-
- /* @brief >=0.10.0: Message.MagicByte version 1:
- * Relative offsets (KIP-31) and message timestamps (KIP-32). */
- .feature = RD_KAFKA_FEATURE_MSGVER1,
- .depends =
- {
- {RD_KAFKAP_Produce, 2, 2},
- {RD_KAFKAP_Fetch, 2, 2},
- {-1},
- },
- },
- {
- /* @brief >=0.11.0: Message.MagicByte version 2 */
- .feature = RD_KAFKA_FEATURE_MSGVER2,
- .depends =
- {
- {RD_KAFKAP_Produce, 3, 3},
- {RD_KAFKAP_Fetch, 4, 4},
- {-1},
- },
- },
- {
- /* @brief >=0.10.0: ApiVersionQuery support.
- * @remark This is a bit of chicken-and-egg problem but needs to be
- * set by feature_check() to avoid the feature being cleared
- * even when broker supports it. */
- .feature = RD_KAFKA_FEATURE_APIVERSION,
- .depends =
- {
- {RD_KAFKAP_ApiVersion, 0, 0},
- {-1},
- },
- },
- {
- /* @brief >=0.8.2.0: Broker-based Group coordinator */
- .feature = RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
- .depends =
- {
- {RD_KAFKAP_FindCoordinator, 0, 0},
- {-1},
- },
- },
- {
- /* @brief >=0.9.0: Broker-based balanced consumer groups. */
- .feature = RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER,
- .depends =
- {
- {RD_KAFKAP_FindCoordinator, 0, 0},
- {RD_KAFKAP_OffsetCommit, 1, 2},
- {RD_KAFKAP_OffsetFetch, 1, 1},
- {RD_KAFKAP_JoinGroup, 0, 0},
- {RD_KAFKAP_SyncGroup, 0, 0},
- {RD_KAFKAP_Heartbeat, 0, 0},
- {RD_KAFKAP_LeaveGroup, 0, 0},
- {-1},
- },
- },
- {
- /* @brief >=0.9.0: ThrottleTime */
- .feature = RD_KAFKA_FEATURE_THROTTLETIME,
- .depends =
- {
- {RD_KAFKAP_Produce, 1, 2},
- {RD_KAFKAP_Fetch, 1, 2},
- {-1},
- },
-
- },
- {
- /* @brief >=0.9.0: SASL (GSSAPI) authentication.
- * Since SASL is not using the Kafka protocol
- * we must use something else to map us to the
- * proper broker version support:
- * JoinGroup was released along with SASL in 0.9.0. */
- .feature = RD_KAFKA_FEATURE_SASL_GSSAPI,
- .depends =
- {
- {RD_KAFKAP_JoinGroup, 0, 0},
- {-1},
- },
- },
- {
- /* @brief >=0.10.0: SASL mechanism handshake (KIP-43)
- * to automatically support other mechanisms
- * than GSSAPI, such as PLAIN. */
- .feature = RD_KAFKA_FEATURE_SASL_HANDSHAKE,
- .depends =
- {
- {RD_KAFKAP_SaslHandshake, 0, 0},
- {-1},
- },
- },
- {
- /* @brief >=0.8.2: LZ4 compression.
- * Since LZ4 initially did not rely on a specific API
- * type or version (it does in >=0.10.0)
- * we must use something else to map us to the
- * proper broker version support:
- * GrooupCoordinator was released in 0.8.2 */
- .feature = RD_KAFKA_FEATURE_LZ4,
- .depends =
- {
- {RD_KAFKAP_FindCoordinator, 0, 0},
- {-1},
- },
- },
- {/* @brief >=0.10.1.0: Offset v1 (KIP-79)
- * Time-based offset requests */
- .feature = RD_KAFKA_FEATURE_OFFSET_TIME,
- .depends =
- {
- {RD_KAFKAP_ListOffsets, 1, 1},
- {-1},
- }},
- {/* @brief >=0.11.0.0: Idempotent Producer*/
- .feature = RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER,
- .depends =
- {
- {RD_KAFKAP_InitProducerId, 0, 0},
- {-1},
- }},
- {
- /* @brief >=2.1.0-IV2: Support ZStandard Compression Codec (KIP-110) */
- .feature = RD_KAFKA_FEATURE_ZSTD,
- .depends =
- {
- {RD_KAFKAP_Produce, 7, 7},
- {RD_KAFKAP_Fetch, 10, 10},
- {-1},
- },
- },
- {
- /* @brief >=1.0.0: SaslAuthenticateRequest */
- .feature = RD_KAFKA_FEATURE_SASL_AUTH_REQ,
- .depends =
- {
- {RD_KAFKAP_SaslHandshake, 1, 1},
- {RD_KAFKAP_SaslAuthenticate, 0, 0},
- {-1},
- },
- },
- {.feature = 0}, /* sentinel */
-};
-
-
-
-/**
- * @brief In absence of KIP-35 support in earlier broker versions we provide
- * hardcoded lists that corresponds to older broker versions.
- */
-
-/* >= 0.10.0.0: dummy for all future versions that support ApiVersionRequest */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_Queryable[] = {
- {RD_KAFKAP_ApiVersion, 0, 0}};
-
-
-/* =~ 0.9.0 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_9_0[] = {
- {RD_KAFKAP_Produce, 0, 1}, {RD_KAFKAP_Fetch, 0, 1},
- {RD_KAFKAP_ListOffsets, 0, 0}, {RD_KAFKAP_Metadata, 0, 0},
- {RD_KAFKAP_OffsetCommit, 0, 2}, {RD_KAFKAP_OffsetFetch, 0, 1},
- {RD_KAFKAP_FindCoordinator, 0, 0}, {RD_KAFKAP_JoinGroup, 0, 0},
- {RD_KAFKAP_Heartbeat, 0, 0}, {RD_KAFKAP_LeaveGroup, 0, 0},
- {RD_KAFKAP_SyncGroup, 0, 0}, {RD_KAFKAP_DescribeGroups, 0, 0},
- {RD_KAFKAP_ListGroups, 0, 0}};
-
-/* =~ 0.8.2 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_2[] = {
- {RD_KAFKAP_Produce, 0, 0}, {RD_KAFKAP_Fetch, 0, 0},
- {RD_KAFKAP_ListOffsets, 0, 0}, {RD_KAFKAP_Metadata, 0, 0},
- {RD_KAFKAP_OffsetCommit, 0, 1}, {RD_KAFKAP_OffsetFetch, 0, 1},
- {RD_KAFKAP_FindCoordinator, 0, 0}};
-
-/* =~ 0.8.1 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_1[] = {
- {RD_KAFKAP_Produce, 0, 0}, {RD_KAFKAP_Fetch, 0, 0},
- {RD_KAFKAP_ListOffsets, 0, 0}, {RD_KAFKAP_Metadata, 0, 0},
- {RD_KAFKAP_OffsetCommit, 0, 1}, {RD_KAFKAP_OffsetFetch, 0, 0}};
-
-/* =~ 0.8.0 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_0[] = {
- {RD_KAFKAP_Produce, 0, 0},
- {RD_KAFKAP_Fetch, 0, 0},
- {RD_KAFKAP_ListOffsets, 0, 0},
- {RD_KAFKAP_Metadata, 0, 0}};
-
-
-/**
- * @brief Returns the ApiVersion list for legacy broker versions that do not
- * support the ApiVersionQuery request. E.g., brokers <0.10.0.
- *
- * @param broker_version Broker version to match (longest prefix matching).
- * @param use_default If no match is found return the default APIs (but return
- * 0).
- *
- * @returns 1 if \p broker_version was recognized: \p *apisp will point to
- * the ApiVersion list and *api_cntp will be set to its element count.
- * 0 if \p broker_version was not recognized: \p *apisp remains
- * unchanged.
- *
- */
-int rd_kafka_get_legacy_ApiVersions(const char *broker_version,
- struct rd_kafka_ApiVersion **apisp,
- size_t *api_cntp,
- const char *fallback) {
- static const struct {
- const char *pfx;
- struct rd_kafka_ApiVersion *apis;
- size_t api_cnt;
- } vermap[] = {
-#define _VERMAP(PFX, APIS) {PFX, APIS, RD_ARRAYSIZE(APIS)}
- _VERMAP("0.9.0", rd_kafka_ApiVersion_0_9_0),
- _VERMAP("0.8.2", rd_kafka_ApiVersion_0_8_2),
- _VERMAP("0.8.1", rd_kafka_ApiVersion_0_8_1),
- _VERMAP("0.8.0", rd_kafka_ApiVersion_0_8_0),
- {"0.7.", NULL}, /* Unsupported */
- {"0.6.", NULL}, /* Unsupported */
- _VERMAP("", rd_kafka_ApiVersion_Queryable),
- {NULL}};
- int i;
- int fallback_i = -1;
- int ret = 0;
-
- *apisp = NULL;
- *api_cntp = 0;
-
- for (i = 0; vermap[i].pfx; i++) {
- if (!strncmp(vermap[i].pfx, broker_version,
- strlen(vermap[i].pfx))) {
- if (!vermap[i].apis)
- return 0;
- *apisp = vermap[i].apis;
- *api_cntp = vermap[i].api_cnt;
- ret = 1;
- break;
- } else if (fallback && !strcmp(vermap[i].pfx, fallback))
- fallback_i = i;
- }
-
- if (!*apisp && fallback) {
- rd_kafka_assert(NULL, fallback_i != -1);
- *apisp = vermap[fallback_i].apis;
- *api_cntp = vermap[fallback_i].api_cnt;
- }
-
- return ret;
-}
-
-
-/**
- * @returns 1 if the provided broker version (probably)
- * supports api.version.request.
- */
-int rd_kafka_ApiVersion_is_queryable(const char *broker_version) {
- struct rd_kafka_ApiVersion *apis;
- size_t api_cnt;
-
-
- if (!rd_kafka_get_legacy_ApiVersions(broker_version, &apis, &api_cnt,
- 0))
- return 0;
-
- return apis == rd_kafka_ApiVersion_Queryable;
-}
-
-
-
-/**
- * @brief Check if match's versions overlaps with \p apis.
- *
- * @returns 1 if true, else 0.
- * @remark \p apis must be sorted using rd_kafka_ApiVersion_key_cmp()
- */
-static RD_INLINE int
-rd_kafka_ApiVersion_check(const struct rd_kafka_ApiVersion *apis,
- size_t api_cnt,
- const struct rd_kafka_ApiVersion *match) {
- const struct rd_kafka_ApiVersion *api;
-
- api = bsearch(match, apis, api_cnt, sizeof(*apis),
- rd_kafka_ApiVersion_key_cmp);
- if (unlikely(!api))
- return 0;
-
- return match->MinVer <= api->MaxVer && api->MinVer <= match->MaxVer;
-}
-
-
-/**
- * @brief Compare broker's supported API versions to our feature request map
- * and enable/disable features accordingly.
- *
- * @param broker_apis Broker's supported APIs. If NULL the
- * \p broker.version.fallback configuration property will specify a
- * default legacy version to use.
- * @param broker_api_cnt Number of elements in \p broker_apis
- *
- * @returns the supported features (bitmask) to enable.
- */
-int rd_kafka_features_check(rd_kafka_broker_t *rkb,
- struct rd_kafka_ApiVersion *broker_apis,
- size_t broker_api_cnt) {
- int features = 0;
- int i;
-
- /* Scan through features. */
- for (i = 0; rd_kafka_feature_map[i].feature != 0; i++) {
- const struct rd_kafka_ApiVersion *match;
- int fails = 0;
-
- /* For each feature check that all its API dependencies
- * can be fullfilled. */
-
- for (match = &rd_kafka_feature_map[i].depends[0];
- match->ApiKey != -1; match++) {
- int r;
-
- r = rd_kafka_ApiVersion_check(broker_apis,
- broker_api_cnt, match);
-
- rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
- " Feature %s: %s (%hd..%hd) "
- "%ssupported by broker",
- rd_kafka_features2str(
- rd_kafka_feature_map[i].feature),
- rd_kafka_ApiKey2str(match->ApiKey),
- match->MinVer, match->MaxVer,
- r ? "" : "NOT ");
-
- fails += !r;
- }
-
- rd_rkb_dbg(
- rkb, FEATURE, "APIVERSION", "%s feature %s",
- fails ? "Disabling" : "Enabling",
- rd_kafka_features2str(rd_kafka_feature_map[i].feature));
-
-
- if (!fails)
- features |= rd_kafka_feature_map[i].feature;
- }
-
- return features;
-}
-
-
-
-/**
- * @brief Make an allocated and sorted copy of \p src.
- */
-void rd_kafka_ApiVersions_copy(const struct rd_kafka_ApiVersion *src,
- size_t src_cnt,
- struct rd_kafka_ApiVersion **dstp,
- size_t *dst_cntp) {
- *dstp = rd_memdup(src, sizeof(*src) * src_cnt);
- *dst_cntp = src_cnt;
- qsort(*dstp, *dst_cntp, sizeof(**dstp), rd_kafka_ApiVersion_key_cmp);
-}
-
-
-
-/**
- * @returns a human-readable feature flag string.
- */
-const char *rd_kafka_features2str(int features) {
- static RD_TLS char ret[4][256];
- size_t of = 0;
- static RD_TLS int reti = 0;
- int i;
-
- reti = (reti + 1) % 4;
-
- *ret[reti] = '\0';
- for (i = 0; rd_kafka_feature_names[i]; i++) {
- int r;
- if (!(features & (1 << i)))
- continue;
-
- r = rd_snprintf(ret[reti] + of, sizeof(ret[reti]) - of, "%s%s",
- of == 0 ? "" : ",", rd_kafka_feature_names[i]);
- if ((size_t)r > sizeof(ret[reti]) - of) {
- /* Out of space */
- memcpy(&ret[reti][sizeof(ret[reti]) - 3], "..", 3);
- break;
- }
-
- of += r;
- }
-
- return ret[reti];
-}