summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c460
1 files changed, 460 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c
new file mode 100644
index 000000000..a2fc085c5
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_feature.c
@@ -0,0 +1,460 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2016, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include "rdkafka_int.h"
+#include "rdkafka_feature.h"
+
+#include <stdlib.h>
+
+static const char *rd_kafka_feature_names[] = {"MsgVer1",
+ "ApiVersion",
+ "BrokerBalancedConsumer",
+ "ThrottleTime",
+ "Sasl",
+ "SaslHandshake",
+ "BrokerGroupCoordinator",
+ "LZ4",
+ "OffsetTime",
+ "MsgVer2",
+ "IdempotentProducer",
+ "ZSTD",
+ "SaslAuthReq",
+ "UnitTest",
+ NULL};
+
+
+static const struct rd_kafka_feature_map {
+ /* RD_KAFKA_FEATURE_... */
+ int feature;
+
+ /* Depends on the following ApiVersions overlapping with
+ * what the broker supports: */
+ struct rd_kafka_ApiVersion depends[RD_KAFKAP__NUM];
+
+} rd_kafka_feature_map[] = {
+ /**
+ * @brief List of features and the ApiVersions they depend on.
+ *
+ * The dependency list consists of the ApiKey followed by this
+ * client's supported minimum and maximum API versions.
+ * As long as this list and its versions overlaps with the
+ * broker supported API versions the feature will be enabled.
+ */
+ {
+
+ /* @brief >=0.10.0: Message.MagicByte version 1:
+ * Relative offsets (KIP-31) and message timestamps (KIP-32). */
+ .feature = RD_KAFKA_FEATURE_MSGVER1,
+ .depends =
+ {
+ {RD_KAFKAP_Produce, 2, 2},
+ {RD_KAFKAP_Fetch, 2, 2},
+ {-1},
+ },
+ },
+ {
+ /* @brief >=0.11.0: Message.MagicByte version 2 */
+ .feature = RD_KAFKA_FEATURE_MSGVER2,
+ .depends =
+ {
+ {RD_KAFKAP_Produce, 3, 3},
+ {RD_KAFKAP_Fetch, 4, 4},
+ {-1},
+ },
+ },
+ {
+ /* @brief >=0.10.0: ApiVersionQuery support.
+ * @remark This is a bit of chicken-and-egg problem but needs to be
+ * set by feature_check() to avoid the feature being cleared
+ * even when broker supports it. */
+ .feature = RD_KAFKA_FEATURE_APIVERSION,
+ .depends =
+ {
+ {RD_KAFKAP_ApiVersion, 0, 0},
+ {-1},
+ },
+ },
+ {
+ /* @brief >=0.8.2.0: Broker-based Group coordinator */
+ .feature = RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
+ .depends =
+ {
+ {RD_KAFKAP_FindCoordinator, 0, 0},
+ {-1},
+ },
+ },
+ {
+ /* @brief >=0.9.0: Broker-based balanced consumer groups. */
+ .feature = RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER,
+ .depends =
+ {
+ {RD_KAFKAP_FindCoordinator, 0, 0},
+ {RD_KAFKAP_OffsetCommit, 1, 2},
+ {RD_KAFKAP_OffsetFetch, 1, 1},
+ {RD_KAFKAP_JoinGroup, 0, 0},
+ {RD_KAFKAP_SyncGroup, 0, 0},
+ {RD_KAFKAP_Heartbeat, 0, 0},
+ {RD_KAFKAP_LeaveGroup, 0, 0},
+ {-1},
+ },
+ },
+ {
+ /* @brief >=0.9.0: ThrottleTime */
+ .feature = RD_KAFKA_FEATURE_THROTTLETIME,
+ .depends =
+ {
+ {RD_KAFKAP_Produce, 1, 2},
+ {RD_KAFKAP_Fetch, 1, 2},
+ {-1},
+ },
+
+ },
+ {
+ /* @brief >=0.9.0: SASL (GSSAPI) authentication.
+ * Since SASL is not using the Kafka protocol
+ * we must use something else to map us to the
+ * proper broker version support:
+ * JoinGroup was released along with SASL in 0.9.0. */
+ .feature = RD_KAFKA_FEATURE_SASL_GSSAPI,
+ .depends =
+ {
+ {RD_KAFKAP_JoinGroup, 0, 0},
+ {-1},
+ },
+ },
+ {
+ /* @brief >=0.10.0: SASL mechanism handshake (KIP-43)
+ * to automatically support other mechanisms
+ * than GSSAPI, such as PLAIN. */
+ .feature = RD_KAFKA_FEATURE_SASL_HANDSHAKE,
+ .depends =
+ {
+ {RD_KAFKAP_SaslHandshake, 0, 0},
+ {-1},
+ },
+ },
+ {
+ /* @brief >=0.8.2: LZ4 compression.
+ * Since LZ4 initially did not rely on a specific API
+ * type or version (it does in >=0.10.0)
+ * we must use something else to map us to the
+ * proper broker version support:
+ * GrooupCoordinator was released in 0.8.2 */
+ .feature = RD_KAFKA_FEATURE_LZ4,
+ .depends =
+ {
+ {RD_KAFKAP_FindCoordinator, 0, 0},
+ {-1},
+ },
+ },
+ {/* @brief >=0.10.1.0: Offset v1 (KIP-79)
+ * Time-based offset requests */
+ .feature = RD_KAFKA_FEATURE_OFFSET_TIME,
+ .depends =
+ {
+ {RD_KAFKAP_ListOffsets, 1, 1},
+ {-1},
+ }},
+ {/* @brief >=0.11.0.0: Idempotent Producer*/
+ .feature = RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER,
+ .depends =
+ {
+ {RD_KAFKAP_InitProducerId, 0, 0},
+ {-1},
+ }},
+ {
+ /* @brief >=2.1.0-IV2: Support ZStandard Compression Codec (KIP-110) */
+ .feature = RD_KAFKA_FEATURE_ZSTD,
+ .depends =
+ {
+ {RD_KAFKAP_Produce, 7, 7},
+ {RD_KAFKAP_Fetch, 10, 10},
+ {-1},
+ },
+ },
+ {
+ /* @brief >=1.0.0: SaslAuthenticateRequest */
+ .feature = RD_KAFKA_FEATURE_SASL_AUTH_REQ,
+ .depends =
+ {
+ {RD_KAFKAP_SaslHandshake, 1, 1},
+ {RD_KAFKAP_SaslAuthenticate, 0, 0},
+ {-1},
+ },
+ },
+ {.feature = 0}, /* sentinel */
+};
+
+
+
+/**
+ * @brief In absence of KIP-35 support in earlier broker versions we provide
+ * hardcoded lists that corresponds to older broker versions.
+ */
+
+/* >= 0.10.0.0: dummy for all future versions that support ApiVersionRequest */
+static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_Queryable[] = {
+ {RD_KAFKAP_ApiVersion, 0, 0}};
+
+
+/* =~ 0.9.0 */
+static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_9_0[] = {
+ {RD_KAFKAP_Produce, 0, 1}, {RD_KAFKAP_Fetch, 0, 1},
+ {RD_KAFKAP_ListOffsets, 0, 0}, {RD_KAFKAP_Metadata, 0, 0},
+ {RD_KAFKAP_OffsetCommit, 0, 2}, {RD_KAFKAP_OffsetFetch, 0, 1},
+ {RD_KAFKAP_FindCoordinator, 0, 0}, {RD_KAFKAP_JoinGroup, 0, 0},
+ {RD_KAFKAP_Heartbeat, 0, 0}, {RD_KAFKAP_LeaveGroup, 0, 0},
+ {RD_KAFKAP_SyncGroup, 0, 0}, {RD_KAFKAP_DescribeGroups, 0, 0},
+ {RD_KAFKAP_ListGroups, 0, 0}};
+
+/* =~ 0.8.2 */
+static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_2[] = {
+ {RD_KAFKAP_Produce, 0, 0}, {RD_KAFKAP_Fetch, 0, 0},
+ {RD_KAFKAP_ListOffsets, 0, 0}, {RD_KAFKAP_Metadata, 0, 0},
+ {RD_KAFKAP_OffsetCommit, 0, 1}, {RD_KAFKAP_OffsetFetch, 0, 1},
+ {RD_KAFKAP_FindCoordinator, 0, 0}};
+
+/* =~ 0.8.1 */
+static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_1[] = {
+ {RD_KAFKAP_Produce, 0, 0}, {RD_KAFKAP_Fetch, 0, 0},
+ {RD_KAFKAP_ListOffsets, 0, 0}, {RD_KAFKAP_Metadata, 0, 0},
+ {RD_KAFKAP_OffsetCommit, 0, 1}, {RD_KAFKAP_OffsetFetch, 0, 0}};
+
+/* =~ 0.8.0 */
+static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_0[] = {
+ {RD_KAFKAP_Produce, 0, 0},
+ {RD_KAFKAP_Fetch, 0, 0},
+ {RD_KAFKAP_ListOffsets, 0, 0},
+ {RD_KAFKAP_Metadata, 0, 0}};
+
+
+/**
+ * @brief Returns the ApiVersion list for legacy broker versions that do not
+ * support the ApiVersionQuery request. E.g., brokers <0.10.0.
+ *
+ * @param broker_version Broker version to match (longest prefix matching).
+ * @param use_default If no match is found return the default APIs (but return
+ * 0).
+ *
+ * @returns 1 if \p broker_version was recognized: \p *apisp will point to
+ * the ApiVersion list and *api_cntp will be set to its element count.
+ * 0 if \p broker_version was not recognized: \p *apisp remains
+ * unchanged.
+ *
+ */
+int rd_kafka_get_legacy_ApiVersions(const char *broker_version,
+ struct rd_kafka_ApiVersion **apisp,
+ size_t *api_cntp,
+ const char *fallback) {
+ static const struct {
+ const char *pfx;
+ struct rd_kafka_ApiVersion *apis;
+ size_t api_cnt;
+ } vermap[] = {
+#define _VERMAP(PFX, APIS) {PFX, APIS, RD_ARRAYSIZE(APIS)}
+ _VERMAP("0.9.0", rd_kafka_ApiVersion_0_9_0),
+ _VERMAP("0.8.2", rd_kafka_ApiVersion_0_8_2),
+ _VERMAP("0.8.1", rd_kafka_ApiVersion_0_8_1),
+ _VERMAP("0.8.0", rd_kafka_ApiVersion_0_8_0),
+ {"0.7.", NULL}, /* Unsupported */
+ {"0.6.", NULL}, /* Unsupported */
+ _VERMAP("", rd_kafka_ApiVersion_Queryable),
+ {NULL}};
+ int i;
+ int fallback_i = -1;
+ int ret = 0;
+
+ *apisp = NULL;
+ *api_cntp = 0;
+
+ for (i = 0; vermap[i].pfx; i++) {
+ if (!strncmp(vermap[i].pfx, broker_version,
+ strlen(vermap[i].pfx))) {
+ if (!vermap[i].apis)
+ return 0;
+ *apisp = vermap[i].apis;
+ *api_cntp = vermap[i].api_cnt;
+ ret = 1;
+ break;
+ } else if (fallback && !strcmp(vermap[i].pfx, fallback))
+ fallback_i = i;
+ }
+
+ if (!*apisp && fallback) {
+ rd_kafka_assert(NULL, fallback_i != -1);
+ *apisp = vermap[fallback_i].apis;
+ *api_cntp = vermap[fallback_i].api_cnt;
+ }
+
+ return ret;
+}
+
+
+/**
+ * @returns 1 if the provided broker version (probably)
+ * supports api.version.request.
+ */
+int rd_kafka_ApiVersion_is_queryable(const char *broker_version) {
+ struct rd_kafka_ApiVersion *apis;
+ size_t api_cnt;
+
+
+ if (!rd_kafka_get_legacy_ApiVersions(broker_version, &apis, &api_cnt,
+ 0))
+ return 0;
+
+ return apis == rd_kafka_ApiVersion_Queryable;
+}
+
+
+
+/**
+ * @brief Check if match's versions overlaps with \p apis.
+ *
+ * @returns 1 if true, else 0.
+ * @remark \p apis must be sorted using rd_kafka_ApiVersion_key_cmp()
+ */
+static RD_INLINE int
+rd_kafka_ApiVersion_check(const struct rd_kafka_ApiVersion *apis,
+ size_t api_cnt,
+ const struct rd_kafka_ApiVersion *match) {
+ const struct rd_kafka_ApiVersion *api;
+
+ api = bsearch(match, apis, api_cnt, sizeof(*apis),
+ rd_kafka_ApiVersion_key_cmp);
+ if (unlikely(!api))
+ return 0;
+
+ return match->MinVer <= api->MaxVer && api->MinVer <= match->MaxVer;
+}
+
+
+/**
+ * @brief Compare broker's supported API versions to our feature request map
+ * and enable/disable features accordingly.
+ *
+ * @param broker_apis Broker's supported APIs. If NULL the
+ * \p broker.version.fallback configuration property will specify a
+ * default legacy version to use.
+ * @param broker_api_cnt Number of elements in \p broker_apis
+ *
+ * @returns the supported features (bitmask) to enable.
+ */
+int rd_kafka_features_check(rd_kafka_broker_t *rkb,
+ struct rd_kafka_ApiVersion *broker_apis,
+ size_t broker_api_cnt) {
+ int features = 0;
+ int i;
+
+ /* Scan through features. */
+ for (i = 0; rd_kafka_feature_map[i].feature != 0; i++) {
+ const struct rd_kafka_ApiVersion *match;
+ int fails = 0;
+
+ /* For each feature check that all its API dependencies
+ * can be fullfilled. */
+
+ for (match = &rd_kafka_feature_map[i].depends[0];
+ match->ApiKey != -1; match++) {
+ int r;
+
+ r = rd_kafka_ApiVersion_check(broker_apis,
+ broker_api_cnt, match);
+
+ rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
+ " Feature %s: %s (%hd..%hd) "
+ "%ssupported by broker",
+ rd_kafka_features2str(
+ rd_kafka_feature_map[i].feature),
+ rd_kafka_ApiKey2str(match->ApiKey),
+ match->MinVer, match->MaxVer,
+ r ? "" : "NOT ");
+
+ fails += !r;
+ }
+
+ rd_rkb_dbg(
+ rkb, FEATURE, "APIVERSION", "%s feature %s",
+ fails ? "Disabling" : "Enabling",
+ rd_kafka_features2str(rd_kafka_feature_map[i].feature));
+
+
+ if (!fails)
+ features |= rd_kafka_feature_map[i].feature;
+ }
+
+ return features;
+}
+
+
+
+/**
+ * @brief Make an allocated and sorted copy of \p src.
+ */
+void rd_kafka_ApiVersions_copy(const struct rd_kafka_ApiVersion *src,
+ size_t src_cnt,
+ struct rd_kafka_ApiVersion **dstp,
+ size_t *dst_cntp) {
+ *dstp = rd_memdup(src, sizeof(*src) * src_cnt);
+ *dst_cntp = src_cnt;
+ qsort(*dstp, *dst_cntp, sizeof(**dstp), rd_kafka_ApiVersion_key_cmp);
+}
+
+
+
+/**
+ * @returns a human-readable feature flag string.
+ */
+const char *rd_kafka_features2str(int features) {
+ static RD_TLS char ret[4][256];
+ size_t of = 0;
+ static RD_TLS int reti = 0;
+ int i;
+
+ reti = (reti + 1) % 4;
+
+ *ret[reti] = '\0';
+ for (i = 0; rd_kafka_feature_names[i]; i++) {
+ int r;
+ if (!(features & (1 << i)))
+ continue;
+
+ r = rd_snprintf(ret[reti] + of, sizeof(ret[reti]) - of, "%s%s",
+ of == 0 ? "" : ",", rd_kafka_feature_names[i]);
+ if ((size_t)r > sizeof(ret[reti]) - of) {
+ /* Out of space */
+ memcpy(&ret[reti][sizeof(ret[reti]) - 3], "..", 3);
+ break;
+ }
+
+ of += r;
+ }
+
+ return ret[reti];
+}