summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.h650
1 files changed, 650 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.h
new file mode 100644
index 000000000..161d6e469
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_conf.h
@@ -0,0 +1,650 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2014-2018 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDKAFKA_CONF_H_
+#define _RDKAFKA_CONF_H_
+
+#include "rdlist.h"
+#include "rdkafka_cert.h"
+
+#if WITH_SSL && OPENSSL_VERSION_NUMBER >= 0x10100000 && \
+ !defined(OPENSSL_IS_BORINGSSL)
+#define WITH_SSL_ENGINE 1
+/* Deprecated in OpenSSL 3 */
+#include <openssl/engine.h>
+#endif /* WITH_SSL && OPENSSL_VERSION_NUMBER >= 0x10100000 */
+
+/**
+ * Forward declarations
+ */
+struct rd_kafka_transport_s;
+
+
+/**
+ * MessageSet compression codecs
+ */
+typedef enum {
+ RD_KAFKA_COMPRESSION_NONE,
+ RD_KAFKA_COMPRESSION_GZIP = RD_KAFKA_MSG_ATTR_GZIP,
+ RD_KAFKA_COMPRESSION_SNAPPY = RD_KAFKA_MSG_ATTR_SNAPPY,
+ RD_KAFKA_COMPRESSION_LZ4 = RD_KAFKA_MSG_ATTR_LZ4,
+ RD_KAFKA_COMPRESSION_ZSTD = RD_KAFKA_MSG_ATTR_ZSTD,
+ RD_KAFKA_COMPRESSION_INHERIT, /* Inherit setting from global conf */
+ RD_KAFKA_COMPRESSION_NUM
+} rd_kafka_compression_t;
+
+static RD_INLINE RD_UNUSED const char *
+rd_kafka_compression2str(rd_kafka_compression_t compr) {
+ static const char *names[RD_KAFKA_COMPRESSION_NUM] = {
+ [RD_KAFKA_COMPRESSION_NONE] = "none",
+ [RD_KAFKA_COMPRESSION_GZIP] = "gzip",
+ [RD_KAFKA_COMPRESSION_SNAPPY] = "snappy",
+ [RD_KAFKA_COMPRESSION_LZ4] = "lz4",
+ [RD_KAFKA_COMPRESSION_ZSTD] = "zstd",
+ [RD_KAFKA_COMPRESSION_INHERIT] = "inherit"};
+ static RD_TLS char ret[32];
+
+ if ((int)compr < 0 || compr >= RD_KAFKA_COMPRESSION_NUM) {
+ rd_snprintf(ret, sizeof(ret), "codec0x%x?", (int)compr);
+ return ret;
+ }
+
+ return names[compr];
+}
+
+/**
+ * MessageSet compression levels
+ */
+typedef enum {
+ RD_KAFKA_COMPLEVEL_DEFAULT = -1,
+ RD_KAFKA_COMPLEVEL_MIN = -1,
+ RD_KAFKA_COMPLEVEL_GZIP_MAX = 9,
+ RD_KAFKA_COMPLEVEL_LZ4_MAX = 12,
+ RD_KAFKA_COMPLEVEL_SNAPPY_MAX = 0,
+ RD_KAFKA_COMPLEVEL_ZSTD_MAX = 22,
+ RD_KAFKA_COMPLEVEL_MAX = 12
+} rd_kafka_complevel_t;
+
+typedef enum {
+ RD_KAFKA_PROTO_PLAINTEXT,
+ RD_KAFKA_PROTO_SSL,
+ RD_KAFKA_PROTO_SASL_PLAINTEXT,
+ RD_KAFKA_PROTO_SASL_SSL,
+ RD_KAFKA_PROTO_NUM,
+} rd_kafka_secproto_t;
+
+
+typedef enum {
+ RD_KAFKA_CONFIGURED,
+ RD_KAFKA_LEARNED,
+ RD_KAFKA_INTERNAL,
+ RD_KAFKA_LOGICAL
+} rd_kafka_confsource_t;
+
+static RD_INLINE RD_UNUSED const char *
+rd_kafka_confsource2str(rd_kafka_confsource_t source) {
+ static const char *names[] = {"configured", "learned", "internal",
+ "logical"};
+
+ return names[source];
+}
+
+
+typedef enum {
+ _RK_GLOBAL = 0x1,
+ _RK_PRODUCER = 0x2,
+ _RK_CONSUMER = 0x4,
+ _RK_TOPIC = 0x8,
+ _RK_CGRP = 0x10,
+ _RK_DEPRECATED = 0x20,
+ _RK_HIDDEN = 0x40,
+ _RK_HIGH = 0x80, /* High Importance */
+ _RK_MED = 0x100, /* Medium Importance */
+ _RK_EXPERIMENTAL = 0x200, /* Experimental (unsupported) property */
+ _RK_SENSITIVE = 0x400 /* The configuration property's value
+ * might contain sensitive information. */
+} rd_kafka_conf_scope_t;
+
+/**< While the client groups is a generic concept, it is currently
+ * only implemented for consumers in librdkafka. */
+#define _RK_CGRP _RK_CONSUMER
+
+typedef enum {
+ _RK_CONF_PROP_SET_REPLACE, /* Replace current value (default) */
+ _RK_CONF_PROP_SET_ADD, /* Add value (S2F) */
+ _RK_CONF_PROP_SET_DEL /* Remove value (S2F) */
+} rd_kafka_conf_set_mode_t;
+
+
+
+typedef enum {
+ RD_KAFKA_OFFSET_METHOD_NONE,
+ RD_KAFKA_OFFSET_METHOD_FILE,
+ RD_KAFKA_OFFSET_METHOD_BROKER
+} rd_kafka_offset_method_t;
+
+typedef enum {
+ RD_KAFKA_SASL_OAUTHBEARER_METHOD_DEFAULT,
+ RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC
+} rd_kafka_oauthbearer_method_t;
+
+typedef enum {
+ RD_KAFKA_SSL_ENDPOINT_ID_NONE,
+ RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */
+} rd_kafka_ssl_endpoint_id_t;
+
+/* Increase in steps of 64 as needed.
+ * This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
+#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33)
+
+/**
+ * @struct rd_kafka_anyconf_t
+ * @brief The anyconf header must be the first field in the
+ * rd_kafka_conf_t and rd_kafka_topic_conf_t structs.
+ * It provides a way to track which property has been modified.
+ */
+struct rd_kafka_anyconf_hdr {
+ uint64_t modified[RD_KAFKA_CONF_PROPS_IDX_MAX / 64];
+};
+
+
+/**
+ * Optional configuration struct passed to rd_kafka_new*().
+ *
+ * The struct is populated ted through string properties
+ * by calling rd_kafka_conf_set().
+ *
+ */
+struct rd_kafka_conf_s {
+ struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */
+
+ /*
+ * Generic configuration
+ */
+ int enabled_events;
+ int max_msg_size;
+ int msg_copy_max_size;
+ int recv_max_msg_size;
+ int max_inflight;
+ int metadata_request_timeout_ms;
+ int metadata_refresh_interval_ms;
+ int metadata_refresh_fast_cnt;
+ int metadata_refresh_fast_interval_ms;
+ int metadata_refresh_sparse;
+ int metadata_max_age_ms;
+ int metadata_propagation_max_ms;
+ int debug;
+ int broker_addr_ttl;
+ int broker_addr_family;
+ int socket_timeout_ms;
+ int socket_blocking_max_ms;
+ int socket_sndbuf_size;
+ int socket_rcvbuf_size;
+ int socket_keepalive;
+ int socket_nagle_disable;
+ int socket_max_fails;
+ char *client_id_str;
+ char *brokerlist;
+ int stats_interval_ms;
+ int term_sig;
+ int reconnect_backoff_ms;
+ int reconnect_backoff_max_ms;
+ int reconnect_jitter_ms;
+ int socket_connection_setup_timeout_ms;
+ int connections_max_idle_ms;
+ int sparse_connections;
+ int sparse_connect_intvl;
+ int api_version_request;
+ int api_version_request_timeout_ms;
+ int api_version_fallback_ms;
+ char *broker_version_fallback;
+ rd_kafka_secproto_t security_protocol;
+
+ struct {
+#if WITH_SSL
+ SSL_CTX *ctx;
+#endif
+ char *cipher_suites;
+ char *curves_list;
+ char *sigalgs_list;
+ char *key_location;
+ char *key_pem;
+ rd_kafka_cert_t *key;
+ char *key_password;
+ char *cert_location;
+ char *cert_pem;
+ rd_kafka_cert_t *cert;
+ char *ca_location;
+ char *ca_pem;
+ rd_kafka_cert_t *ca;
+ /** CSV list of Windows certificate stores */
+ char *ca_cert_stores;
+ char *crl_location;
+#if WITH_SSL && OPENSSL_VERSION_NUMBER >= 0x10100000
+ ENGINE *engine;
+#endif
+ char *engine_location;
+ char *engine_id;
+ void *engine_callback_data;
+ char *providers;
+ rd_list_t loaded_providers; /**< (SSL_PROVIDER*) */
+ char *keystore_location;
+ char *keystore_password;
+ int endpoint_identification;
+ int enable_verify;
+ int (*cert_verify_cb)(rd_kafka_t *rk,
+ const char *broker_name,
+ int32_t broker_id,
+ int *x509_error,
+ int depth,
+ const char *buf,
+ size_t size,
+ char *errstr,
+ size_t errstr_size,
+ void *opaque);
+ } ssl;
+
+ struct {
+ const struct rd_kafka_sasl_provider *provider;
+ char *principal;
+ char *mechanisms;
+ char *service_name;
+ char *kinit_cmd;
+ char *keytab;
+ int relogin_min_time;
+ /** Protects .username and .password access after client
+ * instance has been created (see sasl_set_credentials()). */
+ mtx_t lock;
+ char *username;
+ char *password;
+#if WITH_SASL_SCRAM
+ /* SCRAM EVP-wrapped hash function
+ * (return value from EVP_shaX()) */
+ const void /*EVP_MD*/ *scram_evp;
+ /* SCRAM direct hash function (e.g., SHA256()) */
+ unsigned char *(*scram_H)(const unsigned char *d,
+ size_t n,
+ unsigned char *md);
+ /* Hash size */
+ size_t scram_H_size;
+#endif
+ char *oauthbearer_config;
+ int enable_oauthbearer_unsecure_jwt;
+ int enable_callback_queue;
+ struct {
+ rd_kafka_oauthbearer_method_t method;
+ char *token_endpoint_url;
+ char *client_id;
+ char *client_secret;
+ char *scope;
+ char *extensions_str;
+ /* SASL/OAUTHBEARER token refresh event callback */
+ void (*token_refresh_cb)(rd_kafka_t *rk,
+ const char *oauthbearer_config,
+ void *opaque);
+ } oauthbearer;
+ } sasl;
+
+ char *plugin_paths;
+#if WITH_PLUGINS
+ rd_list_t plugins;
+#endif
+
+ /* Interceptors */
+ struct {
+ /* rd_kafka_interceptor_method_t lists */
+ rd_list_t on_conf_set; /* on_conf_set interceptors
+ * (not copied on conf_dup()) */
+ rd_list_t on_conf_dup; /* .. (not copied) */
+ rd_list_t on_conf_destroy; /* .. (not copied) */
+ rd_list_t on_new; /* .. (copied) */
+ rd_list_t on_destroy; /* .. (copied) */
+ rd_list_t on_send; /* .. (copied) */
+ rd_list_t on_acknowledgement; /* .. (copied) */
+ rd_list_t on_consume; /* .. (copied) */
+ rd_list_t on_commit; /* .. (copied) */
+ rd_list_t on_request_sent; /* .. (copied) */
+ rd_list_t on_response_received; /* .. (copied) */
+ rd_list_t on_thread_start; /* .. (copied) */
+ rd_list_t on_thread_exit; /* .. (copied) */
+ rd_list_t on_broker_state_change; /* .. (copied) */
+
+ /* rd_strtup_t list */
+ rd_list_t config; /* Configuration name=val's
+ * handled by interceptors. */
+ } interceptors;
+
+ /* Client group configuration */
+ int coord_query_intvl_ms;
+ int max_poll_interval_ms;
+
+ int builtin_features;
+ /*
+ * Consumer configuration
+ */
+ int check_crcs;
+ int queued_min_msgs;
+ int queued_max_msg_kbytes;
+ int64_t queued_max_msg_bytes;
+ int fetch_wait_max_ms;
+ int fetch_msg_max_bytes;
+ int fetch_max_bytes;
+ int fetch_min_bytes;
+ int fetch_error_backoff_ms;
+ char *group_id_str;
+ char *group_instance_id;
+ int allow_auto_create_topics;
+
+ rd_kafka_pattern_list_t *topic_blacklist;
+ struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config
+ * for automatically
+ * subscribed topics. */
+ int enable_auto_commit;
+ int enable_auto_offset_store;
+ int auto_commit_interval_ms;
+ int group_session_timeout_ms;
+ int group_heartbeat_intvl_ms;
+ rd_kafkap_str_t *group_protocol_type;
+ char *partition_assignment_strategy;
+ rd_list_t partition_assignors;
+ int enabled_assignor_cnt;
+
+ void (*rebalance_cb)(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *partitions,
+ void *opaque);
+
+ void (*offset_commit_cb)(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *offsets,
+ void *opaque);
+
+ rd_kafka_offset_method_t offset_store_method;
+
+ rd_kafka_isolation_level_t isolation_level;
+
+ int enable_partition_eof;
+
+ rd_kafkap_str_t *client_rack;
+
+ /*
+ * Producer configuration
+ */
+ struct {
+ /*
+ * Idempotence
+ */
+ int idempotence; /**< Enable Idempotent Producer */
+ rd_bool_t gapless; /**< Raise fatal error if
+ * gapless guarantee can't be
+ * satisfied. */
+ /*
+ * Transactions
+ */
+ char *transactional_id; /**< Transactional Id */
+ int transaction_timeout_ms; /**< Transaction timeout */
+ } eos;
+ int queue_buffering_max_msgs;
+ int queue_buffering_max_kbytes;
+ double buffering_max_ms_dbl; /**< This is the configured value */
+ rd_ts_t buffering_max_us; /**< This is the value used in the code */
+ int queue_backpressure_thres;
+ int max_retries;
+ int retry_backoff_ms;
+ int batch_num_messages;
+ int batch_size;
+ rd_kafka_compression_t compression_codec;
+ int dr_err_only;
+ int sticky_partition_linger_ms;
+
+ /* Message delivery report callback.
+ * Called once for each produced message, either on
+ * successful and acknowledged delivery to the broker in which
+ * case 'err' is 0, or if the message could not be delivered
+ * in which case 'err' is non-zero (use rd_kafka_err2str()
+ * to obtain a human-readable error reason).
+ *
+ * If the message was produced with neither RD_KAFKA_MSG_F_FREE
+ * or RD_KAFKA_MSG_F_COPY set then 'payload' is the original
+ * pointer provided to rd_kafka_produce().
+ * rdkafka will not perform any further actions on 'payload'
+ * at this point and the application may rd_free the payload data
+ * at this point.
+ *
+ * 'opaque' is 'conf.opaque', while 'msg_opaque' is
+ * the opaque pointer provided in the rd_kafka_produce() call.
+ */
+ void (*dr_cb)(rd_kafka_t *rk,
+ void *payload,
+ size_t len,
+ rd_kafka_resp_err_t err,
+ void *opaque,
+ void *msg_opaque);
+
+ void (*dr_msg_cb)(rd_kafka_t *rk,
+ const rd_kafka_message_t *rkmessage,
+ void *opaque);
+
+ /* Consume callback */
+ void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque);
+
+ /* Log callback */
+ void (*log_cb)(const rd_kafka_t *rk,
+ int level,
+ const char *fac,
+ const char *buf);
+ int log_level;
+ int log_queue;
+ int log_thread_name;
+ int log_connection_close;
+
+ /* PRNG seeding */
+ int enable_random_seed;
+
+ /* Error callback */
+ void (*error_cb)(rd_kafka_t *rk,
+ int err,
+ const char *reason,
+ void *opaque);
+
+ /* Throttle callback */
+ void (*throttle_cb)(rd_kafka_t *rk,
+ const char *broker_name,
+ int32_t broker_id,
+ int throttle_time_ms,
+ void *opaque);
+
+ /* Stats callback */
+ int (*stats_cb)(rd_kafka_t *rk,
+ char *json,
+ size_t json_len,
+ void *opaque);
+
+ /* Socket creation callback */
+ int (*socket_cb)(int domain, int type, int protocol, void *opaque);
+
+ /* Connect callback */
+ int (*connect_cb)(int sockfd,
+ const struct sockaddr *addr,
+ int addrlen,
+ const char *id,
+ void *opaque);
+
+ /* Close socket callback */
+ int (*closesocket_cb)(int sockfd, void *opaque);
+
+ /* File open callback */
+ int (*open_cb)(const char *pathname,
+ int flags,
+ mode_t mode,
+ void *opaque);
+
+ /* Address resolution callback */
+ int (*resolve_cb)(const char *node,
+ const char *service,
+ const struct addrinfo *hints,
+ struct addrinfo **res,
+ void *opaque);
+
+ /* Background queue event callback */
+ void (*background_event_cb)(rd_kafka_t *rk,
+ rd_kafka_event_t *rkev,
+ void *opaque);
+
+
+ /* Opaque passed to callbacks. */
+ void *opaque;
+
+ /* For use with value-less properties. */
+ int dummy;
+
+
+ /* Admin client defaults */
+ struct {
+ int request_timeout_ms; /* AdminOptions.request_timeout */
+ } admin;
+
+
+ /*
+ * Test mocks
+ */
+ struct {
+ int broker_cnt; /**< Number of mock brokers */
+ int broker_rtt; /**< Broker RTT */
+ } mock;
+
+ /*
+ * Unit test pluggable interfaces
+ */
+ struct {
+ /**< Inject errors in ProduceResponse handler */
+ rd_kafka_resp_err_t (*handle_ProduceResponse)(
+ rd_kafka_t *rk,
+ int32_t brokerid,
+ uint64_t msgid,
+ rd_kafka_resp_err_t err);
+ } ut;
+
+ char *sw_name; /**< Software/client name */
+ char *sw_version; /**< Software/client version */
+
+ struct {
+ /** Properties on (implicit pass-thru) default_topic_conf were
+ * overwritten by passing an explicit default_topic_conf. */
+ rd_bool_t default_topic_conf_overwritten;
+ } warn;
+};
+
+int rd_kafka_socket_cb_linux(int domain, int type, int protocol, void *opaque);
+int rd_kafka_socket_cb_generic(int domain,
+ int type,
+ int protocol,
+ void *opaque);
+#ifndef _WIN32
+int rd_kafka_open_cb_linux(const char *pathname,
+ int flags,
+ mode_t mode,
+ void *opaque);
+#endif
+int rd_kafka_open_cb_generic(const char *pathname,
+ int flags,
+ mode_t mode,
+ void *opaque);
+
+
+
+struct rd_kafka_topic_conf_s {
+ struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */
+
+ int required_acks;
+ int32_t request_timeout_ms;
+ int message_timeout_ms;
+
+ int32_t (*partitioner)(const rd_kafka_topic_t *rkt,
+ const void *keydata,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque);
+ char *partitioner_str;
+
+ rd_bool_t random_partitioner; /**< rd_true - random
+ * rd_false - sticky */
+
+ int queuing_strategy; /* RD_KAFKA_QUEUE_FIFO|LIFO */
+ int (*msg_order_cmp)(const void *a, const void *b);
+
+ rd_kafka_compression_t compression_codec;
+ rd_kafka_complevel_t compression_level;
+ int produce_offset_report;
+
+ int consume_callback_max_msgs;
+ int auto_commit;
+ int auto_commit_interval_ms;
+ int auto_offset_reset;
+ char *offset_store_path;
+ int offset_store_sync_interval_ms;
+
+ rd_kafka_offset_method_t offset_store_method;
+
+ /* Application provided opaque pointer (this is rkt_opaque) */
+ void *opaque;
+};
+
+
+char **rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp);
+
+void rd_kafka_anyconf_destroy(int scope, void *conf);
+
+rd_bool_t rd_kafka_conf_is_modified(const rd_kafka_conf_t *conf,
+ const char *name);
+
+void rd_kafka_desensitize_str(char *str);
+
+void rd_kafka_conf_desensitize(rd_kafka_conf_t *conf);
+void rd_kafka_topic_conf_desensitize(rd_kafka_topic_conf_t *tconf);
+
+const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
+ rd_kafka_conf_t *conf);
+const char *rd_kafka_topic_conf_finalize(rd_kafka_type_t cltype,
+ const rd_kafka_conf_t *conf,
+ rd_kafka_topic_conf_t *tconf);
+
+
+int rd_kafka_conf_warn(rd_kafka_t *rk);
+
+void rd_kafka_anyconf_dump_dbg(rd_kafka_t *rk,
+ int scope,
+ const void *conf,
+ const char *description);
+
+#include "rdkafka_confval.h"
+
+int unittest_conf(void);
+
+#endif /* _RDKAFKA_CONF_H_ */