summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka.c5026
1 files changed, 5026 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka.c
new file mode 100644
index 000000000..b254748eb
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka.c
@@ -0,0 +1,5026 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2013, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#define _GNU_SOURCE
+#include <errno.h>
+#include <string.h>
+#include <stdarg.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+#if !_WIN32
+#include <sys/types.h>
+#include <dirent.h>
+#endif
+
+#include "rdkafka_int.h"
+#include "rdkafka_msg.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_cgrp.h"
+#include "rdkafka_assignor.h"
+#include "rdkafka_request.h"
+#include "rdkafka_event.h"
+#include "rdkafka_error.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_interceptor.h"
+#include "rdkafka_idempotence.h"
+#include "rdkafka_sasl_oauthbearer.h"
+#if WITH_OAUTHBEARER_OIDC
+#include "rdkafka_sasl_oauthbearer_oidc.h"
+#endif
+#if WITH_SSL
+#include "rdkafka_ssl.h"
+#endif
+
+#include "rdtime.h"
+#include "crc32c.h"
+#include "rdunittest.h"
+
+#ifdef _WIN32
+#include <sys/types.h>
+#include <sys/timeb.h>
+#endif
+
+#define CJSON_HIDE_SYMBOLS
+#include "cJSON.h"
+
+#if WITH_CURL
+#include "rdhttp.h"
+#endif
+
+
+static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
+static once_flag rd_kafka_global_srand_once = ONCE_FLAG_INIT;
+
+/**
+ * @brief Global counter+lock for all active librdkafka instances
+ */
+mtx_t rd_kafka_global_lock;
+int rd_kafka_global_cnt;
+
+
+/**
+ * Last API error code, per thread.
+ * Shared among all rd_kafka_t instances.
+ */
+rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
+
+
+/**
+ * Current number of threads created by rdkafka.
+ * This is used in regression tests.
+ */
+rd_atomic32_t rd_kafka_thread_cnt_curr;
+int rd_kafka_thread_cnt(void) {
+ return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
+}
+
+/**
+ * Current thread's log name (TLS)
+ */
+char RD_TLS rd_kafka_thread_name[64] = "app";
+
+void rd_kafka_set_thread_name(const char *fmt, ...) {
+ va_list ap;
+
+ va_start(ap, fmt);
+ rd_vsnprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name), fmt,
+ ap);
+ va_end(ap);
+}
+
+/**
+ * @brief Current thread's system name (TLS)
+ *
+ * Note the name must be 15 characters or less, because it is passed to
+ * pthread_setname_np on Linux which imposes this limit.
+ */
+static char RD_TLS rd_kafka_thread_sysname[16] = "app";
+
+void rd_kafka_set_thread_sysname(const char *fmt, ...) {
+ va_list ap;
+
+ va_start(ap, fmt);
+ rd_vsnprintf(rd_kafka_thread_sysname, sizeof(rd_kafka_thread_sysname),
+ fmt, ap);
+ va_end(ap);
+
+ thrd_setname(rd_kafka_thread_sysname);
+}
+
+static void rd_kafka_global_init0(void) {
+ cJSON_Hooks json_hooks = {.malloc_fn = rd_malloc, .free_fn = rd_free};
+
+ mtx_init(&rd_kafka_global_lock, mtx_plain);
+#if ENABLE_DEVEL
+ rd_atomic32_init(&rd_kafka_op_cnt, 0);
+#endif
+ rd_crc32c_global_init();
+#if WITH_SSL
+ /* The configuration interface might need to use
+ * OpenSSL to parse keys, prior to any rd_kafka_t
+ * object has been created. */
+ rd_kafka_ssl_init();
+#endif
+
+ cJSON_InitHooks(&json_hooks);
+
+#if WITH_CURL
+ rd_http_global_init();
+#endif
+}
+
+/**
+ * @brief Initialize once per process
+ */
+void rd_kafka_global_init(void) {
+ call_once(&rd_kafka_global_init_once, rd_kafka_global_init0);
+}
+
+
+/**
+ * @brief Seed the PRNG with current_time.milliseconds
+ */
+static void rd_kafka_global_srand(void) {
+ struct timeval tv;
+
+ rd_gettimeofday(&tv, NULL);
+
+ srand((unsigned int)(tv.tv_usec / 1000));
+}
+
+
+/**
+ * @returns the current number of active librdkafka instances
+ */
+static int rd_kafka_global_cnt_get(void) {
+ int r;
+ mtx_lock(&rd_kafka_global_lock);
+ r = rd_kafka_global_cnt;
+ mtx_unlock(&rd_kafka_global_lock);
+ return r;
+}
+
+
+/**
+ * @brief Increase counter for active librdkafka instances.
+ * If this is the first instance the global constructors will be called, if any.
+ */
+static void rd_kafka_global_cnt_incr(void) {
+ mtx_lock(&rd_kafka_global_lock);
+ rd_kafka_global_cnt++;
+ if (rd_kafka_global_cnt == 1) {
+ rd_kafka_transport_init();
+#if WITH_SSL
+ rd_kafka_ssl_init();
+#endif
+ rd_kafka_sasl_global_init();
+ }
+ mtx_unlock(&rd_kafka_global_lock);
+}
+
+/**
+ * @brief Decrease counter for active librdkafka instances.
+ * If this counter reaches 0 the global destructors will be called, if any.
+ */
+static void rd_kafka_global_cnt_decr(void) {
+ mtx_lock(&rd_kafka_global_lock);
+ rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
+ rd_kafka_global_cnt--;
+ if (rd_kafka_global_cnt == 0) {
+ rd_kafka_sasl_global_term();
+#if WITH_SSL
+ rd_kafka_ssl_term();
+#endif
+ }
+ mtx_unlock(&rd_kafka_global_lock);
+}
+
+
+/**
+ * Wait for all rd_kafka_t objects to be destroyed.
+ * Returns 0 if all kafka objects are now destroyed, or -1 if the
+ * timeout was reached.
+ */
+int rd_kafka_wait_destroyed(int timeout_ms) {
+ rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
+
+ while (rd_kafka_thread_cnt() > 0 || rd_kafka_global_cnt_get() > 0) {
+ if (rd_clock() >= timeout) {
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
+ ETIMEDOUT);
+ return -1;
+ }
+ rd_usleep(25000, NULL); /* 25ms */
+ }
+
+ return 0;
+}
+
+static void rd_kafka_log_buf(const rd_kafka_conf_t *conf,
+ const rd_kafka_t *rk,
+ int level,
+ int ctx,
+ const char *fac,
+ const char *buf) {
+ if (level > conf->log_level)
+ return;
+ else if (rk && conf->log_queue) {
+ rd_kafka_op_t *rko;
+
+ if (!rk->rk_logq)
+ return; /* Terminating */
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
+ rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
+ rko->rko_u.log.level = level;
+ rd_strlcpy(rko->rko_u.log.fac, fac, sizeof(rko->rko_u.log.fac));
+ rko->rko_u.log.str = rd_strdup(buf);
+ rko->rko_u.log.ctx = ctx;
+ rd_kafka_q_enq(rk->rk_logq, rko);
+
+ } else if (conf->log_cb) {
+ conf->log_cb(rk, level, fac, buf);
+ }
+}
+
+/**
+ * @brief Logger
+ *
+ * @remark conf must be set, but rk may be NULL
+ */
+void rd_kafka_log0(const rd_kafka_conf_t *conf,
+ const rd_kafka_t *rk,
+ const char *extra,
+ int level,
+ int ctx,
+ const char *fac,
+ const char *fmt,
+ ...) {
+ char buf[2048];
+ va_list ap;
+ unsigned int elen = 0;
+ unsigned int of = 0;
+
+ if (level > conf->log_level)
+ return;
+
+ if (conf->log_thread_name) {
+ elen = rd_snprintf(buf, sizeof(buf),
+ "[thrd:%s]: ", rd_kafka_thread_name);
+ if (unlikely(elen >= sizeof(buf)))
+ elen = sizeof(buf);
+ of = elen;
+ }
+
+ if (extra) {
+ elen = rd_snprintf(buf + of, sizeof(buf) - of, "%s: ", extra);
+ if (unlikely(elen >= sizeof(buf) - of))
+ elen = sizeof(buf) - of;
+ of += elen;
+ }
+
+ va_start(ap, fmt);
+ rd_vsnprintf(buf + of, sizeof(buf) - of, fmt, ap);
+ va_end(ap);
+
+ rd_kafka_log_buf(conf, rk, level, ctx, fac, buf);
+}
+
+rd_kafka_resp_err_t
+rd_kafka_oauthbearer_set_token(rd_kafka_t *rk,
+ const char *token_value,
+ int64_t md_lifetime_ms,
+ const char *md_principal_name,
+ const char **extensions,
+ size_t extension_size,
+ char *errstr,
+ size_t errstr_size) {
+#if WITH_SASL_OAUTHBEARER
+ return rd_kafka_oauthbearer_set_token0(
+ rk, token_value, md_lifetime_ms, md_principal_name, extensions,
+ extension_size, errstr, errstr_size);
+#else
+ rd_snprintf(errstr, errstr_size,
+ "librdkafka not built with SASL OAUTHBEARER support");
+ return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+#endif
+}
+
+rd_kafka_resp_err_t rd_kafka_oauthbearer_set_token_failure(rd_kafka_t *rk,
+ const char *errstr) {
+#if WITH_SASL_OAUTHBEARER
+ return rd_kafka_oauthbearer_set_token_failure0(rk, errstr);
+#else
+ return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+#endif
+}
+
+void rd_kafka_log_print(const rd_kafka_t *rk,
+ int level,
+ const char *fac,
+ const char *buf) {
+ int secs, msecs;
+ struct timeval tv;
+ rd_gettimeofday(&tv, NULL);
+ secs = (int)tv.tv_sec;
+ msecs = (int)(tv.tv_usec / 1000);
+ fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n", level, secs, msecs, fac,
+ rk ? rk->rk_name : "", buf);
+}
+
+void rd_kafka_log_syslog(const rd_kafka_t *rk,
+ int level,
+ const char *fac,
+ const char *buf) {
+#if WITH_SYSLOG
+ static int initialized = 0;
+
+ if (!initialized)
+ openlog("rdkafka", LOG_PID | LOG_CONS, LOG_USER);
+
+ syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
+#else
+ rd_assert(!*"syslog support not enabled in this build");
+#endif
+}
+
+void rd_kafka_set_logger(rd_kafka_t *rk,
+ void (*func)(const rd_kafka_t *rk,
+ int level,
+ const char *fac,
+ const char *buf)) {
+#if !WITH_SYSLOG
+ if (func == rd_kafka_log_syslog)
+ rd_assert(!*"syslog support not enabled in this build");
+#endif
+ rk->rk_conf.log_cb = func;
+}
+
+void rd_kafka_set_log_level(rd_kafka_t *rk, int level) {
+ rk->rk_conf.log_level = level;
+}
+
+
+
+static const char *rd_kafka_type2str(rd_kafka_type_t type) {
+ static const char *types[] = {
+ [RD_KAFKA_PRODUCER] = "producer",
+ [RD_KAFKA_CONSUMER] = "consumer",
+ };
+ return types[type];
+}
+
+#define _ERR_DESC(ENUM, DESC) \
+ [ENUM - RD_KAFKA_RESP_ERR__BEGIN] = {ENUM, &(#ENUM)[18] /*pfx*/, DESC}
+
+static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
+ _ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG, "Local: Bad message format"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
+ "Local: Invalid compressed data"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY, "Local: Broker handle destroyed"),
+ _ERR_DESC(
+ RD_KAFKA_RESP_ERR__FAIL,
+ "Local: Communication failure with broker"), // FIXME: too specific
+ _ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT, "Local: Broker transport failure"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
+ "Local: Critical system resource failure"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE, "Local: Host resolution failure"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, "Local: Message timed out"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF, "Broker: No more messages"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, "Local: Unknown partition"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__FS, "Local: File or filesystem error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC, "Local: Unknown topic"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
+ "Local: All broker connections are down"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Local: Invalid argument or configuration"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT, "Local: Timed out"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL, "Local: Queue full"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF, "Local: ISR count insufficient"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE, "Local: Broker node update"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__SSL, "Local: SSL error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD, "Local: Waiting for coordinator"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP, "Local: Unknown group"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS, "Local: Operation in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
+ "Local: Previous operation in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
+ "Local: Existing subscription"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, "Local: Assign partitions"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, "Local: Revoke partitions"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT, "Local: Conflicting use"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__STATE, "Local: Erroneous state"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL, "Local: Unknown protocol"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED, "Local: Not implemented"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
+ "Local: Authentication failure"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET, "Local: No offset stored"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED, "Local: Outdated"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, "Local: Timed out in queue"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
+ "Local: Required feature not supported by broker"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE, "Local: Awaiting cache update"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__INTR, "Local: Operation interrupted"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
+ "Local: Key serialization error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
+ "Local: Value serialization error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
+ "Local: Key deserialization error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
+ "Local: Value deserialization error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__PARTIAL, "Local: Partial response"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__READ_ONLY, "Local: Read-only object"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__NOENT, "Local: No such entry"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNDERFLOW, "Local: Read underflow"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_TYPE, "Local: Invalid type"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__RETRY, "Local: Retry operation"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_QUEUE, "Local: Purged in queue"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT, "Local: Purged in flight"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__FATAL, "Local: Fatal error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__INCONSISTENT, "Local: Inconsistent state"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE,
+ "Local: Gap-less ordering would not be guaranteed "
+ "if proceeding"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED,
+ "Local: Maximum application poll interval "
+ "(max.poll.interval.ms) exceeded"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_BROKER, "Local: Unknown broker"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
+ "Local: Functionality not configured"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__FENCED,
+ "Local: This instance has been fenced by a newer instance"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__APPLICATION,
+ "Local: Application generated error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST,
+ "Local: Group partition assignment lost"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__NOOP, "Local: No operation performed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET,
+ "Local: No offset to automatically reset to"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__LOG_TRUNCATION,
+ "Local: Partition log truncation detected"),
+
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN, "Unknown broker error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR, "Success"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
+ "Broker: Offset out of range"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG, "Broker: Invalid message"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+ "Broker: Unknown topic or partition"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
+ "Broker: Invalid message size"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
+ "Broker: Leader not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
+ "Broker: Not leader for partition"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, "Broker: Request timed out"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
+ "Broker: Broker not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
+ "Broker: Replica not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
+ "Broker: Message size too large"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
+ "Broker: StaleControllerEpochCode"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
+ "Broker: Offset metadata string too large"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
+ "Broker: Broker disconnected before response received"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
+ "Broker: Coordinator load in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
+ "Broker: Coordinator not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR, "Broker: Not coordinator"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION, "Broker: Invalid topic"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
+ "Broker: Message batch larger than configured server "
+ "segment size"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
+ "Broker: Not enough in-sync replicas"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
+ "Broker: Message(s) written to insufficient number of "
+ "in-sync replicas"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
+ "Broker: Invalid required acks value"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ "Broker: Specified group generation id is not valid"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
+ "Broker: Inconsistent group protocol"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID, "Broker: Invalid group.id"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, "Broker: Unknown member"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
+ "Broker: Invalid session timeout"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
+ "Broker: Group rebalance in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
+ "Broker: Commit offset data size is not valid"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
+ "Broker: Topic authorization failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
+ "Broker: Group authorization failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
+ "Broker: Cluster authorization failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP, "Broker: Invalid timestamp"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
+ "Broker: Unsupported SASL mechanism"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
+ "Broker: Request not valid in current SASL state"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
+ "Broker: API version not supported"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
+ "Broker: Topic already exists"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
+ "Broker: Invalid number of partitions"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
+ "Broker: Invalid replication factor"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
+ "Broker: Invalid replica assignment"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
+ "Broker: Configuration is invalid"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
+ "Broker: Not controller for cluster"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST, "Broker: Invalid request"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
+ "Broker: Message format on broker does not support request"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION, "Broker: Policy violation"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
+ "Broker: Broker received an out of order sequence number"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
+ "Broker: Broker received a duplicate sequence number"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
+ "Broker: Producer attempted an operation with an old epoch"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
+ "Broker: Producer attempted a transactional operation in "
+ "an invalid state"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
+ "Broker: Producer attempted to use a producer id which is "
+ "not currently assigned to its transactional id"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
+ "Broker: Transaction timeout is larger than the maximum "
+ "value allowed by the broker's max.transaction.timeout.ms"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
+ "Broker: Producer attempted to update a transaction while "
+ "another concurrent operation on the same transaction was "
+ "ongoing"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
+ "Broker: Indicates that the transaction coordinator sending "
+ "a WriteTxnMarker is no longer the current coordinator for "
+ "a given producer"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
+ "Broker: Transactional Id authorization failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
+ "Broker: Security features are disabled"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
+ "Broker: Operation not attempted"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
+ "Broker: Disk error when trying to access log file on disk"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND,
+ "Broker: The user-specified log directory is not found "
+ "in the broker config"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED,
+ "Broker: SASL Authentication failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
+ "Broker: Unknown Producer Id"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS,
+ "Broker: Partition reassignment is in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED,
+ "Broker: Delegation Token feature is not enabled"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND,
+ "Broker: Delegation Token is not found on server"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH,
+ "Broker: Specified Principal is not valid Owner/Renewer"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED,
+ "Broker: Delegation Token requests are not allowed on "
+ "this connection"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED,
+ "Broker: Delegation Token authorization failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED,
+ "Broker: Delegation Token is expired"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE,
+ "Broker: Supplied principalType is not supported"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP,
+ "Broker: The group is not empty"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND,
+ "Broker: The group id does not exist"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND,
+ "Broker: The fetch session ID was not found"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH,
+ "Broker: The fetch session epoch is invalid"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND,
+ "Broker: No matching listener"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED,
+ "Broker: Topic deletion is disabled"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH,
+ "Broker: Leader epoch is older than broker epoch"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH,
+ "Broker: Leader epoch is newer than broker epoch"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE,
+ "Broker: Unsupported compression type"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH,
+ "Broker: Broker epoch has changed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE,
+ "Broker: Leader high watermark is not caught up"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED,
+ "Broker: Group member needs a valid member ID"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE,
+ "Broker: Preferred leader was not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED,
+ "Broker: Consumer group has reached maximum size"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
+ "Broker: Static consumer fenced by other consumer with same "
+ "group.instance.id"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE,
+ "Broker: Eligible partition leaders are not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED,
+ "Broker: Leader election not needed for topic partition"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS,
+ "Broker: No partition reassignment is in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC,
+ "Broker: Deleting offsets of a topic while the consumer "
+ "group is subscribed to it"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_RECORD,
+ "Broker: Broker failed to validate record"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
+ "Broker: There are unstable offsets that need to be cleared"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_THROTTLING_QUOTA_EXCEEDED,
+ "Broker: Throttling quota has been exceeded"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_PRODUCER_FENCED,
+ "Broker: There is a newer producer with the same "
+ "transactionalId which fences the current one"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND,
+ "Broker: Request illegally referred to resource that "
+ "does not exist"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_RESOURCE,
+ "Broker: Request illegally referred to the same resource "
+ "twice"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNACCEPTABLE_CREDENTIAL,
+ "Broker: Requested credential would not meet criteria for "
+ "acceptability"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_VOTER_SET,
+ "Broker: Indicates that the either the sender or recipient "
+ "of a voter-only request is not one of the expected voters"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_UPDATE_VERSION,
+ "Broker: Invalid update version"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED,
+ "Broker: Unable to update finalized features due to "
+ "server error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE,
+ "Broker: Request principal deserialization failed during "
+ "forwarding"),
+
+ _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
+
+
+void rd_kafka_get_err_descs(const struct rd_kafka_err_desc **errdescs,
+ size_t *cntp) {
+ *errdescs = rd_kafka_err_descs;
+ *cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
+}
+
+
+const char *rd_kafka_err2str(rd_kafka_resp_err_t err) {
+ static RD_TLS char ret[32];
+ int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
+
+ if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
+ err >= RD_KAFKA_RESP_ERR_END_ALL ||
+ !rd_kafka_err_descs[idx].desc)) {
+ rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
+ return ret;
+ }
+
+ return rd_kafka_err_descs[idx].desc;
+}
+
+
+const char *rd_kafka_err2name(rd_kafka_resp_err_t err) {
+ static RD_TLS char ret[32];
+ int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
+
+ if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
+ err >= RD_KAFKA_RESP_ERR_END_ALL ||
+ !rd_kafka_err_descs[idx].desc)) {
+ rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
+ return ret;
+ }
+
+ return rd_kafka_err_descs[idx].name;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_last_error(void) {
+ return rd_kafka_last_error_code;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_errno2err(int errnox) {
+ switch (errnox) {
+ case EINVAL:
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ case EBUSY:
+ return RD_KAFKA_RESP_ERR__CONFLICT;
+
+ case ENOENT:
+ return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
+
+ case ESRCH:
+ return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+
+ case ETIMEDOUT:
+ return RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+ case EMSGSIZE:
+ return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
+
+ case ENOBUFS:
+ return RD_KAFKA_RESP_ERR__QUEUE_FULL;
+
+ case ECANCELED:
+ return RD_KAFKA_RESP_ERR__FATAL;
+
+ default:
+ return RD_KAFKA_RESP_ERR__FAIL;
+ }
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_fatal_error(rd_kafka_t *rk, char *errstr, size_t errstr_size) {
+ rd_kafka_resp_err_t err;
+
+ if (unlikely((err = rd_atomic32_get(&rk->rk_fatal.err)))) {
+ rd_kafka_rdlock(rk);
+ rd_snprintf(errstr, errstr_size, "%s", rk->rk_fatal.errstr);
+ rd_kafka_rdunlock(rk);
+ }
+
+ return err;
+}
+
+
+/**
+ * @brief Set's the fatal error for this instance.
+ *
+ * @param do_lock RD_DO_LOCK: rd_kafka_wrlock() will be acquired and released,
+ * RD_DONT_LOCK: caller must hold rd_kafka_wrlock().
+ *
+ * @returns 1 if the error was set, or 0 if a previous fatal error
+ * has already been set on this instance.
+ *
+ * @locality any
+ * @locks none
+ */
+int rd_kafka_set_fatal_error0(rd_kafka_t *rk,
+ rd_dolock_t do_lock,
+ rd_kafka_resp_err_t err,
+ const char *fmt,
+ ...) {
+ va_list ap;
+ char buf[512];
+
+ if (do_lock)
+ rd_kafka_wrlock(rk);
+ rk->rk_fatal.cnt++;
+ if (rd_atomic32_get(&rk->rk_fatal.err)) {
+ if (do_lock)
+ rd_kafka_wrunlock(rk);
+ rd_kafka_dbg(rk, GENERIC, "FATAL",
+ "Suppressing subsequent fatal error: %s",
+ rd_kafka_err2name(err));
+ return 0;
+ }
+
+ rd_atomic32_set(&rk->rk_fatal.err, err);
+
+ va_start(ap, fmt);
+ rd_vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+ rk->rk_fatal.errstr = rd_strdup(buf);
+
+ if (do_lock)
+ rd_kafka_wrunlock(rk);
+
+ /* If there is an error callback or event handler we
+ * also log the fatal error as it happens.
+ * If there is no error callback the error event
+ * will be automatically logged, and this check here
+ * prevents us from duplicate logs. */
+ if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)
+ rd_kafka_log(rk, LOG_EMERG, "FATAL", "Fatal error: %s: %s",
+ rd_kafka_err2str(err), rk->rk_fatal.errstr);
+ else
+ rd_kafka_dbg(rk, ALL, "FATAL", "Fatal error: %s: %s",
+ rd_kafka_err2str(err), rk->rk_fatal.errstr);
+
+ /* Indicate to the application that a fatal error was raised,
+ * the app should use rd_kafka_fatal_error() to extract the
+ * fatal error code itself.
+ * For the high-level consumer we propagate the error as a
+ * consumer error so it is returned from consumer_poll(),
+ * while for all other client types (the producer) we propagate to
+ * the standard error handler (typically error_cb). */
+ if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
+ rd_kafka_consumer_err(
+ rk->rk_cgrp->rkcg_q, RD_KAFKA_NODEID_UA,
+ RD_KAFKA_RESP_ERR__FATAL, 0, NULL, NULL,
+ RD_KAFKA_OFFSET_INVALID, "Fatal error: %s: %s",
+ rd_kafka_err2str(err), rk->rk_fatal.errstr);
+ else
+ rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__FATAL,
+ "Fatal error: %s: %s", rd_kafka_err2str(err),
+ rk->rk_fatal.errstr);
+
+
+ /* Tell rdkafka main thread to purge producer queues, but not
+ * in-flight since we'll want proper delivery status for transmitted
+ * requests.
+ * Need NON_BLOCKING to avoid dead-lock if user is
+ * calling purge() at the same time, which could be
+ * waiting for this broker thread to handle its
+ * OP_PURGE request. */
+ if (rk->rk_type == RD_KAFKA_PRODUCER) {
+ rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PURGE);
+ rko->rko_u.purge.flags =
+ RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_NON_BLOCKING;
+ rd_kafka_q_enq(rk->rk_ops, rko);
+ }
+
+ return 1;
+}
+
+
+/**
+ * @returns a copy of the current fatal error, if any, else NULL.
+ *
+ * @locks_acquired rd_kafka_rdlock(rk)
+ */
+rd_kafka_error_t *rd_kafka_get_fatal_error(rd_kafka_t *rk) {
+ rd_kafka_error_t *error;
+ rd_kafka_resp_err_t err;
+
+ if (!(err = rd_atomic32_get(&rk->rk_fatal.err)))
+ return NULL; /* No fatal error raised */
+
+ rd_kafka_rdlock(rk);
+ error = rd_kafka_error_new_fatal(err, "%s", rk->rk_fatal.errstr);
+ rd_kafka_rdunlock(rk);
+
+ return error;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_test_fatal_error(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ const char *reason) {
+ if (!rd_kafka_set_fatal_error(rk, err, "test_fatal_error: %s", reason))
+ return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
+ else
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
+ *
+ * @locality application thread
+ */
+void rd_kafka_destroy_final(rd_kafka_t *rk) {
+
+ rd_kafka_assert(rk, rd_kafka_terminating(rk));
+
+ /* Synchronize state */
+ rd_kafka_wrlock(rk);
+ rd_kafka_wrunlock(rk);
+
+ /* Terminate SASL provider */
+ if (rk->rk_conf.sasl.provider)
+ rd_kafka_sasl_term(rk);
+
+ rd_kafka_timers_destroy(&rk->rk_timers);
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");
+
+ /* Destroy cgrp */
+ if (rk->rk_cgrp) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying cgrp");
+ /* Reset queue forwarding (rep -> cgrp) */
+ rd_kafka_q_fwd_set(rk->rk_rep, NULL);
+ rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
+ }
+
+ rd_kafka_assignors_term(rk);
+
+ if (rk->rk_type == RD_KAFKA_CONSUMER) {
+ rd_kafka_assignment_destroy(rk);
+ if (rk->rk_consumer.q)
+ rd_kafka_q_destroy(rk->rk_consumer.q);
+ }
+
+ /* Purge op-queues */
+ rd_kafka_q_destroy_owner(rk->rk_rep);
+ rd_kafka_q_destroy_owner(rk->rk_ops);
+
+#if WITH_SSL
+ if (rk->rk_conf.ssl.ctx) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
+ rd_kafka_ssl_ctx_term(rk);
+ }
+ rd_list_destroy(&rk->rk_conf.ssl.loaded_providers);
+#endif
+
+ /* It is not safe to log after this point. */
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Termination done: freeing resources");
+
+ if (rk->rk_logq) {
+ rd_kafka_q_destroy_owner(rk->rk_logq);
+ rk->rk_logq = NULL;
+ }
+
+ if (rk->rk_type == RD_KAFKA_PRODUCER) {
+ cnd_destroy(&rk->rk_curr_msgs.cnd);
+ mtx_destroy(&rk->rk_curr_msgs.lock);
+ }
+
+ if (rk->rk_fatal.errstr) {
+ rd_free(rk->rk_fatal.errstr);
+ rk->rk_fatal.errstr = NULL;
+ }
+
+ cnd_destroy(&rk->rk_broker_state_change_cnd);
+ mtx_destroy(&rk->rk_broker_state_change_lock);
+
+ mtx_destroy(&rk->rk_suppress.sparse_connect_lock);
+
+ cnd_destroy(&rk->rk_init_cnd);
+ mtx_destroy(&rk->rk_init_lock);
+
+ if (rk->rk_full_metadata)
+ rd_kafka_metadata_destroy(rk->rk_full_metadata);
+ rd_kafkap_str_destroy(rk->rk_client_id);
+ rd_kafkap_str_destroy(rk->rk_group_id);
+ rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
+ rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
+ rd_list_destroy(&rk->rk_broker_by_id);
+
+ mtx_destroy(&rk->rk_conf.sasl.lock);
+ rwlock_destroy(&rk->rk_lock);
+
+ rd_free(rk);
+ rd_kafka_global_cnt_decr();
+}
+
+
+static void rd_kafka_destroy_app(rd_kafka_t *rk, int flags) {
+ thrd_t thrd;
+#ifndef _WIN32
+ int term_sig = rk->rk_conf.term_sig;
+#endif
+ int res;
+ char flags_str[256];
+ static const char *rd_kafka_destroy_flags_names[] = {
+ "Terminate", "DestroyCalled", "Immediate", "NoConsumerClose", NULL};
+
+ /* Fatal errors and _F_IMMEDIATE also sets .._NO_CONSUMER_CLOSE */
+ if (flags & RD_KAFKA_DESTROY_F_IMMEDIATE ||
+ rd_kafka_fatal_error_code(rk))
+ flags |= RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE;
+
+ rd_flags2str(flags_str, sizeof(flags_str), rd_kafka_destroy_flags_names,
+ flags);
+ rd_kafka_dbg(rk, ALL, "DESTROY",
+ "Terminating instance "
+ "(destroy flags %s (0x%x))",
+ flags ? flags_str : "none", flags);
+
+ /* If producer still has messages in queue the application
+ * is terminating the producer without first calling flush() or purge()
+ * which is a common new user mistake, so hint the user of proper
+ * shutdown semantics. */
+ if (rk->rk_type == RD_KAFKA_PRODUCER) {
+ unsigned int tot_cnt;
+ size_t tot_size;
+
+ rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
+
+ if (tot_cnt > 0)
+ rd_kafka_log(rk, LOG_WARNING, "TERMINATE",
+ "Producer terminating with %u message%s "
+ "(%" PRIusz
+ " byte%s) still in "
+ "queue or transit: "
+ "use flush() to wait for "
+ "outstanding message delivery",
+ tot_cnt, tot_cnt > 1 ? "s" : "", tot_size,
+ tot_size > 1 ? "s" : "");
+ }
+
+ /* Make sure destroy is not called from a librdkafka thread
+ * since this will most likely cause a deadlock.
+ * FIXME: include broker threads (for log_cb) */
+ if (thrd_is_current(rk->rk_thread) ||
+ thrd_is_current(rk->rk_background.thread)) {
+ rd_kafka_log(rk, LOG_EMERG, "BGQUEUE",
+ "Application bug: "
+ "rd_kafka_destroy() called from "
+ "librdkafka owned thread");
+ rd_kafka_assert(NULL,
+ !*"Application bug: "
+ "calling rd_kafka_destroy() from "
+ "librdkafka owned thread is prohibited");
+ }
+
+ /* Before signaling for general termination, set the destroy
+ * flags to hint cgrp how to shut down. */
+ rd_atomic32_set(&rk->rk_terminate,
+ flags | RD_KAFKA_DESTROY_F_DESTROY_CALLED);
+
+ /* The legacy/simple consumer lacks an API to close down the consumer*/
+ if (rk->rk_cgrp) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Terminating consumer group handler");
+ rd_kafka_consumer_close(rk);
+ }
+
+ /* With the consumer closed, terminate the rest of librdkafka. */
+ rd_atomic32_set(&rk->rk_terminate,
+ flags | RD_KAFKA_DESTROY_F_TERMINATE);
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
+ rd_kafka_wrlock(rk);
+ thrd = rk->rk_thread;
+ rd_kafka_timers_interrupt(&rk->rk_timers);
+ rd_kafka_wrunlock(rk);
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Sending TERMINATE to internal main thread");
+ /* Send op to trigger queue/io wake-up.
+ * The op itself is (likely) ignored by the receiver. */
+ rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+#ifndef _WIN32
+ /* Interrupt main kafka thread to speed up termination. */
+ if (term_sig) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Sending thread kill signal %d", term_sig);
+ pthread_kill(thrd, term_sig);
+ }
+#endif
+
+ if (rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_IMMEDIATE))
+ return; /* FIXME: thread resource leak */
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Joining internal main thread");
+
+ if (thrd_join(thrd, &res) != thrd_success)
+ rd_kafka_log(rk, LOG_ERR, "DESTROY",
+ "Failed to join internal main thread: %s "
+ "(was process forked?)",
+ rd_strerror(errno));
+
+ rd_kafka_destroy_final(rk);
+}
+
+
+/* NOTE: Must only be called by application.
+ * librdkafka itself must use rd_kafka_destroy0(). */
+void rd_kafka_destroy(rd_kafka_t *rk) {
+ rd_kafka_destroy_app(rk, 0);
+}
+
+void rd_kafka_destroy_flags(rd_kafka_t *rk, int flags) {
+ rd_kafka_destroy_app(rk, flags);
+}
+
+
+/**
+ * Main destructor for rd_kafka_t
+ *
+ * Locality: rdkafka main thread or application thread during rd_kafka_new()
+ */
+static void rd_kafka_destroy_internal(rd_kafka_t *rk) {
+ rd_kafka_topic_t *rkt, *rkt_tmp;
+ rd_kafka_broker_t *rkb, *rkb_tmp;
+ rd_list_t wait_thrds;
+ thrd_t *thrd;
+ int i;
+
+ rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");
+
+ /* Trigger any state-change waiters (which should check the
+ * terminate flag whenever they wake up). */
+ rd_kafka_brokers_broadcast_state_change(rk);
+
+ if (rk->rk_background.thread) {
+ int res;
+ /* Send op to trigger queue/io wake-up.
+ * The op itself is (likely) ignored by the receiver. */
+ rd_kafka_q_enq(rk->rk_background.q,
+ rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+ rd_kafka_dbg(rk, ALL, "DESTROY",
+ "Waiting for background queue thread "
+ "to terminate");
+ thrd_join(rk->rk_background.thread, &res);
+ rd_kafka_q_destroy_owner(rk->rk_background.q);
+ }
+
+ /* Call on_destroy() interceptors */
+ rd_kafka_interceptors_on_destroy(rk);
+
+ /* Brokers pick up on rk_terminate automatically. */
+
+ /* List of (broker) threads to join to synchronize termination */
+ rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);
+
+ rd_kafka_wrlock(rk);
+
+ rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
+ /* Decommission all topics */
+ TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
+ rd_kafka_wrunlock(rk);
+ rd_kafka_topic_partitions_remove(rkt);
+ rd_kafka_wrlock(rk);
+ }
+
+ /* Decommission brokers.
+ * Broker thread holds a refcount and detects when broker refcounts
+ * reaches 1 and then decommissions itself. */
+ TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
+ /* Add broker's thread to wait_thrds list for later joining */
+ thrd = rd_malloc(sizeof(*thrd));
+ *thrd = rkb->rkb_thread;
+ rd_list_add(&wait_thrds, thrd);
+ rd_kafka_wrunlock(rk);
+
+ rd_kafka_dbg(rk, BROKER, "DESTROY", "Sending TERMINATE to %s",
+ rd_kafka_broker_name(rkb));
+ /* Send op to trigger queue/io wake-up.
+ * The op itself is (likely) ignored by the broker thread. */
+ rd_kafka_q_enq(rkb->rkb_ops,
+ rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+#ifndef _WIN32
+ /* Interrupt IO threads to speed up termination. */
+ if (rk->rk_conf.term_sig)
+ pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
+#endif
+
+ rd_kafka_broker_destroy(rkb);
+
+ rd_kafka_wrlock(rk);
+ }
+
+ if (rk->rk_clusterid) {
+ rd_free(rk->rk_clusterid);
+ rk->rk_clusterid = NULL;
+ }
+
+ /* Destroy coord requests */
+ rd_kafka_coord_reqs_term(rk);
+
+ /* Destroy the coordinator cache */
+ rd_kafka_coord_cache_destroy(&rk->rk_coord_cache);
+
+ /* Purge metadata cache.
+ * #3279:
+ * We mustn't call cache_destroy() here since there might be outstanding
+ * broker rkos that hold references to the metadata cache lock,
+ * and these brokers are destroyed below. So to avoid a circular
+ * dependency refcnt deadlock we first purge the cache here
+ * and destroy it after the brokers are destroyed. */
+ rd_kafka_metadata_cache_purge(rk, rd_true /*observers too*/);
+
+ rd_kafka_wrunlock(rk);
+
+ mtx_lock(&rk->rk_broker_state_change_lock);
+ /* Purge broker state change waiters */
+ rd_list_destroy(&rk->rk_broker_state_change_waiters);
+ mtx_unlock(&rk->rk_broker_state_change_lock);
+
+ if (rk->rk_type == RD_KAFKA_CONSUMER) {
+ if (rk->rk_consumer.q)
+ rd_kafka_q_disable(rk->rk_consumer.q);
+ }
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Purging reply queue");
+
+ /* Purge op-queue */
+ rd_kafka_q_disable(rk->rk_rep);
+ rd_kafka_q_purge(rk->rk_rep);
+
+ /* Loose our special reference to the internal broker. */
+ mtx_lock(&rk->rk_internal_rkb_lock);
+ if ((rkb = rk->rk_internal_rkb)) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Decommissioning internal broker");
+
+ /* Send op to trigger queue wake-up. */
+ rd_kafka_q_enq(rkb->rkb_ops,
+ rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+ rk->rk_internal_rkb = NULL;
+ thrd = rd_malloc(sizeof(*thrd));
+ *thrd = rkb->rkb_thread;
+ rd_list_add(&wait_thrds, thrd);
+ }
+ mtx_unlock(&rk->rk_internal_rkb_lock);
+ if (rkb)
+ rd_kafka_broker_destroy(rkb);
+
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Join %d broker thread(s)",
+ rd_list_cnt(&wait_thrds));
+
+ /* Join broker threads */
+ RD_LIST_FOREACH(thrd, &wait_thrds, i) {
+ int res;
+ if (thrd_join(*thrd, &res) != thrd_success)
+ ;
+ rd_free(thrd);
+ }
+
+ rd_list_destroy(&wait_thrds);
+
+ /* Destroy mock cluster */
+ if (rk->rk_mock.cluster)
+ rd_kafka_mock_cluster_destroy(rk->rk_mock.cluster);
+
+ if (rd_atomic32_get(&rk->rk_mock.cluster_cnt) > 0) {
+ rd_kafka_log(rk, LOG_EMERG, "MOCK",
+ "%d mock cluster(s) still active: "
+ "must be explicitly destroyed with "
+ "rd_kafka_mock_cluster_destroy() prior to "
+ "terminating the rd_kafka_t instance",
+ (int)rd_atomic32_get(&rk->rk_mock.cluster_cnt));
+ rd_assert(!*"All mock clusters must be destroyed prior to "
+ "rd_kafka_t destroy");
+ }
+
+ /* Destroy metadata cache */
+ rd_kafka_wrlock(rk);
+ rd_kafka_metadata_cache_destroy(rk);
+ rd_kafka_wrunlock(rk);
+}
+
+/**
+ * @brief Buffer state for stats emitter
+ */
+struct _stats_emit {
+ char *buf; /* Pointer to allocated buffer */
+ size_t size; /* Current allocated size of buf */
+ size_t of; /* Current write-offset in buf */
+};
+
+
+/* Stats buffer printf. Requires a (struct _stats_emit *)st variable in the
+ * current scope. */
+#define _st_printf(...) \
+ do { \
+ ssize_t _r; \
+ ssize_t _rem = st->size - st->of; \
+ _r = rd_snprintf(st->buf + st->of, _rem, __VA_ARGS__); \
+ if (_r >= _rem) { \
+ st->size *= 2; \
+ _rem = st->size - st->of; \
+ st->buf = rd_realloc(st->buf, st->size); \
+ _r = rd_snprintf(st->buf + st->of, _rem, __VA_ARGS__); \
+ } \
+ st->of += _r; \
+ } while (0)
+
+struct _stats_total {
+ int64_t tx; /**< broker.tx */
+ int64_t tx_bytes; /**< broker.tx_bytes */
+ int64_t rx; /**< broker.rx */
+ int64_t rx_bytes; /**< broker.rx_bytes */
+ int64_t txmsgs; /**< partition.txmsgs */
+ int64_t txmsg_bytes; /**< partition.txbytes */
+ int64_t rxmsgs; /**< partition.rxmsgs */
+ int64_t rxmsg_bytes; /**< partition.rxbytes */
+};
+
+
+
+/**
+ * @brief Rollover and emit an average window.
+ */
+static RD_INLINE void rd_kafka_stats_emit_avg(struct _stats_emit *st,
+ const char *name,
+ rd_avg_t *src_avg) {
+ rd_avg_t avg;
+
+ rd_avg_rollover(&avg, src_avg);
+ _st_printf(
+ "\"%s\": {"
+ " \"min\":%" PRId64
+ ","
+ " \"max\":%" PRId64
+ ","
+ " \"avg\":%" PRId64
+ ","
+ " \"sum\":%" PRId64
+ ","
+ " \"stddev\": %" PRId64
+ ","
+ " \"p50\": %" PRId64
+ ","
+ " \"p75\": %" PRId64
+ ","
+ " \"p90\": %" PRId64
+ ","
+ " \"p95\": %" PRId64
+ ","
+ " \"p99\": %" PRId64
+ ","
+ " \"p99_99\": %" PRId64
+ ","
+ " \"outofrange\": %" PRId64
+ ","
+ " \"hdrsize\": %" PRId32
+ ","
+ " \"cnt\":%i "
+ "}, ",
+ name, avg.ra_v.minv, avg.ra_v.maxv, avg.ra_v.avg, avg.ra_v.sum,
+ (int64_t)avg.ra_hist.stddev, avg.ra_hist.p50, avg.ra_hist.p75,
+ avg.ra_hist.p90, avg.ra_hist.p95, avg.ra_hist.p99,
+ avg.ra_hist.p99_99, avg.ra_hist.oor, avg.ra_hist.hdrsize,
+ avg.ra_v.cnt);
+ rd_avg_destroy(&avg);
+}
+
+/**
+ * Emit stats for toppar
+ */
+static RD_INLINE void rd_kafka_stats_emit_toppar(struct _stats_emit *st,
+ struct _stats_total *total,
+ rd_kafka_toppar_t *rktp,
+ int first) {
+ rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
+ int64_t end_offset;
+ int64_t consumer_lag = -1;
+ int64_t consumer_lag_stored = -1;
+ struct offset_stats offs;
+ int32_t broker_id = -1;
+
+ rd_kafka_toppar_lock(rktp);
+
+ if (rktp->rktp_broker) {
+ rd_kafka_broker_lock(rktp->rktp_broker);
+ broker_id = rktp->rktp_broker->rkb_nodeid;
+ rd_kafka_broker_unlock(rktp->rktp_broker);
+ }
+
+ /* Grab a copy of the latest finalized offset stats */
+ offs = rktp->rktp_offsets_fin;
+
+ end_offset = (rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED)
+ ? rktp->rktp_ls_offset
+ : rktp->rktp_hi_offset;
+
+ /* Calculate consumer_lag by using the highest offset
+ * of stored_offset (the last message passed to application + 1, or
+ * if enable.auto.offset.store=false the last message manually stored),
+ * or the committed_offset (the last message committed by this or
+ * another consumer).
+ * Using stored_offset allows consumer_lag to be up to date even if
+ * offsets are not (yet) committed.
+ */
+ if (end_offset != RD_KAFKA_OFFSET_INVALID) {
+ if (rktp->rktp_stored_pos.offset >= 0 &&
+ rktp->rktp_stored_pos.offset <= end_offset)
+ consumer_lag_stored =
+ end_offset - rktp->rktp_stored_pos.offset;
+ if (rktp->rktp_committed_pos.offset >= 0 &&
+ rktp->rktp_committed_pos.offset <= end_offset)
+ consumer_lag =
+ end_offset - rktp->rktp_committed_pos.offset;
+ }
+
+ _st_printf(
+ "%s\"%" PRId32
+ "\": { "
+ "\"partition\":%" PRId32
+ ", "
+ "\"broker\":%" PRId32
+ ", "
+ "\"leader\":%" PRId32
+ ", "
+ "\"desired\":%s, "
+ "\"unknown\":%s, "
+ "\"msgq_cnt\":%i, "
+ "\"msgq_bytes\":%" PRIusz
+ ", "
+ "\"xmit_msgq_cnt\":%i, "
+ "\"xmit_msgq_bytes\":%" PRIusz
+ ", "
+ "\"fetchq_cnt\":%i, "
+ "\"fetchq_size\":%" PRIu64
+ ", "
+ "\"fetch_state\":\"%s\", "
+ "\"query_offset\":%" PRId64
+ ", "
+ "\"next_offset\":%" PRId64
+ ", "
+ "\"app_offset\":%" PRId64
+ ", "
+ "\"stored_offset\":%" PRId64
+ ", "
+ "\"stored_leader_epoch\":%" PRId32
+ ", "
+ "\"commited_offset\":%" PRId64
+ ", " /*FIXME: issue #80 */
+ "\"committed_offset\":%" PRId64
+ ", "
+ "\"committed_leader_epoch\":%" PRId32
+ ", "
+ "\"eof_offset\":%" PRId64
+ ", "
+ "\"lo_offset\":%" PRId64
+ ", "
+ "\"hi_offset\":%" PRId64
+ ", "
+ "\"ls_offset\":%" PRId64
+ ", "
+ "\"consumer_lag\":%" PRId64
+ ", "
+ "\"consumer_lag_stored\":%" PRId64
+ ", "
+ "\"leader_epoch\":%" PRId32
+ ", "
+ "\"txmsgs\":%" PRIu64
+ ", "
+ "\"txbytes\":%" PRIu64
+ ", "
+ "\"rxmsgs\":%" PRIu64
+ ", "
+ "\"rxbytes\":%" PRIu64
+ ", "
+ "\"msgs\": %" PRIu64
+ ", "
+ "\"rx_ver_drops\": %" PRIu64
+ ", "
+ "\"msgs_inflight\": %" PRId32
+ ", "
+ "\"next_ack_seq\": %" PRId32
+ ", "
+ "\"next_err_seq\": %" PRId32
+ ", "
+ "\"acked_msgid\": %" PRIu64 "} ",
+ first ? "" : ", ", rktp->rktp_partition, rktp->rktp_partition,
+ broker_id, rktp->rktp_leader_id,
+ (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) ? "true" : "false",
+ (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) ? "true" : "false",
+ rd_kafka_msgq_len(&rktp->rktp_msgq),
+ rd_kafka_msgq_size(&rktp->rktp_msgq),
+ /* FIXME: xmit_msgq is local to the broker thread. */
+ 0, (size_t)0, rd_kafka_q_len(rktp->rktp_fetchq),
+ rd_kafka_q_size(rktp->rktp_fetchq),
+ rd_kafka_fetch_states[rktp->rktp_fetch_state],
+ rktp->rktp_query_pos.offset, offs.fetch_pos.offset,
+ rktp->rktp_app_pos.offset, rktp->rktp_stored_pos.offset,
+ rktp->rktp_stored_pos.leader_epoch,
+ rktp->rktp_committed_pos.offset, /* FIXME: issue #80 */
+ rktp->rktp_committed_pos.offset,
+ rktp->rktp_committed_pos.leader_epoch, offs.eof_offset,
+ rktp->rktp_lo_offset, rktp->rktp_hi_offset, rktp->rktp_ls_offset,
+ consumer_lag, consumer_lag_stored, rktp->rktp_leader_epoch,
+ rd_atomic64_get(&rktp->rktp_c.tx_msgs),
+ rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes),
+ rd_atomic64_get(&rktp->rktp_c.rx_msgs),
+ rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes),
+ rk->rk_type == RD_KAFKA_PRODUCER
+ ? rd_atomic64_get(&rktp->rktp_c.producer_enq_msgs)
+ : rd_atomic64_get(
+ &rktp->rktp_c.rx_msgs), /* legacy, same as rx_msgs */
+ rd_atomic64_get(&rktp->rktp_c.rx_ver_drops),
+ rd_atomic32_get(&rktp->rktp_msgs_inflight),
+ rktp->rktp_eos.next_ack_seq, rktp->rktp_eos.next_err_seq,
+ rktp->rktp_eos.acked_msgid);
+
+ if (total) {
+ total->txmsgs += rd_atomic64_get(&rktp->rktp_c.tx_msgs);
+ total->txmsg_bytes +=
+ rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes);
+ total->rxmsgs += rd_atomic64_get(&rktp->rktp_c.rx_msgs);
+ total->rxmsg_bytes +=
+ rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes);
+ }
+
+ rd_kafka_toppar_unlock(rktp);
+}
+
+/**
+ * @brief Emit broker request type stats
+ */
+static void rd_kafka_stats_emit_broker_reqs(struct _stats_emit *st,
+ rd_kafka_broker_t *rkb) {
+ /* Filter out request types that will never be sent by the client. */
+ static const rd_bool_t filter[4][RD_KAFKAP__NUM] = {
+ [RD_KAFKA_PRODUCER] = {[RD_KAFKAP_Fetch] = rd_true,
+ [RD_KAFKAP_OffsetCommit] = rd_true,
+ [RD_KAFKAP_OffsetFetch] = rd_true,
+ [RD_KAFKAP_JoinGroup] = rd_true,
+ [RD_KAFKAP_Heartbeat] = rd_true,
+ [RD_KAFKAP_LeaveGroup] = rd_true,
+ [RD_KAFKAP_SyncGroup] = rd_true},
+ [RD_KAFKA_CONSUMER] =
+ {
+ [RD_KAFKAP_Produce] = rd_true,
+ [RD_KAFKAP_InitProducerId] = rd_true,
+ /* Transactional producer */
+ [RD_KAFKAP_AddPartitionsToTxn] = rd_true,
+ [RD_KAFKAP_AddOffsetsToTxn] = rd_true,
+ [RD_KAFKAP_EndTxn] = rd_true,
+ [RD_KAFKAP_TxnOffsetCommit] = rd_true,
+ },
+ [2 /*any client type*/] =
+ {
+ [RD_KAFKAP_UpdateMetadata] = rd_true,
+ [RD_KAFKAP_ControlledShutdown] = rd_true,
+ [RD_KAFKAP_LeaderAndIsr] = rd_true,
+ [RD_KAFKAP_StopReplica] = rd_true,
+ [RD_KAFKAP_OffsetForLeaderEpoch] = rd_true,
+
+ [RD_KAFKAP_WriteTxnMarkers] = rd_true,
+
+ [RD_KAFKAP_AlterReplicaLogDirs] = rd_true,
+ [RD_KAFKAP_DescribeLogDirs] = rd_true,
+
+ [RD_KAFKAP_CreateDelegationToken] = rd_true,
+ [RD_KAFKAP_RenewDelegationToken] = rd_true,
+ [RD_KAFKAP_ExpireDelegationToken] = rd_true,
+ [RD_KAFKAP_DescribeDelegationToken] = rd_true,
+ [RD_KAFKAP_IncrementalAlterConfigs] = rd_true,
+ [RD_KAFKAP_ElectLeaders] = rd_true,
+ [RD_KAFKAP_AlterPartitionReassignments] = rd_true,
+ [RD_KAFKAP_ListPartitionReassignments] = rd_true,
+ [RD_KAFKAP_AlterUserScramCredentials] = rd_true,
+ [RD_KAFKAP_Vote] = rd_true,
+ [RD_KAFKAP_BeginQuorumEpoch] = rd_true,
+ [RD_KAFKAP_EndQuorumEpoch] = rd_true,
+ [RD_KAFKAP_DescribeQuorum] = rd_true,
+ [RD_KAFKAP_AlterIsr] = rd_true,
+ [RD_KAFKAP_UpdateFeatures] = rd_true,
+ [RD_KAFKAP_Envelope] = rd_true,
+ [RD_KAFKAP_FetchSnapshot] = rd_true,
+ [RD_KAFKAP_BrokerHeartbeat] = rd_true,
+ [RD_KAFKAP_UnregisterBroker] = rd_true,
+ [RD_KAFKAP_AllocateProducerIds] = rd_true,
+ },
+ [3 /*hide-unless-non-zero*/] = {
+ /* Hide Admin requests unless they've been used */
+ [RD_KAFKAP_CreateTopics] = rd_true,
+ [RD_KAFKAP_DeleteTopics] = rd_true,
+ [RD_KAFKAP_DeleteRecords] = rd_true,
+ [RD_KAFKAP_CreatePartitions] = rd_true,
+ [RD_KAFKAP_DescribeAcls] = rd_true,
+ [RD_KAFKAP_CreateAcls] = rd_true,
+ [RD_KAFKAP_DeleteAcls] = rd_true,
+ [RD_KAFKAP_DescribeConfigs] = rd_true,
+ [RD_KAFKAP_AlterConfigs] = rd_true,
+ [RD_KAFKAP_DeleteGroups] = rd_true,
+ [RD_KAFKAP_ListGroups] = rd_true,
+ [RD_KAFKAP_DescribeGroups] = rd_true,
+ [RD_KAFKAP_DescribeLogDirs] = rd_true,
+ [RD_KAFKAP_IncrementalAlterConfigs] = rd_true,
+ [RD_KAFKAP_AlterPartitionReassignments] = rd_true,
+ [RD_KAFKAP_ListPartitionReassignments] = rd_true,
+ [RD_KAFKAP_OffsetDelete] = rd_true,
+ [RD_KAFKAP_DescribeClientQuotas] = rd_true,
+ [RD_KAFKAP_AlterClientQuotas] = rd_true,
+ [RD_KAFKAP_DescribeUserScramCredentials] = rd_true,
+ [RD_KAFKAP_AlterUserScramCredentials] = rd_true,
+ }};
+ int i;
+ int cnt = 0;
+
+ _st_printf("\"req\": { ");
+ for (i = 0; i < RD_KAFKAP__NUM; i++) {
+ int64_t v;
+
+ if (filter[rkb->rkb_rk->rk_type][i] || filter[2][i])
+ continue;
+
+ v = rd_atomic64_get(&rkb->rkb_c.reqtype[i]);
+ if (!v && filter[3][i])
+ continue; /* Filter out zero values */
+
+ _st_printf("%s\"%s\": %" PRId64, cnt > 0 ? ", " : "",
+ rd_kafka_ApiKey2str(i), v);
+
+ cnt++;
+ }
+ _st_printf(" }, ");
+}
+
+
+/**
+ * Emit all statistics
+ */
+static void rd_kafka_stats_emit_all(rd_kafka_t *rk) {
+ rd_kafka_broker_t *rkb;
+ rd_kafka_topic_t *rkt;
+ rd_ts_t now;
+ rd_kafka_op_t *rko;
+ unsigned int tot_cnt;
+ size_t tot_size;
+ rd_kafka_resp_err_t err;
+ struct _stats_emit stx = {.size = 1024 * 10};
+ struct _stats_emit *st = &stx;
+ struct _stats_total total = {0};
+
+ st->buf = rd_malloc(st->size);
+
+
+ rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
+ rd_kafka_rdlock(rk);
+
+ now = rd_clock();
+ _st_printf(
+ "{ "
+ "\"name\": \"%s\", "
+ "\"client_id\": \"%s\", "
+ "\"type\": \"%s\", "
+ "\"ts\":%" PRId64
+ ", "
+ "\"time\":%lli, "
+ "\"age\":%" PRId64
+ ", "
+ "\"replyq\":%i, "
+ "\"msg_cnt\":%u, "
+ "\"msg_size\":%" PRIusz
+ ", "
+ "\"msg_max\":%u, "
+ "\"msg_size_max\":%" PRIusz
+ ", "
+ "\"simple_cnt\":%i, "
+ "\"metadata_cache_cnt\":%i, "
+ "\"brokers\":{ " /*open brokers*/,
+ rk->rk_name, rk->rk_conf.client_id_str,
+ rd_kafka_type2str(rk->rk_type), now, (signed long long)time(NULL),
+ now - rk->rk_ts_created, rd_kafka_q_len(rk->rk_rep), tot_cnt,
+ tot_size, rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
+ rd_atomic32_get(&rk->rk_simple_cnt),
+ rk->rk_metadata_cache.rkmc_cnt);
+
+
+ TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
+ rd_kafka_toppar_t *rktp;
+ rd_ts_t txidle = -1, rxidle = -1;
+
+ rd_kafka_broker_lock(rkb);
+
+ if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP) {
+ /* Calculate tx and rx idle time in usecs */
+ txidle = rd_atomic64_get(&rkb->rkb_c.ts_send);
+ rxidle = rd_atomic64_get(&rkb->rkb_c.ts_recv);
+
+ if (txidle)
+ txidle = RD_MAX(now - txidle, 0);
+ else
+ txidle = -1;
+
+ if (rxidle)
+ rxidle = RD_MAX(now - rxidle, 0);
+ else
+ rxidle = -1;
+ }
+
+ _st_printf(
+ "%s\"%s\": { " /*open broker*/
+ "\"name\":\"%s\", "
+ "\"nodeid\":%" PRId32
+ ", "
+ "\"nodename\":\"%s\", "
+ "\"source\":\"%s\", "
+ "\"state\":\"%s\", "
+ "\"stateage\":%" PRId64
+ ", "
+ "\"outbuf_cnt\":%i, "
+ "\"outbuf_msg_cnt\":%i, "
+ "\"waitresp_cnt\":%i, "
+ "\"waitresp_msg_cnt\":%i, "
+ "\"tx\":%" PRIu64
+ ", "
+ "\"txbytes\":%" PRIu64
+ ", "
+ "\"txerrs\":%" PRIu64
+ ", "
+ "\"txretries\":%" PRIu64
+ ", "
+ "\"txidle\":%" PRId64
+ ", "
+ "\"req_timeouts\":%" PRIu64
+ ", "
+ "\"rx\":%" PRIu64
+ ", "
+ "\"rxbytes\":%" PRIu64
+ ", "
+ "\"rxerrs\":%" PRIu64
+ ", "
+ "\"rxcorriderrs\":%" PRIu64
+ ", "
+ "\"rxpartial\":%" PRIu64
+ ", "
+ "\"rxidle\":%" PRId64
+ ", "
+ "\"zbuf_grow\":%" PRIu64
+ ", "
+ "\"buf_grow\":%" PRIu64
+ ", "
+ "\"wakeups\":%" PRIu64
+ ", "
+ "\"connects\":%" PRId32
+ ", "
+ "\"disconnects\":%" PRId32 ", ",
+ rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
+ rkb->rkb_name, rkb->rkb_name, rkb->rkb_nodeid,
+ rkb->rkb_nodename, rd_kafka_confsource2str(rkb->rkb_source),
+ rd_kafka_broker_state_names[rkb->rkb_state],
+ rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
+ rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
+ rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
+ rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
+ rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
+ rd_atomic64_get(&rkb->rkb_c.tx),
+ rd_atomic64_get(&rkb->rkb_c.tx_bytes),
+ rd_atomic64_get(&rkb->rkb_c.tx_err),
+ rd_atomic64_get(&rkb->rkb_c.tx_retries), txidle,
+ rd_atomic64_get(&rkb->rkb_c.req_timeouts),
+ rd_atomic64_get(&rkb->rkb_c.rx),
+ rd_atomic64_get(&rkb->rkb_c.rx_bytes),
+ rd_atomic64_get(&rkb->rkb_c.rx_err),
+ rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
+ rd_atomic64_get(&rkb->rkb_c.rx_partial), rxidle,
+ rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
+ rd_atomic64_get(&rkb->rkb_c.buf_grow),
+ rd_atomic64_get(&rkb->rkb_c.wakeups),
+ rd_atomic32_get(&rkb->rkb_c.connects),
+ rd_atomic32_get(&rkb->rkb_c.disconnects));
+
+ total.tx += rd_atomic64_get(&rkb->rkb_c.tx);
+ total.tx_bytes += rd_atomic64_get(&rkb->rkb_c.tx_bytes);
+ total.rx += rd_atomic64_get(&rkb->rkb_c.rx);
+ total.rx_bytes += rd_atomic64_get(&rkb->rkb_c.rx_bytes);
+
+ rd_kafka_stats_emit_avg(st, "int_latency",
+ &rkb->rkb_avg_int_latency);
+ rd_kafka_stats_emit_avg(st, "outbuf_latency",
+ &rkb->rkb_avg_outbuf_latency);
+ rd_kafka_stats_emit_avg(st, "rtt", &rkb->rkb_avg_rtt);
+ rd_kafka_stats_emit_avg(st, "throttle", &rkb->rkb_avg_throttle);
+
+ rd_kafka_stats_emit_broker_reqs(st, rkb);
+
+ _st_printf("\"toppars\":{ " /*open toppars*/);
+
+ TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
+ _st_printf(
+ "%s\"%.*s-%" PRId32
+ "\": { "
+ "\"topic\":\"%.*s\", "
+ "\"partition\":%" PRId32 "} ",
+ rktp == TAILQ_FIRST(&rkb->rkb_toppars) ? "" : ", ",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition);
+ }
+
+ rd_kafka_broker_unlock(rkb);
+
+ _st_printf(
+ "} " /*close toppars*/
+ "} " /*close broker*/);
+ }
+
+
+ _st_printf(
+ "}, " /* close "brokers" array */
+ "\"topics\":{ ");
+
+ TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
+ rd_kafka_toppar_t *rktp;
+ int i, j;
+
+ rd_kafka_topic_rdlock(rkt);
+ _st_printf(
+ "%s\"%.*s\": { "
+ "\"topic\":\"%.*s\", "
+ "\"age\":%" PRId64
+ ", "
+ "\"metadata_age\":%" PRId64 ", ",
+ rkt == TAILQ_FIRST(&rk->rk_topics) ? "" : ", ",
+ RD_KAFKAP_STR_PR(rkt->rkt_topic),
+ RD_KAFKAP_STR_PR(rkt->rkt_topic),
+ (now - rkt->rkt_ts_create) / 1000,
+ rkt->rkt_ts_metadata ? (now - rkt->rkt_ts_metadata) / 1000
+ : 0);
+
+ rd_kafka_stats_emit_avg(st, "batchsize",
+ &rkt->rkt_avg_batchsize);
+ rd_kafka_stats_emit_avg(st, "batchcnt", &rkt->rkt_avg_batchcnt);
+
+ _st_printf("\"partitions\":{ " /*open partitions*/);
+
+ for (i = 0; i < rkt->rkt_partition_cnt; i++)
+ rd_kafka_stats_emit_toppar(st, &total, rkt->rkt_p[i],
+ i == 0);
+
+ RD_LIST_FOREACH(rktp, &rkt->rkt_desp, j)
+ rd_kafka_stats_emit_toppar(st, &total, rktp, i + j == 0);
+
+ i += j;
+
+ if (rkt->rkt_ua)
+ rd_kafka_stats_emit_toppar(st, NULL, rkt->rkt_ua,
+ i++ == 0);
+
+ rd_kafka_topic_rdunlock(rkt);
+
+ _st_printf(
+ "} " /*close partitions*/
+ "} " /*close topic*/);
+ }
+ _st_printf("} " /*close topics*/);
+
+ if (rk->rk_cgrp) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+ _st_printf(
+ ", \"cgrp\": { "
+ "\"state\": \"%s\", "
+ "\"stateage\": %" PRId64
+ ", "
+ "\"join_state\": \"%s\", "
+ "\"rebalance_age\": %" PRId64
+ ", "
+ "\"rebalance_cnt\": %d, "
+ "\"rebalance_reason\": \"%s\", "
+ "\"assignment_size\": %d }",
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rkcg->rkcg_ts_statechange
+ ? (now - rkcg->rkcg_ts_statechange) / 1000
+ : 0,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rkcg->rkcg_c.ts_rebalance
+ ? (now - rkcg->rkcg_c.ts_rebalance) / 1000
+ : 0,
+ rkcg->rkcg_c.rebalance_cnt, rkcg->rkcg_c.rebalance_reason,
+ rkcg->rkcg_c.assignment_size);
+ }
+
+ if (rd_kafka_is_idempotent(rk)) {
+ _st_printf(
+ ", \"eos\": { "
+ "\"idemp_state\": \"%s\", "
+ "\"idemp_stateage\": %" PRId64
+ ", "
+ "\"txn_state\": \"%s\", "
+ "\"txn_stateage\": %" PRId64
+ ", "
+ "\"txn_may_enq\": %s, "
+ "\"producer_id\": %" PRId64
+ ", "
+ "\"producer_epoch\": %hd, "
+ "\"epoch_cnt\": %d "
+ "}",
+ rd_kafka_idemp_state2str(rk->rk_eos.idemp_state),
+ (now - rk->rk_eos.ts_idemp_state) / 1000,
+ rd_kafka_txn_state2str(rk->rk_eos.txn_state),
+ (now - rk->rk_eos.ts_txn_state) / 1000,
+ rd_atomic32_get(&rk->rk_eos.txn_may_enq) ? "true" : "false",
+ rk->rk_eos.pid.id, rk->rk_eos.pid.epoch,
+ rk->rk_eos.epoch_cnt);
+ }
+
+ if ((err = rd_atomic32_get(&rk->rk_fatal.err)))
+ _st_printf(
+ ", \"fatal\": { "
+ "\"error\": \"%s\", "
+ "\"reason\": \"%s\", "
+ "\"cnt\": %d "
+ "}",
+ rd_kafka_err2str(err), rk->rk_fatal.errstr,
+ rk->rk_fatal.cnt);
+
+ rd_kafka_rdunlock(rk);
+
+ /* Total counters */
+ _st_printf(
+ ", "
+ "\"tx\":%" PRId64
+ ", "
+ "\"tx_bytes\":%" PRId64
+ ", "
+ "\"rx\":%" PRId64
+ ", "
+ "\"rx_bytes\":%" PRId64
+ ", "
+ "\"txmsgs\":%" PRId64
+ ", "
+ "\"txmsg_bytes\":%" PRId64
+ ", "
+ "\"rxmsgs\":%" PRId64
+ ", "
+ "\"rxmsg_bytes\":%" PRId64,
+ total.tx, total.tx_bytes, total.rx, total.rx_bytes, total.txmsgs,
+ total.txmsg_bytes, total.rxmsgs, total.rxmsg_bytes);
+
+ _st_printf("}" /*close object*/);
+
+
+ /* Enqueue op for application */
+ rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
+ rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
+ rko->rko_u.stats.json = st->buf;
+ rko->rko_u.stats.json_len = st->of;
+ rd_kafka_q_enq(rk->rk_rep, rko);
+}
+
+
+/**
+ * @brief 1 second generic timer.
+ *
+ * @locality rdkafka main thread
+ * @locks none
+ */
+static void rd_kafka_1s_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
+ rd_kafka_t *rk = rkts->rkts_rk;
+
+ /* Scan topic state, message timeouts, etc. */
+ rd_kafka_topic_scan_all(rk, rd_clock());
+
+ /* Sparse connections:
+ * try to maintain at least one connection to the cluster. */
+ if (rk->rk_conf.sparse_connections &&
+ rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
+ rd_kafka_connect_any(rk, "no cluster connection");
+
+ rd_kafka_coord_cache_expire(&rk->rk_coord_cache);
+}
+
+static void rd_kafka_stats_emit_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
+ rd_kafka_t *rk = rkts->rkts_rk;
+ rd_kafka_stats_emit_all(rk);
+}
+
+
+/**
+ * @brief Periodic metadata refresh callback
+ *
+ * @locality rdkafka main thread
+ */
+static void rd_kafka_metadata_refresh_cb(rd_kafka_timers_t *rkts, void *arg) {
+ rd_kafka_t *rk = rkts->rkts_rk;
+ rd_kafka_resp_err_t err;
+
+ /* High-level consumer:
+ * We need to query both locally known topics and subscribed topics
+ * so that we can detect locally known topics changing partition
+ * count or disappearing, as well as detect previously non-existent
+ * subscribed topics now being available in the cluster. */
+ if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
+ err = rd_kafka_metadata_refresh_consumer_topics(
+ rk, NULL, "periodic topic and broker list refresh");
+ else
+ err = rd_kafka_metadata_refresh_known_topics(
+ rk, NULL, rd_true /*force*/,
+ "periodic topic and broker list refresh");
+
+
+ if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC &&
+ rd_interval(&rk->rk_suppress.broker_metadata_refresh,
+ 10 * 1000 * 1000 /*10s*/, 0) > 0) {
+ /* If there are no (locally referenced) topics
+ * to query, refresh the broker list.
+ * This avoids getting idle-disconnected for clients
+ * that have not yet referenced a topic and makes
+ * sure such a client has an up to date broker list. */
+ rd_kafka_metadata_refresh_brokers(
+ rk, NULL, "periodic broker list refresh");
+ }
+}
+
+
+
+/**
+ * @brief Wait for background threads to initialize.
+ *
+ * @returns the number of background threads still not initialized.
+ *
+ * @locality app thread calling rd_kafka_new()
+ * @locks none
+ */
+static int rd_kafka_init_wait(rd_kafka_t *rk, int timeout_ms) {
+ struct timespec tspec;
+ int ret;
+
+ rd_timeout_init_timespec(&tspec, timeout_ms);
+
+ mtx_lock(&rk->rk_init_lock);
+ while (rk->rk_init_wait_cnt > 0 &&
+ cnd_timedwait_abs(&rk->rk_init_cnd, &rk->rk_init_lock, &tspec) ==
+ thrd_success)
+ ;
+ ret = rk->rk_init_wait_cnt;
+ mtx_unlock(&rk->rk_init_lock);
+
+ return ret;
+}
+
+
+/**
+ * Main loop for Kafka handler thread.
+ */
+static int rd_kafka_thread_main(void *arg) {
+ rd_kafka_t *rk = arg;
+ rd_kafka_timer_t tmr_1s = RD_ZERO_INIT;
+ rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
+ rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;
+
+ rd_kafka_set_thread_name("main");
+ rd_kafka_set_thread_sysname("rdk:main");
+
+ rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_MAIN);
+
+ (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
+
+ /* Acquire lock (which was held by thread creator during creation)
+ * to synchronise state. */
+ rd_kafka_wrlock(rk);
+ rd_kafka_wrunlock(rk);
+
+ /* 1 second timer for topic scan and connection checking. */
+ rd_kafka_timer_start(&rk->rk_timers, &tmr_1s, 1000000,
+ rd_kafka_1s_tmr_cb, NULL);
+ if (rk->rk_conf.stats_interval_ms)
+ rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
+ rk->rk_conf.stats_interval_ms * 1000ll,
+ rd_kafka_stats_emit_tmr_cb, NULL);
+ if (rk->rk_conf.metadata_refresh_interval_ms > 0)
+ rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
+ rk->rk_conf.metadata_refresh_interval_ms *
+ 1000ll,
+ rd_kafka_metadata_refresh_cb, NULL);
+
+ if (rk->rk_cgrp)
+ rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
+
+ if (rd_kafka_is_idempotent(rk))
+ rd_kafka_idemp_init(rk);
+
+ mtx_lock(&rk->rk_init_lock);
+ rk->rk_init_wait_cnt--;
+ cnd_broadcast(&rk->rk_init_cnd);
+ mtx_unlock(&rk->rk_init_lock);
+
+ while (likely(!rd_kafka_terminating(rk) || rd_kafka_q_len(rk->rk_ops) ||
+ (rk->rk_cgrp && (rk->rk_cgrp->rkcg_state !=
+ RD_KAFKA_CGRP_STATE_TERM)))) {
+ rd_ts_t sleeptime = rd_kafka_timers_next(
+ &rk->rk_timers, 1000 * 1000 /*1s*/, 1 /*lock*/);
+ rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
+ RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
+ if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
+ rd_kafka_cgrp_serve(rk->rk_cgrp);
+ rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
+ }
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Internal main thread terminating");
+
+ if (rd_kafka_is_idempotent(rk))
+ rd_kafka_idemp_term(rk);
+
+ rd_kafka_q_disable(rk->rk_ops);
+ rd_kafka_q_purge(rk->rk_ops);
+
+ rd_kafka_timer_stop(&rk->rk_timers, &tmr_1s, 1);
+ if (rk->rk_conf.stats_interval_ms)
+ rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
+ rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);
+
+ /* Synchronise state */
+ rd_kafka_wrlock(rk);
+ rd_kafka_wrunlock(rk);
+
+ rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_MAIN);
+
+ rd_kafka_destroy_internal(rk);
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Internal main thread termination done");
+
+ rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
+
+ return 0;
+}
+
+
+void rd_kafka_term_sig_handler(int sig) {
+ /* nop */
+}
+
+
+rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
+ rd_kafka_conf_t *app_conf,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_t *rk;
+ static rd_atomic32_t rkid;
+ rd_kafka_conf_t *conf;
+ rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ int ret_errno = 0;
+ const char *conf_err;
+#ifndef _WIN32
+ sigset_t newset, oldset;
+#endif
+ char builtin_features[128];
+ size_t bflen;
+
+ rd_kafka_global_init();
+
+ /* rd_kafka_new() takes ownership of the provided \p app_conf
+ * object if rd_kafka_new() succeeds.
+ * Since \p app_conf is optional we allocate a default configuration
+ * object here if \p app_conf is NULL.
+ * The configuration object itself is struct-copied later
+ * leaving the default *conf pointer to be ready for freeing.
+ * In case new() fails and app_conf was specified we will clear out
+ * rk_conf to avoid double-freeing from destroy_internal() and the
+ * user's eventual call to rd_kafka_conf_destroy().
+ * This is all a bit tricky but that's the nature of
+ * legacy interfaces. */
+ if (!app_conf)
+ conf = rd_kafka_conf_new();
+ else
+ conf = app_conf;
+
+ /* Verify and finalize configuration */
+ if ((conf_err = rd_kafka_conf_finalize(type, conf))) {
+ /* Incompatible configuration settings */
+ rd_snprintf(errstr, errstr_size, "%s", conf_err);
+ if (!app_conf)
+ rd_kafka_conf_destroy(conf);
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+ return NULL;
+ }
+
+
+ rd_kafka_global_cnt_incr();
+
+ /*
+ * Set up the handle.
+ */
+ rk = rd_calloc(1, sizeof(*rk));
+
+ rk->rk_type = type;
+ rk->rk_ts_created = rd_clock();
+
+ /* Struct-copy the config object. */
+ rk->rk_conf = *conf;
+ if (!app_conf)
+ rd_free(conf); /* Free the base config struct only,
+ * not its fields since they were copied to
+ * rk_conf just above. Those fields are
+ * freed from rd_kafka_destroy_internal()
+ * as the rk itself is destroyed. */
+
+ /* Seed PRNG, don't bother about HAVE_RAND_R, since it is pretty cheap.
+ */
+ if (rk->rk_conf.enable_random_seed)
+ call_once(&rd_kafka_global_srand_once, rd_kafka_global_srand);
+
+ /* Call on_new() interceptors */
+ rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
+
+ rwlock_init(&rk->rk_lock);
+ mtx_init(&rk->rk_conf.sasl.lock, mtx_plain);
+ mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
+
+ cnd_init(&rk->rk_broker_state_change_cnd);
+ mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
+ rd_list_init(&rk->rk_broker_state_change_waiters, 8,
+ rd_kafka_enq_once_trigger_destroy);
+
+ cnd_init(&rk->rk_init_cnd);
+ mtx_init(&rk->rk_init_lock, mtx_plain);
+
+ rd_interval_init(&rk->rk_suppress.no_idemp_brokers);
+ rd_interval_init(&rk->rk_suppress.broker_metadata_refresh);
+ rd_interval_init(&rk->rk_suppress.sparse_connect_random);
+ mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain);
+
+ rd_atomic64_init(&rk->rk_ts_last_poll, rk->rk_ts_created);
+ rd_atomic32_init(&rk->rk_flushing, 0);
+
+ rk->rk_rep = rd_kafka_q_new(rk);
+ rk->rk_ops = rd_kafka_q_new(rk);
+ rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
+ rk->rk_ops->rkq_opaque = rk;
+
+ if (rk->rk_conf.log_queue) {
+ rk->rk_logq = rd_kafka_q_new(rk);
+ rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
+ rk->rk_logq->rkq_opaque = rk;
+ }
+
+ TAILQ_INIT(&rk->rk_brokers);
+ TAILQ_INIT(&rk->rk_topics);
+ rd_kafka_timers_init(&rk->rk_timers, rk, rk->rk_ops);
+ rd_kafka_metadata_cache_init(rk);
+ rd_kafka_coord_cache_init(&rk->rk_coord_cache,
+ rk->rk_conf.metadata_max_age_ms);
+ rd_kafka_coord_reqs_init(rk);
+
+ if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
+ rk->rk_drmode = RD_KAFKA_DR_MODE_CB;
+ else if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR)
+ rk->rk_drmode = RD_KAFKA_DR_MODE_EVENT;
+ else
+ rk->rk_drmode = RD_KAFKA_DR_MODE_NONE;
+ if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE)
+ rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
+
+ if (rk->rk_conf.rebalance_cb)
+ rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
+ if (rk->rk_conf.offset_commit_cb)
+ rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
+ if (rk->rk_conf.error_cb)
+ rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR;
+#if WITH_SASL_OAUTHBEARER
+ if (rk->rk_conf.sasl.enable_oauthbearer_unsecure_jwt &&
+ !rk->rk_conf.sasl.oauthbearer.token_refresh_cb)
+ rd_kafka_conf_set_oauthbearer_token_refresh_cb(
+ &rk->rk_conf, rd_kafka_oauthbearer_unsecured_token);
+
+ if (rk->rk_conf.sasl.oauthbearer.token_refresh_cb &&
+ rk->rk_conf.sasl.oauthbearer.method !=
+ RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC)
+ rk->rk_conf.enabled_events |=
+ RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH;
+#endif
+
+#if WITH_OAUTHBEARER_OIDC
+ if (rk->rk_conf.sasl.oauthbearer.method ==
+ RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC &&
+ !rk->rk_conf.sasl.oauthbearer.token_refresh_cb)
+ rd_kafka_conf_set_oauthbearer_token_refresh_cb(
+ &rk->rk_conf, rd_kafka_oidc_token_refresh_cb);
+#endif
+
+ rk->rk_controllerid = -1;
+
+ /* Admin client defaults */
+ rk->rk_conf.admin.request_timeout_ms = rk->rk_conf.socket_timeout_ms;
+
+ if (rk->rk_conf.debug)
+ rk->rk_conf.log_level = LOG_DEBUG;
+
+ rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
+ rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
+ rd_atomic32_add(&rkid, 1));
+
+ /* Construct clientid kafka string */
+ rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str, -1);
+
+ /* Convert group.id to kafka string (may be NULL) */
+ rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str, -1);
+
+ /* Config fixups */
+ rk->rk_conf.queued_max_msg_bytes =
+ (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;
+
+ /* Enable api.version.request=true if fallback.broker.version
+ * indicates a supporting broker. */
+ if (rd_kafka_ApiVersion_is_queryable(
+ rk->rk_conf.broker_version_fallback))
+ rk->rk_conf.api_version_request = 1;
+
+ if (rk->rk_type == RD_KAFKA_PRODUCER) {
+ mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
+ cnd_init(&rk->rk_curr_msgs.cnd);
+ rk->rk_curr_msgs.max_cnt = rk->rk_conf.queue_buffering_max_msgs;
+ if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes *
+ 1024 >
+ (unsigned long long)SIZE_MAX) {
+ rk->rk_curr_msgs.max_size = SIZE_MAX;
+ rd_kafka_log(rk, LOG_WARNING, "QUEUESIZE",
+ "queue.buffering.max.kbytes adjusted "
+ "to system SIZE_MAX limit %" PRIusz
+ " bytes",
+ rk->rk_curr_msgs.max_size);
+ } else {
+ rk->rk_curr_msgs.max_size =
+ (size_t)rk->rk_conf.queue_buffering_max_kbytes *
+ 1024;
+ }
+ }
+
+ if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
+ ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+ ret_errno = EINVAL;
+ goto fail;
+ }
+
+ /* Create Mock cluster */
+ rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0);
+ if (rk->rk_conf.mock.broker_cnt > 0) {
+ const char *mock_bootstraps;
+ rk->rk_mock.cluster =
+ rd_kafka_mock_cluster_new(rk, rk->rk_conf.mock.broker_cnt);
+
+ if (!rk->rk_mock.cluster) {
+ rd_snprintf(errstr, errstr_size,
+ "Failed to create mock cluster, see logs");
+ ret_err = RD_KAFKA_RESP_ERR__FAIL;
+ ret_errno = EINVAL;
+ goto fail;
+ }
+
+ mock_bootstraps =
+ rd_kafka_mock_cluster_bootstraps(rk->rk_mock.cluster),
+ rd_kafka_log(rk, LOG_NOTICE, "MOCK",
+ "Mock cluster enabled: "
+ "original bootstrap.servers and security.protocol "
+ "ignored and replaced with %s",
+ mock_bootstraps);
+
+ /* Overwrite bootstrap.servers and connection settings */
+ if (rd_kafka_conf_set(&rk->rk_conf, "bootstrap.servers",
+ mock_bootstraps, NULL,
+ 0) != RD_KAFKA_CONF_OK)
+ rd_assert(!"failed to replace mock bootstrap.servers");
+
+ if (rd_kafka_conf_set(&rk->rk_conf, "security.protocol",
+ "plaintext", NULL, 0) != RD_KAFKA_CONF_OK)
+ rd_assert(!"failed to reset mock security.protocol");
+
+ rk->rk_conf.security_protocol = RD_KAFKA_PROTO_PLAINTEXT;
+
+ /* Apply default RTT to brokers */
+ if (rk->rk_conf.mock.broker_rtt)
+ rd_kafka_mock_broker_set_rtt(
+ rk->rk_mock.cluster, -1 /*all brokers*/,
+ rk->rk_conf.mock.broker_rtt);
+ }
+
+ if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
+ rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
+ /* Select SASL provider */
+ if (rd_kafka_sasl_select_provider(rk, errstr, errstr_size) ==
+ -1) {
+ ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+ ret_errno = EINVAL;
+ goto fail;
+ }
+
+ /* Initialize SASL provider */
+ if (rd_kafka_sasl_init(rk, errstr, errstr_size) == -1) {
+ rk->rk_conf.sasl.provider = NULL;
+ ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+ ret_errno = EINVAL;
+ goto fail;
+ }
+ }
+
+#if WITH_SSL
+ if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
+ rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
+ /* Create SSL context */
+ if (rd_kafka_ssl_ctx_init(rk, errstr, errstr_size) == -1) {
+ ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+ ret_errno = EINVAL;
+ goto fail;
+ }
+ }
+#endif
+
+ if (type == RD_KAFKA_CONSUMER) {
+ rd_kafka_assignment_init(rk);
+
+ if (RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) {
+ /* Create consumer group handle */
+ rk->rk_cgrp = rd_kafka_cgrp_new(rk, rk->rk_group_id,
+ rk->rk_client_id);
+ rk->rk_consumer.q =
+ rd_kafka_q_keep(rk->rk_cgrp->rkcg_q);
+ } else {
+ /* Legacy consumer */
+ rk->rk_consumer.q = rd_kafka_q_keep(rk->rk_rep);
+ }
+
+ } else if (type == RD_KAFKA_PRODUCER) {
+ rk->rk_eos.transactional_id =
+ rd_kafkap_str_new(rk->rk_conf.eos.transactional_id, -1);
+ }
+
+#ifndef _WIN32
+ /* Block all signals in newly created threads.
+ * To avoid race condition we block all signals in the calling
+ * thread, which the new thread will inherit its sigmask from,
+ * and then restore the original sigmask of the calling thread when
+ * we're done creating the thread. */
+ sigemptyset(&oldset);
+ sigfillset(&newset);
+ if (rk->rk_conf.term_sig) {
+ struct sigaction sa_term = {.sa_handler =
+ rd_kafka_term_sig_handler};
+ sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
+ }
+ pthread_sigmask(SIG_SETMASK, &newset, &oldset);
+#endif
+
+ /* Create background thread and queue if background_event_cb()
+ * RD_KAFKA_EVENT_BACKGROUND has been enabled.
+ * Do this before creating the main thread since after
+ * the main thread is created it is no longer trivial to error
+ * out from rd_kafka_new(). */
+ if (rk->rk_conf.background_event_cb ||
+ (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_BACKGROUND)) {
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_wrlock(rk);
+ if (!rk->rk_background.q)
+ err = rd_kafka_background_thread_create(rk, errstr,
+ errstr_size);
+ rd_kafka_wrunlock(rk);
+ if (err)
+ goto fail;
+ }
+
+ /* Lock handle here to synchronise state, i.e., hold off
+ * the thread until we've finalized the handle. */
+ rd_kafka_wrlock(rk);
+
+ /* Create handler thread */
+ mtx_lock(&rk->rk_init_lock);
+ rk->rk_init_wait_cnt++;
+ if ((thrd_create(&rk->rk_thread, rd_kafka_thread_main, rk)) !=
+ thrd_success) {
+ rk->rk_init_wait_cnt--;
+ ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ ret_errno = errno;
+ if (errstr)
+ rd_snprintf(errstr, errstr_size,
+ "Failed to create thread: %s (%i)",
+ rd_strerror(errno), errno);
+ mtx_unlock(&rk->rk_init_lock);
+ rd_kafka_wrunlock(rk);
+#ifndef _WIN32
+ /* Restore sigmask of caller */
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+ goto fail;
+ }
+
+ mtx_unlock(&rk->rk_init_lock);
+ rd_kafka_wrunlock(rk);
+
+ /*
+ * @warning `goto fail` is prohibited past this point
+ */
+
+ mtx_lock(&rk->rk_internal_rkb_lock);
+ rk->rk_internal_rkb =
+ rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL, RD_KAFKA_PROTO_PLAINTEXT,
+ "", 0, RD_KAFKA_NODEID_UA);
+ mtx_unlock(&rk->rk_internal_rkb_lock);
+
+ /* Add initial list of brokers from configuration */
+ if (rk->rk_conf.brokerlist) {
+ if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
+ rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
+ "No brokers configured");
+ }
+
+#ifndef _WIN32
+ /* Restore sigmask of caller */
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+
+ /* Wait for background threads to fully initialize so that
+ * the client instance is fully functional at the time it is
+ * returned from the constructor. */
+ if (rd_kafka_init_wait(rk, 60 * 1000) != 0) {
+ /* This should never happen unless there is a bug
+ * or the OS is not scheduling the background threads.
+ * Either case there is no point in handling this gracefully
+ * in the current state since the thread joins are likely
+ * to hang as well. */
+ mtx_lock(&rk->rk_init_lock);
+ rd_kafka_log(rk, LOG_CRIT, "INIT",
+ "Failed to initialize %s: "
+ "%d background thread(s) did not initialize "
+ "within 60 seconds",
+ rk->rk_name, rk->rk_init_wait_cnt);
+ if (errstr)
+ rd_snprintf(errstr, errstr_size,
+ "Timed out waiting for "
+ "%d background thread(s) to initialize",
+ rk->rk_init_wait_cnt);
+ mtx_unlock(&rk->rk_init_lock);
+
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
+ EDEADLK);
+ return NULL;
+ }
+
+ rk->rk_initialized = 1;
+
+ bflen = sizeof(builtin_features);
+ if (rd_kafka_conf_get(&rk->rk_conf, "builtin.features",
+ builtin_features, &bflen) != RD_KAFKA_CONF_OK)
+ rd_snprintf(builtin_features, sizeof(builtin_features), "?");
+ rd_kafka_dbg(rk, ALL, "INIT",
+ "librdkafka v%s (0x%x) %s initialized "
+ "(builtin.features %s, %s, debug 0x%x)",
+ rd_kafka_version_str(), rd_kafka_version(), rk->rk_name,
+ builtin_features, BUILT_WITH, rk->rk_conf.debug);
+
+ /* Log warnings for deprecated configuration */
+ rd_kafka_conf_warn(rk);
+
+ /* Debug dump configuration */
+ if (rk->rk_conf.debug & RD_KAFKA_DBG_CONF) {
+ rd_kafka_anyconf_dump_dbg(rk, _RK_GLOBAL, &rk->rk_conf,
+ "Client configuration");
+ if (rk->rk_conf.topic_conf)
+ rd_kafka_anyconf_dump_dbg(
+ rk, _RK_TOPIC, rk->rk_conf.topic_conf,
+ "Default topic configuration");
+ }
+
+ /* Free user supplied conf's base pointer on success,
+ * but not the actual allocated fields since the struct
+ * will have been copied in its entirety above. */
+ if (app_conf)
+ rd_free(app_conf);
+ rd_kafka_set_last_error(0, 0);
+
+ return rk;
+
+fail:
+ /*
+ * Error out and clean up
+ */
+
+ /*
+ * Tell background thread to terminate and wait for it to return.
+ */
+ rd_atomic32_set(&rk->rk_terminate, RD_KAFKA_DESTROY_F_TERMINATE);
+
+ /* Terminate SASL provider */
+ if (rk->rk_conf.sasl.provider)
+ rd_kafka_sasl_term(rk);
+
+ if (rk->rk_background.thread) {
+ int res;
+ thrd_join(rk->rk_background.thread, &res);
+ rd_kafka_q_destroy_owner(rk->rk_background.q);
+ }
+
+ /* If on_new() interceptors have been called we also need
+ * to allow interceptor clean-up by calling on_destroy() */
+ rd_kafka_interceptors_on_destroy(rk);
+
+ /* If rk_conf is a struct-copy of the application configuration
+ * we need to avoid rk_conf fields from being freed from
+ * rd_kafka_destroy_internal() since they belong to app_conf.
+ * However, there are some internal fields, such as interceptors,
+ * that belong to rk_conf and thus needs to be cleaned up.
+ * Legacy APIs, sigh.. */
+ if (app_conf) {
+ rd_kafka_assignors_term(rk);
+ rd_kafka_interceptors_destroy(&rk->rk_conf);
+ memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
+ }
+
+ rd_kafka_destroy_internal(rk);
+ rd_kafka_destroy_final(rk);
+
+ rd_kafka_set_last_error(ret_err, ret_errno);
+
+ return NULL;
+}
+
+
+
+/**
+ * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
+ * friends) since it does not have an API for stopping the cgrp we will need to
+ * sort that out automatically in the background when all consumption
+ * has stopped.
+ *
+ * Returns 0 if a High level consumer is already instantiated
+ * which means a Simple consumer cannot co-operate with it, else 1.
+ *
+ * A rd_kafka_t handle can never migrate from simple to high-level, or
+ * vice versa, so we dont need a ..consumer_del().
+ */
+int rd_kafka_simple_consumer_add(rd_kafka_t *rk) {
+ if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
+ return 0;
+
+ return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
+}
+
+
+
+/**
+ * rktp fetch is split up in these parts:
+ * * application side:
+ * * broker side (handled by current leader broker thread for rktp):
+ * - the fetch state, initial offset, etc.
+ * - fetching messages, updating fetched offset, etc.
+ * - offset commits
+ *
+ * Communication between the two are:
+ * app side -> rdkafka main side: rktp_ops
+ * broker thread -> app side: rktp_fetchq
+ *
+ * There is no shared state between these threads, instead
+ * state is communicated through the two op queues, and state synchronization
+ * is performed by version barriers.
+ *
+ */
+
+static RD_UNUSED int rd_kafka_consume_start0(rd_kafka_topic_t *rkt,
+ int32_t partition,
+ int64_t offset,
+ rd_kafka_q_t *rkq) {
+ rd_kafka_toppar_t *rktp;
+
+ if (partition < 0) {
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return -1;
+ }
+
+ if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+ return -1;
+ }
+
+ rd_kafka_topic_wrlock(rkt);
+ rktp = rd_kafka_toppar_desired_add(rkt, partition);
+ rd_kafka_topic_wrunlock(rkt);
+
+ /* Verify offset */
+ if (offset == RD_KAFKA_OFFSET_BEGINNING ||
+ offset == RD_KAFKA_OFFSET_END ||
+ offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
+ /* logical offsets */
+
+ } else if (offset == RD_KAFKA_OFFSET_STORED) {
+ /* offset manager */
+
+ if (rkt->rkt_conf.offset_store_method ==
+ RD_KAFKA_OFFSET_METHOD_BROKER &&
+ RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
+ /* Broker based offsets require a group id. */
+ rd_kafka_toppar_destroy(rktp);
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ EINVAL);
+ return -1;
+ }
+
+ } else if (offset < 0) {
+ rd_kafka_toppar_destroy(rktp);
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+ return -1;
+ }
+
+ rd_kafka_toppar_op_fetch_start(rktp, RD_KAFKA_FETCH_POS(offset, -1),
+ rkq, RD_KAFKA_NO_REPLYQ);
+
+ rd_kafka_toppar_destroy(rktp);
+
+ rd_kafka_set_last_error(0, 0);
+ return 0;
+}
+
+
+
+int rd_kafka_consume_start(rd_kafka_topic_t *app_rkt,
+ int32_t partition,
+ int64_t offset) {
+ rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
+ rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
+ "Start consuming partition %" PRId32, partition);
+ return rd_kafka_consume_start0(rkt, partition, offset, NULL);
+}
+
+int rd_kafka_consume_start_queue(rd_kafka_topic_t *app_rkt,
+ int32_t partition,
+ int64_t offset,
+ rd_kafka_queue_t *rkqu) {
+ rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
+
+ return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
+}
+
+
+
+static RD_UNUSED int rd_kafka_consume_stop0(rd_kafka_toppar_t *rktp) {
+ rd_kafka_q_t *tmpq = NULL;
+ rd_kafka_resp_err_t err;
+
+ rd_kafka_topic_wrlock(rktp->rktp_rkt);
+ rd_kafka_toppar_lock(rktp);
+ rd_kafka_toppar_desired_del(rktp);
+ rd_kafka_toppar_unlock(rktp);
+ rd_kafka_topic_wrunlock(rktp->rktp_rkt);
+
+ tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);
+
+ rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));
+
+ /* Synchronisation: Wait for stop reply from broker thread */
+ err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
+ rd_kafka_q_destroy_owner(tmpq);
+
+ rd_kafka_set_last_error(err, err ? EINVAL : 0);
+
+ return err ? -1 : 0;
+}
+
+
+int rd_kafka_consume_stop(rd_kafka_topic_t *app_rkt, int32_t partition) {
+ rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
+ rd_kafka_toppar_t *rktp;
+ int r;
+
+ if (partition == RD_KAFKA_PARTITION_UA) {
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+ return -1;
+ }
+
+ rd_kafka_topic_wrlock(rkt);
+ if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
+ !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
+ rd_kafka_topic_wrunlock(rkt);
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return -1;
+ }
+ rd_kafka_topic_wrunlock(rkt);
+
+ r = rd_kafka_consume_stop0(rktp);
+ /* set_last_error() called by stop0() */
+
+ rd_kafka_toppar_destroy(rktp);
+
+ return r;
+}
+
+
+
+rd_kafka_resp_err_t rd_kafka_seek(rd_kafka_topic_t *app_rkt,
+ int32_t partition,
+ int64_t offset,
+ int timeout_ms) {
+ rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_q_t *tmpq = NULL;
+ rd_kafka_resp_err_t err;
+ rd_kafka_replyq_t replyq = RD_KAFKA_NO_REPLYQ;
+
+ /* FIXME: simple consumer check */
+
+ if (partition == RD_KAFKA_PARTITION_UA)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ rd_kafka_topic_rdlock(rkt);
+ if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
+ !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
+ rd_kafka_topic_rdunlock(rkt);
+ return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+ }
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (timeout_ms) {
+ tmpq = rd_kafka_q_new(rkt->rkt_rk);
+ replyq = RD_KAFKA_REPLYQ(tmpq, 0);
+ }
+
+ if ((err = rd_kafka_toppar_op_seek(rktp, RD_KAFKA_FETCH_POS(offset, -1),
+ replyq))) {
+ if (tmpq)
+ rd_kafka_q_destroy_owner(tmpq);
+ rd_kafka_toppar_destroy(rktp);
+ return err;
+ }
+
+ rd_kafka_toppar_destroy(rktp);
+
+ if (tmpq) {
+ err = rd_kafka_q_wait_result(tmpq, timeout_ms);
+ rd_kafka_q_destroy_owner(tmpq);
+ return err;
+ }
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+rd_kafka_error_t *
+rd_kafka_seek_partitions(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *partitions,
+ int timeout_ms) {
+ rd_kafka_q_t *tmpq = NULL;
+ rd_kafka_topic_partition_t *rktpar;
+ rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
+ int cnt = 0;
+
+ if (rk->rk_type != RD_KAFKA_CONSUMER)
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Must only be used on consumer instance");
+
+ if (!partitions || partitions->cnt == 0)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "partitions must be specified");
+
+ if (timeout_ms)
+ tmpq = rd_kafka_q_new(rk);
+
+ RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_resp_err_t err;
+
+ rktp = rd_kafka_toppar_get2(
+ rk, rktpar->topic, rktpar->partition,
+ rd_false /*no-ua-on-miss*/, rd_false /*no-create-on-miss*/);
+ if (!rktp) {
+ rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+ continue;
+ }
+
+ err = rd_kafka_toppar_op_seek(
+ rktp, rd_kafka_topic_partition_get_fetch_pos(rktpar),
+ RD_KAFKA_REPLYQ(tmpq, 0));
+ if (err) {
+ rktpar->err = err;
+ } else {
+ rktpar->err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
+ cnt++;
+ }
+
+ rd_kafka_toppar_destroy(rktp); /* refcnt from toppar_get2() */
+ }
+
+ if (!timeout_ms)
+ return NULL;
+
+
+ while (cnt > 0) {
+ rd_kafka_op_t *rko;
+
+ rko =
+ rd_kafka_q_pop(tmpq, rd_timeout_remains_us(abs_timeout), 0);
+ if (!rko) {
+ rd_kafka_q_destroy_owner(tmpq);
+
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "Timed out waiting for %d remaining partition "
+ "seek(s) to finish",
+ cnt);
+ }
+
+ if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) {
+ rd_kafka_q_destroy_owner(tmpq);
+ rd_kafka_op_destroy(rko);
+
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__DESTROY,
+ "Instance is terminating");
+ }
+
+ rd_assert(rko->rko_rktp);
+
+ rktpar = rd_kafka_topic_partition_list_find(
+ partitions, rko->rko_rktp->rktp_rkt->rkt_topic->str,
+ rko->rko_rktp->rktp_partition);
+ rd_assert(rktpar);
+
+ rktpar->err = rko->rko_err;
+
+ rd_kafka_op_destroy(rko);
+
+ cnt--;
+ }
+
+ rd_kafka_q_destroy_owner(tmpq);
+
+ return NULL;
+}
+
+
+
+static ssize_t rd_kafka_consume_batch0(rd_kafka_q_t *rkq,
+ int timeout_ms,
+ rd_kafka_message_t **rkmessages,
+ size_t rkmessages_size) {
+ /* Populate application's rkmessages array. */
+ return rd_kafka_q_serve_rkmessages(rkq, timeout_ms, rkmessages,
+ rkmessages_size);
+}
+
+
+ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *app_rkt,
+ int32_t partition,
+ int timeout_ms,
+ rd_kafka_message_t **rkmessages,
+ size_t rkmessages_size) {
+ rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
+ rd_kafka_toppar_t *rktp;
+ ssize_t cnt;
+
+ /* Get toppar */
+ rd_kafka_topic_rdlock(rkt);
+ rktp = rd_kafka_toppar_get(rkt, partition, 0 /*no ua on miss*/);
+ if (unlikely(!rktp))
+ rktp = rd_kafka_toppar_desired_get(rkt, partition);
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (unlikely(!rktp)) {
+ /* No such toppar known */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return -1;
+ }
+
+ /* Populate application's rkmessages array. */
+ cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
+ rkmessages, rkmessages_size);
+
+ rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
+
+ rd_kafka_set_last_error(0, 0);
+
+ return cnt;
+}
+
+ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu,
+ int timeout_ms,
+ rd_kafka_message_t **rkmessages,
+ size_t rkmessages_size) {
+ /* Populate application's rkmessages array. */
+ return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms, rkmessages,
+ rkmessages_size);
+}
+
+
+struct consume_ctx {
+ void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque);
+ void *opaque;
+};
+
+
+/**
+ * Trampoline for application's consume_cb()
+ */
+static rd_kafka_op_res_t rd_kafka_consume_cb(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ rd_kafka_q_cb_type_t cb_type,
+ void *opaque) {
+ struct consume_ctx *ctx = opaque;
+ rd_kafka_message_t *rkmessage;
+
+ if (unlikely(rd_kafka_op_version_outdated(rko, 0)) ||
+ rko->rko_type == RD_KAFKA_OP_BARRIER) {
+ rd_kafka_op_destroy(rko);
+ return RD_KAFKA_OP_RES_HANDLED;
+ }
+
+ rkmessage = rd_kafka_message_get(rko);
+
+ rd_kafka_fetch_op_app_prepare(rk, rko);
+
+ ctx->consume_cb(rkmessage, ctx->opaque);
+
+ rd_kafka_op_destroy(rko);
+
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+
+static rd_kafka_op_res_t rd_kafka_consume_callback0(
+ rd_kafka_q_t *rkq,
+ int timeout_ms,
+ int max_cnt,
+ void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque),
+ void *opaque) {
+ struct consume_ctx ctx = {.consume_cb = consume_cb, .opaque = opaque};
+ rd_kafka_op_res_t res;
+
+ if (timeout_ms)
+ rd_kafka_app_poll_blocking(rkq->rkq_rk);
+
+ res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, RD_KAFKA_Q_CB_RETURN,
+ rd_kafka_consume_cb, &ctx);
+
+ rd_kafka_app_polled(rkq->rkq_rk);
+
+ return res;
+}
+
+
+int rd_kafka_consume_callback(rd_kafka_topic_t *app_rkt,
+ int32_t partition,
+ int timeout_ms,
+ void (*consume_cb)(rd_kafka_message_t *rkmessage,
+ void *opaque),
+ void *opaque) {
+ rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
+ rd_kafka_toppar_t *rktp;
+ int r;
+
+ /* Get toppar */
+ rd_kafka_topic_rdlock(rkt);
+ rktp = rd_kafka_toppar_get(rkt, partition, 0 /*no ua on miss*/);
+ if (unlikely(!rktp))
+ rktp = rd_kafka_toppar_desired_get(rkt, partition);
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (unlikely(!rktp)) {
+ /* No such toppar known */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return -1;
+ }
+
+ r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
+ rkt->rkt_conf.consume_callback_max_msgs,
+ consume_cb, opaque);
+
+ rd_kafka_toppar_destroy(rktp);
+
+ rd_kafka_set_last_error(0, 0);
+
+ return r;
+}
+
+
+
+int rd_kafka_consume_callback_queue(
+ rd_kafka_queue_t *rkqu,
+ int timeout_ms,
+ void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque),
+ void *opaque) {
+ return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
+ consume_cb, opaque);
+}
+
+
+/**
+ * Serve queue 'rkq' and return one message.
+ * By serving the queue it will also call any registered callbacks
+ * registered for matching events, this includes consumer_cb()
+ * in which case no message will be returned.
+ */
+static rd_kafka_message_t *
+rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
+ rd_kafka_op_t *rko;
+ rd_kafka_message_t *rkmessage = NULL;
+ rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
+
+ if (timeout_ms)
+ rd_kafka_app_poll_blocking(rk);
+
+ rd_kafka_yield_thread = 0;
+ while ((
+ rko = rd_kafka_q_pop(rkq, rd_timeout_remains_us(abs_timeout), 0))) {
+ rd_kafka_op_res_t res;
+
+ res =
+ rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);
+
+ if (res == RD_KAFKA_OP_RES_PASS)
+ break;
+
+ if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
+ rd_kafka_yield_thread)) {
+ /* Callback called rd_kafka_yield(), we must
+ * stop dispatching the queue and return. */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, EINTR);
+ rd_kafka_app_polled(rk);
+ return NULL;
+ }
+
+ /* Message was handled by callback. */
+ continue;
+ }
+
+ if (!rko) {
+ /* Timeout reached with no op returned. */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
+ ETIMEDOUT);
+ rd_kafka_app_polled(rk);
+ return NULL;
+ }
+
+ rd_kafka_assert(rk, rko->rko_type == RD_KAFKA_OP_FETCH ||
+ rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
+
+ /* Get rkmessage from rko */
+ rkmessage = rd_kafka_message_get(rko);
+
+ /* Store offset, etc */
+ rd_kafka_fetch_op_app_prepare(rk, rko);
+
+ rd_kafka_set_last_error(0, 0);
+
+ rd_kafka_app_polled(rk);
+
+ return rkmessage;
+}
+
+rd_kafka_message_t *
+rd_kafka_consume(rd_kafka_topic_t *app_rkt, int32_t partition, int timeout_ms) {
+ rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_message_t *rkmessage;
+
+ rd_kafka_topic_rdlock(rkt);
+ rktp = rd_kafka_toppar_get(rkt, partition, 0 /*no ua on miss*/);
+ if (unlikely(!rktp))
+ rktp = rd_kafka_toppar_desired_get(rkt, partition);
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (unlikely(!rktp)) {
+ /* No such toppar known */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return NULL;
+ }
+
+ rkmessage =
+ rd_kafka_consume0(rkt->rkt_rk, rktp->rktp_fetchq, timeout_ms);
+
+ rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
+
+ return rkmessage;
+}
+
+
+rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu,
+ int timeout_ms) {
+ return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
+}
+
+
+
+rd_kafka_resp_err_t rd_kafka_poll_set_consumer(rd_kafka_t *rk) {
+ rd_kafka_cgrp_t *rkcg;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+ rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+rd_kafka_message_t *rd_kafka_consumer_poll(rd_kafka_t *rk, int timeout_ms) {
+ rd_kafka_cgrp_t *rkcg;
+
+ if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
+ rd_kafka_message_t *rkmessage = rd_kafka_message_new();
+ rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+ return rkmessage;
+ }
+
+ return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
+}
+
+
+/**
+ * @brief Consumer close.
+ *
+ * @param rkq The consumer group queue will be forwarded to this queue, which
+ * which must be served (rebalance events) by the application/caller
+ * until rd_kafka_consumer_closed() returns true.
+ * If the consumer is not in a joined state, no rebalance events
+ * will be emitted.
+ */
+static rd_kafka_error_t *rd_kafka_consumer_close_q(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq) {
+ rd_kafka_cgrp_t *rkcg;
+ rd_kafka_error_t *error = NULL;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
+ "Consume close called on non-group "
+ "consumer");
+
+ if (rd_atomic32_get(&rkcg->rkcg_terminated))
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__DESTROY,
+ "Consumer already closed");
+
+ /* If a fatal error has been raised and this is an
+ * explicit consumer_close() from the application we return
+ * a fatal error. Otherwise let the "silent" no_consumer_close
+ * logic be performed to clean up properly. */
+ if (!rd_kafka_destroy_flags_no_consumer_close(rk) &&
+ (error = rd_kafka_get_fatal_error(rk)))
+ return error;
+
+ rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLOSE",
+ "Closing consumer");
+
+ /* Redirect cgrp queue to the rebalance queue to make sure all posted
+ * ops (e.g., rebalance callbacks) are served by
+ * the application/caller. */
+ rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);
+
+ /* Tell cgrp subsystem to terminate. A TERMINATE op will be posted
+ * on the rkq when done. */
+ rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */
+
+ return error;
+}
+
+rd_kafka_error_t *rd_kafka_consumer_close_queue(rd_kafka_t *rk,
+ rd_kafka_queue_t *rkqu) {
+ if (!rkqu)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Queue must be specified");
+ return rd_kafka_consumer_close_q(rk, rkqu->rkqu_q);
+}
+
+rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk) {
+ rd_kafka_error_t *error;
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+ rd_kafka_q_t *rkq;
+
+ /* Create a temporary reply queue to handle the TERMINATE reply op. */
+ rkq = rd_kafka_q_new(rk);
+
+ /* Initiate the close (async) */
+ error = rd_kafka_consumer_close_q(rk, rkq);
+ if (error) {
+ err = rd_kafka_error_is_fatal(error)
+ ? RD_KAFKA_RESP_ERR__FATAL
+ : rd_kafka_error_code(error);
+ rd_kafka_error_destroy(error);
+ rd_kafka_q_destroy_owner(rkq);
+ return err;
+ }
+
+ /* Disable the queue if termination is immediate or the user
+ * does not want the blocking consumer_close() behaviour, this will
+ * cause any ops posted for this queue (such as rebalance) to
+ * be destroyed.
+ */
+ if (rd_kafka_destroy_flags_no_consumer_close(rk)) {
+ rd_kafka_dbg(rk, CONSUMER, "CLOSE",
+ "Disabling and purging temporary queue to quench "
+ "close events");
+ err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_q_disable(rkq);
+ /* Purge ops already enqueued */
+ rd_kafka_q_purge(rkq);
+ } else {
+ rd_kafka_op_t *rko;
+ rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Waiting for close events");
+ while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
+ rd_kafka_op_res_t res;
+ if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
+ RD_KAFKA_OP_TERMINATE) {
+ err = rko->rko_err;
+ rd_kafka_op_destroy(rko);
+ break;
+ }
+ /* Handle callbacks */
+ res = rd_kafka_poll_cb(rk, rkq, rko,
+ RD_KAFKA_Q_CB_RETURN, NULL);
+ if (res == RD_KAFKA_OP_RES_PASS)
+ rd_kafka_op_destroy(rko);
+ /* Ignore YIELD, we need to finish */
+ }
+ }
+
+ rd_kafka_q_destroy_owner(rkq);
+
+ if (err)
+ rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLOSE",
+ "Consumer closed with error: %s",
+ rd_kafka_err2str(err));
+ else
+ rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLOSE",
+ "Consumer closed");
+
+ return err;
+}
+
+
+int rd_kafka_consumer_closed(rd_kafka_t *rk) {
+ if (unlikely(!rk->rk_cgrp))
+ return 0;
+
+ return rd_atomic32_get(&rk->rk_cgrp->rkcg_terminated);
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_committed(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *partitions,
+ int timeout_ms) {
+ rd_kafka_q_t *rkq;
+ rd_kafka_resp_err_t err;
+ rd_kafka_cgrp_t *rkcg;
+ rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
+
+ if (!partitions)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+ /* Set default offsets. */
+ rd_kafka_topic_partition_list_reset_offsets(partitions,
+ RD_KAFKA_OFFSET_INVALID);
+
+ rkq = rd_kafka_q_new(rk);
+
+ do {
+ rd_kafka_op_t *rko;
+ int state_version = rd_kafka_brokers_get_state_version(rk);
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
+ rd_kafka_op_set_replyq(rko, rkq, NULL);
+
+ /* Issue #827
+ * Copy partition list to avoid use-after-free if we time out
+ * here, the app frees the list, and then cgrp starts
+ * processing the op. */
+ rko->rko_u.offset_fetch.partitions =
+ rd_kafka_topic_partition_list_copy(partitions);
+ rko->rko_u.offset_fetch.require_stable_offsets =
+ rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED;
+ rko->rko_u.offset_fetch.do_free = 1;
+
+ if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
+ err = RD_KAFKA_RESP_ERR__DESTROY;
+ break;
+ }
+
+ rko =
+ rd_kafka_q_pop(rkq, rd_timeout_remains_us(abs_timeout), 0);
+ if (rko) {
+ if (!(err = rko->rko_err))
+ rd_kafka_topic_partition_list_update(
+ partitions,
+ rko->rko_u.offset_fetch.partitions);
+ else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
+ err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
+ !rd_kafka_brokers_wait_state_change(
+ rk, state_version,
+ rd_timeout_remains(abs_timeout)))
+ err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+ rd_kafka_op_destroy(rko);
+ } else
+ err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+ } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
+ err == RD_KAFKA_RESP_ERR__WAIT_COORD);
+
+ rd_kafka_q_destroy_owner(rkq);
+
+ return err;
+}
+
+
+
+rd_kafka_resp_err_t
+rd_kafka_position(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions) {
+ int i;
+
+ for (i = 0; i < partitions->cnt; i++) {
+ rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
+ rd_kafka_toppar_t *rktp;
+
+ if (!(rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
+ rktpar->partition, 0, 1))) {
+ rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+ rktpar->offset = RD_KAFKA_OFFSET_INVALID;
+ continue;
+ }
+
+ rd_kafka_toppar_lock(rktp);
+ rd_kafka_topic_partition_set_from_fetch_pos(rktpar,
+ rktp->rktp_app_pos);
+ rd_kafka_toppar_unlock(rktp);
+ rd_kafka_toppar_destroy(rktp);
+
+ rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+struct _query_wmark_offsets_state {
+ rd_kafka_resp_err_t err;
+ const char *topic;
+ int32_t partition;
+ int64_t offsets[2];
+ int offidx; /* next offset to set from response */
+ rd_ts_t ts_end;
+ int state_version; /* Broker state version */
+};
+
+static void rd_kafka_query_wmark_offsets_resp_cb(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ struct _query_wmark_offsets_state *state;
+ rd_kafka_topic_partition_list_t *offsets;
+ rd_kafka_topic_partition_t *rktpar;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* 'state' has gone out of scope when query_watermark..()
+ * timed out and returned to the caller. */
+ return;
+ }
+
+ state = opaque;
+
+ offsets = rd_kafka_topic_partition_list_new(1);
+ err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request, offsets,
+ NULL);
+ if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
+ rd_kafka_topic_partition_list_destroy(offsets);
+ return; /* Retrying */
+ }
+
+ /* Retry if no broker connection is available yet. */
+ if (err == RD_KAFKA_RESP_ERR__TRANSPORT && rkb &&
+ rd_kafka_brokers_wait_state_change(
+ rkb->rkb_rk, state->state_version,
+ rd_timeout_remains(state->ts_end))) {
+ /* Retry */
+ state->state_version = rd_kafka_brokers_get_state_version(rk);
+ request->rkbuf_retries = 0;
+ if (rd_kafka_buf_retry(rkb, request)) {
+ rd_kafka_topic_partition_list_destroy(offsets);
+ return; /* Retry in progress */
+ }
+ /* FALLTHRU */
+ }
+
+ /* Partition not seen in response. */
+ if (!(rktpar = rd_kafka_topic_partition_list_find(offsets, state->topic,
+ state->partition)))
+ err = RD_KAFKA_RESP_ERR__BAD_MSG;
+ else if (rktpar->err)
+ err = rktpar->err;
+ else
+ state->offsets[state->offidx] = rktpar->offset;
+
+ state->offidx++;
+
+ if (err || state->offidx == 2) /* Error or Done */
+ state->err = err;
+
+ rd_kafka_topic_partition_list_destroy(offsets);
+}
+
+
+rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
+ const char *topic,
+ int32_t partition,
+ int64_t *low,
+ int64_t *high,
+ int timeout_ms) {
+ rd_kafka_q_t *rkq;
+ struct _query_wmark_offsets_state state;
+ rd_ts_t ts_end = rd_timeout_init(timeout_ms);
+ rd_kafka_topic_partition_list_t *partitions;
+ rd_kafka_topic_partition_t *rktpar;
+ struct rd_kafka_partition_leader *leader;
+ rd_list_t leaders;
+ rd_kafka_resp_err_t err;
+
+ partitions = rd_kafka_topic_partition_list_new(1);
+ rktpar =
+ rd_kafka_topic_partition_list_add(partitions, topic, partition);
+
+ rd_list_init(&leaders, partitions->cnt,
+ (void *)rd_kafka_partition_leader_destroy);
+
+ err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
+ &leaders, timeout_ms);
+ if (err) {
+ rd_list_destroy(&leaders);
+ rd_kafka_topic_partition_list_destroy(partitions);
+ return err;
+ }
+
+ leader = rd_list_elem(&leaders, 0);
+
+ rkq = rd_kafka_q_new(rk);
+
+ /* Due to KAFKA-1588 we need to send a request for each wanted offset,
+ * in this case one for the low watermark and one for the high. */
+ state.topic = topic;
+ state.partition = partition;
+ state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
+ state.offsets[1] = RD_KAFKA_OFFSET_END;
+ state.offidx = 0;
+ state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
+ state.ts_end = ts_end;
+ state.state_version = rd_kafka_brokers_get_state_version(rk);
+
+
+ rktpar->offset = RD_KAFKA_OFFSET_BEGINNING;
+ rd_kafka_ListOffsetsRequest(
+ leader->rkb, partitions, RD_KAFKA_REPLYQ(rkq, 0),
+ rd_kafka_query_wmark_offsets_resp_cb, &state);
+
+ rktpar->offset = RD_KAFKA_OFFSET_END;
+ rd_kafka_ListOffsetsRequest(
+ leader->rkb, partitions, RD_KAFKA_REPLYQ(rkq, 0),
+ rd_kafka_query_wmark_offsets_resp_cb, &state);
+
+ rd_kafka_topic_partition_list_destroy(partitions);
+ rd_list_destroy(&leaders);
+
+ /* Wait for reply (or timeout) */
+ while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
+ rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
+ rd_kafka_poll_cb,
+ NULL) != RD_KAFKA_OP_RES_YIELD)
+ ;
+
+ rd_kafka_q_destroy_owner(rkq);
+
+ if (state.err)
+ return state.err;
+ else if (state.offidx != 2)
+ return RD_KAFKA_RESP_ERR__FAIL;
+
+ /* We are not certain about the returned order. */
+ if (state.offsets[0] < state.offsets[1]) {
+ *low = state.offsets[0];
+ *high = state.offsets[1];
+ } else {
+ *low = state.offsets[1];
+ *high = state.offsets[0];
+ }
+
+ /* If partition is empty only one offset (the last) will be returned. */
+ if (*low < 0 && *high >= 0)
+ *low = *high;
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_get_watermark_offsets(rd_kafka_t *rk,
+ const char *topic,
+ int32_t partition,
+ int64_t *low,
+ int64_t *high) {
+ rd_kafka_toppar_t *rktp;
+
+ rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
+ if (!rktp)
+ return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+
+ rd_kafka_toppar_lock(rktp);
+ *low = rktp->rktp_lo_offset;
+ *high = rktp->rktp_hi_offset;
+ rd_kafka_toppar_unlock(rktp);
+
+ rd_kafka_toppar_destroy(rktp);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief get_offsets_for_times() state
+ */
+struct _get_offsets_for_times {
+ rd_kafka_topic_partition_list_t *results;
+ rd_kafka_resp_err_t err;
+ int wait_reply;
+ int state_version;
+ rd_ts_t ts_end;
+};
+
+/**
+ * @brief Handle OffsetRequest responses
+ */
+static void rd_kafka_get_offsets_for_times_resp_cb(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ struct _get_offsets_for_times *state;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* 'state' has gone out of scope when offsets_for_times()
+ * timed out and returned to the caller. */
+ return;
+ }
+
+ state = opaque;
+
+ err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request,
+ state->results, NULL);
+ if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
+ return; /* Retrying */
+
+ /* Retry if no broker connection is available yet. */
+ if (err == RD_KAFKA_RESP_ERR__TRANSPORT && rkb &&
+ rd_kafka_brokers_wait_state_change(
+ rkb->rkb_rk, state->state_version,
+ rd_timeout_remains(state->ts_end))) {
+ /* Retry */
+ state->state_version = rd_kafka_brokers_get_state_version(rk);
+ request->rkbuf_retries = 0;
+ if (rd_kafka_buf_retry(rkb, request))
+ return; /* Retry in progress */
+ /* FALLTHRU */
+ }
+
+ if (err && !state->err)
+ state->err = err;
+
+ state->wait_reply--;
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_offsets_for_times(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *offsets,
+ int timeout_ms) {
+ rd_kafka_q_t *rkq;
+ struct _get_offsets_for_times state = RD_ZERO_INIT;
+ rd_ts_t ts_end = rd_timeout_init(timeout_ms);
+ rd_list_t leaders;
+ int i;
+ rd_kafka_resp_err_t err;
+ struct rd_kafka_partition_leader *leader;
+ int tmout;
+
+ if (offsets->cnt == 0)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ rd_list_init(&leaders, offsets->cnt,
+ (void *)rd_kafka_partition_leader_destroy);
+
+ err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
+ timeout_ms);
+ if (err) {
+ rd_list_destroy(&leaders);
+ return err;
+ }
+
+
+ rkq = rd_kafka_q_new(rk);
+
+ state.wait_reply = 0;
+ state.results = rd_kafka_topic_partition_list_new(offsets->cnt);
+
+ /* For each leader send a request for its partitions */
+ RD_LIST_FOREACH(leader, &leaders, i) {
+ state.wait_reply++;
+ rd_kafka_ListOffsetsRequest(
+ leader->rkb, leader->partitions, RD_KAFKA_REPLYQ(rkq, 0),
+ rd_kafka_get_offsets_for_times_resp_cb, &state);
+ }
+
+ rd_list_destroy(&leaders);
+
+ /* Wait for reply (or timeout) */
+ while (state.wait_reply > 0 &&
+ !rd_timeout_expired((tmout = rd_timeout_remains(ts_end))))
+ rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
+ rd_kafka_poll_cb, NULL);
+
+ rd_kafka_q_destroy_owner(rkq);
+
+ if (state.wait_reply > 0 && !state.err)
+ state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+ /* Then update the queried partitions. */
+ if (!state.err)
+ rd_kafka_topic_partition_list_update(offsets, state.results);
+
+ rd_kafka_topic_partition_list_destroy(state.results);
+
+ return state.err;
+}
+
+
+/**
+ * @brief rd_kafka_poll() (and similar) op callback handler.
+ * Will either call registered callback depending on cb_type and op type
+ * or return op to application, if applicable (e.g., fetch message).
+ *
+ * @returns RD_KAFKA_OP_RES_HANDLED if op was handled, else one of the
+ * other res types (such as OP_RES_PASS).
+ *
+ * @locality any thread that serves op queues
+ */
+rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ rd_kafka_q_cb_type_t cb_type,
+ void *opaque) {
+ rd_kafka_msg_t *rkm;
+ rd_kafka_op_res_t res = RD_KAFKA_OP_RES_HANDLED;
+
+ /* Special handling for events based on cb_type */
+ if (cb_type == RD_KAFKA_Q_CB_EVENT && rd_kafka_event_setup(rk, rko)) {
+ /* Return-as-event requested. */
+ return RD_KAFKA_OP_RES_PASS; /* Return as event */
+ }
+
+ switch ((int)rko->rko_type) {
+ case RD_KAFKA_OP_FETCH:
+ if (!rk->rk_conf.consume_cb ||
+ cb_type == RD_KAFKA_Q_CB_RETURN ||
+ cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
+ return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
+ else {
+ struct consume_ctx ctx = {.consume_cb =
+ rk->rk_conf.consume_cb,
+ .opaque = rk->rk_conf.opaque};
+
+ return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
+ }
+ break;
+
+ case RD_KAFKA_OP_REBALANCE:
+ if (rk->rk_conf.rebalance_cb)
+ rk->rk_conf.rebalance_cb(
+ rk, rko->rko_err, rko->rko_u.rebalance.partitions,
+ rk->rk_conf.opaque);
+ else {
+ /** If EVENT_REBALANCE is enabled but rebalance_cb
+ * isn't, we need to perform a dummy assign for the
+ * application. This might happen during termination
+ * with consumer_close() */
+ rd_kafka_dbg(rk, CGRP, "UNASSIGN",
+ "Forcing unassign of %d partition(s)",
+ rko->rko_u.rebalance.partitions
+ ? rko->rko_u.rebalance.partitions->cnt
+ : 0);
+ rd_kafka_assign(rk, NULL);
+ }
+ break;
+
+ case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
+ if (!rko->rko_u.offset_commit.cb)
+ return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
+ rko->rko_u.offset_commit.cb(rk, rko->rko_err,
+ rko->rko_u.offset_commit.partitions,
+ rko->rko_u.offset_commit.opaque);
+ break;
+
+ case RD_KAFKA_OP_FETCH_STOP | RD_KAFKA_OP_REPLY:
+ /* Reply from toppar FETCH_STOP */
+ rd_kafka_assignment_partition_stopped(rk, rko->rko_rktp);
+ break;
+
+ case RD_KAFKA_OP_CONSUMER_ERR:
+ /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
+ * Consumer errors are returned to the application
+ * as rkmessages, not error callbacks.
+ *
+ * rd_kafka_poll() (_Q_CB_GLOBAL):
+ * convert to ERR op (fallthru)
+ */
+ if (cb_type == RD_KAFKA_Q_CB_RETURN ||
+ cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
+ /* return as message_t to application */
+ return RD_KAFKA_OP_RES_PASS;
+ }
+ /* FALLTHRU */
+
+ case RD_KAFKA_OP_ERR:
+ if (rk->rk_conf.error_cb)
+ rk->rk_conf.error_cb(rk, rko->rko_err,
+ rko->rko_u.err.errstr,
+ rk->rk_conf.opaque);
+ else
+ rd_kafka_log(rk, LOG_ERR, "ERROR", "%s: %s",
+ rk->rk_name, rko->rko_u.err.errstr);
+ break;
+
+ case RD_KAFKA_OP_DR:
+ /* Delivery report:
+ * call application DR callback for each message. */
+ while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
+ rd_kafka_message_t *rkmessage;
+
+ TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs, rkm,
+ rkm_link);
+
+ rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);
+
+ if (likely(rk->rk_conf.dr_msg_cb != NULL)) {
+ rk->rk_conf.dr_msg_cb(rk, rkmessage,
+ rk->rk_conf.opaque);
+
+ } else if (rk->rk_conf.dr_cb) {
+ rk->rk_conf.dr_cb(
+ rk, rkmessage->payload, rkmessage->len,
+ rkmessage->err, rk->rk_conf.opaque,
+ rkmessage->_private);
+ } else if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
+ rd_kafka_log(
+ rk, LOG_WARNING, "DRDROP",
+ "Dropped delivery report for "
+ "message to "
+ "%s [%" PRId32
+ "] (%s) with "
+ "opaque %p: flush() or poll() "
+ "should not be called when "
+ "EVENT_DR is enabled",
+ rd_kafka_topic_name(rkmessage->rkt),
+ rkmessage->partition,
+ rd_kafka_err2name(rkmessage->err),
+ rkmessage->_private);
+ } else {
+ rd_assert(!*"BUG: neither a delivery report "
+ "callback or EVENT_DR flag set");
+ }
+
+ rd_kafka_msg_destroy(rk, rkm);
+
+ if (unlikely(rd_kafka_yield_thread)) {
+ /* Callback called yield(),
+ * re-enqueue the op (if there are any
+ * remaining messages). */
+ if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.rkmq_msgs))
+ rd_kafka_q_reenq(rkq, rko);
+ else
+ rd_kafka_op_destroy(rko);
+ return RD_KAFKA_OP_RES_YIELD;
+ }
+ }
+
+ rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
+
+ break;
+
+ case RD_KAFKA_OP_THROTTLE:
+ if (rk->rk_conf.throttle_cb)
+ rk->rk_conf.throttle_cb(
+ rk, rko->rko_u.throttle.nodename,
+ rko->rko_u.throttle.nodeid,
+ rko->rko_u.throttle.throttle_time,
+ rk->rk_conf.opaque);
+ break;
+
+ case RD_KAFKA_OP_STATS:
+ /* Statistics */
+ if (rk->rk_conf.stats_cb &&
+ rk->rk_conf.stats_cb(rk, rko->rko_u.stats.json,
+ rko->rko_u.stats.json_len,
+ rk->rk_conf.opaque) == 1)
+ rko->rko_u.stats.json =
+ NULL; /* Application wanted json ptr */
+ break;
+
+ case RD_KAFKA_OP_LOG:
+ if (likely(rk->rk_conf.log_cb &&
+ rk->rk_conf.log_level >= rko->rko_u.log.level))
+ rk->rk_conf.log_cb(rk, rko->rko_u.log.level,
+ rko->rko_u.log.fac,
+ rko->rko_u.log.str);
+ break;
+
+ case RD_KAFKA_OP_TERMINATE:
+ /* nop: just a wake-up */
+ res = RD_KAFKA_OP_RES_YIELD;
+ rd_kafka_op_destroy(rko);
+ break;
+
+ case RD_KAFKA_OP_CREATETOPICS:
+ case RD_KAFKA_OP_DELETETOPICS:
+ case RD_KAFKA_OP_CREATEPARTITIONS:
+ case RD_KAFKA_OP_ALTERCONFIGS:
+ case RD_KAFKA_OP_DESCRIBECONFIGS:
+ case RD_KAFKA_OP_DELETERECORDS:
+ case RD_KAFKA_OP_DELETEGROUPS:
+ case RD_KAFKA_OP_ADMIN_FANOUT:
+ case RD_KAFKA_OP_CREATEACLS:
+ case RD_KAFKA_OP_DESCRIBEACLS:
+ case RD_KAFKA_OP_DELETEACLS:
+ /* Calls op_destroy() from worker callback,
+ * when the time comes. */
+ res = rd_kafka_op_call(rk, rkq, rko);
+ break;
+
+ case RD_KAFKA_OP_ADMIN_RESULT:
+ if (cb_type == RD_KAFKA_Q_CB_RETURN ||
+ cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
+ return RD_KAFKA_OP_RES_PASS; /* Don't handle here */
+
+ /* Op is silently destroyed below */
+ break;
+
+ case RD_KAFKA_OP_TXN:
+ /* Must only be handled by rdkafka main thread */
+ rd_assert(thrd_is_current(rk->rk_thread));
+ res = rd_kafka_op_call(rk, rkq, rko);
+ break;
+
+ case RD_KAFKA_OP_BARRIER:
+ break;
+
+ case RD_KAFKA_OP_PURGE:
+ rd_kafka_purge(rk, rko->rko_u.purge.flags);
+ break;
+
+ default:
+ /* If op has a callback set (e.g., OAUTHBEARER_REFRESH),
+ * call it. */
+ if (rko->rko_type & RD_KAFKA_OP_CB) {
+ res = rd_kafka_op_call(rk, rkq, rko);
+ break;
+ }
+
+ RD_BUG("Can't handle op type %s (0x%x)",
+ rd_kafka_op2str(rko->rko_type), rko->rko_type);
+ break;
+ }
+
+ if (res == RD_KAFKA_OP_RES_HANDLED)
+ rd_kafka_op_destroy(rko);
+
+ return res;
+}
+
+int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
+ int r;
+
+ r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK,
+ rd_kafka_poll_cb, NULL);
+
+ return r;
+}
+
+
+rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {
+ rd_kafka_op_t *rko;
+
+ rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
+ RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);
+
+ if (!rko)
+ return NULL;
+
+ return rko;
+}
+
+int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) {
+ int r;
+
+ r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
+ RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
+
+ return r;
+}
+
+
+
+static void
+rd_kafka_toppar_dump(FILE *fp, const char *indent, rd_kafka_toppar_t *rktp) {
+
+ fprintf(fp,
+ "%s%.*s [%" PRId32
+ "] broker %s, "
+ "leader_id %s\n",
+ indent, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rktp->rktp_broker ? rktp->rktp_broker->rkb_name : "none",
+ rktp->rktp_leader ? rktp->rktp_leader->rkb_name : "none");
+ fprintf(fp,
+ "%s refcnt %i\n"
+ "%s msgq: %i messages\n"
+ "%s xmit_msgq: %i messages\n"
+ "%s total: %" PRIu64 " messages, %" PRIu64 " bytes\n",
+ indent, rd_refcnt_get(&rktp->rktp_refcnt), indent,
+ rktp->rktp_msgq.rkmq_msg_cnt, indent,
+ rktp->rktp_xmit_msgq.rkmq_msg_cnt, indent,
+ rd_atomic64_get(&rktp->rktp_c.tx_msgs),
+ rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes));
+}
+
+static void rd_kafka_broker_dump(FILE *fp, rd_kafka_broker_t *rkb, int locks) {
+ rd_kafka_toppar_t *rktp;
+
+ if (locks)
+ rd_kafka_broker_lock(rkb);
+ fprintf(fp,
+ " rd_kafka_broker_t %p: %s NodeId %" PRId32
+ " in state %s (for %.3fs)\n",
+ rkb, rkb->rkb_name, rkb->rkb_nodeid,
+ rd_kafka_broker_state_names[rkb->rkb_state],
+ rkb->rkb_ts_state
+ ? (float)(rd_clock() - rkb->rkb_ts_state) / 1000000.0f
+ : 0.0f);
+ fprintf(fp, " refcnt %i\n", rd_refcnt_get(&rkb->rkb_refcnt));
+ fprintf(fp, " outbuf_cnt: %i waitresp_cnt: %i\n",
+ rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
+ rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt));
+ fprintf(fp,
+ " %" PRIu64 " messages sent, %" PRIu64
+ " bytes, "
+ "%" PRIu64 " errors, %" PRIu64
+ " timeouts\n"
+ " %" PRIu64 " messages received, %" PRIu64
+ " bytes, "
+ "%" PRIu64
+ " errors\n"
+ " %" PRIu64 " messageset transmissions were retried\n",
+ rd_atomic64_get(&rkb->rkb_c.tx),
+ rd_atomic64_get(&rkb->rkb_c.tx_bytes),
+ rd_atomic64_get(&rkb->rkb_c.tx_err),
+ rd_atomic64_get(&rkb->rkb_c.req_timeouts),
+ rd_atomic64_get(&rkb->rkb_c.rx),
+ rd_atomic64_get(&rkb->rkb_c.rx_bytes),
+ rd_atomic64_get(&rkb->rkb_c.rx_err),
+ rd_atomic64_get(&rkb->rkb_c.tx_retries));
+
+ fprintf(fp, " %i toppars:\n", rkb->rkb_toppar_cnt);
+ TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink)
+ rd_kafka_toppar_dump(fp, " ", rktp);
+ if (locks) {
+ rd_kafka_broker_unlock(rkb);
+ }
+}
+
+
+static void rd_kafka_dump0(FILE *fp, rd_kafka_t *rk, int locks) {
+ rd_kafka_broker_t *rkb;
+ rd_kafka_topic_t *rkt;
+ rd_kafka_toppar_t *rktp;
+ int i;
+ unsigned int tot_cnt;
+ size_t tot_size;
+
+ rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
+
+ if (locks)
+ rd_kafka_rdlock(rk);
+#if ENABLE_DEVEL
+ fprintf(fp, "rd_kafka_op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt));
+#endif
+ fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);
+
+ fprintf(fp, " producer.msg_cnt %u (%" PRIusz " bytes)\n", tot_cnt,
+ tot_size);
+ fprintf(fp, " rk_rep reply queue: %i ops\n",
+ rd_kafka_q_len(rk->rk_rep));
+
+ fprintf(fp, " brokers:\n");
+ if (locks)
+ mtx_lock(&rk->rk_internal_rkb_lock);
+ if (rk->rk_internal_rkb)
+ rd_kafka_broker_dump(fp, rk->rk_internal_rkb, locks);
+ if (locks)
+ mtx_unlock(&rk->rk_internal_rkb_lock);
+
+ TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
+ rd_kafka_broker_dump(fp, rkb, locks);
+ }
+
+ fprintf(fp, " cgrp:\n");
+ if (rk->rk_cgrp) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+ fprintf(fp, " %.*s in state %s, flags 0x%x\n",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rkcg->rkcg_flags);
+ fprintf(fp, " coord_id %" PRId32 ", broker %s\n",
+ rkcg->rkcg_coord_id,
+ rkcg->rkcg_curr_coord
+ ? rd_kafka_broker_name(rkcg->rkcg_curr_coord)
+ : "(none)");
+
+ fprintf(fp, " toppars:\n");
+ RD_LIST_FOREACH(rktp, &rkcg->rkcg_toppars, i) {
+ fprintf(fp, " %.*s [%" PRId32 "] in state %s\n",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_fetch_states[rktp->rktp_fetch_state]);
+ }
+ }
+
+ fprintf(fp, " topics:\n");
+ TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
+ fprintf(fp,
+ " %.*s with %" PRId32
+ " partitions, state %s, "
+ "refcnt %i\n",
+ RD_KAFKAP_STR_PR(rkt->rkt_topic),
+ rkt->rkt_partition_cnt,
+ rd_kafka_topic_state_names[rkt->rkt_state],
+ rd_refcnt_get(&rkt->rkt_refcnt));
+ if (rkt->rkt_ua)
+ rd_kafka_toppar_dump(fp, " ", rkt->rkt_ua);
+ if (rd_list_empty(&rkt->rkt_desp)) {
+ fprintf(fp, " desired partitions:");
+ RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
+ fprintf(fp, " %" PRId32, rktp->rktp_partition);
+ fprintf(fp, "\n");
+ }
+ }
+
+ fprintf(fp, "\n");
+ rd_kafka_metadata_cache_dump(fp, rk);
+
+ if (locks)
+ rd_kafka_rdunlock(rk);
+}
+
+void rd_kafka_dump(FILE *fp, rd_kafka_t *rk) {
+ if (rk)
+ rd_kafka_dump0(fp, rk, 1 /*locks*/);
+}
+
+
+
+const char *rd_kafka_name(const rd_kafka_t *rk) {
+ return rk->rk_name;
+}
+
+rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk) {
+ return rk->rk_type;
+}
+
+
+char *rd_kafka_memberid(const rd_kafka_t *rk) {
+ rd_kafka_op_t *rko;
+ rd_kafka_cgrp_t *rkcg;
+ char *memberid;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return NULL;
+
+ rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_NAME);
+ if (!rko)
+ return NULL;
+ memberid = rko->rko_u.name.str;
+ rko->rko_u.name.str = NULL;
+ rd_kafka_op_destroy(rko);
+
+ return memberid;
+}
+
+
+char *rd_kafka_clusterid(rd_kafka_t *rk, int timeout_ms) {
+ rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
+
+ /* ClusterId is returned in Metadata >=V2 responses and
+ * cached on the rk. If no cached value is available
+ * it means no metadata has been received yet, or we're
+ * using a lower protocol version
+ * (e.g., lack of api.version.request=true). */
+
+ while (1) {
+ int remains_ms;
+
+ rd_kafka_rdlock(rk);
+
+ if (rk->rk_clusterid) {
+ /* Cached clusterid available. */
+ char *ret = rd_strdup(rk->rk_clusterid);
+ rd_kafka_rdunlock(rk);
+ return ret;
+ } else if (rk->rk_ts_metadata > 0) {
+ /* Metadata received but no clusterid,
+ * this probably means the broker is too old
+ * or api.version.request=false. */
+ rd_kafka_rdunlock(rk);
+ return NULL;
+ }
+
+ rd_kafka_rdunlock(rk);
+
+ /* Wait for up to timeout_ms for a metadata refresh,
+ * if permitted by application. */
+ remains_ms = rd_timeout_remains(abs_timeout);
+ if (rd_timeout_expired(remains_ms))
+ return NULL;
+
+ rd_kafka_metadata_cache_wait_change(rk, remains_ms);
+ }
+
+ return NULL;
+}
+
+
+int32_t rd_kafka_controllerid(rd_kafka_t *rk, int timeout_ms) {
+ rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
+
+ /* ControllerId is returned in Metadata >=V1 responses and
+ * cached on the rk. If no cached value is available
+ * it means no metadata has been received yet, or we're
+ * using a lower protocol version
+ * (e.g., lack of api.version.request=true). */
+
+ while (1) {
+ int remains_ms;
+ int version;
+
+ version = rd_kafka_brokers_get_state_version(rk);
+
+ rd_kafka_rdlock(rk);
+
+ if (rk->rk_controllerid != -1) {
+ /* Cached controllerid available. */
+ rd_kafka_rdunlock(rk);
+ return rk->rk_controllerid;
+ } else if (rk->rk_ts_metadata > 0) {
+ /* Metadata received but no clusterid,
+ * this probably means the broker is too old
+ * or api.version.request=false. */
+ rd_kafka_rdunlock(rk);
+ return -1;
+ }
+
+ rd_kafka_rdunlock(rk);
+
+ /* Wait for up to timeout_ms for a metadata refresh,
+ * if permitted by application. */
+ remains_ms = rd_timeout_remains(abs_timeout);
+ if (rd_timeout_expired(remains_ms))
+ return -1;
+
+ rd_kafka_brokers_wait_state_change(rk, version, remains_ms);
+ }
+
+ return -1;
+}
+
+
+void *rd_kafka_opaque(const rd_kafka_t *rk) {
+ return rk->rk_conf.opaque;
+}
+
+
+int rd_kafka_outq_len(rd_kafka_t *rk) {
+ return rd_kafka_curr_msgs_cnt(rk) + rd_kafka_q_len(rk->rk_rep) +
+ (rk->rk_background.q ? rd_kafka_q_len(rk->rk_background.q) : 0);
+}
+
+
+rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms) {
+ unsigned int msg_cnt = 0;
+
+ if (rk->rk_type != RD_KAFKA_PRODUCER)
+ return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+
+ rd_kafka_yield_thread = 0;
+
+ /* Set flushing flag on the producer for the duration of the
+ * flush() call. This tells producer_serve() that the linger.ms
+ * time should be considered immediate. */
+ rd_atomic32_add(&rk->rk_flushing, 1);
+
+ /* Wake up all broker threads to trigger the produce_serve() call.
+ * If this flush() call finishes before the broker wakes up
+ * then no flushing will be performed by that broker thread. */
+ rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_UP, "flushing");
+
+ if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
+ /* Application wants delivery reports as events rather
+ * than callbacks, we must thus not serve this queue
+ * with rd_kafka_poll() since that would trigger non-existent
+ * delivery report callbacks, which would result
+ * in the delivery reports being dropped.
+ * Instead we rely on the application to serve the event
+ * queue in another thread, so all we do here is wait
+ * for the current message count to reach zero. */
+ rd_kafka_curr_msgs_wait_zero(rk, timeout_ms, &msg_cnt);
+
+ } else {
+ /* Standard poll interface.
+ *
+ * First poll call is non-blocking for the case
+ * where timeout_ms==RD_POLL_NOWAIT to make sure poll is
+ * called at least once. */
+ rd_ts_t ts_end = rd_timeout_init(timeout_ms);
+ int tmout = RD_POLL_NOWAIT;
+ int qlen = 0;
+
+ do {
+ rd_kafka_poll(rk, tmout);
+ qlen = rd_kafka_q_len(rk->rk_rep);
+ msg_cnt = rd_kafka_curr_msgs_cnt(rk);
+ } while (qlen + msg_cnt > 0 && !rd_kafka_yield_thread &&
+ (tmout = rd_timeout_remains_limit(ts_end, 10)) !=
+ RD_POLL_NOWAIT);
+
+ msg_cnt += qlen;
+ }
+
+ rd_atomic32_sub(&rk->rk_flushing, 1);
+
+ return msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT
+ : RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief Purge the partition message queue (according to \p purge_flags) for
+ * all toppars.
+ *
+ * This is a necessity to avoid the race condition when a purge() is scheduled
+ * shortly in-between an rktp has been created but before it has been
+ * joined to a broker handler thread.
+ *
+ * The rktp_xmit_msgq is handled by the broker-thread purge.
+ *
+ * @returns the number of messages purged.
+ *
+ * @locks_required rd_kafka_*lock()
+ * @locks_acquired rd_kafka_topic_rdlock()
+ */
+static int rd_kafka_purge_toppars(rd_kafka_t *rk, int purge_flags) {
+ rd_kafka_topic_t *rkt;
+ int cnt = 0;
+
+ TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
+ rd_kafka_toppar_t *rktp;
+ int i;
+
+ rd_kafka_topic_rdlock(rkt);
+ for (i = 0; i < rkt->rkt_partition_cnt; i++)
+ cnt += rd_kafka_toppar_purge_queues(
+ rkt->rkt_p[i], purge_flags, rd_false /*!xmit*/);
+
+ RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
+ cnt += rd_kafka_toppar_purge_queues(rktp, purge_flags,
+ rd_false /*!xmit*/);
+
+ if (rkt->rkt_ua)
+ cnt += rd_kafka_toppar_purge_queues(
+ rkt->rkt_ua, purge_flags, rd_false /*!xmit*/);
+ rd_kafka_topic_rdunlock(rkt);
+ }
+
+ return cnt;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_purge(rd_kafka_t *rk, int purge_flags) {
+ rd_kafka_broker_t *rkb;
+ rd_kafka_q_t *tmpq = NULL;
+ int waitcnt = 0;
+
+ if (rk->rk_type != RD_KAFKA_PRODUCER)
+ return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+
+ /* Check that future flags are not passed */
+ if ((purge_flags & ~RD_KAFKA_PURGE_F_MASK) != 0)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ /* Nothing to purge */
+ if (!purge_flags)
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ /* Set up a reply queue to wait for broker thread signalling
+ * completion, unless non-blocking. */
+ if (!(purge_flags & RD_KAFKA_PURGE_F_NON_BLOCKING))
+ tmpq = rd_kafka_q_new(rk);
+
+ rd_kafka_rdlock(rk);
+
+ /* Purge msgq for all toppars. */
+ rd_kafka_purge_toppars(rk, purge_flags);
+
+ /* Send purge request to all broker threads */
+ TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
+ rd_kafka_broker_purge_queues(rkb, purge_flags,
+ RD_KAFKA_REPLYQ(tmpq, 0));
+ waitcnt++;
+ }
+
+ rd_kafka_rdunlock(rk);
+
+
+ if (tmpq) {
+ /* Wait for responses */
+ while (waitcnt-- > 0)
+ rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
+
+ rd_kafka_q_destroy_owner(tmpq);
+ }
+
+ /* Purge messages for the UA(-1) partitions (which are not
+ * handled by a broker thread) */
+ if (purge_flags & RD_KAFKA_PURGE_F_QUEUE)
+ rd_kafka_purge_ua_toppar_queues(rk);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * @returns a csv string of purge flags in thread-local storage
+ */
+const char *rd_kafka_purge_flags2str(int flags) {
+ static const char *names[] = {"queue", "inflight", "non-blocking",
+ NULL};
+ static RD_TLS char ret[64];
+
+ return rd_flags2str(ret, sizeof(ret), names, flags);
+}
+
+
+int rd_kafka_version(void) {
+ return RD_KAFKA_VERSION;
+}
+
+const char *rd_kafka_version_str(void) {
+ static RD_TLS char ret[128];
+ size_t of = 0, r;
+
+ if (*ret)
+ return ret;
+
+#ifdef LIBRDKAFKA_GIT_VERSION
+ if (*LIBRDKAFKA_GIT_VERSION) {
+ of = rd_snprintf(ret, sizeof(ret), "%s",
+ *LIBRDKAFKA_GIT_VERSION == 'v'
+ ? &LIBRDKAFKA_GIT_VERSION[1]
+ : LIBRDKAFKA_GIT_VERSION);
+ if (of > sizeof(ret))
+ of = sizeof(ret);
+ }
+#endif
+
+#define _my_sprintf(...) \
+ do { \
+ r = rd_snprintf(ret + of, sizeof(ret) - of, __VA_ARGS__); \
+ if (r > sizeof(ret) - of) \
+ r = sizeof(ret) - of; \
+ of += r; \
+ } while (0)
+
+ if (of == 0) {
+ int ver = rd_kafka_version();
+ int prel = (ver & 0xff);
+ _my_sprintf("%i.%i.%i", (ver >> 24) & 0xff, (ver >> 16) & 0xff,
+ (ver >> 8) & 0xff);
+ if (prel != 0xff) {
+ /* pre-builds below 200 are just running numbers,
+ * above 200 are RC numbers. */
+ if (prel <= 200)
+ _my_sprintf("-pre%d", prel);
+ else
+ _my_sprintf("-RC%d", prel - 200);
+ }
+ }
+
+#if ENABLE_DEVEL
+ _my_sprintf("-devel");
+#endif
+
+#if WITHOUT_OPTIMIZATION
+ _my_sprintf("-O0");
+#endif
+
+ return ret;
+}
+
+
+/**
+ * Assert trampoline to print some debugging information on crash.
+ */
+void RD_NORETURN rd_kafka_crash(const char *file,
+ int line,
+ const char *function,
+ rd_kafka_t *rk,
+ const char *reason) {
+ fprintf(stderr, "*** %s:%i:%s: %s ***\n", file, line, function, reason);
+ if (rk)
+ rd_kafka_dump0(stderr, rk, 0 /*no locks*/);
+ abort();
+}
+
+
+
+struct list_groups_state {
+ rd_kafka_q_t *q;
+ rd_kafka_resp_err_t err;
+ int wait_cnt;
+ const char *desired_group;
+ struct rd_kafka_group_list *grplist;
+ int grplist_size;
+};
+
+static const char *rd_kafka_consumer_group_state_names[] = {
+ "Unknown", "PreparingRebalance", "CompletingRebalance", "Stable", "Dead",
+ "Empty"};
+
+const char *
+rd_kafka_consumer_group_state_name(rd_kafka_consumer_group_state_t state) {
+ if (state < 0 || state >= RD_KAFKA_CONSUMER_GROUP_STATE__CNT)
+ return NULL;
+ return rd_kafka_consumer_group_state_names[state];
+}
+
+rd_kafka_consumer_group_state_t
+rd_kafka_consumer_group_state_code(const char *name) {
+ size_t i;
+ for (i = 0; i < RD_KAFKA_CONSUMER_GROUP_STATE__CNT; i++) {
+ if (!rd_strcasecmp(rd_kafka_consumer_group_state_names[i],
+ name))
+ return i;
+ }
+ return RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN;
+}
+
+static void rd_kafka_DescribeGroups_resp_cb(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *reply,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ struct list_groups_state *state;
+ const int log_decode_errors = LOG_ERR;
+ int cnt;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* 'state' has gone out of scope due to list_groups()
+ * timing out and returning. */
+ return;
+ }
+
+ state = opaque;
+ state->wait_cnt--;
+
+ if (err)
+ goto err;
+
+ rd_kafka_buf_read_i32(reply, &cnt);
+
+ while (cnt-- > 0) {
+ int16_t ErrorCode;
+ rd_kafkap_str_t Group, GroupState, ProtoType, Proto;
+ int MemberCnt;
+ struct rd_kafka_group_info *gi;
+
+ if (state->grplist->group_cnt == state->grplist_size) {
+ /* Grow group array */
+ state->grplist_size *= 2;
+ state->grplist->groups =
+ rd_realloc(state->grplist->groups,
+ state->grplist_size *
+ sizeof(*state->grplist->groups));
+ }
+
+ gi = &state->grplist->groups[state->grplist->group_cnt++];
+ memset(gi, 0, sizeof(*gi));
+
+ rd_kafka_buf_read_i16(reply, &ErrorCode);
+ rd_kafka_buf_read_str(reply, &Group);
+ rd_kafka_buf_read_str(reply, &GroupState);
+ rd_kafka_buf_read_str(reply, &ProtoType);
+ rd_kafka_buf_read_str(reply, &Proto);
+ rd_kafka_buf_read_i32(reply, &MemberCnt);
+
+ if (MemberCnt > 100000) {
+ err = RD_KAFKA_RESP_ERR__BAD_MSG;
+ goto err;
+ }
+
+ rd_kafka_broker_lock(rkb);
+ gi->broker.id = rkb->rkb_nodeid;
+ gi->broker.host = rd_strdup(rkb->rkb_origname);
+ gi->broker.port = rkb->rkb_port;
+ rd_kafka_broker_unlock(rkb);
+
+ gi->err = ErrorCode;
+ gi->group = RD_KAFKAP_STR_DUP(&Group);
+ gi->state = RD_KAFKAP_STR_DUP(&GroupState);
+ gi->protocol_type = RD_KAFKAP_STR_DUP(&ProtoType);
+ gi->protocol = RD_KAFKAP_STR_DUP(&Proto);
+
+ if (MemberCnt > 0)
+ gi->members =
+ rd_malloc(MemberCnt * sizeof(*gi->members));
+
+ while (MemberCnt-- > 0) {
+ rd_kafkap_str_t MemberId, ClientId, ClientHost;
+ rd_kafkap_bytes_t Meta, Assignment;
+ struct rd_kafka_group_member_info *mi;
+
+ mi = &gi->members[gi->member_cnt++];
+ memset(mi, 0, sizeof(*mi));
+
+ rd_kafka_buf_read_str(reply, &MemberId);
+ rd_kafka_buf_read_str(reply, &ClientId);
+ rd_kafka_buf_read_str(reply, &ClientHost);
+ rd_kafka_buf_read_bytes(reply, &Meta);
+ rd_kafka_buf_read_bytes(reply, &Assignment);
+
+ mi->member_id = RD_KAFKAP_STR_DUP(&MemberId);
+ mi->client_id = RD_KAFKAP_STR_DUP(&ClientId);
+ mi->client_host = RD_KAFKAP_STR_DUP(&ClientHost);
+
+ if (RD_KAFKAP_BYTES_LEN(&Meta) == 0) {
+ mi->member_metadata_size = 0;
+ mi->member_metadata = NULL;
+ } else {
+ mi->member_metadata_size =
+ RD_KAFKAP_BYTES_LEN(&Meta);
+ mi->member_metadata = rd_memdup(
+ Meta.data, mi->member_metadata_size);
+ }
+
+ if (RD_KAFKAP_BYTES_LEN(&Assignment) == 0) {
+ mi->member_assignment_size = 0;
+ mi->member_assignment = NULL;
+ } else {
+ mi->member_assignment_size =
+ RD_KAFKAP_BYTES_LEN(&Assignment);
+ mi->member_assignment =
+ rd_memdup(Assignment.data,
+ mi->member_assignment_size);
+ }
+ }
+ }
+
+err:
+ state->err = err;
+ return;
+
+err_parse:
+ state->err = reply->rkbuf_err;
+}
+
+static void rd_kafka_ListGroups_resp_cb(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *reply,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ struct list_groups_state *state;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode;
+ char **grps = NULL;
+ int cnt, grpcnt, i = 0;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* 'state' is no longer in scope because
+ * list_groups() timed out and returned to the caller.
+ * We must not touch anything here but simply return. */
+ return;
+ }
+
+ state = opaque;
+
+ state->wait_cnt--;
+
+ if (err)
+ goto err;
+
+ rd_kafka_buf_read_i16(reply, &ErrorCode);
+ if (ErrorCode) {
+ err = ErrorCode;
+ goto err;
+ }
+
+ rd_kafka_buf_read_i32(reply, &cnt);
+
+ if (state->desired_group)
+ grpcnt = 1;
+ else
+ grpcnt = cnt;
+
+ if (cnt == 0 || grpcnt == 0)
+ return;
+
+ grps = rd_malloc(sizeof(*grps) * grpcnt);
+
+ while (cnt-- > 0) {
+ rd_kafkap_str_t grp, proto;
+
+ rd_kafka_buf_read_str(reply, &grp);
+ rd_kafka_buf_read_str(reply, &proto);
+
+ if (state->desired_group &&
+ rd_kafkap_str_cmp_str(&grp, state->desired_group))
+ continue;
+
+ grps[i++] = RD_KAFKAP_STR_DUP(&grp);
+
+ if (i == grpcnt)
+ break;
+ }
+
+ if (i > 0) {
+ rd_kafka_error_t *error;
+
+ state->wait_cnt++;
+ error = rd_kafka_DescribeGroupsRequest(
+ rkb, 0, grps, i, RD_KAFKA_REPLYQ(state->q, 0),
+ rd_kafka_DescribeGroups_resp_cb, state);
+ if (error) {
+ rd_kafka_DescribeGroups_resp_cb(
+ rk, rkb, rd_kafka_error_code(error), reply, request,
+ opaque);
+ rd_kafka_error_destroy(error);
+ }
+
+ while (i-- > 0)
+ rd_free(grps[i]);
+ }
+
+
+ rd_free(grps);
+
+err:
+ state->err = err;
+ return;
+
+err_parse:
+ if (grps)
+ rd_free(grps);
+ state->err = reply->rkbuf_err;
+}
+
+rd_kafka_resp_err_t
+rd_kafka_list_groups(rd_kafka_t *rk,
+ const char *group,
+ const struct rd_kafka_group_list **grplistp,
+ int timeout_ms) {
+ rd_kafka_broker_t *rkb;
+ int rkb_cnt = 0;
+ struct list_groups_state state = RD_ZERO_INIT;
+ rd_ts_t ts_end = rd_timeout_init(timeout_ms);
+
+ /* Wait until metadata has been fetched from cluster so
+ * that we have a full broker list.
+ * This state only happens during initial client setup, after that
+ * there'll always be a cached metadata copy. */
+ while (1) {
+ int state_version = rd_kafka_brokers_get_state_version(rk);
+ rd_bool_t has_metadata;
+
+ rd_kafka_rdlock(rk);
+ has_metadata = rk->rk_ts_metadata != 0;
+ rd_kafka_rdunlock(rk);
+
+ if (has_metadata)
+ break;
+
+ if (!rd_kafka_brokers_wait_state_change(
+ rk, state_version, rd_timeout_remains(ts_end)))
+ return RD_KAFKA_RESP_ERR__TIMED_OUT;
+ }
+
+
+ state.q = rd_kafka_q_new(rk);
+ state.desired_group = group;
+ state.grplist = rd_calloc(1, sizeof(*state.grplist));
+ state.grplist_size = group ? 1 : 32;
+
+ state.grplist->groups =
+ rd_malloc(state.grplist_size * sizeof(*state.grplist->groups));
+
+ /* Query each broker for its list of groups */
+ rd_kafka_rdlock(rk);
+ TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
+ rd_kafka_error_t *error;
+ rd_kafka_broker_lock(rkb);
+ if (rkb->rkb_nodeid == -1 || RD_KAFKA_BROKER_IS_LOGICAL(rkb)) {
+ rd_kafka_broker_unlock(rkb);
+ continue;
+ }
+ rd_kafka_broker_unlock(rkb);
+
+ state.wait_cnt++;
+ rkb_cnt++;
+ error = rd_kafka_ListGroupsRequest(
+ rkb, 0, NULL, 0, RD_KAFKA_REPLYQ(state.q, 0),
+ rd_kafka_ListGroups_resp_cb, &state);
+ if (error) {
+ rd_kafka_ListGroups_resp_cb(rk, rkb,
+ rd_kafka_error_code(error),
+ NULL, NULL, &state);
+ rd_kafka_error_destroy(error);
+ }
+ }
+ rd_kafka_rdunlock(rk);
+
+ if (rkb_cnt == 0) {
+ state.err = RD_KAFKA_RESP_ERR__TRANSPORT;
+
+ } else {
+ int remains;
+
+ while (state.wait_cnt > 0 &&
+ !rd_timeout_expired(
+ (remains = rd_timeout_remains(ts_end)))) {
+ rd_kafka_q_serve(state.q, remains, 0,
+ RD_KAFKA_Q_CB_CALLBACK,
+ rd_kafka_poll_cb, NULL);
+ /* Ignore yields */
+ }
+ }
+
+ rd_kafka_q_destroy_owner(state.q);
+
+ if (state.wait_cnt > 0 && !state.err) {
+ if (state.grplist->group_cnt == 0)
+ state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+ else {
+ *grplistp = state.grplist;
+ return RD_KAFKA_RESP_ERR__PARTIAL;
+ }
+ }
+
+ if (state.err)
+ rd_kafka_group_list_destroy(state.grplist);
+ else
+ *grplistp = state.grplist;
+
+ return state.err;
+}
+
+
+void rd_kafka_group_list_destroy(const struct rd_kafka_group_list *grplist0) {
+ struct rd_kafka_group_list *grplist =
+ (struct rd_kafka_group_list *)grplist0;
+
+ while (grplist->group_cnt-- > 0) {
+ struct rd_kafka_group_info *gi;
+ gi = &grplist->groups[grplist->group_cnt];
+
+ if (gi->broker.host)
+ rd_free(gi->broker.host);
+ if (gi->group)
+ rd_free(gi->group);
+ if (gi->state)
+ rd_free(gi->state);
+ if (gi->protocol_type)
+ rd_free(gi->protocol_type);
+ if (gi->protocol)
+ rd_free(gi->protocol);
+
+ while (gi->member_cnt-- > 0) {
+ struct rd_kafka_group_member_info *mi;
+ mi = &gi->members[gi->member_cnt];
+
+ if (mi->member_id)
+ rd_free(mi->member_id);
+ if (mi->client_id)
+ rd_free(mi->client_id);
+ if (mi->client_host)
+ rd_free(mi->client_host);
+ if (mi->member_metadata)
+ rd_free(mi->member_metadata);
+ if (mi->member_assignment)
+ rd_free(mi->member_assignment);
+ }
+
+ if (gi->members)
+ rd_free(gi->members);
+ }
+
+ if (grplist->groups)
+ rd_free(grplist->groups);
+
+ rd_free(grplist);
+}
+
+
+
+const char *rd_kafka_get_debug_contexts(void) {
+ return RD_KAFKA_DEBUG_CONTEXTS;
+}
+
+
+int rd_kafka_path_is_dir(const char *path) {
+#ifdef _WIN32
+ struct _stat st;
+ return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR);
+#else
+ struct stat st;
+ return (stat(path, &st) == 0 && S_ISDIR(st.st_mode));
+#endif
+}
+
+
+/**
+ * @returns true if directory is empty or can't be accessed, else false.
+ */
+rd_bool_t rd_kafka_dir_is_empty(const char *path) {
+#if _WIN32
+ /* FIXME: Unsupported */
+ return rd_true;
+#else
+ DIR *dir;
+ struct dirent *d;
+#if defined(__sun)
+ struct stat st;
+ int ret = 0;
+#endif
+
+ dir = opendir(path);
+ if (!dir)
+ return rd_true;
+
+ while ((d = readdir(dir))) {
+
+ if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
+ continue;
+
+#if defined(__sun)
+ ret = stat(d->d_name, &st);
+ if (ret != 0) {
+ return rd_true; // Can't be accessed
+ }
+ if (S_ISREG(st.st_mode) || S_ISDIR(st.st_mode) ||
+ S_ISLNK(st.st_mode)) {
+#else
+ if (d->d_type == DT_REG || d->d_type == DT_LNK ||
+ d->d_type == DT_DIR) {
+#endif
+ closedir(dir);
+ return rd_false;
+ }
+ }
+
+ closedir(dir);
+ return rd_true;
+#endif
+}
+
+
+void *rd_kafka_mem_malloc(rd_kafka_t *rk, size_t size) {
+ return rd_malloc(size);
+}
+
+void *rd_kafka_mem_calloc(rd_kafka_t *rk, size_t num, size_t size) {
+ return rd_calloc(num, size);
+}
+
+void rd_kafka_mem_free(rd_kafka_t *rk, void *ptr) {
+ rd_free(ptr);
+}
+
+
+int rd_kafka_errno(void) {
+ return errno;
+}
+
+int rd_kafka_unittest(void) {
+ return rd_unittest();
+}