summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.c4362
1 files changed, 4362 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.c
new file mode 100644
index 000000000..e481f4dd8
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.c
@@ -0,0 +1,4362 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2022 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rd.h"
+#include "rdfloat.h"
+
+#include <stdlib.h>
+#include <ctype.h>
+#include <stddef.h>
+
+#include "rdkafka_int.h"
+#include "rdkafka_feature.h"
+#include "rdkafka_interceptor.h"
+#include "rdkafka_idempotence.h"
+#include "rdkafka_assignor.h"
+#include "rdkafka_sasl_oauthbearer.h"
+#if WITH_PLUGINS
+#include "rdkafka_plugin.h"
+#endif
+#include "rdunittest.h"
+
+#ifndef _WIN32
+#include <netinet/tcp.h>
+#else
+
+#ifndef WIN32_MEAN_AND_LEAN
+#define WIN32_MEAN_AND_LEAN
+#endif
+#include <windows.h>
+#endif
+
+struct rd_kafka_property {
+ rd_kafka_conf_scope_t scope;
+ const char *name;
+ enum { _RK_C_STR,
+ _RK_C_INT,
+ _RK_C_DBL, /* Double */
+ _RK_C_S2I, /* String to Integer mapping.
+ * Supports limited canonical str->int mappings
+ * using s2i[] */
+ _RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */
+ _RK_C_BOOL,
+ _RK_C_PTR, /* Only settable through special set functions */
+ _RK_C_PATLIST, /* Pattern list */
+ _RK_C_KSTR, /* Kafka string */
+ _RK_C_ALIAS, /* Alias: points to other property through .sdef */
+ _RK_C_INTERNAL, /* Internal, don't expose to application */
+ _RK_C_INVALID, /* Invalid property, used to catch known
+ * but unsupported Java properties. */
+ } type;
+ int offset;
+ const char *desc;
+ int vmin;
+ int vmax;
+ int vdef; /* Default value (int) */
+ const char *sdef; /* Default value (string) */
+ void *pdef; /* Default value (pointer) */
+ double ddef; /* Default value (double) */
+ double dmin;
+ double dmax;
+ struct {
+ int val;
+ const char *str;
+ const char *unsupported; /**< Reason for value not being
+ * supported in this build. */
+ } s2i[20]; /* _RK_C_S2I and _RK_C_S2F */
+
+ const char *unsupported; /**< Reason for propery not being supported
+ * in this build.
+ * Will be included in the conf_set()
+ * error string. */
+
+ /* Value validator (STR) */
+ int (*validate)(const struct rd_kafka_property *prop,
+ const char *val,
+ int ival);
+
+ /* Configuration object constructors and destructor for use when
+ * the property value itself is not used, or needs extra care. */
+ void (*ctor)(int scope, void *pconf);
+ void (*dtor)(int scope, void *pconf);
+ void (*copy)(int scope,
+ void *pdst,
+ const void *psrc,
+ void *dstptr,
+ const void *srcptr,
+ size_t filter_cnt,
+ const char **filter);
+
+ rd_kafka_conf_res_t (*set)(int scope,
+ void *pconf,
+ const char *name,
+ const char *value,
+ void *dstptr,
+ rd_kafka_conf_set_mode_t set_mode,
+ char *errstr,
+ size_t errstr_size);
+};
+
+
+#define _RK(field) offsetof(rd_kafka_conf_t, field)
+#define _RKT(field) offsetof(rd_kafka_topic_conf_t, field)
+
+#if WITH_SSL
+#define _UNSUPPORTED_SSL .unsupported = NULL
+#else
+#define _UNSUPPORTED_SSL .unsupported = "OpenSSL not available at build time"
+#endif
+
+#if OPENSSL_VERSION_NUMBER >= 0x1000200fL && defined(WITH_SSL) && \
+ !defined(LIBRESSL_VERSION_NUMBER)
+#define _UNSUPPORTED_OPENSSL_1_0_2 .unsupported = NULL
+#else
+#define _UNSUPPORTED_OPENSSL_1_0_2 \
+ .unsupported = "OpenSSL >= 1.0.2 not available at build time"
+#endif
+
+#if OPENSSL_VERSION_NUMBER >= 0x10100000 && defined(WITH_SSL) && \
+ !defined(LIBRESSL_VERSION_NUMBER)
+#define _UNSUPPORTED_OPENSSL_1_1_0 .unsupported = NULL
+#else
+#define _UNSUPPORTED_OPENSSL_1_1_0 \
+ .unsupported = "OpenSSL >= 1.1.0 not available at build time"
+#endif
+
+#if WITH_SSL_ENGINE
+#define _UNSUPPORTED_SSL_ENGINE .unsupported = NULL
+#else
+#define _UNSUPPORTED_SSL_ENGINE \
+ .unsupported = "OpenSSL >= 1.1.x not available at build time"
+#endif
+
+#if OPENSSL_VERSION_NUMBER >= 0x30000000 && defined(WITH_SSL)
+#define _UNSUPPORTED_SSL_3 .unsupported = NULL
+#else
+#define _UNSUPPORTED_SSL_3 \
+ .unsupported = "OpenSSL >= 3.0.0 not available at build time"
+#endif
+
+
+#if WITH_ZLIB
+#define _UNSUPPORTED_ZLIB .unsupported = NULL
+#else
+#define _UNSUPPORTED_ZLIB .unsupported = "zlib not available at build time"
+#endif
+
+#if WITH_SNAPPY
+#define _UNSUPPORTED_SNAPPY .unsupported = NULL
+#else
+#define _UNSUPPORTED_SNAPPY .unsupported = "snappy not enabled at build time"
+#endif
+
+#if WITH_ZSTD
+#define _UNSUPPORTED_ZSTD .unsupported = NULL
+#else
+#define _UNSUPPORTED_ZSTD .unsupported = "libzstd not available at build time"
+#endif
+
+#if WITH_CURL
+#define _UNSUPPORTED_HTTP .unsupported = NULL
+#else
+#define _UNSUPPORTED_HTTP .unsupported = "libcurl not available at build time"
+#endif
+
+#if WITH_OAUTHBEARER_OIDC
+#define _UNSUPPORTED_OIDC .unsupported = NULL
+#else
+#define _UNSUPPORTED_OIDC \
+ .unsupported = \
+ "OAuth/OIDC depends on libcurl and OpenSSL which were not " \
+ "available at build time"
+#endif
+
+#ifdef _WIN32
+#define _UNSUPPORTED_WIN32_GSSAPI \
+ .unsupported = \
+ "Kerberos keytabs are not supported on Windows, " \
+ "instead the logged on " \
+ "user's credentials are used through native SSPI"
+#else
+#define _UNSUPPORTED_WIN32_GSSAPI .unsupported = NULL
+#endif
+
+#if defined(_WIN32) || defined(WITH_SASL_CYRUS)
+#define _UNSUPPORTED_GSSAPI .unsupported = NULL
+#else
+#define _UNSUPPORTED_GSSAPI \
+ .unsupported = "cyrus-sasl/libsasl2 not available at build time"
+#endif
+
+#define _UNSUPPORTED_OAUTHBEARER _UNSUPPORTED_SSL
+
+
+static rd_kafka_conf_res_t
+rd_kafka_anyconf_get0(const void *conf,
+ const struct rd_kafka_property *prop,
+ char *dest,
+ size_t *dest_size);
+
+
+
+/**
+ * @returns a unique index for property \p prop, using the byte position
+ * of the field.
+ */
+static RD_INLINE int rd_kafka_prop2idx(const struct rd_kafka_property *prop) {
+ return prop->offset;
+}
+
+
+
+/**
+ * @brief Set the property as modified.
+ *
+ * We do this by mapping the property's conf struct field byte offset
+ * to a bit in a bit vector.
+ * If the bit is set the property has been modified, otherwise it is
+ * at its default unmodified value.
+ *
+ * \p is_modified 1: set as modified, 0: clear modified
+ */
+static void rd_kafka_anyconf_set_modified(void *conf,
+ const struct rd_kafka_property *prop,
+ int is_modified) {
+ int idx = rd_kafka_prop2idx(prop);
+ int bkt = idx / 64;
+ uint64_t bit = (uint64_t)1 << (idx % 64);
+ struct rd_kafka_anyconf_hdr *confhdr = conf;
+
+ rd_assert(idx < RD_KAFKA_CONF_PROPS_IDX_MAX &&
+ *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
+
+ if (is_modified)
+ confhdr->modified[bkt] |= bit;
+ else
+ confhdr->modified[bkt] &= ~bit;
+}
+
+/**
+ * @brief Clear is_modified for all properties.
+ * @warning Does NOT clear/reset the value.
+ */
+static void rd_kafka_anyconf_clear_all_is_modified(void *conf) {
+ struct rd_kafka_anyconf_hdr *confhdr = conf;
+
+ memset(confhdr, 0, sizeof(*confhdr));
+}
+
+
+/**
+ * @returns true of the property has been set/modified, else false.
+ */
+static rd_bool_t
+rd_kafka_anyconf_is_modified(const void *conf,
+ const struct rd_kafka_property *prop) {
+ int idx = rd_kafka_prop2idx(prop);
+ int bkt = idx / 64;
+ uint64_t bit = (uint64_t)1 << (idx % 64);
+ const struct rd_kafka_anyconf_hdr *confhdr = conf;
+
+ return !!(confhdr->modified[bkt] & bit);
+}
+
+/**
+ * @returns true if any property in \p conf has been set/modified.
+ */
+static rd_bool_t rd_kafka_anyconf_is_any_modified(const void *conf) {
+ const struct rd_kafka_anyconf_hdr *confhdr = conf;
+ int i;
+
+ for (i = 0; i < (int)RD_ARRAYSIZE(confhdr->modified); i++)
+ if (confhdr->modified[i])
+ return rd_true;
+
+ return rd_false;
+}
+
+
+
+/**
+ * @brief Validate \p broker.version.fallback property.
+ */
+static int
+rd_kafka_conf_validate_broker_version(const struct rd_kafka_property *prop,
+ const char *val,
+ int ival) {
+ struct rd_kafka_ApiVersion *apis;
+ size_t api_cnt;
+ return rd_kafka_get_legacy_ApiVersions(val, &apis, &api_cnt, NULL);
+}
+
+/**
+ * @brief Validate that string is a single item, without delimters (, space).
+ */
+static RD_UNUSED int
+rd_kafka_conf_validate_single(const struct rd_kafka_property *prop,
+ const char *val,
+ int ival) {
+ return !strchr(val, ',') && !strchr(val, ' ');
+}
+
+/**
+ * @brief Validate builtin partitioner string
+ */
+static RD_UNUSED int
+rd_kafka_conf_validate_partitioner(const struct rd_kafka_property *prop,
+ const char *val,
+ int ival) {
+ return !strcmp(val, "random") || !strcmp(val, "consistent") ||
+ !strcmp(val, "consistent_random") || !strcmp(val, "murmur2") ||
+ !strcmp(val, "murmur2_random") || !strcmp(val, "fnv1a") ||
+ !strcmp(val, "fnv1a_random");
+}
+
+
+/**
+ * librdkafka configuration property definitions.
+ */
+static const struct rd_kafka_property rd_kafka_properties[] = {
+ /* Global properties */
+ {_RK_GLOBAL, "builtin.features", _RK_C_S2F, _RK(builtin_features),
+ "Indicates the builtin features for this build of librdkafka. "
+ "An application can either query this value or attempt to set it "
+ "with its list of required features to check for library support.",
+ 0, 0x7fffffff, 0xffff,
+ .s2i = {{0x1, "gzip", _UNSUPPORTED_ZLIB},
+ {0x2, "snappy", _UNSUPPORTED_SNAPPY},
+ {0x4, "ssl", _UNSUPPORTED_SSL},
+ {0x8, "sasl"},
+ {0x10, "regex"},
+ {0x20, "lz4"},
+ {0x40, "sasl_gssapi", _UNSUPPORTED_GSSAPI},
+ {0x80, "sasl_plain"},
+ {0x100, "sasl_scram", _UNSUPPORTED_SSL},
+ {0x200, "plugins"
+#if !WITH_PLUGINS
+ ,
+ .unsupported = "libdl/dlopen(3) not available at "
+ "build time"
+#endif
+ },
+ {0x400, "zstd", _UNSUPPORTED_ZSTD},
+ {0x800, "sasl_oauthbearer", _UNSUPPORTED_SSL},
+ {0x1000, "http", _UNSUPPORTED_HTTP},
+ {0x2000, "oidc", _UNSUPPORTED_OIDC},
+ {0, NULL}}},
+ {_RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str),
+ "Client identifier.", .sdef = "rdkafka"},
+ {_RK_GLOBAL | _RK_HIDDEN, "client.software.name", _RK_C_STR, _RK(sw_name),
+ "Client software name as reported to broker version >= v2.4.0. "
+ "Broker-side character restrictions apply, as of broker version "
+ "v2.4.0 the allowed characters are `a-zA-Z0-9.-`. The local client "
+ "will replace any other character with `-` and strip leading and "
+ "trailing non-alphanumeric characters before tranmission to "
+ "the broker. "
+ "This property should only be set by high-level language "
+ "librdkafka client bindings.",
+ .sdef = "librdkafka"},
+ {
+ _RK_GLOBAL | _RK_HIDDEN,
+ "client.software.version",
+ _RK_C_STR,
+ _RK(sw_version),
+ "Client software version as reported to broker version >= v2.4.0. "
+ "Broker-side character restrictions apply, as of broker version "
+ "v2.4.0 the allowed characters are `a-zA-Z0-9.-`. The local client "
+ "will replace any other character with `-` and strip leading and "
+ "trailing non-alphanumeric characters before tranmission to "
+ "the broker. "
+ "This property should only be set by high-level language "
+ "librdkafka client bindings."
+ "If changing this property it is highly recommended to append the "
+ "librdkafka version.",
+ },
+ {_RK_GLOBAL | _RK_HIGH, "metadata.broker.list", _RK_C_STR, _RK(brokerlist),
+ "Initial list of brokers as a CSV list of broker host or host:port. "
+ "The application may also use `rd_kafka_brokers_add()` to add "
+ "brokers during runtime."},
+ {_RK_GLOBAL | _RK_HIGH, "bootstrap.servers", _RK_C_ALIAS, 0,
+ "See metadata.broker.list", .sdef = "metadata.broker.list"},
+ {_RK_GLOBAL | _RK_MED, "message.max.bytes", _RK_C_INT, _RK(max_msg_size),
+ "Maximum Kafka protocol request message size. "
+ "Due to differing framing overhead between protocol versions the "
+ "producer is unable to reliably enforce a strict max message limit "
+ "at produce time and may exceed the maximum size by one message in "
+ "protocol ProduceRequests, the broker will enforce the the topic's "
+ "`max.message.bytes` limit (see Apache Kafka documentation).",
+ 1000, 1000000000, 1000000},
+ {_RK_GLOBAL, "message.copy.max.bytes", _RK_C_INT, _RK(msg_copy_max_size),
+ "Maximum size for message to be copied to buffer. "
+ "Messages larger than this will be passed by reference (zero-copy) "
+ "at the expense of larger iovecs.",
+ 0, 1000000000, 0xffff},
+ {_RK_GLOBAL | _RK_MED, "receive.message.max.bytes", _RK_C_INT,
+ _RK(recv_max_msg_size),
+ "Maximum Kafka protocol response message size. "
+ "This serves as a safety precaution to avoid memory exhaustion in "
+ "case of protocol hickups. "
+ "This value must be at least `fetch.max.bytes` + 512 to allow "
+ "for protocol overhead; the value is adjusted automatically "
+ "unless the configuration property is explicitly set.",
+ 1000, INT_MAX, 100000000},
+ {_RK_GLOBAL, "max.in.flight.requests.per.connection", _RK_C_INT,
+ _RK(max_inflight),
+ "Maximum number of in-flight requests per broker connection. "
+ "This is a generic property applied to all broker communication, "
+ "however it is primarily relevant to produce requests. "
+ "In particular, note that other mechanisms limit the number "
+ "of outstanding consumer fetch request per broker to one.",
+ 1, 1000000, 1000000},
+ {_RK_GLOBAL, "max.in.flight", _RK_C_ALIAS,
+ .sdef = "max.in.flight.requests.per.connection"},
+ {_RK_GLOBAL | _RK_DEPRECATED | _RK_HIDDEN, "metadata.request.timeout.ms",
+ _RK_C_INT, _RK(metadata_request_timeout_ms), "Not used.", 10, 900 * 1000,
+ 10},
+ {_RK_GLOBAL, "topic.metadata.refresh.interval.ms", _RK_C_INT,
+ _RK(metadata_refresh_interval_ms),
+ "Period of time in milliseconds at which topic and broker "
+ "metadata is refreshed in order to proactively discover any new "
+ "brokers, topics, partitions or partition leader changes. "
+ "Use -1 to disable the intervalled refresh (not recommended). "
+ "If there are no locally referenced topics "
+ "(no topic objects created, no messages produced, "
+ "no subscription or no assignment) then only the broker list will "
+ "be refreshed every interval but no more often than every 10s.",
+ -1, 3600 * 1000, 5 * 60 * 1000},
+ {_RK_GLOBAL, "metadata.max.age.ms", _RK_C_INT, _RK(metadata_max_age_ms),
+ "Metadata cache max age. "
+ "Defaults to topic.metadata.refresh.interval.ms * 3",
+ 1, 24 * 3600 * 1000, 5 * 60 * 1000 * 3},
+ {_RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT,
+ _RK(metadata_refresh_fast_interval_ms),
+ "When a topic loses its leader a new metadata request will be "
+ "enqueued with this initial interval, exponentially increasing "
+ "until the topic metadata has been refreshed. "
+ "This is used to recover quickly from transitioning leader brokers.",
+ 1, 60 * 1000, 250},
+ {_RK_GLOBAL | _RK_DEPRECATED, "topic.metadata.refresh.fast.cnt", _RK_C_INT,
+ _RK(metadata_refresh_fast_cnt), "No longer used.", 0, 1000, 10},
+ {_RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL,
+ _RK(metadata_refresh_sparse),
+ "Sparse metadata requests (consumes less network bandwidth)", 0, 1, 1},
+ {_RK_GLOBAL, "topic.metadata.propagation.max.ms", _RK_C_INT,
+ _RK(metadata_propagation_max_ms),
+ "Apache Kafka topic creation is asynchronous and it takes some "
+ "time for a new topic to propagate throughout the cluster to all "
+ "brokers. "
+ "If a client requests topic metadata after manual topic creation but "
+ "before the topic has been fully propagated to the broker the "
+ "client is requesting metadata from, the topic will seem to be "
+ "non-existent and the client will mark the topic as such, "
+ "failing queued produced messages with `ERR__UNKNOWN_TOPIC`. "
+ "This setting delays marking a topic as non-existent until the "
+ "configured propagation max time has passed. "
+ "The maximum propagation time is calculated from the time the "
+ "topic is first referenced in the client, e.g., on produce().",
+ 0, 60 * 60 * 1000, 30 * 1000},
+ {_RK_GLOBAL, "topic.blacklist", _RK_C_PATLIST, _RK(topic_blacklist),
+ "Topic blacklist, a comma-separated list of regular expressions "
+ "for matching topic names that should be ignored in "
+ "broker metadata information as if the topics did not exist."},
+ {_RK_GLOBAL | _RK_MED, "debug", _RK_C_S2F, _RK(debug),
+ "A comma-separated list of debug contexts to enable. "
+ "Detailed Producer debugging: broker,topic,msg. "
+ "Consumer: consumer,cgrp,topic,fetch",
+ .s2i = {{RD_KAFKA_DBG_GENERIC, "generic"},
+ {RD_KAFKA_DBG_BROKER, "broker"},
+ {RD_KAFKA_DBG_TOPIC, "topic"},
+ {RD_KAFKA_DBG_METADATA, "metadata"},
+ {RD_KAFKA_DBG_FEATURE, "feature"},
+ {RD_KAFKA_DBG_QUEUE, "queue"},
+ {RD_KAFKA_DBG_MSG, "msg"},
+ {RD_KAFKA_DBG_PROTOCOL, "protocol"},
+ {RD_KAFKA_DBG_CGRP, "cgrp"},
+ {RD_KAFKA_DBG_SECURITY, "security"},
+ {RD_KAFKA_DBG_FETCH, "fetch"},
+ {RD_KAFKA_DBG_INTERCEPTOR, "interceptor"},
+ {RD_KAFKA_DBG_PLUGIN, "plugin"},
+ {RD_KAFKA_DBG_CONSUMER, "consumer"},
+ {RD_KAFKA_DBG_ADMIN, "admin"},
+ {RD_KAFKA_DBG_EOS, "eos"},
+ {RD_KAFKA_DBG_MOCK, "mock"},
+ {RD_KAFKA_DBG_ASSIGNOR, "assignor"},
+ {RD_KAFKA_DBG_CONF, "conf"},
+ {RD_KAFKA_DBG_ALL, "all"}}},
+ {_RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms),
+ "Default timeout for network requests. "
+ "Producer: ProduceRequests will use the lesser value of "
+ "`socket.timeout.ms` and remaining `message.timeout.ms` for the "
+ "first message in the batch. "
+ "Consumer: FetchRequests will use "
+ "`fetch.wait.max.ms` + `socket.timeout.ms`. "
+ "Admin: Admin requests will use `socket.timeout.ms` or explicitly "
+ "set `rd_kafka_AdminOptions_set_operation_timeout()` value.",
+ 10, 300 * 1000, 60 * 1000},
+ {_RK_GLOBAL | _RK_DEPRECATED, "socket.blocking.max.ms", _RK_C_INT,
+ _RK(socket_blocking_max_ms), "No longer used.", 1, 60 * 1000, 1000},
+ {_RK_GLOBAL, "socket.send.buffer.bytes", _RK_C_INT, _RK(socket_sndbuf_size),
+ "Broker socket send buffer size. System default is used if 0.", 0,
+ 100000000, 0},
+ {_RK_GLOBAL, "socket.receive.buffer.bytes", _RK_C_INT,
+ _RK(socket_rcvbuf_size),
+ "Broker socket receive buffer size. System default is used if 0.", 0,
+ 100000000, 0},
+ {_RK_GLOBAL, "socket.keepalive.enable", _RK_C_BOOL, _RK(socket_keepalive),
+ "Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets", 0, 1, 0
+#ifndef SO_KEEPALIVE
+ ,
+ .unsupported = "SO_KEEPALIVE not available at build time"
+#endif
+ },
+ {_RK_GLOBAL, "socket.nagle.disable", _RK_C_BOOL, _RK(socket_nagle_disable),
+ "Disable the Nagle algorithm (TCP_NODELAY) on broker sockets.", 0, 1, 0
+#ifndef TCP_NODELAY
+ ,
+ .unsupported = "TCP_NODELAY not available at build time"
+#endif
+ },
+ {_RK_GLOBAL, "socket.max.fails", _RK_C_INT, _RK(socket_max_fails),
+ "Disconnect from broker when this number of send failures "
+ "(e.g., timed out requests) is reached. Disable with 0. "
+ "WARNING: It is highly recommended to leave this setting at "
+ "its default value of 1 to avoid the client and broker to "
+ "become desynchronized in case of request timeouts. "
+ "NOTE: The connection is automatically re-established.",
+ 0, 1000000, 1},
+ {_RK_GLOBAL, "broker.address.ttl", _RK_C_INT, _RK(broker_addr_ttl),
+ "How long to cache the broker address resolving "
+ "results (milliseconds).",
+ 0, 86400 * 1000, 1 * 1000},
+ {_RK_GLOBAL, "broker.address.family", _RK_C_S2I, _RK(broker_addr_family),
+ "Allowed broker IP address families: any, v4, v6", .vdef = AF_UNSPEC,
+ .s2i =
+ {
+ {AF_UNSPEC, "any"},
+ {AF_INET, "v4"},
+ {AF_INET6, "v6"},
+ }},
+ {_RK_GLOBAL | _RK_MED, "socket.connection.setup.timeout.ms", _RK_C_INT,
+ _RK(socket_connection_setup_timeout_ms),
+ "Maximum time allowed for broker connection setup "
+ "(TCP connection setup as well SSL and SASL handshake). "
+ "If the connection to the broker is not fully functional after this "
+ "the connection will be closed and retried.",
+ 1000, INT_MAX, 30 * 1000 /* 30s */},
+ {_RK_GLOBAL | _RK_MED, "connections.max.idle.ms", _RK_C_INT,
+ _RK(connections_max_idle_ms),
+ "Close broker connections after the specified time of "
+ "inactivity. "
+ "Disable with 0. "
+ "If this property is left at its default value some heuristics are "
+ "performed to determine a suitable default value, this is currently "
+ "limited to identifying brokers on Azure "
+ "(see librdkafka issue #3109 for more info).",
+ 0, INT_MAX, 0},
+ {_RK_GLOBAL | _RK_MED | _RK_HIDDEN, "enable.sparse.connections", _RK_C_BOOL,
+ _RK(sparse_connections),
+ "When enabled the client will only connect to brokers "
+ "it needs to communicate with. When disabled the client "
+ "will maintain connections to all brokers in the cluster.",
+ 0, 1, 1},
+ {_RK_GLOBAL | _RK_DEPRECATED, "reconnect.backoff.jitter.ms", _RK_C_INT,
+ _RK(reconnect_jitter_ms),
+ "No longer used. See `reconnect.backoff.ms` and "
+ "`reconnect.backoff.max.ms`.",
+ 0, 60 * 60 * 1000, 0},
+ {_RK_GLOBAL | _RK_MED, "reconnect.backoff.ms", _RK_C_INT,
+ _RK(reconnect_backoff_ms),
+ "The initial time to wait before reconnecting to a broker "
+ "after the connection has been closed. "
+ "The time is increased exponentially until "
+ "`reconnect.backoff.max.ms` is reached. "
+ "-25% to +50% jitter is applied to each reconnect backoff. "
+ "A value of 0 disables the backoff and reconnects immediately.",
+ 0, 60 * 60 * 1000, 100},
+ {_RK_GLOBAL | _RK_MED, "reconnect.backoff.max.ms", _RK_C_INT,
+ _RK(reconnect_backoff_max_ms),
+ "The maximum time to wait before reconnecting to a broker "
+ "after the connection has been closed.",
+ 0, 60 * 60 * 1000, 10 * 1000},
+ {_RK_GLOBAL | _RK_HIGH, "statistics.interval.ms", _RK_C_INT,
+ _RK(stats_interval_ms),
+ "librdkafka statistics emit interval. The application also needs to "
+ "register a stats callback using `rd_kafka_conf_set_stats_cb()`. "
+ "The granularity is 1000ms. A value of 0 disables statistics.",
+ 0, 86400 * 1000, 0},
+ {_RK_GLOBAL, "enabled_events", _RK_C_INT, _RK(enabled_events),
+ "See `rd_kafka_conf_set_events()`", 0, 0x7fffffff, 0},
+ {_RK_GLOBAL, "error_cb", _RK_C_PTR, _RK(error_cb),
+ "Error callback (set with rd_kafka_conf_set_error_cb())"},
+ {_RK_GLOBAL, "throttle_cb", _RK_C_PTR, _RK(throttle_cb),
+ "Throttle callback (set with rd_kafka_conf_set_throttle_cb())"},
+ {_RK_GLOBAL, "stats_cb", _RK_C_PTR, _RK(stats_cb),
+ "Statistics callback (set with rd_kafka_conf_set_stats_cb())"},
+ {_RK_GLOBAL, "log_cb", _RK_C_PTR, _RK(log_cb),
+ "Log callback (set with rd_kafka_conf_set_log_cb())",
+ .pdef = rd_kafka_log_print},
+ {_RK_GLOBAL, "log_level", _RK_C_INT, _RK(log_level),
+ "Logging level (syslog(3) levels)", 0, 7, 6},
+ {_RK_GLOBAL, "log.queue", _RK_C_BOOL, _RK(log_queue),
+ "Disable spontaneous log_cb from internal librdkafka "
+ "threads, instead enqueue log messages on queue set with "
+ "`rd_kafka_set_log_queue()` and serve log callbacks or "
+ "events through the standard poll APIs. "
+ "**NOTE**: Log messages will linger in a temporary queue "
+ "until the log queue has been set.",
+ 0, 1, 0},
+ {_RK_GLOBAL, "log.thread.name", _RK_C_BOOL, _RK(log_thread_name),
+ "Print internal thread name in log messages "
+ "(useful for debugging librdkafka internals)",
+ 0, 1, 1},
+ {_RK_GLOBAL, "enable.random.seed", _RK_C_BOOL, _RK(enable_random_seed),
+ "If enabled librdkafka will initialize the PRNG "
+ "with srand(current_time.milliseconds) on the first invocation of "
+ "rd_kafka_new() (required only if rand_r() is not available on your "
+ "platform). "
+ "If disabled the application must call srand() prior to calling "
+ "rd_kafka_new().",
+ 0, 1, 1},
+ {_RK_GLOBAL, "log.connection.close", _RK_C_BOOL, _RK(log_connection_close),
+ "Log broker disconnects. "
+ "It might be useful to turn this off when interacting with "
+ "0.9 brokers with an aggressive `connections.max.idle.ms` value.",
+ 0, 1, 1},
+ {_RK_GLOBAL, "background_event_cb", _RK_C_PTR, _RK(background_event_cb),
+ "Background queue event callback "
+ "(set with rd_kafka_conf_set_background_event_cb())"},
+ {_RK_GLOBAL, "socket_cb", _RK_C_PTR, _RK(socket_cb),
+ "Socket creation callback to provide race-free CLOEXEC",
+ .pdef =
+#ifdef __linux__
+ rd_kafka_socket_cb_linux
+#else
+ rd_kafka_socket_cb_generic
+#endif
+ },
+ {
+ _RK_GLOBAL,
+ "connect_cb",
+ _RK_C_PTR,
+ _RK(connect_cb),
+ "Socket connect callback",
+ },
+ {
+ _RK_GLOBAL,
+ "closesocket_cb",
+ _RK_C_PTR,
+ _RK(closesocket_cb),
+ "Socket close callback",
+ },
+ {_RK_GLOBAL, "open_cb", _RK_C_PTR, _RK(open_cb),
+ "File open callback to provide race-free CLOEXEC",
+ .pdef =
+#ifdef __linux__
+ rd_kafka_open_cb_linux
+#else
+ rd_kafka_open_cb_generic
+#endif
+ },
+ {_RK_GLOBAL, "resolve_cb", _RK_C_PTR, _RK(resolve_cb),
+ "Address resolution callback (set with rd_kafka_conf_set_resolve_cb())."},
+ {_RK_GLOBAL, "opaque", _RK_C_PTR, _RK(opaque),
+ "Application opaque (set with rd_kafka_conf_set_opaque())"},
+ {_RK_GLOBAL, "default_topic_conf", _RK_C_PTR, _RK(topic_conf),
+ "Default topic configuration for automatically subscribed topics"},
+ {_RK_GLOBAL, "internal.termination.signal", _RK_C_INT, _RK(term_sig),
+ "Signal that librdkafka will use to quickly terminate on "
+ "rd_kafka_destroy(). If this signal is not set then there will be a "
+ "delay before rd_kafka_wait_destroyed() returns true "
+ "as internal threads are timing out their system calls. "
+ "If this signal is set however the delay will be minimal. "
+ "The application should mask this signal as an internal "
+ "signal handler is installed.",
+ 0, 128, 0},
+ {_RK_GLOBAL | _RK_HIGH, "api.version.request", _RK_C_BOOL,
+ _RK(api_version_request),
+ "Request broker's supported API versions to adjust functionality to "
+ "available protocol features. If set to false, or the "
+ "ApiVersionRequest fails, the fallback version "
+ "`broker.version.fallback` will be used. "
+ "**NOTE**: Depends on broker version >=0.10.0. If the request is not "
+ "supported by (an older) broker the `broker.version.fallback` fallback is "
+ "used.",
+ 0, 1, 1},
+ {_RK_GLOBAL, "api.version.request.timeout.ms", _RK_C_INT,
+ _RK(api_version_request_timeout_ms),
+ "Timeout for broker API version requests.", 1, 5 * 60 * 1000, 10 * 1000},
+ {_RK_GLOBAL | _RK_MED, "api.version.fallback.ms", _RK_C_INT,
+ _RK(api_version_fallback_ms),
+ "Dictates how long the `broker.version.fallback` fallback is used "
+ "in the case the ApiVersionRequest fails. "
+ "**NOTE**: The ApiVersionRequest is only issued when a new connection "
+ "to the broker is made (such as after an upgrade).",
+ 0, 86400 * 7 * 1000, 0},
+
+ {_RK_GLOBAL | _RK_MED, "broker.version.fallback", _RK_C_STR,
+ _RK(broker_version_fallback),
+ "Older broker versions (before 0.10.0) provide no way for a client to "
+ "query "
+ "for supported protocol features "
+ "(ApiVersionRequest, see `api.version.request`) making it impossible "
+ "for the client to know what features it may use. "
+ "As a workaround a user may set this property to the expected broker "
+ "version and the client will automatically adjust its feature set "
+ "accordingly if the ApiVersionRequest fails (or is disabled). "
+ "The fallback broker version will be used for `api.version.fallback.ms`. "
+ "Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. "
+ "Any other value >= 0.10, such as 0.10.2.1, "
+ "enables ApiVersionRequests.",
+ .sdef = "0.10.0", .validate = rd_kafka_conf_validate_broker_version},
+ {_RK_GLOBAL, "allow.auto.create.topics", _RK_C_BOOL,
+ _RK(allow_auto_create_topics),
+ "Allow automatic topic creation on the broker when subscribing to "
+ "or assigning non-existent topics. "
+ "The broker must also be configured with "
+ "`auto.create.topics.enable=true` for this configuration to "
+ "take effect. "
+ "Note: the default value (true) for the producer is "
+ "different from the default value (false) for the consumer. "
+ "Further, the consumer default value is different from the Java "
+ "consumer (true), and this property is not supported by the Java "
+ "producer. Requires broker version >= 0.11.0.0, for older broker "
+ "versions only the broker configuration applies.",
+ 0, 1, 0},
+
+ /* Security related global properties */
+ {_RK_GLOBAL | _RK_HIGH, "security.protocol", _RK_C_S2I,
+ _RK(security_protocol), "Protocol used to communicate with brokers.",
+ .vdef = RD_KAFKA_PROTO_PLAINTEXT,
+ .s2i = {{RD_KAFKA_PROTO_PLAINTEXT, "plaintext"},
+ {RD_KAFKA_PROTO_SSL, "ssl", _UNSUPPORTED_SSL},
+ {RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext"},
+ {RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl", _UNSUPPORTED_SSL},
+ {0, NULL}}},
+
+ {_RK_GLOBAL, "ssl.cipher.suites", _RK_C_STR, _RK(ssl.cipher_suites),
+ "A cipher suite is a named combination of authentication, "
+ "encryption, MAC and key exchange algorithm used to negotiate the "
+ "security settings for a network connection using TLS or SSL network "
+ "protocol. See manual page for `ciphers(1)` and "
+ "`SSL_CTX_set_cipher_list(3).",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL, "ssl.curves.list", _RK_C_STR, _RK(ssl.curves_list),
+ "The supported-curves extension in the TLS ClientHello message specifies "
+ "the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client "
+ "is willing to have the server use. See manual page for "
+ "`SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required.",
+ _UNSUPPORTED_OPENSSL_1_0_2},
+ {_RK_GLOBAL, "ssl.sigalgs.list", _RK_C_STR, _RK(ssl.sigalgs_list),
+ "The client uses the TLS ClientHello signature_algorithms extension "
+ "to indicate to the server which signature/hash algorithm pairs "
+ "may be used in digital signatures. See manual page for "
+ "`SSL_CTX_set1_sigalgs_list(3)`. OpenSSL >= 1.0.2 required.",
+ _UNSUPPORTED_OPENSSL_1_0_2},
+ {_RK_GLOBAL | _RK_SENSITIVE, "ssl.key.location", _RK_C_STR,
+ _RK(ssl.key_location),
+ "Path to client's private key (PEM) used for authentication.",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL | _RK_SENSITIVE, "ssl.key.password", _RK_C_STR,
+ _RK(ssl.key_password),
+ "Private key passphrase (for use with `ssl.key.location` "
+ "and `set_ssl_cert()`)",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL | _RK_SENSITIVE, "ssl.key.pem", _RK_C_STR, _RK(ssl.key_pem),
+ "Client's private key string (PEM format) used for authentication.",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL | _RK_SENSITIVE, "ssl_key", _RK_C_INTERNAL, _RK(ssl.key),
+ "Client's private key as set by rd_kafka_conf_set_ssl_cert()",
+ .dtor = rd_kafka_conf_cert_dtor, .copy = rd_kafka_conf_cert_copy,
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL, "ssl.certificate.location", _RK_C_STR, _RK(ssl.cert_location),
+ "Path to client's public key (PEM) used for authentication.",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL, "ssl.certificate.pem", _RK_C_STR, _RK(ssl.cert_pem),
+ "Client's public key string (PEM format) used for authentication.",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL, "ssl_certificate", _RK_C_INTERNAL, _RK(ssl.key),
+ "Client's public key as set by rd_kafka_conf_set_ssl_cert()",
+ .dtor = rd_kafka_conf_cert_dtor, .copy = rd_kafka_conf_cert_copy,
+ _UNSUPPORTED_SSL},
+
+ {_RK_GLOBAL, "ssl.ca.location", _RK_C_STR, _RK(ssl.ca_location),
+ "File or directory path to CA certificate(s) for verifying "
+ "the broker's key. "
+ "Defaults: "
+ "On Windows the system's CA certificates are automatically looked "
+ "up in the Windows Root certificate store. "
+ "On Mac OSX this configuration defaults to `probe`. "
+ "It is recommended to install openssl using Homebrew, "
+ "to provide CA certificates. "
+ "On Linux install the distribution's ca-certificates package. "
+ "If OpenSSL is statically linked or `ssl.ca.location` is set to "
+ "`probe` a list of standard paths will be probed and the first one "
+ "found will be used as the default CA certificate location path. "
+ "If OpenSSL is dynamically linked the OpenSSL library's default "
+ "path will be used (see `OPENSSLDIR` in `openssl version -a`).",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL | _RK_SENSITIVE, "ssl.ca.pem", _RK_C_STR, _RK(ssl.ca_pem),
+ "CA certificate string (PEM format) for verifying the broker's key.",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL, "ssl_ca", _RK_C_INTERNAL, _RK(ssl.ca),
+ "CA certificate as set by rd_kafka_conf_set_ssl_cert()",
+ .dtor = rd_kafka_conf_cert_dtor, .copy = rd_kafka_conf_cert_copy,
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL, "ssl.ca.certificate.stores", _RK_C_STR,
+ _RK(ssl.ca_cert_stores),
+ "Comma-separated list of Windows Certificate stores to load "
+ "CA certificates from. Certificates will be loaded in the same "
+ "order as stores are specified. If no certificates can be loaded "
+ "from any of the specified stores an error is logged and the "
+ "OpenSSL library's default CA location is used instead. "
+ "Store names are typically one or more of: MY, Root, Trust, CA.",
+ .sdef = "Root",
+#if !defined(_WIN32)
+ .unsupported = "configuration only valid on Windows"
+#endif
+ },
+
+ {_RK_GLOBAL, "ssl.crl.location", _RK_C_STR, _RK(ssl.crl_location),
+ "Path to CRL for verifying broker's certificate validity.",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL, "ssl.keystore.location", _RK_C_STR, _RK(ssl.keystore_location),
+ "Path to client's keystore (PKCS#12) used for authentication.",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL | _RK_SENSITIVE, "ssl.keystore.password", _RK_C_STR,
+ _RK(ssl.keystore_password), "Client's keystore (PKCS#12) password.",
+ _UNSUPPORTED_SSL},
+ {_RK_GLOBAL, "ssl.providers", _RK_C_STR, _RK(ssl.providers),
+ "Comma-separated list of OpenSSL 3.0.x implementation providers. "
+ "E.g., \"default,legacy\".",
+ _UNSUPPORTED_SSL_3},
+ {_RK_GLOBAL | _RK_DEPRECATED, "ssl.engine.location", _RK_C_STR,
+ _RK(ssl.engine_location),
+ "Path to OpenSSL engine library. OpenSSL >= 1.1.x required. "
+ "DEPRECATED: OpenSSL engine support is deprecated and should be "
+ "replaced by OpenSSL 3 providers.",
+ _UNSUPPORTED_SSL_ENGINE},
+ {_RK_GLOBAL, "ssl.engine.id", _RK_C_STR, _RK(ssl.engine_id),
+ "OpenSSL engine id is the name used for loading engine.",
+ .sdef = "dynamic", _UNSUPPORTED_SSL_ENGINE},
+ {_RK_GLOBAL, "ssl_engine_callback_data", _RK_C_PTR,
+ _RK(ssl.engine_callback_data),
+ "OpenSSL engine callback data (set "
+ "with rd_kafka_conf_set_engine_callback_data()).",
+ _UNSUPPORTED_SSL_ENGINE},
+ {_RK_GLOBAL, "enable.ssl.certificate.verification", _RK_C_BOOL,
+ _RK(ssl.enable_verify),
+ "Enable OpenSSL's builtin broker (server) certificate verification. "
+ "This verification can be extended by the application by "
+ "implementing a certificate_verify_cb.",
+ 0, 1, 1, _UNSUPPORTED_SSL},
+ {_RK_GLOBAL, "ssl.endpoint.identification.algorithm", _RK_C_S2I,
+ _RK(ssl.endpoint_identification),
+ "Endpoint identification algorithm to validate broker "
+ "hostname using broker certificate. "
+ "https - Server (broker) hostname verification as "
+ "specified in RFC2818. "
+ "none - No endpoint verification. "
+ "OpenSSL >= 1.0.2 required.",
+ .vdef = RD_KAFKA_SSL_ENDPOINT_ID_HTTPS,
+ .s2i = {{RD_KAFKA_SSL_ENDPOINT_ID_NONE, "none"},
+ {RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, "https"}},
+ _UNSUPPORTED_OPENSSL_1_0_2},
+ {_RK_GLOBAL, "ssl.certificate.verify_cb", _RK_C_PTR,
+ _RK(ssl.cert_verify_cb),
+ "Callback to verify the broker certificate chain.", _UNSUPPORTED_SSL},
+
+ /* Point user in the right direction if they try to apply
+ * Java client SSL / JAAS properties. */
+ {_RK_GLOBAL, "ssl.truststore.location", _RK_C_INVALID, _RK(dummy),
+ "Java TrustStores are not supported, use `ssl.ca.location` "
+ "and a certificate file instead. "
+ "See "
+ "https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka "
+ "for more information."},
+ {_RK_GLOBAL, "sasl.jaas.config", _RK_C_INVALID, _RK(dummy),
+ "Java JAAS configuration is not supported, see "
+ "https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka "
+ "for more information."},
+
+ {_RK_GLOBAL | _RK_HIGH, "sasl.mechanisms", _RK_C_STR, _RK(sasl.mechanisms),
+ "SASL mechanism to use for authentication. "
+ "Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. "
+ "**NOTE**: Despite the name only one mechanism must be configured.",
+ .sdef = "GSSAPI", .validate = rd_kafka_conf_validate_single},
+ {_RK_GLOBAL | _RK_HIGH, "sasl.mechanism", _RK_C_ALIAS,
+ .sdef = "sasl.mechanisms"},
+ {_RK_GLOBAL, "sasl.kerberos.service.name", _RK_C_STR,
+ _RK(sasl.service_name),
+ "Kerberos principal name that Kafka runs as, "
+ "not including /hostname@REALM",
+ .sdef = "kafka"},
+ {_RK_GLOBAL, "sasl.kerberos.principal", _RK_C_STR, _RK(sasl.principal),
+ "This client's Kerberos principal name. "
+ "(Not supported on Windows, will use the logon user's principal).",
+ .sdef = "kafkaclient"},
+ {_RK_GLOBAL, "sasl.kerberos.kinit.cmd", _RK_C_STR, _RK(sasl.kinit_cmd),
+ "Shell command to refresh or acquire the client's Kerberos ticket. "
+ "This command is executed on client creation and every "
+ "sasl.kerberos.min.time.before.relogin (0=disable). "
+ "%{config.prop.name} is replaced by corresponding config "
+ "object value.",
+ .sdef =
+ /* First attempt to refresh, else acquire. */
+ "kinit -R -t \"%{sasl.kerberos.keytab}\" "
+ "-k %{sasl.kerberos.principal} || "
+ "kinit -t \"%{sasl.kerberos.keytab}\" -k %{sasl.kerberos.principal}",
+ _UNSUPPORTED_WIN32_GSSAPI},
+ {_RK_GLOBAL, "sasl.kerberos.keytab", _RK_C_STR, _RK(sasl.keytab),
+ "Path to Kerberos keytab file. "
+ "This configuration property is only used as a variable in "
+ "`sasl.kerberos.kinit.cmd` as "
+ "` ... -t \"%{sasl.kerberos.keytab}\"`.",
+ _UNSUPPORTED_WIN32_GSSAPI},
+ {_RK_GLOBAL, "sasl.kerberos.min.time.before.relogin", _RK_C_INT,
+ _RK(sasl.relogin_min_time),
+ "Minimum time in milliseconds between key refresh attempts. "
+ "Disable automatic key refresh by setting this property to 0.",
+ 0, 86400 * 1000, 60 * 1000, _UNSUPPORTED_WIN32_GSSAPI},
+ {_RK_GLOBAL | _RK_HIGH | _RK_SENSITIVE, "sasl.username", _RK_C_STR,
+ _RK(sasl.username),
+ "SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms"},
+ {_RK_GLOBAL | _RK_HIGH | _RK_SENSITIVE, "sasl.password", _RK_C_STR,
+ _RK(sasl.password),
+ "SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism"},
+ {_RK_GLOBAL | _RK_SENSITIVE, "sasl.oauthbearer.config", _RK_C_STR,
+ _RK(sasl.oauthbearer_config),
+ "SASL/OAUTHBEARER configuration. The format is "
+ "implementation-dependent and must be parsed accordingly. The "
+ "default unsecured token implementation (see "
+ "https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes "
+ "space-separated name=value pairs with valid names including "
+ "principalClaimName, principal, scopeClaimName, scope, and "
+ "lifeSeconds. The default value for principalClaimName is \"sub\", "
+ "the default value for scopeClaimName is \"scope\", and the default "
+ "value for lifeSeconds is 3600. The scope value is CSV format with "
+ "the default value being no/empty scope. For example: "
+ "`principalClaimName=azp principal=admin scopeClaimName=roles "
+ "scope=role1,role2 lifeSeconds=600`. In addition, SASL extensions "
+ "can be communicated to the broker via "
+ "`extension_NAME=value`. For example: "
+ "`principal=admin extension_traceId=123`",
+ _UNSUPPORTED_OAUTHBEARER},
+ {_RK_GLOBAL, "enable.sasl.oauthbearer.unsecure.jwt", _RK_C_BOOL,
+ _RK(sasl.enable_oauthbearer_unsecure_jwt),
+ "Enable the builtin unsecure JWT OAUTHBEARER token handler "
+ "if no oauthbearer_refresh_cb has been set. "
+ "This builtin handler should only be used for development "
+ "or testing, and not in production.",
+ 0, 1, 0, _UNSUPPORTED_OAUTHBEARER},
+ {_RK_GLOBAL, "oauthbearer_token_refresh_cb", _RK_C_PTR,
+ _RK(sasl.oauthbearer.token_refresh_cb),
+ "SASL/OAUTHBEARER token refresh callback (set with "
+ "rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by "
+ "rd_kafka_poll(), et.al. "
+ "This callback will be triggered when it is time to refresh "
+ "the client's OAUTHBEARER token. "
+ "Also see `rd_kafka_conf_enable_sasl_queue()`.",
+ _UNSUPPORTED_OAUTHBEARER},
+ {
+ _RK_GLOBAL | _RK_HIDDEN,
+ "enable_sasl_queue",
+ _RK_C_BOOL,
+ _RK(sasl.enable_callback_queue),
+ "Enable the SASL callback queue "
+ "(set with rd_kafka_conf_enable_sasl_queue()).",
+ 0,
+ 1,
+ 0,
+ },
+ {_RK_GLOBAL, "sasl.oauthbearer.method", _RK_C_S2I,
+ _RK(sasl.oauthbearer.method),
+ "Set to \"default\" or \"oidc\" to control which login method "
+ "to be used. If set to \"oidc\", the following properties must also be "
+ "be specified: "
+ "`sasl.oauthbearer.client.id`, `sasl.oauthbearer.client.secret`, "
+ "and `sasl.oauthbearer.token.endpoint.url`.",
+ .vdef = RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT,
+ .s2i = {{RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT, "default"},
+ {RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC, "oidc"}},
+ _UNSUPPORTED_OIDC},
+ {_RK_GLOBAL, "sasl.oauthbearer.client.id", _RK_C_STR,
+ _RK(sasl.oauthbearer.client_id),
+ "Public identifier for the application. "
+ "Must be unique across all clients that the "
+ "authorization server handles. "
+ "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
+ _UNSUPPORTED_OIDC},
+ {_RK_GLOBAL, "sasl.oauthbearer.client.secret", _RK_C_STR,
+ _RK(sasl.oauthbearer.client_secret),
+ "Client secret only known to the application and the "
+ "authorization server. This should be a sufficiently random string "
+ "that is not guessable. "
+ "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
+ _UNSUPPORTED_OIDC},
+ {_RK_GLOBAL, "sasl.oauthbearer.scope", _RK_C_STR,
+ _RK(sasl.oauthbearer.scope),
+ "Client use this to specify the scope of the access request to the "
+ "broker. "
+ "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
+ _UNSUPPORTED_OIDC},
+ {_RK_GLOBAL, "sasl.oauthbearer.extensions", _RK_C_STR,
+ _RK(sasl.oauthbearer.extensions_str),
+ "Allow additional information to be provided to the broker. "
+ "Comma-separated list of key=value pairs. "
+ "E.g., \"supportFeatureX=true,organizationId=sales-emea\"."
+ "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
+ _UNSUPPORTED_OIDC},
+ {_RK_GLOBAL, "sasl.oauthbearer.token.endpoint.url", _RK_C_STR,
+ _RK(sasl.oauthbearer.token_endpoint_url),
+ "OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. "
+ "Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
+ _UNSUPPORTED_OIDC},
+
+ /* Plugins */
+ {_RK_GLOBAL, "plugin.library.paths", _RK_C_STR, _RK(plugin_paths),
+ "List of plugin libraries to load (; separated). "
+ "The library search path is platform dependent (see dlopen(3) for "
+ "Unix and LoadLibrary() for Windows). If no filename extension is "
+ "specified the platform-specific extension (such as .dll or .so) "
+ "will be appended automatically.",
+#if WITH_PLUGINS
+ .set = rd_kafka_plugins_conf_set
+#else
+ .unsupported = "libdl/dlopen(3) not available at build time"
+#endif
+ },
+
+ /* Interceptors are added through specific API and not exposed
+ * as configuration properties.
+ * The interceptor property must be defined after plugin.library.paths
+ * so that the plugin libraries are properly loaded before
+ * interceptors are configured when duplicating configuration objects.*/
+ {_RK_GLOBAL, "interceptors", _RK_C_INTERNAL, _RK(interceptors),
+ "Interceptors added through rd_kafka_conf_interceptor_add_..() "
+ "and any configuration handled by interceptors.",
+ .ctor = rd_kafka_conf_interceptor_ctor,
+ .dtor = rd_kafka_conf_interceptor_dtor,
+ .copy = rd_kafka_conf_interceptor_copy},
+
+ /* Test mocks. */
+ {_RK_GLOBAL | _RK_HIDDEN, "test.mock.num.brokers", _RK_C_INT,
+ _RK(mock.broker_cnt),
+ "Number of mock brokers to create. "
+ "This will automatically overwrite `bootstrap.servers` with the "
+ "mock broker list.",
+ 0, 10000, 0},
+ {_RK_GLOBAL | _RK_HIDDEN, "test.mock.broker.rtt", _RK_C_INT,
+ _RK(mock.broker_rtt), "Simulated mock broker latency in milliseconds.", 0,
+ 60 * 60 * 1000 /*1h*/, 0},
+
+ /* Unit test interfaces.
+ * These are not part of the public API and may change at any time.
+ * Only to be used by the librdkafka tests. */
+ {_RK_GLOBAL | _RK_HIDDEN, "ut_handle_ProduceResponse", _RK_C_PTR,
+ _RK(ut.handle_ProduceResponse),
+ "ProduceResponse handler: "
+ "rd_kafka_resp_err_t (*cb) (rd_kafka_t *rk, "
+ "int32_t brokerid, uint64_t msgid, rd_kafka_resp_err_t err)"},
+
+ /* Global consumer group properties */
+ {_RK_GLOBAL | _RK_CGRP | _RK_HIGH, "group.id", _RK_C_STR, _RK(group_id_str),
+ "Client group id string. All clients sharing the same group.id "
+ "belong to the same group."},
+ {_RK_GLOBAL | _RK_CGRP | _RK_MED, "group.instance.id", _RK_C_STR,
+ _RK(group_instance_id),
+ "Enable static group membership. "
+ "Static group members are able to leave and rejoin a group "
+ "within the configured `session.timeout.ms` without prompting a "
+ "group rebalance. This should be used in combination with a larger "
+ "`session.timeout.ms` to avoid group rebalances caused by transient "
+ "unavailability (e.g. process restarts). "
+ "Requires broker version >= 2.3.0."},
+ {_RK_GLOBAL | _RK_CGRP | _RK_MED, "partition.assignment.strategy",
+ _RK_C_STR, _RK(partition_assignment_strategy),
+ "The name of one or more partition assignment strategies. The "
+ "elected group leader will use a strategy supported by all "
+ "members of the group to assign partitions to group members. If "
+ "there is more than one eligible strategy, preference is "
+ "determined by the order of this list (strategies earlier in the "
+ "list have higher priority). "
+ "Cooperative and non-cooperative (eager) strategies must not be "
+ "mixed. "
+ "Available strategies: range, roundrobin, cooperative-sticky.",
+ .sdef = "range,roundrobin"},
+ {_RK_GLOBAL | _RK_CGRP | _RK_HIGH, "session.timeout.ms", _RK_C_INT,
+ _RK(group_session_timeout_ms),
+ "Client group session and failure detection timeout. "
+ "The consumer sends periodic heartbeats (heartbeat.interval.ms) "
+ "to indicate its liveness to the broker. If no hearts are "
+ "received by the broker for a group member within the "
+ "session timeout, the broker will remove the consumer from "
+ "the group and trigger a rebalance. "
+ "The allowed range is configured with the **broker** configuration "
+ "properties `group.min.session.timeout.ms` and "
+ "`group.max.session.timeout.ms`. "
+ "Also see `max.poll.interval.ms`.",
+ 1, 3600 * 1000, 45 * 1000},
+ {_RK_GLOBAL | _RK_CGRP, "heartbeat.interval.ms", _RK_C_INT,
+ _RK(group_heartbeat_intvl_ms),
+ "Group session keepalive heartbeat interval.", 1, 3600 * 1000, 3 * 1000},
+ {_RK_GLOBAL | _RK_CGRP, "group.protocol.type", _RK_C_KSTR,
+ _RK(group_protocol_type),
+ "Group protocol type. NOTE: Currently, the only supported group "
+ "protocol type is `consumer`.",
+ .sdef = "consumer"},
+ {_RK_GLOBAL | _RK_CGRP, "coordinator.query.interval.ms", _RK_C_INT,
+ _RK(coord_query_intvl_ms),
+ "How often to query for the current client group coordinator. "
+ "If the currently assigned coordinator is down the configured "
+ "query interval will be divided by ten to more quickly recover "
+ "in case of coordinator reassignment.",
+ 1, 3600 * 1000, 10 * 60 * 1000},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "max.poll.interval.ms", _RK_C_INT,
+ _RK(max_poll_interval_ms),
+ "Maximum allowed time between calls to consume messages "
+ "(e.g., rd_kafka_consumer_poll()) for high-level consumers. "
+ "If this interval is exceeded the consumer is considered failed "
+ "and the group will rebalance in order to reassign the "
+ "partitions to another consumer group member. "
+ "Warning: Offset commits may be not possible at this point. "
+ "Note: It is recommended to set `enable.auto.offset.store=false` "
+ "for long-time processing applications and then explicitly store "
+ "offsets (using offsets_store()) *after* message processing, to "
+ "make sure offsets are not auto-committed prior to processing "
+ "has finished. "
+ "The interval is checked two times per second. "
+ "See KIP-62 for more information.",
+ 1, 86400 * 1000, 300000},
+
+ /* Global consumer properties */
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "enable.auto.commit", _RK_C_BOOL,
+ _RK(enable_auto_commit),
+ "Automatically and periodically commit offsets in the background. "
+ "Note: setting this to false does not prevent the consumer from "
+ "fetching previously committed start offsets. To circumvent this "
+ "behaviour set specific start offsets per partition in the call "
+ "to assign().",
+ 0, 1, 1},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "auto.commit.interval.ms", _RK_C_INT,
+ _RK(auto_commit_interval_ms),
+ "The frequency in milliseconds that the consumer offsets "
+ "are committed (written) to offset storage. (0 = disable). "
+ "This setting is used by the high-level consumer.",
+ 0, 86400 * 1000, 5 * 1000},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "enable.auto.offset.store",
+ _RK_C_BOOL, _RK(enable_auto_offset_store),
+ "Automatically store offset of last message provided to "
+ "application. "
+ "The offset store is an in-memory store of the next offset to "
+ "(auto-)commit for each partition.",
+ 0, 1, 1},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "queued.min.messages", _RK_C_INT,
+ _RK(queued_min_msgs),
+ "Minimum number of messages per topic+partition "
+ "librdkafka tries to maintain in the local consumer queue.",
+ 1, 10000000, 100000},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "queued.max.messages.kbytes",
+ _RK_C_INT, _RK(queued_max_msg_kbytes),
+ "Maximum number of kilobytes of queued pre-fetched messages "
+ "in the local consumer queue. "
+ "If using the high-level consumer this setting applies to the "
+ "single consumer queue, regardless of the number of partitions. "
+ "When using the legacy simple consumer or when separate "
+ "partition queues are used this setting applies per partition. "
+ "This value may be overshot by fetch.message.max.bytes. "
+ "This property has higher priority than queued.min.messages.",
+ 1, INT_MAX / 1024, 0x10000 /*64MB*/},
+ {_RK_GLOBAL | _RK_CONSUMER, "fetch.wait.max.ms", _RK_C_INT,
+ _RK(fetch_wait_max_ms),
+ "Maximum time the broker may wait to fill the Fetch response "
+ "with fetch.min.bytes of messages.",
+ 0, 300 * 1000, 500},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "fetch.message.max.bytes", _RK_C_INT,
+ _RK(fetch_msg_max_bytes),
+ "Initial maximum number of bytes per topic+partition to request when "
+ "fetching messages from the broker. "
+ "If the client encounters a message larger than this value "
+ "it will gradually try to increase it until the "
+ "entire message can be fetched.",
+ 1, 1000000000, 1024 * 1024},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "max.partition.fetch.bytes",
+ _RK_C_ALIAS, .sdef = "fetch.message.max.bytes"},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "fetch.max.bytes", _RK_C_INT,
+ _RK(fetch_max_bytes),
+ "Maximum amount of data the broker shall return for a Fetch request. "
+ "Messages are fetched in batches by the consumer and if the first "
+ "message batch in the first non-empty partition of the Fetch request "
+ "is larger than this value, then the message batch will still be "
+ "returned to ensure the consumer can make progress. "
+ "The maximum message batch size accepted by the broker is defined "
+ "via `message.max.bytes` (broker config) or "
+ "`max.message.bytes` (broker topic config). "
+ "`fetch.max.bytes` is automatically adjusted upwards to be "
+ "at least `message.max.bytes` (consumer config).",
+ 0, INT_MAX - 512, 50 * 1024 * 1024 /* 50MB */},
+ {_RK_GLOBAL | _RK_CONSUMER, "fetch.min.bytes", _RK_C_INT,
+ _RK(fetch_min_bytes),
+ "Minimum number of bytes the broker responds with. "
+ "If fetch.wait.max.ms expires the accumulated data will "
+ "be sent to the client regardless of this setting.",
+ 1, 100000000, 1},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "fetch.error.backoff.ms", _RK_C_INT,
+ _RK(fetch_error_backoff_ms),
+ "How long to postpone the next fetch request for a "
+ "topic+partition in case of a fetch error.",
+ 0, 300 * 1000, 500},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_DEPRECATED, "offset.store.method",
+ _RK_C_S2I, _RK(offset_store_method),
+ "Offset commit store method: "
+ "'file' - DEPRECATED: local file store (offset.store.path, et.al), "
+ "'broker' - broker commit store "
+ "(requires Apache Kafka 0.8.2 or later on the broker).",
+ .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
+ .s2i = {{RD_KAFKA_OFFSET_METHOD_NONE, "none"},
+ {RD_KAFKA_OFFSET_METHOD_FILE, "file"},
+ {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_HIGH, "isolation.level", _RK_C_S2I,
+ _RK(isolation_level),
+ "Controls how to read messages written transactionally: "
+ "`read_committed` - only return transactional messages which have "
+ "been committed. `read_uncommitted` - return all messages, even "
+ "transactional messages which have been aborted.",
+ .vdef = RD_KAFKA_READ_COMMITTED,
+ .s2i = {{RD_KAFKA_READ_UNCOMMITTED, "read_uncommitted"},
+ {RD_KAFKA_READ_COMMITTED, "read_committed"}}},
+ {_RK_GLOBAL | _RK_CONSUMER, "consume_cb", _RK_C_PTR, _RK(consume_cb),
+ "Message consume callback (set with rd_kafka_conf_set_consume_cb())"},
+ {_RK_GLOBAL | _RK_CONSUMER, "rebalance_cb", _RK_C_PTR, _RK(rebalance_cb),
+ "Called after consumer group has been rebalanced "
+ "(set with rd_kafka_conf_set_rebalance_cb())"},
+ {_RK_GLOBAL | _RK_CONSUMER, "offset_commit_cb", _RK_C_PTR,
+ _RK(offset_commit_cb),
+ "Offset commit result propagation callback. "
+ "(set with rd_kafka_conf_set_offset_commit_cb())"},
+ {_RK_GLOBAL | _RK_CONSUMER, "enable.partition.eof", _RK_C_BOOL,
+ _RK(enable_partition_eof),
+ "Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the "
+ "consumer reaches the end of a partition.",
+ 0, 1, 0},
+ {_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "check.crcs", _RK_C_BOOL,
+ _RK(check_crcs),
+ "Verify CRC32 of consumed messages, ensuring no on-the-wire or "
+ "on-disk corruption to the messages occurred. This check comes "
+ "at slightly increased CPU usage.",
+ 0, 1, 0},
+ {_RK_GLOBAL, "client.rack", _RK_C_KSTR, _RK(client_rack),
+ "A rack identifier for this client. This can be any string value "
+ "which indicates where this client is physically located. It "
+ "corresponds with the broker config `broker.rack`.",
+ .sdef = ""},
+
+ /* Global producer properties */
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "transactional.id", _RK_C_STR,
+ _RK(eos.transactional_id),
+ "Enables the transactional producer. "
+ "The transactional.id is used to identify the same transactional "
+ "producer instance across process restarts. "
+ "It allows the producer to guarantee that transactions corresponding "
+ "to earlier instances of the same producer have been finalized "
+ "prior to starting any new transactions, and that any "
+ "zombie instances are fenced off. "
+ "If no transactional.id is provided, then the producer is limited "
+ "to idempotent delivery (if enable.idempotence is set). "
+ "Requires broker version >= 0.11.0."},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "transaction.timeout.ms", _RK_C_INT,
+ _RK(eos.transaction_timeout_ms),
+ "The maximum amount of time in milliseconds that the transaction "
+ "coordinator will wait for a transaction status update from the "
+ "producer before proactively aborting the ongoing transaction. "
+ "If this value is larger than the `transaction.max.timeout.ms` "
+ "setting in the broker, the init_transactions() call will fail with "
+ "ERR_INVALID_TRANSACTION_TIMEOUT. "
+ "The transaction timeout automatically adjusts "
+ "`message.timeout.ms` and `socket.timeout.ms`, unless explicitly "
+ "configured in which case they must not exceed the "
+ "transaction timeout (`socket.timeout.ms` must be at least 100ms "
+ "lower than `transaction.timeout.ms`). "
+ "This is also the default timeout value if no timeout (-1) is "
+ "supplied to the transactional API methods.",
+ 1000, INT_MAX, 60000},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "enable.idempotence", _RK_C_BOOL,
+ _RK(eos.idempotence),
+ "When set to `true`, the producer will ensure that messages are "
+ "successfully produced exactly once and in the original produce "
+ "order. "
+ "The following configuration properties are adjusted automatically "
+ "(if not modified by the user) when idempotence is enabled: "
+ "`max.in.flight.requests.per.connection=" RD_KAFKA_IDEMP_MAX_INFLIGHT_STR
+ "` (must be less than or "
+ "equal to " RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "), `retries=INT32_MAX` "
+ "(must be greater than 0), `acks=all`, `queuing.strategy=fifo`. "
+ "Producer instantation will fail if user-supplied configuration "
+ "is incompatible.",
+ 0, 1, 0},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_EXPERIMENTAL, "enable.gapless.guarantee",
+ _RK_C_BOOL, _RK(eos.gapless),
+ "When set to `true`, any error that could result in a gap "
+ "in the produced message series when a batch of messages fails, "
+ "will raise a fatal error (ERR__GAPLESS_GUARANTEE) and stop "
+ "the producer. "
+ "Messages failing due to `message.timeout.ms` are not covered "
+ "by this guarantee. "
+ "Requires `enable.idempotence=true`.",
+ 0, 1, 0},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "queue.buffering.max.messages",
+ _RK_C_INT, _RK(queue_buffering_max_msgs),
+ "Maximum number of messages allowed on the producer queue. "
+ "This queue is shared by all topics and partitions. A value of 0 disables "
+ "this limit.",
+ 0, INT_MAX, 100000},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "queue.buffering.max.kbytes",
+ _RK_C_INT, _RK(queue_buffering_max_kbytes),
+ "Maximum total message size sum allowed on the producer queue. "
+ "This queue is shared by all topics and partitions. "
+ "This property has higher priority than queue.buffering.max.messages.",
+ 1, INT_MAX, 0x100000 /*1GB*/},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "queue.buffering.max.ms", _RK_C_DBL,
+ _RK(buffering_max_ms_dbl),
+ "Delay in milliseconds to wait for messages in the producer queue "
+ "to accumulate before constructing message batches (MessageSets) to "
+ "transmit to brokers. "
+ "A higher value allows larger and more effective "
+ "(less overhead, improved compression) batches of messages to "
+ "accumulate at the expense of increased message delivery latency.",
+ .dmin = 0, .dmax = 900.0 * 1000.0, .ddef = 5.0},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "linger.ms", _RK_C_ALIAS,
+ .sdef = "queue.buffering.max.ms"},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "message.send.max.retries",
+ _RK_C_INT, _RK(max_retries),
+ "How many times to retry sending a failing Message. "
+ "**Note:** retrying may cause reordering unless "
+ "`enable.idempotence` is set to true.",
+ 0, INT32_MAX, INT32_MAX},
+ {_RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
+ .sdef = "message.send.max.retries"},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "retry.backoff.ms", _RK_C_INT,
+ _RK(retry_backoff_ms),
+ "The backoff time in milliseconds before retrying a protocol request.", 1,
+ 300 * 1000, 100},
+
+ {_RK_GLOBAL | _RK_PRODUCER, "queue.buffering.backpressure.threshold",
+ _RK_C_INT, _RK(queue_backpressure_thres),
+ "The threshold of outstanding not yet transmitted broker requests "
+ "needed to backpressure the producer's message accumulator. "
+ "If the number of not yet transmitted requests equals or exceeds "
+ "this number, produce request creation that would have otherwise "
+ "been triggered (for example, in accordance with linger.ms) will be "
+ "delayed. A lower number yields larger and more effective batches. "
+ "A higher value can improve latency when using compression on slow "
+ "machines.",
+ 1, 1000000, 1},
+
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "compression.codec", _RK_C_S2I,
+ _RK(compression_codec),
+ "compression codec to use for compressing message sets. "
+ "This is the default value for all topics, may be overridden by "
+ "the topic configuration property `compression.codec`. ",
+ .vdef = RD_KAFKA_COMPRESSION_NONE,
+ .s2i = {{RD_KAFKA_COMPRESSION_NONE, "none"},
+ {RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB},
+ {RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY},
+ {RD_KAFKA_COMPRESSION_LZ4, "lz4"},
+ {RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD},
+ {0}}},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "compression.type", _RK_C_ALIAS,
+ .sdef = "compression.codec"},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "batch.num.messages", _RK_C_INT,
+ _RK(batch_num_messages),
+ "Maximum number of messages batched in one MessageSet. "
+ "The total MessageSet size is also limited by batch.size and "
+ "message.max.bytes.",
+ 1, 1000000, 10000},
+ {_RK_GLOBAL | _RK_PRODUCER | _RK_MED, "batch.size", _RK_C_INT,
+ _RK(batch_size),
+ "Maximum size (in bytes) of all messages batched in one MessageSet, "
+ "including protocol framing overhead. "
+ "This limit is applied after the first message has been added "
+ "to the batch, regardless of the first message's size, this is to "
+ "ensure that messages that exceed batch.size are produced. "
+ "The total MessageSet size is also limited by batch.num.messages and "
+ "message.max.bytes.",
+ 1, INT_MAX, 1000000},
+ {_RK_GLOBAL | _RK_PRODUCER, "delivery.report.only.error", _RK_C_BOOL,
+ _RK(dr_err_only), "Only provide delivery reports for failed messages.", 0,
+ 1, 0},
+ {_RK_GLOBAL | _RK_PRODUCER, "dr_cb", _RK_C_PTR, _RK(dr_cb),
+ "Delivery report callback (set with rd_kafka_conf_set_dr_cb())"},
+ {_RK_GLOBAL | _RK_PRODUCER, "dr_msg_cb", _RK_C_PTR, _RK(dr_msg_cb),
+ "Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())"},
+ {_RK_GLOBAL | _RK_PRODUCER, "sticky.partitioning.linger.ms", _RK_C_INT,
+ _RK(sticky_partition_linger_ms),
+ "Delay in milliseconds to wait to assign new sticky partitions for "
+ "each topic. "
+ "By default, set to double the time of linger.ms. To disable sticky "
+ "behavior, set to 0. "
+ "This behavior affects messages with the key NULL in all cases, and "
+ "messages with key lengths of zero when the consistent_random "
+ "partitioner is in use. "
+ "These messages would otherwise be assigned randomly. "
+ "A higher value allows for more effective batching of these "
+ "messages.",
+ 0, 900000, 10},
+
+
+ /*
+ * Topic properties
+ */
+
+ /* Topic producer properties */
+ {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "request.required.acks", _RK_C_INT,
+ _RKT(required_acks),
+ "This field indicates the number of acknowledgements the leader "
+ "broker must receive from ISR brokers before responding to the "
+ "request: "
+ "*0*=Broker does not send any response/ack to client, "
+ "*-1* or *all*=Broker will block until message is committed by all "
+ "in sync replicas (ISRs). If there are less than "
+ "`min.insync.replicas` (broker configuration) in the ISR set the "
+ "produce request will fail.",
+ -1, 1000, -1,
+ .s2i =
+ {
+ {-1, "all"},
+ }},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "acks", _RK_C_ALIAS,
+ .sdef = "request.required.acks"},
+
+ {_RK_TOPIC | _RK_PRODUCER | _RK_MED, "request.timeout.ms", _RK_C_INT,
+ _RKT(request_timeout_ms),
+ "The ack timeout of the producer request in milliseconds. "
+ "This value is only enforced by the broker and relies "
+ "on `request.required.acks` being != 0.",
+ 1, 900 * 1000, 30 * 1000},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "message.timeout.ms", _RK_C_INT,
+ _RKT(message_timeout_ms),
+ "Local message timeout. "
+ "This value is only enforced locally and limits the time a "
+ "produced message waits for successful delivery. "
+ "A time of 0 is infinite. "
+ "This is the maximum time librdkafka may use to deliver a message "
+ "(including retries). Delivery error occurs when either the retry "
+ "count or the message timeout are exceeded. "
+ "The message timeout is automatically adjusted to "
+ "`transaction.timeout.ms` if `transactional.id` is configured.",
+ 0, INT32_MAX, 300 * 1000},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "delivery.timeout.ms", _RK_C_ALIAS,
+ .sdef = "message.timeout.ms"},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_DEPRECATED | _RK_EXPERIMENTAL,
+ "queuing.strategy", _RK_C_S2I, _RKT(queuing_strategy),
+ "Producer queuing strategy. FIFO preserves produce ordering, "
+ "while LIFO prioritizes new messages.",
+ .vdef = 0,
+ .s2i = {{RD_KAFKA_QUEUE_FIFO, "fifo"}, {RD_KAFKA_QUEUE_LIFO, "lifo"}}},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_DEPRECATED, "produce.offset.report",
+ _RK_C_BOOL, _RKT(produce_offset_report), "No longer used.", 0, 1, 0},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "partitioner", _RK_C_STR,
+ _RKT(partitioner_str),
+ "Partitioner: "
+ "`random` - random distribution, "
+ "`consistent` - CRC32 hash of key (Empty and NULL keys are mapped to "
+ "single partition), "
+ "`consistent_random` - CRC32 hash of key (Empty and NULL keys are "
+ "randomly partitioned), "
+ "`murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are "
+ "mapped to single partition), "
+ "`murmur2_random` - Java Producer compatible Murmur2 hash of key "
+ "(NULL keys are randomly partitioned. This is functionally equivalent "
+ "to the default partitioner in the Java Producer.), "
+ "`fnv1a` - FNV-1a hash of key (NULL keys are mapped to single partition), "
+ "`fnv1a_random` - FNV-1a hash of key (NULL keys are randomly "
+ "partitioned).",
+ .sdef = "consistent_random",
+ .validate = rd_kafka_conf_validate_partitioner},
+ {_RK_TOPIC | _RK_PRODUCER, "partitioner_cb", _RK_C_PTR, _RKT(partitioner),
+ "Custom partitioner callback "
+ "(set with rd_kafka_topic_conf_set_partitioner_cb())"},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_DEPRECATED | _RK_EXPERIMENTAL,
+ "msg_order_cmp", _RK_C_PTR, _RKT(msg_order_cmp),
+ "Message queue ordering comparator "
+ "(set with rd_kafka_topic_conf_set_msg_order_cmp()). "
+ "Also see `queuing.strategy`."},
+ {_RK_TOPIC, "opaque", _RK_C_PTR, _RKT(opaque),
+ "Application opaque (set with rd_kafka_topic_conf_set_opaque())"},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "compression.codec", _RK_C_S2I,
+ _RKT(compression_codec),
+ "Compression codec to use for compressing message sets. "
+ "inherit = inherit global compression.codec configuration.",
+ .vdef = RD_KAFKA_COMPRESSION_INHERIT,
+ .s2i = {{RD_KAFKA_COMPRESSION_NONE, "none"},
+ {RD_KAFKA_COMPRESSION_GZIP, "gzip", _UNSUPPORTED_ZLIB},
+ {RD_KAFKA_COMPRESSION_SNAPPY, "snappy", _UNSUPPORTED_SNAPPY},
+ {RD_KAFKA_COMPRESSION_LZ4, "lz4"},
+ {RD_KAFKA_COMPRESSION_ZSTD, "zstd", _UNSUPPORTED_ZSTD},
+ {RD_KAFKA_COMPRESSION_INHERIT, "inherit"},
+ {0}}},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_HIGH, "compression.type", _RK_C_ALIAS,
+ .sdef = "compression.codec"},
+ {_RK_TOPIC | _RK_PRODUCER | _RK_MED, "compression.level", _RK_C_INT,
+ _RKT(compression_level),
+ "Compression level parameter for algorithm selected by configuration "
+ "property `compression.codec`. Higher values will result in better "
+ "compression at the cost of more CPU usage. Usable range is "
+ "algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; "
+ "-1 = codec-dependent default compression level.",
+ RD_KAFKA_COMPLEVEL_MIN, RD_KAFKA_COMPLEVEL_MAX,
+ RD_KAFKA_COMPLEVEL_DEFAULT},
+
+
+ /* Topic consumer properties */
+ {_RK_TOPIC | _RK_CONSUMER | _RK_DEPRECATED, "auto.commit.enable",
+ _RK_C_BOOL, _RKT(auto_commit),
+ "[**LEGACY PROPERTY:** This property is used by the simple legacy "
+ "consumer only. When using the high-level KafkaConsumer, the global "
+ "`enable.auto.commit` property must be used instead]. "
+ "If true, periodically commit offset of the last message handed "
+ "to the application. This committed offset will be used when the "
+ "process restarts to pick up where it left off. "
+ "If false, the application will have to call "
+ "`rd_kafka_offset_store()` to store an offset (optional). "
+ "Offsets will be written to broker or local file according to "
+ "offset.store.method.",
+ 0, 1, 1},
+ {_RK_TOPIC | _RK_CONSUMER, "enable.auto.commit", _RK_C_ALIAS,
+ .sdef = "auto.commit.enable"},
+ {_RK_TOPIC | _RK_CONSUMER | _RK_HIGH, "auto.commit.interval.ms", _RK_C_INT,
+ _RKT(auto_commit_interval_ms),
+ "[**LEGACY PROPERTY:** This setting is used by the simple legacy "
+ "consumer only. When using the high-level KafkaConsumer, the "
+ "global `auto.commit.interval.ms` property must be used instead]. "
+ "The frequency in milliseconds that the consumer offsets "
+ "are committed (written) to offset storage.",
+ 10, 86400 * 1000, 60 * 1000},
+ {_RK_TOPIC | _RK_CONSUMER | _RK_HIGH, "auto.offset.reset", _RK_C_S2I,
+ _RKT(auto_offset_reset),
+ "Action to take when there is no initial offset in offset store "
+ "or the desired offset is out of range: "
+ "'smallest','earliest' - automatically reset the offset to the smallest "
+ "offset, "
+ "'largest','latest' - automatically reset the offset to the largest "
+ "offset, "
+ "'error' - trigger an error (ERR__AUTO_OFFSET_RESET) which is "
+ "retrieved by consuming messages and checking 'message->err'.",
+ .vdef = RD_KAFKA_OFFSET_END,
+ .s2i =
+ {
+ {RD_KAFKA_OFFSET_BEGINNING, "smallest"},
+ {RD_KAFKA_OFFSET_BEGINNING, "earliest"},
+ {RD_KAFKA_OFFSET_BEGINNING, "beginning"},
+ {RD_KAFKA_OFFSET_END, "largest"},
+ {RD_KAFKA_OFFSET_END, "latest"},
+ {RD_KAFKA_OFFSET_END, "end"},
+ {RD_KAFKA_OFFSET_INVALID, "error"},
+ }},
+ {_RK_TOPIC | _RK_CONSUMER | _RK_DEPRECATED, "offset.store.path", _RK_C_STR,
+ _RKT(offset_store_path),
+ "Path to local file for storing offsets. If the path is a directory "
+ "a filename will be automatically generated in that directory based "
+ "on the topic and partition. "
+ "File-based offset storage will be removed in a future version.",
+ .sdef = "."},
+
+ {_RK_TOPIC | _RK_CONSUMER | _RK_DEPRECATED, "offset.store.sync.interval.ms",
+ _RK_C_INT, _RKT(offset_store_sync_interval_ms),
+ "fsync() interval for the offset file, in milliseconds. "
+ "Use -1 to disable syncing, and 0 for immediate sync after "
+ "each write. "
+ "File-based offset storage will be removed in a future version.",
+ -1, 86400 * 1000, -1},
+
+ {_RK_TOPIC | _RK_CONSUMER | _RK_DEPRECATED, "offset.store.method",
+ _RK_C_S2I, _RKT(offset_store_method),
+ "Offset commit store method: "
+ "'file' - DEPRECATED: local file store (offset.store.path, et.al), "
+ "'broker' - broker commit store "
+ "(requires \"group.id\" to be configured and "
+ "Apache Kafka 0.8.2 or later on the broker.).",
+ .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
+ .s2i = {{RD_KAFKA_OFFSET_METHOD_FILE, "file"},
+ {RD_KAFKA_OFFSET_METHOD_BROKER, "broker"}}},
+
+ {_RK_TOPIC | _RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT,
+ _RKT(consume_callback_max_msgs),
+ "Maximum number of messages to dispatch in "
+ "one `rd_kafka_consume_callback*()` call (0 = unlimited)",
+ 0, 1000000, 0},
+
+ {0, /* End */}};
+
+/**
+ * @returns the property object for \p name in \p scope, or NULL if not found.
+ * @remark does not work with interceptor configs.
+ */
+const struct rd_kafka_property *rd_kafka_conf_prop_find(int scope,
+ const char *name) {
+ const struct rd_kafka_property *prop;
+
+restart:
+ for (prop = rd_kafka_properties; prop->name; prop++) {
+
+ if (!(prop->scope & scope))
+ continue;
+
+ if (strcmp(prop->name, name))
+ continue;
+
+ if (prop->type == _RK_C_ALIAS) {
+ /* Caller supplied an alias, restart
+ * search for real name. */
+ name = prop->sdef;
+ goto restart;
+ }
+
+ return prop;
+ }
+
+ return NULL;
+}
+
+/**
+ * @returns rd_true if property has been set/modified, else rd_false.
+ *
+ * @warning Asserts if the property does not exist.
+ */
+rd_bool_t rd_kafka_conf_is_modified(const rd_kafka_conf_t *conf,
+ const char *name) {
+ const struct rd_kafka_property *prop;
+
+ if (!(prop = rd_kafka_conf_prop_find(_RK_GLOBAL, name)))
+ RD_BUG("Configuration property \"%s\" does not exist", name);
+
+ return rd_kafka_anyconf_is_modified(conf, prop);
+}
+
+
+/**
+ * @returns true if property has been set/modified, else 0.
+ *
+ * @warning Asserts if the property does not exist.
+ */
+static rd_bool_t
+rd_kafka_topic_conf_is_modified(const rd_kafka_topic_conf_t *conf,
+ const char *name) {
+ const struct rd_kafka_property *prop;
+
+ if (!(prop = rd_kafka_conf_prop_find(_RK_TOPIC, name)))
+ RD_BUG("Topic configuration property \"%s\" does not exist",
+ name);
+
+ return rd_kafka_anyconf_is_modified(conf, prop);
+}
+
+
+
+static rd_kafka_conf_res_t
+rd_kafka_anyconf_set_prop0(int scope,
+ void *conf,
+ const struct rd_kafka_property *prop,
+ const char *istr,
+ int ival,
+ rd_kafka_conf_set_mode_t set_mode,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_conf_res_t res;
+
+#define _RK_PTR(TYPE, BASE, OFFSET) (TYPE)(void *)(((char *)(BASE)) + (OFFSET))
+
+ /* Try interceptors first (only for GLOBAL config) */
+ if (scope & _RK_GLOBAL) {
+ if (prop->type == _RK_C_PTR || prop->type == _RK_C_INTERNAL)
+ res = RD_KAFKA_CONF_UNKNOWN;
+ else
+ res = rd_kafka_interceptors_on_conf_set(
+ conf, prop->name, istr, errstr, errstr_size);
+ if (res != RD_KAFKA_CONF_UNKNOWN)
+ return res;
+ }
+
+
+ if (prop->set) {
+ /* Custom setter */
+
+ res = prop->set(scope, conf, prop->name, istr,
+ _RK_PTR(void *, conf, prop->offset), set_mode,
+ errstr, errstr_size);
+
+ if (res != RD_KAFKA_CONF_OK)
+ return res;
+
+ /* FALLTHRU so that property value is set. */
+ }
+
+ switch (prop->type) {
+ case _RK_C_STR: {
+ char **str = _RK_PTR(char **, conf, prop->offset);
+ if (*str)
+ rd_free(*str);
+ if (istr)
+ *str = rd_strdup(istr);
+ else
+ *str = prop->sdef ? rd_strdup(prop->sdef) : NULL;
+ break;
+ }
+ case _RK_C_KSTR: {
+ rd_kafkap_str_t **kstr =
+ _RK_PTR(rd_kafkap_str_t **, conf, prop->offset);
+ if (*kstr)
+ rd_kafkap_str_destroy(*kstr);
+ if (istr)
+ *kstr = rd_kafkap_str_new(istr, -1);
+ else
+ *kstr = prop->sdef ? rd_kafkap_str_new(prop->sdef, -1)
+ : NULL;
+ break;
+ }
+ case _RK_C_PTR:
+ *_RK_PTR(const void **, conf, prop->offset) = istr;
+ break;
+ case _RK_C_BOOL:
+ case _RK_C_INT:
+ case _RK_C_S2I:
+ case _RK_C_S2F: {
+ int *val = _RK_PTR(int *, conf, prop->offset);
+
+ if (prop->type == _RK_C_S2F) {
+ switch (set_mode) {
+ case _RK_CONF_PROP_SET_REPLACE:
+ *val = ival;
+ break;
+ case _RK_CONF_PROP_SET_ADD:
+ *val |= ival;
+ break;
+ case _RK_CONF_PROP_SET_DEL:
+ *val &= ~ival;
+ break;
+ }
+ } else {
+ /* Single assignment */
+ *val = ival;
+ }
+ break;
+ }
+ case _RK_C_DBL: {
+ double *val = _RK_PTR(double *, conf, prop->offset);
+ if (istr) {
+ char *endptr;
+ double new_val = strtod(istr, &endptr);
+ /* This is verified in set_prop() */
+ rd_assert(endptr != istr);
+ *val = new_val;
+ } else
+ *val = prop->ddef;
+ break;
+ }
+
+ case _RK_C_PATLIST: {
+ /* Split comma-separated list into individual regex expressions
+ * that are verified and then append to the provided list. */
+ rd_kafka_pattern_list_t **plist;
+
+ plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
+
+ if (*plist)
+ rd_kafka_pattern_list_destroy(*plist);
+
+ if (istr) {
+ if (!(*plist = rd_kafka_pattern_list_new(
+ istr, errstr, (int)errstr_size)))
+ return RD_KAFKA_CONF_INVALID;
+ } else
+ *plist = NULL;
+
+ break;
+ }
+
+ case _RK_C_INTERNAL:
+ /* Probably handled by setter */
+ break;
+
+ default:
+ rd_kafka_assert(NULL, !*"unknown conf type");
+ }
+
+
+ rd_kafka_anyconf_set_modified(conf, prop, 1 /*modified*/);
+ return RD_KAFKA_CONF_OK;
+}
+
+
+/**
+ * @brief Find s2i (string-to-int mapping) entry and return its array index,
+ * or -1 on miss.
+ */
+static int rd_kafka_conf_s2i_find(const struct rd_kafka_property *prop,
+ const char *value) {
+ int j;
+
+ for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
+ if (prop->s2i[j].str && !rd_strcasecmp(prop->s2i[j].str, value))
+ return j;
+ }
+
+ return -1;
+}
+
+
+/**
+ * @brief Set configuration property.
+ *
+ * @param allow_specific Allow rd_kafka_*conf_set_..() to be set,
+ * such as rd_kafka_conf_set_log_cb().
+ * Should not be allowed from the conf_set() string interface.
+ */
+static rd_kafka_conf_res_t
+rd_kafka_anyconf_set_prop(int scope,
+ void *conf,
+ const struct rd_kafka_property *prop,
+ const char *value,
+ int allow_specific,
+ char *errstr,
+ size_t errstr_size) {
+ int ival;
+
+ if (prop->unsupported) {
+ rd_snprintf(errstr, errstr_size,
+ "Configuration property \"%s\" not supported "
+ "in this build: %s",
+ prop->name, prop->unsupported);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ switch (prop->type) {
+ case _RK_C_STR:
+ /* Left-trim string(likes) */
+ if (value)
+ while (isspace((int)*value))
+ value++;
+
+ /* FALLTHRU */
+ case _RK_C_KSTR:
+ if (prop->s2i[0].str) {
+ int match;
+
+ if (!value || (match = rd_kafka_conf_s2i_find(
+ prop, value)) == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value for "
+ "configuration property \"%s\": "
+ "%s",
+ prop->name, value);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ /* Replace value string with canonical form */
+ value = prop->s2i[match].str;
+ }
+ /* FALLTHRU */
+ case _RK_C_PATLIST:
+ if (prop->validate &&
+ (!value || !prop->validate(prop, value, -1))) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value for "
+ "configuration property \"%s\": %s",
+ prop->name, value);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
+ _RK_CONF_PROP_SET_REPLACE,
+ errstr, errstr_size);
+
+ case _RK_C_PTR:
+ /* Allow hidden internal unit test properties to
+ * be set from generic conf_set() interface. */
+ if (!allow_specific && !(prop->scope & _RK_HIDDEN)) {
+ rd_snprintf(errstr, errstr_size,
+ "Property \"%s\" must be set through "
+ "dedicated .._set_..() function",
+ prop->name);
+ return RD_KAFKA_CONF_INVALID;
+ }
+ return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
+ _RK_CONF_PROP_SET_REPLACE,
+ errstr, errstr_size);
+
+ case _RK_C_BOOL:
+ if (!value) {
+ rd_snprintf(errstr, errstr_size,
+ "Bool configuration property \"%s\" cannot "
+ "be set to empty value",
+ prop->name);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+
+ if (!rd_strcasecmp(value, "true") ||
+ !rd_strcasecmp(value, "t") || !strcmp(value, "1"))
+ ival = 1;
+ else if (!rd_strcasecmp(value, "false") ||
+ !rd_strcasecmp(value, "f") || !strcmp(value, "0"))
+ ival = 0;
+ else {
+ rd_snprintf(errstr, errstr_size,
+ "Expected bool value for \"%s\": "
+ "true or false",
+ prop->name);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
+ _RK_CONF_PROP_SET_REPLACE, errstr,
+ errstr_size);
+ return RD_KAFKA_CONF_OK;
+
+ case _RK_C_INT: {
+ const char *end;
+
+ if (!value) {
+ rd_snprintf(errstr, errstr_size,
+ "Integer configuration "
+ "property \"%s\" cannot be set "
+ "to empty value",
+ prop->name);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ ival = (int)strtol(value, (char **)&end, 0);
+ if (end == value) {
+ /* Non numeric, check s2i for string mapping */
+ int match = rd_kafka_conf_s2i_find(prop, value);
+
+ if (match == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value for "
+ "configuration property \"%s\"",
+ prop->name);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ if (prop->s2i[match].unsupported) {
+ rd_snprintf(errstr, errstr_size,
+ "Unsupported value \"%s\" for "
+ "configuration property \"%s\": %s",
+ value, prop->name,
+ prop->s2i[match].unsupported);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ ival = prop->s2i[match].val;
+ }
+
+ if (ival < prop->vmin || ival > prop->vmax) {
+ rd_snprintf(errstr, errstr_size,
+ "Configuration property \"%s\" value "
+ "%i is outside allowed range %i..%i\n",
+ prop->name, ival, prop->vmin, prop->vmax);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
+ _RK_CONF_PROP_SET_REPLACE, errstr,
+ errstr_size);
+ return RD_KAFKA_CONF_OK;
+ }
+
+ case _RK_C_DBL: {
+ const char *end;
+ double dval;
+
+ if (!value) {
+ rd_snprintf(errstr, errstr_size,
+ "Float configuration "
+ "property \"%s\" cannot be set "
+ "to empty value",
+ prop->name);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ dval = strtod(value, (char **)&end);
+ if (end == value) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value for "
+ "configuration property \"%s\"",
+ prop->name);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ if (dval < prop->dmin || dval > prop->dmax) {
+ rd_snprintf(errstr, errstr_size,
+ "Configuration property \"%s\" value "
+ "%g is outside allowed range %g..%g\n",
+ prop->name, dval, prop->dmin, prop->dmax);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
+ _RK_CONF_PROP_SET_REPLACE, errstr,
+ errstr_size);
+ return RD_KAFKA_CONF_OK;
+ }
+
+ case _RK_C_S2I:
+ case _RK_C_S2F: {
+ int j;
+ const char *next;
+
+ if (!value) {
+ rd_snprintf(errstr, errstr_size,
+ "Configuration "
+ "property \"%s\" cannot be set "
+ "to empty value",
+ prop->name);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ next = value;
+ while (next && *next) {
+ const char *s, *t;
+ rd_kafka_conf_set_mode_t set_mode =
+ _RK_CONF_PROP_SET_ADD; /* S2F */
+
+ s = next;
+
+ if (prop->type == _RK_C_S2F && (t = strchr(s, ','))) {
+ /* CSV flag field */
+ next = t + 1;
+ } else {
+ /* Single string */
+ t = s + strlen(s);
+ next = NULL;
+ }
+
+
+ /* Left trim */
+ while (s < t && isspace((int)*s))
+ s++;
+
+ /* Right trim */
+ while (t > s && isspace((int)*t))
+ t--;
+
+ /* S2F: +/- prefix */
+ if (prop->type == _RK_C_S2F) {
+ if (*s == '+') {
+ set_mode = _RK_CONF_PROP_SET_ADD;
+ s++;
+ } else if (*s == '-') {
+ set_mode = _RK_CONF_PROP_SET_DEL;
+ s++;
+ }
+ }
+
+ /* Empty string? */
+ if (s == t)
+ continue;
+
+ /* Match string to s2i table entry */
+ for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
+ int new_val;
+
+ if (!prop->s2i[j].str)
+ continue;
+
+ if (strlen(prop->s2i[j].str) ==
+ (size_t)(t - s) &&
+ !rd_strncasecmp(prop->s2i[j].str, s,
+ (int)(t - s)))
+ new_val = prop->s2i[j].val;
+ else
+ continue;
+
+ if (prop->s2i[j].unsupported) {
+ rd_snprintf(
+ errstr, errstr_size,
+ "Unsupported value \"%.*s\" "
+ "for configuration property "
+ "\"%s\": %s",
+ (int)(t - s), s, prop->name,
+ prop->s2i[j].unsupported);
+ return RD_KAFKA_CONF_INVALID;
+ }
+
+ rd_kafka_anyconf_set_prop0(
+ scope, conf, prop, value, new_val, set_mode,
+ errstr, errstr_size);
+
+ if (prop->type == _RK_C_S2F) {
+ /* Flags: OR it in: do next */
+ break;
+ } else {
+ /* Single assignment */
+ return RD_KAFKA_CONF_OK;
+ }
+ }
+
+ /* S2F: Good match: continue with next */
+ if (j < (int)RD_ARRAYSIZE(prop->s2i))
+ continue;
+
+ /* No match */
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value \"%.*s\" for "
+ "configuration property \"%s\"",
+ (int)(t - s), s, prop->name);
+ return RD_KAFKA_CONF_INVALID;
+ }
+ return RD_KAFKA_CONF_OK;
+ }
+
+ case _RK_C_INTERNAL:
+ rd_snprintf(errstr, errstr_size,
+ "Internal property \"%s\" not settable",
+ prop->name);
+ return RD_KAFKA_CONF_INVALID;
+
+ case _RK_C_INVALID:
+ rd_snprintf(errstr, errstr_size, "%s", prop->desc);
+ return RD_KAFKA_CONF_INVALID;
+
+ default:
+ rd_kafka_assert(NULL, !*"unknown conf type");
+ }
+
+ /* not reachable */
+ return RD_KAFKA_CONF_INVALID;
+}
+
+
+
+static void rd_kafka_defaultconf_set(int scope, void *conf) {
+ const struct rd_kafka_property *prop;
+
+ for (prop = rd_kafka_properties; prop->name; prop++) {
+ if (!(prop->scope & scope))
+ continue;
+
+ if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
+ continue;
+
+ if (prop->ctor)
+ prop->ctor(scope, conf);
+
+ if (prop->sdef || prop->vdef || prop->pdef ||
+ !rd_dbl_zero(prop->ddef))
+ rd_kafka_anyconf_set_prop0(
+ scope, conf, prop,
+ prop->sdef ? prop->sdef : prop->pdef, prop->vdef,
+ _RK_CONF_PROP_SET_REPLACE, NULL, 0);
+ }
+}
+
+rd_kafka_conf_t *rd_kafka_conf_new(void) {
+ rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf));
+ rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*conf) &&
+ *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
+ rd_kafka_defaultconf_set(_RK_GLOBAL, conf);
+ rd_kafka_anyconf_clear_all_is_modified(conf);
+ return conf;
+}
+
+rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void) {
+ rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
+ rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*tconf) &&
+ *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
+ rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
+ rd_kafka_anyconf_clear_all_is_modified(tconf);
+ return tconf;
+}
+
+
+static int rd_kafka_anyconf_set(int scope,
+ void *conf,
+ const char *name,
+ const char *value,
+ char *errstr,
+ size_t errstr_size) {
+ char estmp[1];
+ const struct rd_kafka_property *prop;
+ rd_kafka_conf_res_t res;
+
+ if (!errstr) {
+ errstr = estmp;
+ errstr_size = 0;
+ }
+
+ if (value && !*value)
+ value = NULL;
+
+ /* Try interceptors first (only for GLOBAL config for now) */
+ if (scope & _RK_GLOBAL) {
+ res = rd_kafka_interceptors_on_conf_set(
+ (rd_kafka_conf_t *)conf, name, value, errstr, errstr_size);
+ /* Handled (successfully or not) by interceptor. */
+ if (res != RD_KAFKA_CONF_UNKNOWN)
+ return res;
+ }
+
+ /* Then global config */
+
+
+ for (prop = rd_kafka_properties; prop->name; prop++) {
+
+ if (!(prop->scope & scope))
+ continue;
+
+ if (strcmp(prop->name, name))
+ continue;
+
+ if (prop->type == _RK_C_ALIAS)
+ return rd_kafka_anyconf_set(scope, conf, prop->sdef,
+ value, errstr, errstr_size);
+
+ return rd_kafka_anyconf_set_prop(scope, conf, prop, value,
+ 0 /*don't allow specifics*/,
+ errstr, errstr_size);
+ }
+
+ rd_snprintf(errstr, errstr_size,
+ "No such configuration property: \"%s\"", name);
+
+ return RD_KAFKA_CONF_UNKNOWN;
+}
+
+
+/**
+ * @brief Set a rd_kafka_*_conf_set_...() specific property, such as
+ * rd_kafka_conf_set_error_cb().
+ *
+ * @warning Will not call interceptor's on_conf_set.
+ * @warning Asserts if \p name is not known or value is incorrect.
+ *
+ * Implemented as a macro to have rd_assert() print the original function.
+ */
+
+#define rd_kafka_anyconf_set_internal(SCOPE, CONF, NAME, VALUE) \
+ do { \
+ const struct rd_kafka_property *_prop; \
+ rd_kafka_conf_res_t _res; \
+ _prop = rd_kafka_conf_prop_find(SCOPE, NAME); \
+ rd_assert(_prop && * "invalid property name"); \
+ _res = rd_kafka_anyconf_set_prop( \
+ SCOPE, CONF, _prop, (const void *)VALUE, \
+ 1 /*allow-specifics*/, NULL, 0); \
+ rd_assert(_res == RD_KAFKA_CONF_OK); \
+ } while (0)
+
+
+rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
+ const char *name,
+ const char *value,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_conf_res_t res;
+
+ res = rd_kafka_anyconf_set(_RK_GLOBAL, conf, name, value, errstr,
+ errstr_size);
+ if (res != RD_KAFKA_CONF_UNKNOWN)
+ return res;
+
+ /* Fallthru:
+ * If the global property was unknown, try setting it on the
+ * default topic config. */
+ if (!conf->topic_conf) {
+ /* Create topic config, might be over-written by application
+ * later. */
+ rd_kafka_conf_set_default_topic_conf(conf,
+ rd_kafka_topic_conf_new());
+ }
+
+ return rd_kafka_topic_conf_set(conf->topic_conf, name, value, errstr,
+ errstr_size);
+}
+
+
+rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,
+ const char *name,
+ const char *value,
+ char *errstr,
+ size_t errstr_size) {
+ if (!strncmp(name, "topic.", strlen("topic.")))
+ name += strlen("topic.");
+
+ return rd_kafka_anyconf_set(_RK_TOPIC, conf, name, value, errstr,
+ errstr_size);
+}
+
+
+/**
+ * @brief Overwrites the contents of \p str up until but not including
+ * the nul-term.
+ */
+void rd_kafka_desensitize_str(char *str) {
+ size_t len;
+ static const char redacted[] = "(REDACTED)";
+
+#ifdef _WIN32
+ len = strlen(str);
+ SecureZeroMemory(str, len);
+#else
+ volatile char *volatile s;
+
+ for (s = str; *s; s++)
+ *s = '\0';
+
+ len = (size_t)(s - str);
+#endif
+
+ if (len > sizeof(redacted))
+ memcpy(str, redacted, sizeof(redacted));
+}
+
+
+
+/**
+ * @brief Overwrite the value of \p prop, if sensitive.
+ */
+static RD_INLINE void
+rd_kafka_anyconf_prop_desensitize(int scope,
+ void *conf,
+ const struct rd_kafka_property *prop) {
+ if (likely(!(prop->scope & _RK_SENSITIVE)))
+ return;
+
+ switch (prop->type) {
+ case _RK_C_STR: {
+ char **str = _RK_PTR(char **, conf, prop->offset);
+ if (*str)
+ rd_kafka_desensitize_str(*str);
+ break;
+ }
+
+ case _RK_C_INTERNAL:
+ /* This is typically a pointer to something, the
+ * _RK_SENSITIVE flag is set to get it redacted in
+ * ..dump_dbg(), but we don't have to desensitize
+ * anything here. */
+ break;
+
+ default:
+ rd_assert(!*"BUG: Don't know how to desensitize prop type");
+ break;
+ }
+}
+
+
+/**
+ * @brief Desensitize all sensitive properties in \p conf
+ */
+static void rd_kafka_anyconf_desensitize(int scope, void *conf) {
+ const struct rd_kafka_property *prop;
+
+ for (prop = rd_kafka_properties; prop->name; prop++) {
+ if (!(prop->scope & scope))
+ continue;
+
+ rd_kafka_anyconf_prop_desensitize(scope, conf, prop);
+ }
+}
+
+/**
+ * @brief Overwrite the values of sensitive properties
+ */
+void rd_kafka_conf_desensitize(rd_kafka_conf_t *conf) {
+ if (conf->topic_conf)
+ rd_kafka_anyconf_desensitize(_RK_TOPIC, conf->topic_conf);
+ rd_kafka_anyconf_desensitize(_RK_GLOBAL, conf);
+}
+
+/**
+ * @brief Overwrite the values of sensitive properties
+ */
+void rd_kafka_topic_conf_desensitize(rd_kafka_topic_conf_t *tconf) {
+ rd_kafka_anyconf_desensitize(_RK_TOPIC, tconf);
+}
+
+
+static void rd_kafka_anyconf_clear(int scope,
+ void *conf,
+ const struct rd_kafka_property *prop) {
+
+ rd_kafka_anyconf_prop_desensitize(scope, conf, prop);
+
+ switch (prop->type) {
+ case _RK_C_STR: {
+ char **str = _RK_PTR(char **, conf, prop->offset);
+
+ if (*str) {
+ if (prop->set) {
+ prop->set(scope, conf, prop->name, NULL, *str,
+ _RK_CONF_PROP_SET_DEL, NULL, 0);
+ /* FALLTHRU */
+ }
+ rd_free(*str);
+ *str = NULL;
+ }
+ } break;
+
+ case _RK_C_KSTR: {
+ rd_kafkap_str_t **kstr =
+ _RK_PTR(rd_kafkap_str_t **, conf, prop->offset);
+ if (*kstr) {
+ rd_kafkap_str_destroy(*kstr);
+ *kstr = NULL;
+ }
+ } break;
+
+ case _RK_C_PATLIST: {
+ rd_kafka_pattern_list_t **plist;
+ plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
+ if (*plist) {
+ rd_kafka_pattern_list_destroy(*plist);
+ *plist = NULL;
+ }
+ } break;
+
+ case _RK_C_PTR:
+ if (_RK_PTR(void *, conf, prop->offset) != NULL) {
+ if (!strcmp(prop->name, "default_topic_conf")) {
+ rd_kafka_topic_conf_t **tconf;
+
+ tconf = _RK_PTR(rd_kafka_topic_conf_t **, conf,
+ prop->offset);
+ if (*tconf) {
+ rd_kafka_topic_conf_destroy(*tconf);
+ *tconf = NULL;
+ }
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ if (prop->dtor)
+ prop->dtor(scope, conf);
+}
+
+void rd_kafka_anyconf_destroy(int scope, void *conf) {
+ const struct rd_kafka_property *prop;
+
+ /* Call on_conf_destroy() interceptors */
+ if (scope == _RK_GLOBAL)
+ rd_kafka_interceptors_on_conf_destroy(conf);
+
+ for (prop = rd_kafka_properties; prop->name; prop++) {
+ if (!(prop->scope & scope))
+ continue;
+
+ rd_kafka_anyconf_clear(scope, conf, prop);
+ }
+}
+
+
+void rd_kafka_conf_destroy(rd_kafka_conf_t *conf) {
+ rd_kafka_anyconf_destroy(_RK_GLOBAL, conf);
+ // FIXME: partition_assignors
+ rd_free(conf);
+}
+
+void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf) {
+ rd_kafka_anyconf_destroy(_RK_TOPIC, topic_conf);
+ rd_free(topic_conf);
+}
+
+
+
+static void rd_kafka_anyconf_copy(int scope,
+ void *dst,
+ const void *src,
+ size_t filter_cnt,
+ const char **filter) {
+ const struct rd_kafka_property *prop;
+
+ for (prop = rd_kafka_properties; prop->name; prop++) {
+ const char *val = NULL;
+ int ival = 0;
+ char *valstr;
+ size_t valsz;
+ size_t fi;
+ size_t nlen;
+
+ if (!(prop->scope & scope))
+ continue;
+
+ if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
+ continue;
+
+ /* Skip properties that have not been set,
+ * unless it is an internal one which requires
+ * extra logic, such as the interceptors. */
+ if (!rd_kafka_anyconf_is_modified(src, prop) &&
+ prop->type != _RK_C_INTERNAL)
+ continue;
+
+ /* Apply filter, if any. */
+ nlen = strlen(prop->name);
+ for (fi = 0; fi < filter_cnt; fi++) {
+ size_t flen = strlen(filter[fi]);
+ if (nlen >= flen &&
+ !strncmp(filter[fi], prop->name, flen))
+ break;
+ }
+ if (fi < filter_cnt)
+ continue; /* Filter matched */
+
+ switch (prop->type) {
+ case _RK_C_STR:
+ case _RK_C_PTR:
+ val = *_RK_PTR(const char **, src, prop->offset);
+
+ if (!strcmp(prop->name, "default_topic_conf") && val)
+ val = (void *)rd_kafka_topic_conf_dup(
+ (const rd_kafka_topic_conf_t *)(void *)val);
+ break;
+ case _RK_C_KSTR: {
+ rd_kafkap_str_t **kstr =
+ _RK_PTR(rd_kafkap_str_t **, src, prop->offset);
+ if (*kstr)
+ val = (*kstr)->str;
+ break;
+ }
+
+ case _RK_C_BOOL:
+ case _RK_C_INT:
+ case _RK_C_S2I:
+ case _RK_C_S2F:
+ ival = *_RK_PTR(const int *, src, prop->offset);
+
+ /* Get string representation of configuration value. */
+ valsz = 0;
+ rd_kafka_anyconf_get0(src, prop, NULL, &valsz);
+ valstr = rd_alloca(valsz);
+ rd_kafka_anyconf_get0(src, prop, valstr, &valsz);
+ val = valstr;
+ break;
+ case _RK_C_DBL:
+ /* Get string representation of configuration value. */
+ valsz = 0;
+ rd_kafka_anyconf_get0(src, prop, NULL, &valsz);
+ valstr = rd_alloca(valsz);
+ rd_kafka_anyconf_get0(src, prop, valstr, &valsz);
+ val = valstr;
+ break;
+ case _RK_C_PATLIST: {
+ const rd_kafka_pattern_list_t **plist;
+ plist = _RK_PTR(const rd_kafka_pattern_list_t **, src,
+ prop->offset);
+ if (*plist)
+ val = (*plist)->rkpl_orig;
+ break;
+ }
+ case _RK_C_INTERNAL:
+ /* Handled by ->copy() below. */
+ break;
+ default:
+ continue;
+ }
+
+ if (prop->copy)
+ prop->copy(scope, dst, src,
+ _RK_PTR(void *, dst, prop->offset),
+ _RK_PTR(const void *, src, prop->offset),
+ filter_cnt, filter);
+
+ rd_kafka_anyconf_set_prop0(scope, dst, prop, val, ival,
+ _RK_CONF_PROP_SET_REPLACE, NULL, 0);
+ }
+}
+
+
+rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf) {
+ rd_kafka_conf_t *new = rd_kafka_conf_new();
+
+ rd_kafka_interceptors_on_conf_dup(new, conf, 0, NULL);
+
+ rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, 0, NULL);
+
+ return new;
+}
+
+rd_kafka_conf_t *rd_kafka_conf_dup_filter(const rd_kafka_conf_t *conf,
+ size_t filter_cnt,
+ const char **filter) {
+ rd_kafka_conf_t *new = rd_kafka_conf_new();
+
+ rd_kafka_interceptors_on_conf_dup(new, conf, filter_cnt, filter);
+
+ rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, filter_cnt, filter);
+
+ return new;
+}
+
+
+rd_kafka_topic_conf_t *
+rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t *conf) {
+ rd_kafka_topic_conf_t *new = rd_kafka_topic_conf_new();
+
+ rd_kafka_anyconf_copy(_RK_TOPIC, new, conf, 0, NULL);
+
+ return new;
+}
+
+rd_kafka_topic_conf_t *rd_kafka_default_topic_conf_dup(rd_kafka_t *rk) {
+ if (rk->rk_conf.topic_conf)
+ return rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
+ else
+ return rd_kafka_topic_conf_new();
+}
+
+void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events) {
+ char tmp[32];
+ rd_snprintf(tmp, sizeof(tmp), "%d", events);
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "enabled_events", tmp);
+}
+
+void rd_kafka_conf_set_background_event_cb(
+ rd_kafka_conf_t *conf,
+ void (*event_cb)(rd_kafka_t *rk, rd_kafka_event_t *rkev, void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "background_event_cb",
+ event_cb);
+}
+
+
+void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf,
+ void (*dr_cb)(rd_kafka_t *rk,
+ void *payload,
+ size_t len,
+ rd_kafka_resp_err_t err,
+ void *opaque,
+ void *msg_opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_cb", dr_cb);
+}
+
+
+void rd_kafka_conf_set_dr_msg_cb(
+ rd_kafka_conf_t *conf,
+ void (*dr_msg_cb)(rd_kafka_t *rk,
+ const rd_kafka_message_t *rkmessage,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_msg_cb", dr_msg_cb);
+}
+
+
+void rd_kafka_conf_set_consume_cb(
+ rd_kafka_conf_t *conf,
+ void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "consume_cb",
+ consume_cb);
+}
+
+void rd_kafka_conf_set_rebalance_cb(
+ rd_kafka_conf_t *conf,
+ void (*rebalance_cb)(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *partitions,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "rebalance_cb",
+ rebalance_cb);
+}
+
+void rd_kafka_conf_set_offset_commit_cb(
+ rd_kafka_conf_t *conf,
+ void (*offset_commit_cb)(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *offsets,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "offset_commit_cb",
+ offset_commit_cb);
+}
+
+
+
+void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
+ void (*error_cb)(rd_kafka_t *rk,
+ int err,
+ const char *reason,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "error_cb", error_cb);
+}
+
+
+void rd_kafka_conf_set_throttle_cb(rd_kafka_conf_t *conf,
+ void (*throttle_cb)(rd_kafka_t *rk,
+ const char *broker_name,
+ int32_t broker_id,
+ int throttle_time_ms,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "throttle_cb",
+ throttle_cb);
+}
+
+
+void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf,
+ void (*log_cb)(const rd_kafka_t *rk,
+ int level,
+ const char *fac,
+ const char *buf)) {
+#if !WITH_SYSLOG
+ if (log_cb == rd_kafka_log_syslog)
+ rd_assert(!*"syslog support not enabled in this build");
+#endif
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "log_cb", log_cb);
+}
+
+
+void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf,
+ int (*stats_cb)(rd_kafka_t *rk,
+ char *json,
+ size_t json_len,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "stats_cb", stats_cb);
+}
+
+void rd_kafka_conf_set_oauthbearer_token_refresh_cb(
+ rd_kafka_conf_t *conf,
+ void (*oauthbearer_token_refresh_cb)(rd_kafka_t *rk,
+ const char *oauthbearer_config,
+ void *opaque)) {
+#if WITH_SASL_OAUTHBEARER
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf,
+ "oauthbearer_token_refresh_cb",
+ oauthbearer_token_refresh_cb);
+#endif
+}
+
+void rd_kafka_conf_enable_sasl_queue(rd_kafka_conf_t *conf, int enable) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "enable_sasl_queue",
+ (enable ? "true" : "false"));
+}
+
+void rd_kafka_conf_set_socket_cb(
+ rd_kafka_conf_t *conf,
+ int (*socket_cb)(int domain, int type, int protocol, void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "socket_cb", socket_cb);
+}
+
+void rd_kafka_conf_set_connect_cb(rd_kafka_conf_t *conf,
+ int (*connect_cb)(int sockfd,
+ const struct sockaddr *addr,
+ int addrlen,
+ const char *id,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "connect_cb",
+ connect_cb);
+}
+
+void rd_kafka_conf_set_closesocket_cb(rd_kafka_conf_t *conf,
+ int (*closesocket_cb)(int sockfd,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "closesocket_cb",
+ closesocket_cb);
+}
+
+
+
+#ifndef _WIN32
+void rd_kafka_conf_set_open_cb(rd_kafka_conf_t *conf,
+ int (*open_cb)(const char *pathname,
+ int flags,
+ mode_t mode,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "open_cb", open_cb);
+}
+#endif
+
+void rd_kafka_conf_set_resolve_cb(
+ rd_kafka_conf_t *conf,
+ int (*resolve_cb)(const char *node,
+ const char *service,
+ const struct addrinfo *hints,
+ struct addrinfo **res,
+ void *opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "resolve_cb",
+ resolve_cb);
+}
+
+rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert_verify_cb(
+ rd_kafka_conf_t *conf,
+ int (*ssl_cert_verify_cb)(rd_kafka_t *rk,
+ const char *broker_name,
+ int32_t broker_id,
+ int *x509_set_error,
+ int depth,
+ const char *buf,
+ size_t size,
+ char *errstr,
+ size_t errstr_size,
+ void *opaque)) {
+#if !WITH_SSL
+ return RD_KAFKA_CONF_INVALID;
+#else
+ rd_kafka_anyconf_set_internal(
+ _RK_GLOBAL, conf, "ssl.certificate.verify_cb", ssl_cert_verify_cb);
+ return RD_KAFKA_CONF_OK;
+#endif
+}
+
+
+void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque) {
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "opaque", opaque);
+}
+
+
+void rd_kafka_conf_set_engine_callback_data(rd_kafka_conf_t *conf,
+ void *callback_data) {
+ rd_kafka_anyconf_set_internal(
+ _RK_GLOBAL, conf, "ssl_engine_callback_data", callback_data);
+}
+
+
+void rd_kafka_conf_set_default_topic_conf(rd_kafka_conf_t *conf,
+ rd_kafka_topic_conf_t *tconf) {
+ if (conf->topic_conf) {
+ if (rd_kafka_anyconf_is_any_modified(conf->topic_conf))
+ conf->warn.default_topic_conf_overwritten = rd_true;
+ rd_kafka_topic_conf_destroy(conf->topic_conf);
+ }
+
+ rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "default_topic_conf",
+ tconf);
+}
+
+rd_kafka_topic_conf_t *
+rd_kafka_conf_get_default_topic_conf(rd_kafka_conf_t *conf) {
+ return conf->topic_conf;
+}
+
+
+void rd_kafka_topic_conf_set_partitioner_cb(
+ rd_kafka_topic_conf_t *topic_conf,
+ int32_t (*partitioner)(const rd_kafka_topic_t *rkt,
+ const void *keydata,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque)) {
+ rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "partitioner_cb",
+ partitioner);
+}
+
+void rd_kafka_topic_conf_set_msg_order_cmp(
+ rd_kafka_topic_conf_t *topic_conf,
+ int (*msg_order_cmp)(const rd_kafka_message_t *a,
+ const rd_kafka_message_t *b)) {
+ rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "msg_order_cmp",
+ msg_order_cmp);
+}
+
+void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *topic_conf,
+ void *opaque) {
+ rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "opaque", opaque);
+}
+
+
+
+/**
+ * @brief Convert flags \p ival to csv-string using S2F property \p prop.
+ *
+ * This function has two modes: size query and write.
+ * To query for needed size call with dest==NULL,
+ * to write to buffer of size dest_size call with dest!=NULL.
+ *
+ * An \p ival of -1 means all.
+ *
+ * @param include_unsupported Include flag values that are unsupported
+ * due to missing dependencies at build time.
+ *
+ * @returns the number of bytes written to \p dest (if not NULL), else the
+ * total number of bytes needed.
+ *
+ */
+static size_t rd_kafka_conf_flags2str(char *dest,
+ size_t dest_size,
+ const char *delim,
+ const struct rd_kafka_property *prop,
+ int ival,
+ rd_bool_t include_unsupported) {
+ size_t of = 0;
+ int j;
+
+ if (dest && dest_size > 0)
+ *dest = '\0';
+
+ /* Phase 1: scan for set flags, accumulate needed size.
+ * Phase 2: write to dest */
+ for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i) && prop->s2i[j].str; j++) {
+ if (prop->type == _RK_C_S2F && ival != -1 &&
+ (ival & prop->s2i[j].val) != prop->s2i[j].val)
+ continue;
+ else if (prop->type == _RK_C_S2I && ival != -1 &&
+ prop->s2i[j].val != ival)
+ continue;
+ else if (prop->s2i[j].unsupported && !include_unsupported)
+ continue;
+
+ if (!dest)
+ of += strlen(prop->s2i[j].str) + (of > 0 ? 1 : 0);
+ else {
+ size_t r;
+ r = rd_snprintf(dest + of, dest_size - of, "%s%s",
+ of > 0 ? delim : "", prop->s2i[j].str);
+ if (r > dest_size - of) {
+ r = dest_size - of;
+ break;
+ }
+ of += r;
+ }
+ }
+
+ return of + 1 /*nul*/;
+}
+
+
+/**
+ * Return "original"(re-created) configuration value string
+ */
+static rd_kafka_conf_res_t
+rd_kafka_anyconf_get0(const void *conf,
+ const struct rd_kafka_property *prop,
+ char *dest,
+ size_t *dest_size) {
+ char tmp[22];
+ const char *val = NULL;
+ size_t val_len = 0;
+ int j;
+
+ switch (prop->type) {
+ case _RK_C_STR:
+ val = *_RK_PTR(const char **, conf, prop->offset);
+ break;
+
+ case _RK_C_KSTR: {
+ const rd_kafkap_str_t **kstr =
+ _RK_PTR(const rd_kafkap_str_t **, conf, prop->offset);
+ if (*kstr)
+ val = (*kstr)->str;
+ break;
+ }
+
+ case _RK_C_PTR:
+ val = *_RK_PTR(const void **, conf, prop->offset);
+ if (val) {
+ rd_snprintf(tmp, sizeof(tmp), "%p", (void *)val);
+ val = tmp;
+ }
+ break;
+
+ case _RK_C_BOOL:
+ val = (*_RK_PTR(int *, conf, prop->offset) ? "true" : "false");
+ break;
+
+ case _RK_C_INT:
+ rd_snprintf(tmp, sizeof(tmp), "%i",
+ *_RK_PTR(int *, conf, prop->offset));
+ val = tmp;
+ break;
+
+ case _RK_C_DBL:
+ rd_snprintf(tmp, sizeof(tmp), "%g",
+ *_RK_PTR(double *, conf, prop->offset));
+ val = tmp;
+ break;
+
+ case _RK_C_S2I:
+ for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
+ if (prop->s2i[j].val ==
+ *_RK_PTR(int *, conf, prop->offset)) {
+ val = prop->s2i[j].str;
+ break;
+ }
+ }
+ break;
+
+ case _RK_C_S2F: {
+ const int ival = *_RK_PTR(const int *, conf, prop->offset);
+
+ val_len = rd_kafka_conf_flags2str(dest, dest ? *dest_size : 0,
+ ",", prop, ival,
+ rd_false /*only supported*/);
+ if (dest) {
+ val_len = 0;
+ val = dest;
+ dest = NULL;
+ }
+ break;
+ }
+
+ case _RK_C_PATLIST: {
+ const rd_kafka_pattern_list_t **plist;
+ plist = _RK_PTR(const rd_kafka_pattern_list_t **, conf,
+ prop->offset);
+ if (*plist)
+ val = (*plist)->rkpl_orig;
+ break;
+ }
+
+ default:
+ break;
+ }
+
+ if (val_len) {
+ *dest_size = val_len + 1;
+ return RD_KAFKA_CONF_OK;
+ }
+
+ if (!val)
+ return RD_KAFKA_CONF_INVALID;
+
+ val_len = strlen(val);
+
+ if (dest) {
+ size_t use_len = RD_MIN(val_len, (*dest_size) - 1);
+ memcpy(dest, val, use_len);
+ dest[use_len] = '\0';
+ }
+
+ /* Return needed size */
+ *dest_size = val_len + 1;
+
+ return RD_KAFKA_CONF_OK;
+}
+
+
+static rd_kafka_conf_res_t rd_kafka_anyconf_get(int scope,
+ const void *conf,
+ const char *name,
+ char *dest,
+ size_t *dest_size) {
+ const struct rd_kafka_property *prop;
+
+ for (prop = rd_kafka_properties; prop->name; prop++) {
+
+ if (!(prop->scope & scope) || strcmp(prop->name, name))
+ continue;
+
+ if (prop->type == _RK_C_ALIAS)
+ return rd_kafka_anyconf_get(scope, conf, prop->sdef,
+ dest, dest_size);
+
+ if (rd_kafka_anyconf_get0(conf, prop, dest, dest_size) ==
+ RD_KAFKA_CONF_OK)
+ return RD_KAFKA_CONF_OK;
+ }
+
+ return RD_KAFKA_CONF_UNKNOWN;
+}
+
+rd_kafka_conf_res_t rd_kafka_topic_conf_get(const rd_kafka_topic_conf_t *conf,
+ const char *name,
+ char *dest,
+ size_t *dest_size) {
+ return rd_kafka_anyconf_get(_RK_TOPIC, conf, name, dest, dest_size);
+}
+
+rd_kafka_conf_res_t rd_kafka_conf_get(const rd_kafka_conf_t *conf,
+ const char *name,
+ char *dest,
+ size_t *dest_size) {
+ rd_kafka_conf_res_t res;
+ res = rd_kafka_anyconf_get(_RK_GLOBAL, conf, name, dest, dest_size);
+ if (res != RD_KAFKA_CONF_UNKNOWN || !conf->topic_conf)
+ return res;
+
+ /* Fallthru:
+ * If the global property was unknown, try getting it from the
+ * default topic config, if any. */
+ return rd_kafka_topic_conf_get(conf->topic_conf, name, dest, dest_size);
+}
+
+
+static const char **rd_kafka_anyconf_dump(int scope,
+ const void *conf,
+ size_t *cntp,
+ rd_bool_t only_modified,
+ rd_bool_t redact_sensitive) {
+ const struct rd_kafka_property *prop;
+ char **arr;
+ int cnt = 0;
+
+ arr = rd_calloc(sizeof(char *), RD_ARRAYSIZE(rd_kafka_properties) * 2);
+
+ for (prop = rd_kafka_properties; prop->name; prop++) {
+ char *val = NULL;
+ size_t val_size;
+
+ if (!(prop->scope & scope))
+ continue;
+
+ if (only_modified && !rd_kafka_anyconf_is_modified(conf, prop))
+ continue;
+
+ /* Skip aliases, show original property instead.
+ * Skip invalids. */
+ if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
+ continue;
+
+ if (redact_sensitive && (prop->scope & _RK_SENSITIVE)) {
+ val = rd_strdup("[redacted]");
+ } else {
+ /* Query value size */
+ if (rd_kafka_anyconf_get0(conf, prop, NULL,
+ &val_size) !=
+ RD_KAFKA_CONF_OK)
+ continue;
+
+ /* Get value */
+ val = rd_malloc(val_size);
+ rd_kafka_anyconf_get0(conf, prop, val, &val_size);
+ }
+
+ arr[cnt++] = rd_strdup(prop->name);
+ arr[cnt++] = val;
+ }
+
+ *cntp = cnt;
+
+ return (const char **)arr;
+}
+
+
+const char **rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp) {
+ return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp, rd_false /*all*/,
+ rd_false /*don't redact*/);
+}
+
+const char **rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf,
+ size_t *cntp) {
+ return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp, rd_false /*all*/,
+ rd_false /*don't redact*/);
+}
+
+void rd_kafka_conf_dump_free(const char **arr, size_t cnt) {
+ char **_arr = (char **)arr;
+ unsigned int i;
+
+ for (i = 0; i < cnt; i++)
+ if (_arr[i])
+ rd_free(_arr[i]);
+
+ rd_free(_arr);
+}
+
+
+
+/**
+ * @brief Dump configured properties to debug log.
+ */
+void rd_kafka_anyconf_dump_dbg(rd_kafka_t *rk,
+ int scope,
+ const void *conf,
+ const char *description) {
+ const char **arr;
+ size_t cnt;
+ size_t i;
+
+ arr =
+ rd_kafka_anyconf_dump(scope, conf, &cnt, rd_true /*modified only*/,
+ rd_true /*redact sensitive*/);
+ if (cnt > 0)
+ rd_kafka_dbg(rk, CONF, "CONF", "%s:", description);
+ for (i = 0; i < cnt; i += 2)
+ rd_kafka_dbg(rk, CONF, "CONF", " %s = %s", arr[i], arr[i + 1]);
+
+ rd_kafka_conf_dump_free(arr, cnt);
+}
+
+void rd_kafka_conf_properties_show(FILE *fp) {
+ const struct rd_kafka_property *prop0;
+ int last = 0;
+ int j;
+ char tmp[512];
+ const char *dash80 =
+ "----------------------------------------"
+ "----------------------------------------";
+
+ for (prop0 = rd_kafka_properties; prop0->name; prop0++) {
+ const char *typeinfo = "";
+ const char *importance;
+ const struct rd_kafka_property *prop = prop0;
+
+ /* Skip hidden properties. */
+ if (prop->scope & _RK_HIDDEN)
+ continue;
+
+ /* Skip invalid properties. */
+ if (prop->type == _RK_C_INVALID)
+ continue;
+
+ if (!(prop->scope & last)) {
+ fprintf(fp, "%s## %s configuration properties\n\n",
+ last ? "\n\n" : "",
+ prop->scope == _RK_GLOBAL ? "Global" : "Topic");
+
+ fprintf(fp,
+ "%-40s | %3s | %-15s | %13s | %-10s | %-25s\n"
+ "%.*s-|-%.*s-|-%.*s-|-%.*s:|-%.*s-| -%.*s\n",
+ "Property", "C/P", "Range", "Default",
+ "Importance", "Description", 40, dash80, 3,
+ dash80, 15, dash80, 13, dash80, 10, dash80, 25,
+ dash80);
+
+ last = prop->scope & (_RK_GLOBAL | _RK_TOPIC);
+ }
+
+ fprintf(fp, "%-40s | ", prop->name);
+
+ /* For aliases, use the aliased property from here on
+ * so that the alias property shows up with proper
+ * ranges, defaults, etc. */
+ if (prop->type == _RK_C_ALIAS) {
+ prop = rd_kafka_conf_prop_find(prop->scope, prop->sdef);
+ rd_assert(prop && *"BUG: "
+ "alias points to unknown config property");
+ }
+
+ fprintf(fp, "%3s | ",
+ (!(prop->scope & _RK_PRODUCER) ==
+ !(prop->scope & _RK_CONSUMER)
+ ? " * "
+ : ((prop->scope & _RK_PRODUCER) ? " P " : " C ")));
+
+ switch (prop->type) {
+ case _RK_C_STR:
+ case _RK_C_KSTR:
+ typeinfo = "string";
+ case _RK_C_PATLIST:
+ if (prop->type == _RK_C_PATLIST)
+ typeinfo = "pattern list";
+ if (prop->s2i[0].str) {
+ rd_kafka_conf_flags2str(
+ tmp, sizeof(tmp), ", ", prop, -1,
+ rd_true /*include unsupported*/);
+ fprintf(fp, "%-15s | %13s", tmp,
+ prop->sdef ? prop->sdef : "");
+ } else {
+ fprintf(fp, "%-15s | %13s", "",
+ prop->sdef ? prop->sdef : "");
+ }
+ break;
+ case _RK_C_BOOL:
+ typeinfo = "boolean";
+ fprintf(fp, "%-15s | %13s", "true, false",
+ prop->vdef ? "true" : "false");
+ break;
+ case _RK_C_INT:
+ typeinfo = "integer";
+ rd_snprintf(tmp, sizeof(tmp), "%d .. %d", prop->vmin,
+ prop->vmax);
+ fprintf(fp, "%-15s | %13i", tmp, prop->vdef);
+ break;
+ case _RK_C_DBL:
+ typeinfo = "float"; /* more user-friendly than double */
+ rd_snprintf(tmp, sizeof(tmp), "%g .. %g", prop->dmin,
+ prop->dmax);
+ fprintf(fp, "%-15s | %13g", tmp, prop->ddef);
+ break;
+ case _RK_C_S2I:
+ typeinfo = "enum value";
+ rd_kafka_conf_flags2str(
+ tmp, sizeof(tmp), ", ", prop, -1,
+ rd_true /*include unsupported*/);
+ fprintf(fp, "%-15s | ", tmp);
+
+ for (j = 0; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
+ if (prop->s2i[j].val == prop->vdef) {
+ fprintf(fp, "%13s", prop->s2i[j].str);
+ break;
+ }
+ }
+ if (j == RD_ARRAYSIZE(prop->s2i))
+ fprintf(fp, "%13s", " ");
+ break;
+
+ case _RK_C_S2F:
+ typeinfo = "CSV flags";
+ /* Dont duplicate builtin.features value in
+ * both Range and Default */
+ if (!strcmp(prop->name, "builtin.features"))
+ *tmp = '\0';
+ else
+ rd_kafka_conf_flags2str(
+ tmp, sizeof(tmp), ", ", prop, -1,
+ rd_true /*include unsupported*/);
+ fprintf(fp, "%-15s | ", tmp);
+ rd_kafka_conf_flags2str(
+ tmp, sizeof(tmp), ", ", prop, prop->vdef,
+ rd_true /*include unsupported*/);
+ fprintf(fp, "%13s", tmp);
+
+ break;
+ case _RK_C_PTR:
+ case _RK_C_INTERNAL:
+ typeinfo = "see dedicated API";
+ /* FALLTHRU */
+ default:
+ fprintf(fp, "%-15s | %-13s", "", " ");
+ break;
+ }
+
+ if (prop->scope & _RK_HIGH)
+ importance = "high";
+ else if (prop->scope & _RK_MED)
+ importance = "medium";
+ else
+ importance = "low";
+
+ fprintf(fp, " | %-10s | ", importance);
+
+ if (prop->scope & _RK_EXPERIMENTAL)
+ fprintf(fp,
+ "**EXPERIMENTAL**: "
+ "subject to change or removal. ");
+
+ if (prop->scope & _RK_DEPRECATED)
+ fprintf(fp, "**DEPRECATED** ");
+
+ /* If the original property is an alias, prefix the
+ * description saying so. */
+ if (prop0->type == _RK_C_ALIAS)
+ fprintf(fp, "Alias for `%s`: ", prop0->sdef);
+
+ fprintf(fp, "%s <br>*Type: %s*\n", prop->desc, typeinfo);
+ }
+ fprintf(fp, "\n");
+ fprintf(fp, "### C/P legend: C = Consumer, P = Producer, * = both\n");
+}
+
+
+
+/**
+ * @name Configuration value methods
+ *
+ * @remark This generic interface will eventually replace the config property
+ * used above.
+ * @{
+ */
+
+
+/**
+ * @brief Set up an INT confval.
+ *
+ * @oaram name Property name, must be a const static string (will not be copied)
+ */
+void rd_kafka_confval_init_int(rd_kafka_confval_t *confval,
+ const char *name,
+ int vmin,
+ int vmax,
+ int vdef) {
+ confval->name = name;
+ confval->is_enabled = 1;
+ confval->valuetype = RD_KAFKA_CONFVAL_INT;
+ confval->u.INT.vmin = vmin;
+ confval->u.INT.vmax = vmax;
+ confval->u.INT.vdef = vdef;
+ confval->u.INT.v = vdef;
+}
+
+/**
+ * @brief Set up a PTR confval.
+ *
+ * @oaram name Property name, must be a const static string (will not be copied)
+ */
+void rd_kafka_confval_init_ptr(rd_kafka_confval_t *confval, const char *name) {
+ confval->name = name;
+ confval->is_enabled = 1;
+ confval->valuetype = RD_KAFKA_CONFVAL_PTR;
+ confval->u.PTR = NULL;
+}
+
+/**
+ * @brief Set up but disable an intval, attempt to set this confval will fail.
+ *
+ * @oaram name Property name, must be a const static string (will not be copied)
+ */
+void rd_kafka_confval_disable(rd_kafka_confval_t *confval, const char *name) {
+ confval->name = name;
+ confval->is_enabled = 0;
+}
+
+/**
+ * @brief Set confval's value to \p valuep, verifying the passed
+ * \p valuetype matches (or can be cast to) \p confval's type.
+ *
+ * @param dispname is the display name for the configuration value and is
+ * included in error strings.
+ * @param valuep is a pointer to the value, or NULL to revert to default.
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the new value was set, or
+ * RD_KAFKA_RESP_ERR__INVALID_ARG if the value was of incorrect type,
+ * out of range, or otherwise not a valid value.
+ */
+rd_kafka_resp_err_t rd_kafka_confval_set_type(rd_kafka_confval_t *confval,
+ rd_kafka_confval_type_t valuetype,
+ const void *valuep,
+ char *errstr,
+ size_t errstr_size) {
+
+ if (!confval->is_enabled) {
+ rd_snprintf(errstr, errstr_size,
+ "\"%s\" is not supported for this operation",
+ confval->name);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ switch (confval->valuetype) {
+ case RD_KAFKA_CONFVAL_INT: {
+ int v;
+ const char *end;
+
+ if (!valuep) {
+ /* Revert to default */
+ confval->u.INT.v = confval->u.INT.vdef;
+ confval->is_set = 0;
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ switch (valuetype) {
+ case RD_KAFKA_CONFVAL_INT:
+ v = *(const int *)valuep;
+ break;
+ case RD_KAFKA_CONFVAL_STR:
+ v = (int)strtol((const char *)valuep, (char **)&end, 0);
+ if (end == (const char *)valuep) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value type for \"%s\": "
+ "expecting integer",
+ confval->name);
+ return RD_KAFKA_RESP_ERR__INVALID_TYPE;
+ }
+ break;
+ default:
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value type for \"%s\": "
+ "expecting integer",
+ confval->name);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+
+ if ((confval->u.INT.vmin || confval->u.INT.vmax) &&
+ (v < confval->u.INT.vmin || v > confval->u.INT.vmax)) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value type for \"%s\": "
+ "expecting integer in range %d..%d",
+ confval->name, confval->u.INT.vmin,
+ confval->u.INT.vmax);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ confval->u.INT.v = v;
+ confval->is_set = 1;
+ } break;
+
+ case RD_KAFKA_CONFVAL_STR: {
+ size_t vlen;
+ const char *v = (const char *)valuep;
+
+ if (!valuep) {
+ confval->is_set = 0;
+ if (confval->u.STR.vdef)
+ confval->u.STR.v =
+ rd_strdup(confval->u.STR.vdef);
+ else
+ confval->u.STR.v = NULL;
+ }
+
+ if (valuetype != RD_KAFKA_CONFVAL_STR) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value type for \"%s\": "
+ "expecting string",
+ confval->name);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ vlen = strlen(v);
+ if ((confval->u.STR.minlen || confval->u.STR.maxlen) &&
+ (vlen < confval->u.STR.minlen ||
+ vlen > confval->u.STR.maxlen)) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value for \"%s\": "
+ "expecting string with length "
+ "%" PRIusz "..%" PRIusz,
+ confval->name, confval->u.STR.minlen,
+ confval->u.STR.maxlen);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ if (confval->u.STR.v)
+ rd_free(confval->u.STR.v);
+
+ confval->u.STR.v = rd_strdup(v);
+ } break;
+
+ case RD_KAFKA_CONFVAL_PTR:
+ confval->u.PTR = (void *)valuep;
+ break;
+
+ default:
+ RD_NOTREACHED();
+ return RD_KAFKA_RESP_ERR__NOENT;
+ }
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+int rd_kafka_confval_get_int(const rd_kafka_confval_t *confval) {
+ rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_INT);
+ return confval->u.INT.v;
+}
+
+
+const char *rd_kafka_confval_get_str(const rd_kafka_confval_t *confval) {
+ rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_STR);
+ return confval->u.STR.v;
+}
+
+void *rd_kafka_confval_get_ptr(const rd_kafka_confval_t *confval) {
+ rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_PTR);
+ return confval->u.PTR;
+}
+
+
+#define _is_alphanum(C) \
+ (((C) >= 'a' && (C) <= 'z') || ((C) >= 'A' && (C) <= 'Z') || \
+ ((C) >= '0' && (C) <= '9'))
+
+/**
+ * @returns true if the string is KIP-511 safe, else false.
+ */
+static rd_bool_t rd_kafka_sw_str_is_safe(const char *str) {
+ const char *s;
+
+ if (!*str)
+ return rd_true;
+
+ for (s = str; *s; s++) {
+ int c = (int)*s;
+
+ if (unlikely(!(_is_alphanum(c) || c == '-' || c == '.')))
+ return rd_false;
+ }
+
+ /* Verify that the string begins and ends with a-zA-Z0-9 */
+ if (!_is_alphanum(*str))
+ return rd_false;
+ if (!_is_alphanum(*(s - 1)))
+ return rd_false;
+
+ return rd_true;
+}
+
+
+/**
+ * @brief Sanitize KIP-511 software name/version strings in-place,
+ * replacing unaccepted characters with "-".
+ *
+ * @warning The \p str is modified in-place.
+ */
+static void rd_kafka_sw_str_sanitize_inplace(char *str) {
+ char *s = str, *d = str;
+
+ /* Strip any leading non-alphanums */
+ while (!_is_alphanum(*s))
+ s++;
+
+ for (; *s; s++) {
+ int c = (int)*s;
+
+ if (unlikely(!(_is_alphanum(c) || c == '-' || c == '.')))
+ *d = '-';
+ else
+ *d = *s;
+ d++;
+ }
+
+ *d = '\0';
+
+ /* Strip any trailing non-alphanums */
+ for (d = d - 1; d >= str && !_is_alphanum(*d); d--)
+ *d = '\0';
+}
+
+#undef _is_alphanum
+
+
+/**
+ * @brief Create a staggered array of key-value pairs from
+ * an array of "key=value" strings (typically from rd_string_split()).
+ *
+ * The output array will have element 0 being key0 and element 1 being
+ * value0. Element 2 being key1 and element 3 being value1, and so on.
+ * E.g.:
+ * input { "key0=value0", "key1=value1" } incnt=2
+ * returns { "key0", "value0", "key1", "value1" } cntp=4
+ *
+ * @returns NULL on error (no '=' separator), or a newly allocated array
+ * on success. The array count is returned in \p cntp.
+ * The returned pointer must be freed with rd_free().
+ */
+char **rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp) {
+ size_t i;
+ char **out, *p;
+ size_t lens = 0;
+ size_t outcnt = 0;
+
+ /* First calculate total length needed for key-value strings. */
+ for (i = 0; i < incnt; i++) {
+ const char *t = strchr(input[i], '=');
+
+ /* No "=", or "=" at beginning of string. */
+ if (!t || t == input[i])
+ return NULL;
+
+ /* Length of key, '=' (will be \0), value, and \0 */
+ lens += strlen(input[i]) + 1;
+ }
+
+ /* Allocate array along with elements in one go */
+ out = rd_malloc((sizeof(*out) * incnt * 2) + lens);
+ p = (char *)(&out[incnt * 2]);
+
+ for (i = 0; i < incnt; i++) {
+ const char *t = strchr(input[i], '=');
+ size_t namelen = (size_t)(t - input[i]);
+ size_t valuelen = strlen(t + 1);
+
+ /* Copy name */
+ out[outcnt++] = p;
+ memcpy(p, input[i], namelen);
+ p += namelen;
+ *(p++) = '\0';
+
+ /* Copy value */
+ out[outcnt++] = p;
+ memcpy(p, t + 1, valuelen + 1);
+ p += valuelen;
+ *(p++) = '\0';
+ }
+
+
+ *cntp = outcnt;
+ return out;
+}
+
+
+/**
+ * @brief Verify configuration \p conf is
+ * correct/non-conflicting and finalize the configuration
+ * settings for use.
+ *
+ * @returns an error string if configuration is incorrect, else NULL.
+ */
+const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
+ rd_kafka_conf_t *conf) {
+ const char *errstr;
+
+ if (!conf->sw_name)
+ rd_kafka_conf_set(conf, "client.software.name", "librdkafka",
+ NULL, 0);
+ if (!conf->sw_version)
+ rd_kafka_conf_set(conf, "client.software.version",
+ rd_kafka_version_str(), NULL, 0);
+
+ /* The client.software.name and .version are sent to the broker
+ * with the ApiVersionRequest starting with AK 2.4.0 (KIP-511).
+ * These strings need to be sanitized or the broker will reject them,
+ * so modify them in-place here. */
+ rd_assert(conf->sw_name && conf->sw_version);
+ rd_kafka_sw_str_sanitize_inplace(conf->sw_name);
+ rd_kafka_sw_str_sanitize_inplace(conf->sw_version);
+
+ /* Verify mandatory configuration */
+ if (!conf->socket_cb)
+ return "Mandatory config property `socket_cb` not set";
+
+ if (!conf->open_cb)
+ return "Mandatory config property `open_cb` not set";
+
+#if WITH_SSL
+ if (conf->ssl.keystore_location && !conf->ssl.keystore_password)
+ return "`ssl.keystore.password` is mandatory when "
+ "`ssl.keystore.location` is set";
+ if (conf->ssl.ca && (conf->ssl.ca_location || conf->ssl.ca_pem))
+ return "`ssl.ca.location` or `ssl.ca.pem`, and memory-based "
+ "set_ssl_cert(CERT_CA) are mutually exclusive.";
+#ifdef __APPLE__
+ else if (!conf->ssl.ca && !conf->ssl.ca_location && !conf->ssl.ca_pem)
+ /* Default ssl.ca.location to 'probe' on OSX */
+ rd_kafka_conf_set(conf, "ssl.ca.location", "probe", NULL, 0);
+#endif
+#endif
+
+#if WITH_SASL_OAUTHBEARER
+ if (!rd_strcasecmp(conf->sasl.mechanisms, "OAUTHBEARER")) {
+ if (conf->sasl.enable_oauthbearer_unsecure_jwt &&
+ conf->sasl.oauthbearer.token_refresh_cb)
+ return "`enable.sasl.oauthbearer.unsecure.jwt` and "
+ "`oauthbearer_token_refresh_cb` are "
+ "mutually exclusive";
+
+ if (conf->sasl.enable_oauthbearer_unsecure_jwt &&
+ conf->sasl.oauthbearer.method ==
+ RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC)
+ return "`enable.sasl.oauthbearer.unsecure.jwt` and "
+ "`sasl.oauthbearer.method=oidc` are "
+ "mutually exclusive";
+
+ if (conf->sasl.oauthbearer.method ==
+ RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC) {
+ if (!conf->sasl.oauthbearer.client_id)
+ return "`sasl.oauthbearer.client.id` is "
+ "mandatory when "
+ "`sasl.oauthbearer.method=oidc` is set";
+
+ if (!conf->sasl.oauthbearer.client_secret) {
+ return "`sasl.oauthbearer.client.secret` is "
+ "mandatory when "
+ "`sasl.oauthbearer.method=oidc` is set";
+ }
+
+ if (!conf->sasl.oauthbearer.token_endpoint_url) {
+ return "`sasl.oauthbearer.token.endpoint.url` "
+ "is mandatory when "
+ "`sasl.oauthbearer.method=oidc` is set";
+ }
+ }
+
+ /* Enable background thread for the builtin OIDC handler,
+ * unless a refresh callback has been set. */
+ if (conf->sasl.oauthbearer.method ==
+ RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC &&
+ !conf->sasl.oauthbearer.token_refresh_cb) {
+ conf->enabled_events |= RD_KAFKA_EVENT_BACKGROUND;
+ conf->sasl.enable_callback_queue = 1;
+ }
+ }
+
+#endif
+
+ if (cltype == RD_KAFKA_CONSUMER) {
+
+ /* Automatically adjust `fetch.max.bytes` to be >=
+ * `message.max.bytes` and <= `queued.max.message.kbytes`
+ * unless set by user. */
+ if (rd_kafka_conf_is_modified(conf, "fetch.max.bytes")) {
+ if (conf->fetch_max_bytes < conf->max_msg_size)
+ return "`fetch.max.bytes` must be >= "
+ "`message.max.bytes`";
+ } else {
+ conf->fetch_max_bytes =
+ RD_MAX(RD_MIN(conf->fetch_max_bytes,
+ conf->queued_max_msg_kbytes * 1024),
+ conf->max_msg_size);
+ }
+
+ /* Automatically adjust 'receive.message.max.bytes' to
+ * be 512 bytes larger than 'fetch.max.bytes' to have enough
+ * room for protocol framing (including topic name), unless
+ * set by user. */
+ if (rd_kafka_conf_is_modified(conf,
+ "receive.message.max.bytes")) {
+ if (conf->fetch_max_bytes + 512 >
+ conf->recv_max_msg_size)
+ return "`receive.message.max.bytes` must be >= "
+ "`fetch.max.bytes` + 512";
+ } else {
+ conf->recv_max_msg_size =
+ RD_MAX(conf->recv_max_msg_size,
+ conf->fetch_max_bytes + 512);
+ }
+
+ if (conf->max_poll_interval_ms < conf->group_session_timeout_ms)
+ return "`max.poll.interval.ms`must be >= "
+ "`session.timeout.ms`";
+
+ /* Simplifies rd_kafka_is_idempotent() which is producer-only */
+ conf->eos.idempotence = 0;
+
+ } else if (cltype == RD_KAFKA_PRODUCER) {
+ if (conf->eos.transactional_id) {
+ if (!conf->eos.idempotence) {
+ /* Auto enable idempotence unless
+ * explicitly disabled */
+ if (rd_kafka_conf_is_modified(
+ conf, "enable.idempotence"))
+ return "`transactional.id` requires "
+ "`enable.idempotence=true`";
+
+ conf->eos.idempotence = rd_true;
+ }
+
+ /* Make sure at least one request can be sent
+ * before the transaction times out. */
+ if (!rd_kafka_conf_is_modified(conf,
+ "socket.timeout.ms"))
+ conf->socket_timeout_ms = RD_MAX(
+ conf->eos.transaction_timeout_ms - 100,
+ 900);
+ else if (conf->eos.transaction_timeout_ms + 100 <
+ conf->socket_timeout_ms)
+ return "`socket.timeout.ms` must be set <= "
+ "`transaction.timeout.ms` + 100";
+ }
+
+ if (conf->eos.idempotence) {
+ /* Adjust configuration values for idempotent producer*/
+
+ if (rd_kafka_conf_is_modified(conf, "max.in.flight")) {
+ if (conf->max_inflight >
+ RD_KAFKA_IDEMP_MAX_INFLIGHT)
+ return "`max.in.flight` must be "
+ "set "
+ "<="
+ " " RD_KAFKA_IDEMP_MAX_INFLIGHT_STR
+ " when `enable.idempotence` "
+ "is true";
+ } else {
+ conf->max_inflight =
+ RD_MIN(conf->max_inflight,
+ RD_KAFKA_IDEMP_MAX_INFLIGHT);
+ }
+
+
+ if (rd_kafka_conf_is_modified(conf, "retries")) {
+ if (conf->max_retries < 1)
+ return "`retries` must be set >= 1 "
+ "when `enable.idempotence` is "
+ "true";
+ } else {
+ conf->max_retries = INT32_MAX;
+ }
+
+
+ if (rd_kafka_conf_is_modified(
+ conf,
+ "queue.buffering.backpressure.threshold") &&
+ conf->queue_backpressure_thres > 1)
+ return "`queue.buffering.backpressure."
+ "threshold` "
+ "must be set to 1 when "
+ "`enable.idempotence` is true";
+ else
+ conf->queue_backpressure_thres = 1;
+
+ /* acks=all and queuing.strategy are set
+ * in topic_conf_finalize() */
+
+ } else {
+ if (conf->eos.gapless &&
+ rd_kafka_conf_is_modified(
+ conf, "enable.gapless.guarantee"))
+ return "`enable.gapless.guarantee` requires "
+ "`enable.idempotence` to be enabled";
+ }
+
+ if (!rd_kafka_conf_is_modified(conf,
+ "sticky.partitioning.linger.ms"))
+ conf->sticky_partition_linger_ms = (int)RD_MIN(
+ 900000, (rd_ts_t)(2 * conf->buffering_max_ms_dbl));
+ }
+
+
+ if (!rd_kafka_conf_is_modified(conf, "metadata.max.age.ms") &&
+ conf->metadata_refresh_interval_ms > 0)
+ conf->metadata_max_age_ms =
+ conf->metadata_refresh_interval_ms * 3;
+
+ if (conf->reconnect_backoff_max_ms < conf->reconnect_backoff_ms)
+ return "`reconnect.backoff.max.ms` must be >= "
+ "`reconnect.max.ms`";
+
+ if (conf->sparse_connections) {
+ /* Set sparse connection random selection interval to
+ * 10 < reconnect.backoff.ms / 2 < 1000. */
+ conf->sparse_connect_intvl =
+ RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms / 2, 1000));
+ }
+
+ if (!rd_kafka_conf_is_modified(conf, "connections.max.idle.ms") &&
+ conf->brokerlist && rd_strcasestr(conf->brokerlist, "azure")) {
+ /* Issue #3109:
+ * Default connections.max.idle.ms to <4 minutes on Azure. */
+ conf->connections_max_idle_ms = (4 * 60 - 10) * 1000;
+ }
+
+ if (!rd_kafka_conf_is_modified(conf, "allow.auto.create.topics")) {
+ /* Consumer: Do not allow auto create by default.
+ * Producer: Allow auto create by default. */
+ if (cltype == RD_KAFKA_CONSUMER)
+ conf->allow_auto_create_topics = rd_false;
+ else if (cltype == RD_KAFKA_PRODUCER)
+ conf->allow_auto_create_topics = rd_true;
+ }
+
+ /* Finalize and verify the default.topic.config */
+ if (conf->topic_conf) {
+
+ if (cltype == RD_KAFKA_PRODUCER) {
+ rd_kafka_topic_conf_t *tconf = conf->topic_conf;
+
+ if (tconf->message_timeout_ms != 0 &&
+ (double)tconf->message_timeout_ms <=
+ conf->buffering_max_ms_dbl) {
+ if (rd_kafka_conf_is_modified(conf,
+ "linger.ms"))
+ return "`message.timeout.ms` must be "
+ "greater than `linger.ms`";
+ else /* Auto adjust linger.ms to be lower
+ * than message.timeout.ms */
+ conf->buffering_max_ms_dbl =
+ (double)tconf->message_timeout_ms -
+ 0.1;
+ }
+ }
+
+ errstr = rd_kafka_topic_conf_finalize(cltype, conf,
+ conf->topic_conf);
+ if (errstr)
+ return errstr;
+ }
+
+ /* Convert double linger.ms to internal int microseconds after
+ * finalizing default_topic_conf since it may
+ * update buffering_max_ms_dbl. */
+ conf->buffering_max_us = (rd_ts_t)(conf->buffering_max_ms_dbl * 1000);
+
+
+ return NULL;
+}
+
+
+/**
+ * @brief Verify topic configuration \p tconf is
+ * correct/non-conflicting and finalize the configuration
+ * settings for use.
+ *
+ * @returns an error string if configuration is incorrect, else NULL.
+ */
+const char *rd_kafka_topic_conf_finalize(rd_kafka_type_t cltype,
+ const rd_kafka_conf_t *conf,
+ rd_kafka_topic_conf_t *tconf) {
+
+ if (cltype != RD_KAFKA_PRODUCER)
+ return NULL;
+
+ if (conf->eos.idempotence) {
+ /* Ensure acks=all */
+ if (rd_kafka_topic_conf_is_modified(tconf, "acks")) {
+ if (tconf->required_acks != -1)
+ return "`acks` must be set to `all` when "
+ "`enable.idempotence` is true";
+ } else {
+ tconf->required_acks = -1; /* all */
+ }
+
+ /* Ensure FIFO queueing */
+ if (rd_kafka_topic_conf_is_modified(tconf,
+ "queuing.strategy")) {
+ if (tconf->queuing_strategy != RD_KAFKA_QUEUE_FIFO)
+ return "`queuing.strategy` must be set to "
+ "`fifo` when `enable.idempotence` is "
+ "true";
+ } else {
+ tconf->queuing_strategy = RD_KAFKA_QUEUE_FIFO;
+ }
+
+ /* Ensure message.timeout.ms <= transaction.timeout.ms */
+ if (conf->eos.transactional_id) {
+ if (!rd_kafka_topic_conf_is_modified(
+ tconf, "message.timeout.ms"))
+ tconf->message_timeout_ms =
+ conf->eos.transaction_timeout_ms;
+ else if (tconf->message_timeout_ms >
+ conf->eos.transaction_timeout_ms)
+ return "`message.timeout.ms` must be set <= "
+ "`transaction.timeout.ms`";
+ }
+ }
+
+ if (tconf->message_timeout_ms != 0 &&
+ (double)tconf->message_timeout_ms <= conf->buffering_max_ms_dbl &&
+ rd_kafka_conf_is_modified(conf, "linger.ms"))
+ return "`message.timeout.ms` must be greater than `linger.ms`";
+
+ return NULL;
+}
+
+
+/**
+ * @brief Log warnings for set deprecated or experimental
+ * configuration properties.
+ * @returns the number of warnings logged.
+ */
+static int rd_kafka_anyconf_warn_deprecated(rd_kafka_t *rk,
+ rd_kafka_conf_scope_t scope,
+ const void *conf) {
+ const struct rd_kafka_property *prop;
+ int warn_type =
+ rk->rk_type == RD_KAFKA_PRODUCER ? _RK_CONSUMER : _RK_PRODUCER;
+ int warn_on = _RK_DEPRECATED | _RK_EXPERIMENTAL | warn_type;
+
+ int cnt = 0;
+
+ for (prop = rd_kafka_properties; prop->name; prop++) {
+ int match = prop->scope & warn_on;
+
+ if (likely(!(prop->scope & scope) || !match))
+ continue;
+
+ if (likely(!rd_kafka_anyconf_is_modified(conf, prop)))
+ continue;
+
+ if (match != warn_type)
+ rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
+ "Configuration property %s is %s%s%s: %s",
+ prop->name,
+ match & _RK_DEPRECATED ? "deprecated" : "",
+ match == warn_on ? " and " : "",
+ match & _RK_EXPERIMENTAL ? "experimental"
+ : "",
+ prop->desc);
+
+ if (match & warn_type)
+ rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
+ "Configuration property %s "
+ "is a %s property and will be ignored by "
+ "this %s instance",
+ prop->name,
+ warn_type == _RK_PRODUCER ? "producer"
+ : "consumer",
+ warn_type == _RK_PRODUCER ? "consumer"
+ : "producer");
+
+ cnt++;
+ }
+
+ return cnt;
+}
+
+
+/**
+ * @brief Log configuration warnings (deprecated configuration properties,
+ * unrecommended combinations, etc).
+ *
+ * @returns the number of warnings logged.
+ *
+ * @locality any
+ * @locks none
+ */
+int rd_kafka_conf_warn(rd_kafka_t *rk) {
+ int cnt = 0;
+
+ cnt = rd_kafka_anyconf_warn_deprecated(rk, _RK_GLOBAL, &rk->rk_conf);
+ if (rk->rk_conf.topic_conf)
+ cnt += rd_kafka_anyconf_warn_deprecated(rk, _RK_TOPIC,
+ rk->rk_conf.topic_conf);
+
+ if (rk->rk_conf.warn.default_topic_conf_overwritten)
+ rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
+ "Topic configuration properties set in the "
+ "global configuration were overwritten by "
+ "explicitly setting a default_topic_conf: "
+ "recommend not using set_default_topic_conf");
+
+ /* Additional warnings */
+ if (rk->rk_type == RD_KAFKA_CONSUMER) {
+ if (rk->rk_conf.fetch_wait_max_ms + 1000 >
+ rk->rk_conf.socket_timeout_ms)
+ rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
+ "Configuration property "
+ "`fetch.wait.max.ms` (%d) should be "
+ "set lower than `socket.timeout.ms` (%d) "
+ "by at least 1000ms to avoid blocking "
+ "and timing out sub-sequent requests",
+ rk->rk_conf.fetch_wait_max_ms,
+ rk->rk_conf.socket_timeout_ms);
+ }
+
+ if (rd_kafka_conf_is_modified(&rk->rk_conf, "sasl.mechanisms") &&
+ !(rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
+ rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT)) {
+ rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
+ "Configuration property `sasl.mechanism` set to "
+ "`%s` but `security.protocol` is not configured "
+ "for SASL: recommend setting "
+ "`security.protocol` to SASL_SSL or "
+ "SASL_PLAINTEXT",
+ rk->rk_conf.sasl.mechanisms);
+ }
+
+ if (rd_kafka_conf_is_modified(&rk->rk_conf, "sasl.username") &&
+ !(!strncmp(rk->rk_conf.sasl.mechanisms, "SCRAM", 5) ||
+ !strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")))
+ rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
+ "Configuration property `sasl.username` only "
+ "applies when `sasl.mechanism` is set to "
+ "PLAIN or SCRAM-SHA-..");
+
+ if (rd_kafka_conf_is_modified(&rk->rk_conf, "client.software.name") &&
+ !rd_kafka_sw_str_is_safe(rk->rk_conf.sw_name))
+ rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
+ "Configuration property `client.software.name` "
+ "may only contain 'a-zA-Z0-9.-', other characters "
+ "will be replaced with '-'");
+
+ if (rd_kafka_conf_is_modified(&rk->rk_conf,
+ "client.software.version") &&
+ !rd_kafka_sw_str_is_safe(rk->rk_conf.sw_version))
+ rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
+ "Configuration property `client.software.verison` "
+ "may only contain 'a-zA-Z0-9.-', other characters "
+ "will be replaced with '-'");
+
+ if (rd_atomic32_get(&rk->rk_broker_cnt) == 0)
+ rd_kafka_log(rk, LOG_NOTICE, "CONFWARN",
+ "No `bootstrap.servers` configured: "
+ "client will not be able to connect "
+ "to Kafka cluster");
+
+ return cnt;
+}
+
+
+const rd_kafka_conf_t *rd_kafka_conf(rd_kafka_t *rk) {
+ return &rk->rk_conf;
+}
+
+
+/**
+ * @brief Unittests
+ */
+int unittest_conf(void) {
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *tconf;
+ rd_kafka_conf_res_t res, res2;
+ char errstr[128];
+ int iteration;
+ const struct rd_kafka_property *prop;
+ char readval[512];
+ size_t readlen;
+ const char *errstr2;
+
+ conf = rd_kafka_conf_new();
+ tconf = rd_kafka_topic_conf_new();
+
+ res = rd_kafka_conf_set(conf, "unknown.thing", "foo", errstr,
+ sizeof(errstr));
+ RD_UT_ASSERT(res == RD_KAFKA_CONF_UNKNOWN, "fail");
+ RD_UT_ASSERT(*errstr, "fail");
+
+ for (iteration = 0; iteration < 5; iteration++) {
+ int cnt;
+
+
+ /* Iterations:
+ * 0 - Check is_modified
+ * 1 - Set every other config property, read back and verify.
+ * 2 - Check is_modified.
+ * 3 - Set all config properties, read back and verify.
+ * 4 - Check is_modified. */
+ for (prop = rd_kafka_properties, cnt = 0; prop->name;
+ prop++, cnt++) {
+ const char *val;
+ char tmp[64];
+ int odd = cnt & 1;
+ int do_set = iteration == 3 || (iteration == 1 && odd);
+ rd_bool_t is_modified;
+ int exp_is_modified =
+ !prop->unsupported &&
+ (iteration >= 3 ||
+ (iteration > 0 && (do_set || odd)));
+
+ readlen = sizeof(readval);
+
+ /* Avoid some special configs */
+ if (!strcmp(prop->name, "plugin.library.paths") ||
+ !strcmp(prop->name, "builtin.features"))
+ continue;
+
+ switch (prop->type) {
+ case _RK_C_STR:
+ case _RK_C_KSTR:
+ case _RK_C_PATLIST:
+ if (prop->sdef)
+ val = prop->sdef;
+ else
+ val = "test";
+ break;
+
+ case _RK_C_BOOL:
+ val = "true";
+ break;
+
+ case _RK_C_INT:
+ rd_snprintf(tmp, sizeof(tmp), "%d", prop->vdef);
+ val = tmp;
+ break;
+
+ case _RK_C_DBL:
+ rd_snprintf(tmp, sizeof(tmp), "%g", prop->ddef);
+ val = tmp;
+ break;
+
+ case _RK_C_S2F:
+ case _RK_C_S2I:
+ val = prop->s2i[0].str;
+ break;
+
+ case _RK_C_PTR:
+ case _RK_C_ALIAS:
+ case _RK_C_INVALID:
+ case _RK_C_INTERNAL:
+ default:
+ continue;
+ }
+
+
+ if (prop->scope & _RK_GLOBAL) {
+ if (do_set)
+ res = rd_kafka_conf_set(
+ conf, prop->name, val, errstr,
+ sizeof(errstr));
+
+ res2 = rd_kafka_conf_get(conf, prop->name,
+ readval, &readlen);
+
+ is_modified =
+ rd_kafka_conf_is_modified(conf, prop->name);
+
+
+ } else if (prop->scope & _RK_TOPIC) {
+ if (do_set)
+ res = rd_kafka_topic_conf_set(
+ tconf, prop->name, val, errstr,
+ sizeof(errstr));
+
+ res2 = rd_kafka_topic_conf_get(
+ tconf, prop->name, readval, &readlen);
+
+ is_modified = rd_kafka_topic_conf_is_modified(
+ tconf, prop->name);
+
+ } else {
+ RD_NOTREACHED();
+ }
+
+
+
+ if (do_set && prop->unsupported) {
+ RD_UT_ASSERT(res == RD_KAFKA_CONF_INVALID,
+ "conf_set %s should've failed "
+ "with CONF_INVALID, not %d: %s",
+ prop->name, res, errstr);
+
+ } else if (do_set) {
+ RD_UT_ASSERT(res == RD_KAFKA_CONF_OK,
+ "conf_set %s failed: %d: %s",
+ prop->name, res, errstr);
+ RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK,
+ "conf_get %s failed: %d",
+ prop->name, res2);
+
+ RD_UT_ASSERT(!strcmp(readval, val),
+ "conf_get %s "
+ "returned \"%s\": "
+ "expected \"%s\"",
+ prop->name, readval, val);
+
+ RD_UT_ASSERT(is_modified,
+ "Property %s was set but "
+ "is_modified=%d",
+ prop->name, is_modified);
+ }
+
+ assert(is_modified == exp_is_modified);
+ RD_UT_ASSERT(is_modified == exp_is_modified,
+ "Property %s is_modified=%d, "
+ "exp_is_modified=%d "
+ "(iter %d, odd %d, do_set %d)",
+ prop->name, is_modified, exp_is_modified,
+ iteration, odd, do_set);
+ }
+ }
+
+ /* Set an alias and make sure is_modified() works for it. */
+ res = rd_kafka_conf_set(conf, "max.in.flight", "19", NULL, 0);
+ RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);
+
+ RD_UT_ASSERT(rd_kafka_conf_is_modified(conf, "max.in.flight") ==
+ rd_true,
+ "fail");
+ RD_UT_ASSERT(rd_kafka_conf_is_modified(
+ conf, "max.in.flight.requests.per.connection") ==
+ rd_true,
+ "fail");
+
+ rd_kafka_conf_destroy(conf);
+ rd_kafka_topic_conf_destroy(tconf);
+
+
+ /* Verify that software.client.* string-safing works */
+ conf = rd_kafka_conf_new();
+ res = rd_kafka_conf_set(conf, "client.software.name",
+ " .~aba. va! !.~~", NULL, 0);
+ RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);
+ res = rd_kafka_conf_set(conf, "client.software.version",
+ "!1.2.3.4.5!!! a", NULL, 0);
+ RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);
+
+ errstr2 = rd_kafka_conf_finalize(RD_KAFKA_PRODUCER, conf);
+ RD_UT_ASSERT(!errstr2, "conf_finalize() failed: %s", errstr2);
+
+ readlen = sizeof(readval);
+ res2 =
+ rd_kafka_conf_get(conf, "client.software.name", readval, &readlen);
+ RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK, "%d", res2);
+ RD_UT_ASSERT(!strcmp(readval, "aba.-va"),
+ "client.software.* safification failed: \"%s\"", readval);
+ RD_UT_SAY("Safified client.software.name=\"%s\"", readval);
+
+ readlen = sizeof(readval);
+ res2 = rd_kafka_conf_get(conf, "client.software.version", readval,
+ &readlen);
+ RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK, "%d", res2);
+ RD_UT_ASSERT(!strcmp(readval, "1.2.3.4.5----a"),
+ "client.software.* safification failed: \"%s\"", readval);
+ RD_UT_SAY("Safified client.software.version=\"%s\"", readval);
+
+ rd_kafka_conf_destroy(conf);
+
+ RD_UT_PASS();
+}
+
+/**@}*/