summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h1054
1 files changed, 0 insertions, 1054 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h
deleted file mode 100644
index 584ff3c96..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h
+++ /dev/null
@@ -1,1054 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#ifndef _RDKAFKA_INT_H_
-#define _RDKAFKA_INT_H_
-
-#ifndef _WIN32
-#define _GNU_SOURCE /* for strndup() */
-#endif
-
-#ifdef _MSC_VER
-typedef int mode_t;
-#endif
-
-#include <fcntl.h>
-
-
-#include "rdsysqueue.h"
-
-#include "rdkafka.h"
-#include "rd.h"
-#include "rdlog.h"
-#include "rdtime.h"
-#include "rdaddr.h"
-#include "rdinterval.h"
-#include "rdavg.h"
-#include "rdlist.h"
-
-#if WITH_SSL
-#include <openssl/ssl.h>
-#endif
-
-
-
-#define rd_kafka_assert(rk, cond) \
- do { \
- if (unlikely(!(cond))) \
- rd_kafka_crash(__FILE__, __LINE__, __FUNCTION__, (rk), \
- "assert: " #cond); \
- } while (0)
-
-
-void RD_NORETURN rd_kafka_crash(const char *file,
- int line,
- const char *function,
- rd_kafka_t *rk,
- const char *reason);
-
-
-/* Forward declarations */
-struct rd_kafka_s;
-struct rd_kafka_topic_s;
-struct rd_kafka_msg_s;
-struct rd_kafka_broker_s;
-struct rd_kafka_toppar_s;
-
-typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t;
-
-
-/**
- * Protocol level sanity
- */
-#define RD_KAFKAP_BROKERS_MAX 10000
-#define RD_KAFKAP_TOPICS_MAX 1000000
-#define RD_KAFKAP_PARTITIONS_MAX 100000
-
-
-#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0)
-
-
-/**
- * @struct Represents a fetch position:
- * an offset and an partition leader epoch (if known, else -1).
- */
-typedef struct rd_kafka_fetch_pos_s {
- int64_t offset;
- int32_t leader_epoch;
- rd_bool_t validated;
-} rd_kafka_fetch_pos_t;
-
-
-
-#include "rdkafka_op.h"
-#include "rdkafka_queue.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_proto.h"
-#include "rdkafka_buf.h"
-#include "rdkafka_pattern.h"
-#include "rdkafka_conf.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_timer.h"
-#include "rdkafka_assignor.h"
-#include "rdkafka_metadata.h"
-#include "rdkafka_mock.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_assignment.h"
-#include "rdkafka_coord.h"
-#include "rdkafka_mock.h"
-
-/**
- * Protocol level sanity
- */
-#define RD_KAFKAP_BROKERS_MAX 10000
-#define RD_KAFKAP_TOPICS_MAX 1000000
-#define RD_KAFKAP_PARTITIONS_MAX 100000
-#define RD_KAFKAP_GROUPS_MAX 100000
-
-
-#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0)
-
-
-
-/**
- * @enum Idempotent Producer state
- */
-typedef enum {
- RD_KAFKA_IDEMP_STATE_INIT, /**< Initial state */
- RD_KAFKA_IDEMP_STATE_TERM, /**< Instance is terminating */
- RD_KAFKA_IDEMP_STATE_FATAL_ERROR, /**< A fatal error has been raised */
- RD_KAFKA_IDEMP_STATE_REQ_PID, /**< Request new PID */
- RD_KAFKA_IDEMP_STATE_WAIT_TRANSPORT, /**< Waiting for coordinator to
- * become available. */
- RD_KAFKA_IDEMP_STATE_WAIT_PID, /**< PID requested, waiting for reply */
- RD_KAFKA_IDEMP_STATE_ASSIGNED, /**< New PID assigned */
- RD_KAFKA_IDEMP_STATE_DRAIN_RESET, /**< Wait for outstanding
- * ProduceRequests to finish
- * before resetting and
- * re-requesting a new PID. */
- RD_KAFKA_IDEMP_STATE_DRAIN_BUMP, /**< Wait for outstanding
- * ProduceRequests to finish
- * before bumping the current
- * epoch. */
- RD_KAFKA_IDEMP_STATE_WAIT_TXN_ABORT, /**< Wait for transaction abort
- * to finish and trigger a
- * drain and reset or bump. */
-} rd_kafka_idemp_state_t;
-
-/**
- * @returns the idemp_state_t string representation
- */
-static RD_UNUSED const char *
-rd_kafka_idemp_state2str(rd_kafka_idemp_state_t state) {
- static const char *names[] = {
- "Init", "Terminate", "FatalError", "RequestPID", "WaitTransport",
- "WaitPID", "Assigned", "DrainReset", "DrainBump", "WaitTxnAbort"};
- return names[state];
-}
-
-
-
-/**
- * @enum Transactional Producer state
- */
-typedef enum {
- /**< Initial state */
- RD_KAFKA_TXN_STATE_INIT,
- /**< Awaiting PID to be acquired by rdkafka_idempotence.c */
- RD_KAFKA_TXN_STATE_WAIT_PID,
- /**< PID acquired, but application has not made a successful
- * init_transactions() call. */
- RD_KAFKA_TXN_STATE_READY_NOT_ACKED,
- /**< PID acquired, no active transaction. */
- RD_KAFKA_TXN_STATE_READY,
- /**< begin_transaction() has been called. */
- RD_KAFKA_TXN_STATE_IN_TRANSACTION,
- /**< commit_transaction() has been called. */
- RD_KAFKA_TXN_STATE_BEGIN_COMMIT,
- /**< commit_transaction() has been called and all outstanding
- * messages, partitions, and offsets have been sent. */
- RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION,
- /**< Transaction successfully committed but application has not made
- * a successful commit_transaction() call yet. */
- RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED,
- /**< begin_transaction() has been called. */
- RD_KAFKA_TXN_STATE_BEGIN_ABORT,
- /**< abort_transaction() has been called. */
- RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION,
- /**< Transaction successfully aborted but application has not made
- * a successful abort_transaction() call yet. */
- RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED,
- /**< An abortable error has occurred. */
- RD_KAFKA_TXN_STATE_ABORTABLE_ERROR,
- /* A fatal error has occured. */
- RD_KAFKA_TXN_STATE_FATAL_ERROR
-} rd_kafka_txn_state_t;
-
-
-/**
- * @returns the txn_state_t string representation
- */
-static RD_UNUSED const char *
-rd_kafka_txn_state2str(rd_kafka_txn_state_t state) {
- static const char *names[] = {"Init",
- "WaitPID",
- "ReadyNotAcked",
- "Ready",
- "InTransaction",
- "BeginCommit",
- "CommittingTransaction",
- "CommitNotAcked",
- "BeginAbort",
- "AbortingTransaction",
- "AbortedNotAcked",
- "AbortableError",
- "FatalError"};
- return names[state];
-}
-
-
-
-/**
- * Kafka handle, internal representation of the application's rd_kafka_t.
- */
-
-struct rd_kafka_s {
- rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */
- rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */
-
- TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
- rd_list_t rk_broker_by_id; /* Fast id lookups. */
- rd_atomic32_t rk_broker_cnt;
- /**< Number of brokers in state >= UP */
- rd_atomic32_t rk_broker_up_cnt;
- /**< Number of logical brokers in state >= UP, this is a sub-set
- * of rk_broker_up_cnt. */
- rd_atomic32_t rk_logical_broker_up_cnt;
- /**< Number of brokers that are down, only includes brokers
- * that have had at least one connection attempt. */
- rd_atomic32_t rk_broker_down_cnt;
- /**< Logical brokers currently without an address.
- * Used for calculating ERR__ALL_BROKERS_DOWN. */
- rd_atomic32_t rk_broker_addrless_cnt;
-
- mtx_t rk_internal_rkb_lock;
- rd_kafka_broker_t *rk_internal_rkb;
-
- /* Broadcasting of broker state changes to wake up
- * functions waiting for a state change. */
- cnd_t rk_broker_state_change_cnd;
- mtx_t rk_broker_state_change_lock;
- int rk_broker_state_change_version;
- /* List of (rd_kafka_enq_once_t*) objects waiting for broker
- * state changes. Protected by rk_broker_state_change_lock. */
- rd_list_t rk_broker_state_change_waiters; /**< (rd_kafka_enq_once_t*) */
-
- TAILQ_HEAD(, rd_kafka_topic_s) rk_topics;
- int rk_topic_cnt;
-
- struct rd_kafka_cgrp_s *rk_cgrp;
-
- rd_kafka_conf_t rk_conf;
- rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */
- char rk_name[128];
- rd_kafkap_str_t *rk_client_id;
- rd_kafkap_str_t *rk_group_id; /* Consumer group id */
-
- rd_atomic32_t rk_terminate; /**< Set to RD_KAFKA_DESTROY_F_..
- * flags instance
- * is being destroyed.
- * The value set is the
- * destroy flags from
- * rd_kafka_destroy*() and
- * the two internal flags shown
- * below.
- *
- * Order:
- * 1. user_flags | .._F_DESTROY_CALLED
- * is set in rd_kafka_destroy*().
- * 2. consumer_close() is called
- * for consumers.
- * 3. .._F_TERMINATE is set to
- * signal all background threads
- * to terminate.
- */
-
-#define RD_KAFKA_DESTROY_F_TERMINATE \
- 0x1 /**< Internal flag to make sure \
- * rk_terminate is set to non-zero \
- * value even if user passed \
- * no destroy flags. */
-#define RD_KAFKA_DESTROY_F_DESTROY_CALLED \
- 0x2 /**< Application has called \
- * ..destroy*() and we've \
- * begun the termination \
- * process. \
- * This flag is needed to avoid \
- * rk_terminate from being \
- * 0 when destroy_flags() \
- * is called with flags=0 \
- * and prior to _F_TERMINATE \
- * has been set. */
-#define RD_KAFKA_DESTROY_F_IMMEDIATE \
- 0x4 /**< Immediate non-blocking \
- * destruction without waiting \
- * for all resources \
- * to be cleaned up. \
- * WARNING: Memory and resource \
- * leaks possible. \
- * This flag automatically sets \
- * .._NO_CONSUMER_CLOSE. */
-
-
- rwlock_t rk_lock;
- rd_kafka_type_t rk_type;
- struct timeval rk_tv_state_change;
-
- rd_atomic64_t rk_ts_last_poll; /**< Timestamp of last application
- * consumer_poll() call
- * (or equivalent).
- * Used to enforce
- * max.poll.interval.ms.
- * Only relevant for consumer. */
- /* First fatal error. */
- struct {
- rd_atomic32_t err; /**< rd_kafka_resp_err_t */
- char *errstr; /**< Protected by rk_lock */
- int cnt; /**< Number of errors raised, only
- * the first one is stored. */
- } rk_fatal;
-
- rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value
- * from broker. */
-
- /* Locks: rd_kafka_*lock() */
- rd_ts_t rk_ts_metadata; /* Timestamp of most recent
- * metadata. */
-
- struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
- rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */
- struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */
-
- char *rk_clusterid; /* ClusterId from metadata */
- int32_t rk_controllerid; /* ControllerId from metadata */
-
- /**< Producer: Delivery report mode */
- enum { RD_KAFKA_DR_MODE_NONE, /**< No delivery reports */
- RD_KAFKA_DR_MODE_CB, /**< Delivery reports through callback */
- RD_KAFKA_DR_MODE_EVENT, /**< Delivery reports through event API*/
- } rk_drmode;
-
- /* Simple consumer count:
- * >0: Running in legacy / Simple Consumer mode,
- * 0: No consumers running
- * <0: Running in High level consumer mode */
- rd_atomic32_t rk_simple_cnt;
-
- /**
- * Exactly Once Semantics and Idempotent Producer
- *
- * @locks rk_lock
- */
- struct {
- /*
- * Idempotence
- */
- rd_kafka_idemp_state_t idemp_state; /**< Idempotent Producer
- * state */
- rd_ts_t ts_idemp_state; /**< Last state change */
- rd_kafka_pid_t pid; /**< Current Producer ID and Epoch */
- int epoch_cnt; /**< Number of times pid/epoch changed */
- rd_atomic32_t inflight_toppar_cnt; /**< Current number of
- * toppars with inflight
- * requests. */
- rd_kafka_timer_t pid_tmr; /**< PID FSM timer */
-
- /*
- * Transactions
- *
- * All field access is from the rdkafka main thread,
- * unless a specific lock is mentioned in the doc string.
- *
- */
- rd_atomic32_t txn_may_enq; /**< Transaction state allows
- * application to enqueue
- * (produce) messages. */
-
- rd_kafkap_str_t *transactional_id; /**< transactional.id */
- rd_kafka_txn_state_t txn_state; /**< Transactional state.
- * @locks rk_lock */
- rd_ts_t ts_txn_state; /**< Last state change.
- * @locks rk_lock */
- rd_kafka_broker_t *txn_coord; /**< Transaction coordinator,
- * this is a logical broker.*/
- rd_kafka_broker_t *txn_curr_coord; /**< Current actual coord
- * broker.
- * This is only used to
- * check if the coord
- * changes. */
- rd_kafka_broker_monitor_t txn_coord_mon; /**< Monitor for
- * coordinator to
- * take action when
- * the broker state
- * changes. */
- rd_bool_t txn_requires_epoch_bump; /**< Coordinator epoch bump
- * required to recover from
- * idempotent producer
- * fatal error. */
-
- /**< Blocking transactional API application call
- * currently being handled, its state, reply queue and how
- * to handle timeout.
- * Only one transactional API call is allowed at any time.
- * Protected by the rk_lock. */
- struct {
- char name[64]; /**< API name, e.g.,
- * send_offsets_to_transaction.
- * This is used to make sure
- * conflicting APIs are not
- * called simultaneously. */
- rd_bool_t calling; /**< API is being actively called.
- * I.e., application is blocking
- * on a txn API call.
- * This is used to make sure
- * no concurrent API calls are
- * being made. */
- rd_kafka_error_t *error; /**< Last error from background
- * processing. This is only
- * set if the application's
- * API call timed out.
- * It will be returned on
- * the next call. */
- rd_bool_t has_result; /**< Indicates whether an API
- * result (possibly
- * intermediate) has been set.
- */
- cnd_t cnd; /**< Application thread will
- * block on this cnd waiting
- * for a result to be set. */
- mtx_t lock; /**< Protects all fields of
- * txn_curr_api. */
- } txn_curr_api;
-
-
- int txn_req_cnt; /**< Number of transaction
- * requests sent.
- * This is incremented when a
- * AddPartitionsToTxn or
- * AddOffsetsToTxn request
- * has been sent for the
- * current transaction,
- * to keep track of
- * whether the broker is
- * aware of the current
- * transaction and thus
- * requires an EndTxn request
- * on abort or not. */
-
- /**< Timer to trigger registration of pending partitions */
- rd_kafka_timer_t txn_register_parts_tmr;
-
- /**< Lock for txn_pending_rktps and txn_waitresp_rktps */
- mtx_t txn_pending_lock;
-
- /**< Partitions pending being added to transaction. */
- rd_kafka_toppar_tqhead_t txn_pending_rktps;
-
- /**< Partitions in-flight added to transaction. */
- rd_kafka_toppar_tqhead_t txn_waitresp_rktps;
-
- /**< Partitions added and registered to transaction. */
- rd_kafka_toppar_tqhead_t txn_rktps;
-
- /**< Number of messages that failed delivery.
- * If this number is >0 on transaction_commit then an
- * abortable transaction error will be raised.
- * Is reset to zero on each begin_transaction(). */
- rd_atomic64_t txn_dr_fails;
-
- /**< Current transaction error. */
- rd_kafka_resp_err_t txn_err;
-
- /**< Current transaction error string, if any. */
- char *txn_errstr;
-
- /**< Last InitProducerIdRequest error. */
- rd_kafka_resp_err_t txn_init_err;
-
- /**< Waiting for transaction coordinator query response */
- rd_bool_t txn_wait_coord;
-
- /**< Transaction coordinator query timer */
- rd_kafka_timer_t txn_coord_tmr;
- } rk_eos;
-
- rd_atomic32_t rk_flushing; /**< Application is calling flush(). */
-
- /**
- * Consumer state
- *
- * @locality rdkafka main thread
- * @locks_required none
- */
- struct {
- /** Application consumer queue for messages, events and errors.
- * (typically points to rkcg_q) */
- rd_kafka_q_t *q;
- /** Current assigned partitions through assign() et.al. */
- rd_kafka_assignment_t assignment;
- /** Waiting for this number of commits to finish. */
- int wait_commit_cnt;
- } rk_consumer;
-
- /**<
- * Coordinator cache.
- *
- * @locks none
- * @locality rdkafka main thread
- */
- rd_kafka_coord_cache_t rk_coord_cache; /**< Coordinator cache */
-
- TAILQ_HEAD(, rd_kafka_coord_req_s)
- rk_coord_reqs; /**< Coordinator
- * requests */
-
-
- struct {
- mtx_t lock; /* Protects acces to this struct */
- cnd_t cnd; /* For waking up blocking injectors */
- unsigned int cnt; /* Current message count */
- size_t size; /* Current message size sum */
- unsigned int max_cnt; /* Max limit */
- size_t max_size; /* Max limit */
- } rk_curr_msgs;
-
- rd_kafka_timers_t rk_timers;
- thrd_t rk_thread;
-
- int rk_initialized; /**< Will be > 0 when the rd_kafka_t
- * instance has been fully initialized. */
-
- int rk_init_wait_cnt; /**< Number of background threads that
- * need to finish initialization. */
- cnd_t rk_init_cnd; /**< Cond-var used to wait for main thread
- * to finish its initialization before
- * before rd_kafka_new() returns. */
- mtx_t rk_init_lock; /**< Lock for rk_init_wait and _cmd */
-
- rd_ts_t rk_ts_created; /**< Timestamp (monotonic clock) of
- * rd_kafka_t creation. */
-
- /**
- * Background thread and queue,
- * enabled by setting `background_event_cb()`.
- */
- struct {
- rd_kafka_q_t *q; /**< Queue served by background thread. */
- thrd_t thread; /**< Background thread. */
- int calling; /**< Indicates whether the event callback
- * is being called, reset back to 0
- * when the callback returns.
- * This can be used for troubleshooting
- * purposes. */
- } rk_background;
-
-
- /*
- * Logs, events or actions to rate limit / suppress
- */
- struct {
- /**< Log: No brokers support Idempotent Producer */
- rd_interval_t no_idemp_brokers;
-
- /**< Sparse connections: randomly select broker
- * to bring up. This interval should allow
- * for a previous connection to be established,
- * which varies between different environments:
- * Use 10 < reconnect.backoff.jitter.ms / 2 < 1000.
- */
- rd_interval_t sparse_connect_random;
- /**< Lock for sparse_connect_random */
- mtx_t sparse_connect_lock;
-
- /**< Broker metadata refresh interval:
- * this is rate-limiting the number of topic-less
- * broker/cluster metadata refreshes when there are no
- * topics to refresh.
- * Will be refreshed every topic.metadata.refresh.interval.ms
- * but no more often than every 10s.
- * No locks: only accessed by rdkafka main thread. */
- rd_interval_t broker_metadata_refresh;
-
- /**< Suppression for allow.auto.create.topics=false not being
- * supported by the broker. */
- rd_interval_t allow_auto_create_topics;
- } rk_suppress;
-
- struct {
- void *handle; /**< Provider-specific handle struct pointer.
- * Typically assigned in provider's .init() */
- rd_kafka_q_t *callback_q; /**< SASL callback queue, if any. */
- } rk_sasl;
-
- /* Test mocks */
- struct {
- rd_kafka_mock_cluster_t *cluster; /**< Mock cluster, created
- * by test.mock.num.brokers
- */
- rd_atomic32_t cluster_cnt; /**< Total number of mock
- * clusters, created either
- * through
- * test.mock.num.brokers
- * or mock_cluster_new().
- */
-
- } rk_mock;
-};
-
-#define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock)
-#define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock)
-#define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock)
-#define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock)
-
-
-/**
- * @brief Add \p cnt messages and of total size \p size bytes to the
- * internal bookkeeping of current message counts.
- * If the total message count or size after add would exceed the
- * configured limits \c queue.buffering.max.messages and
- * \c queue.buffering.max.kbytes then depending on the value of
- * \p block the function either blocks until enough space is available
- * if \p block is 1, else immediately returns
- * RD_KAFKA_RESP_ERR__QUEUE_FULL.
- *
- * @param rdmtx If non-null and \p block is set and blocking is to ensue,
- * then unlock this mutex for the duration of the blocking
- * and then reacquire with a read-lock.
- */
-static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
-rd_kafka_curr_msgs_add(rd_kafka_t *rk,
- unsigned int cnt,
- size_t size,
- int block,
- rwlock_t *rdlock) {
-
- if (rk->rk_type != RD_KAFKA_PRODUCER)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- mtx_lock(&rk->rk_curr_msgs.lock);
- while (
- unlikely((rk->rk_curr_msgs.max_cnt > 0 &&
- rk->rk_curr_msgs.cnt + cnt > rk->rk_curr_msgs.max_cnt) ||
- (unsigned long long)(rk->rk_curr_msgs.size + size) >
- (unsigned long long)rk->rk_curr_msgs.max_size)) {
- if (!block) {
- mtx_unlock(&rk->rk_curr_msgs.lock);
- return RD_KAFKA_RESP_ERR__QUEUE_FULL;
- }
-
- if (rdlock)
- rwlock_rdunlock(rdlock);
-
- cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
-
- if (rdlock)
- rwlock_rdlock(rdlock);
- }
-
- rk->rk_curr_msgs.cnt += cnt;
- rk->rk_curr_msgs.size += size;
- mtx_unlock(&rk->rk_curr_msgs.lock);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Subtract \p cnt messages of total size \p size from the
- * current bookkeeping and broadcast a wakeup on the condvar
- * for any waiting & blocking threads.
- */
-static RD_INLINE RD_UNUSED void
-rd_kafka_curr_msgs_sub(rd_kafka_t *rk, unsigned int cnt, size_t size) {
- int broadcast = 0;
-
- if (rk->rk_type != RD_KAFKA_PRODUCER)
- return;
-
- mtx_lock(&rk->rk_curr_msgs.lock);
- rd_kafka_assert(NULL, rk->rk_curr_msgs.cnt >= cnt &&
- rk->rk_curr_msgs.size >= size);
-
- /* If the subtraction would pass one of the thresholds
- * broadcast a wake-up to any waiting listeners. */
- if ((rk->rk_curr_msgs.cnt - cnt == 0) ||
- (rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
- rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
- (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
- rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
- broadcast = 1;
-
- rk->rk_curr_msgs.cnt -= cnt;
- rk->rk_curr_msgs.size -= size;
-
- if (unlikely(broadcast))
- cnd_broadcast(&rk->rk_curr_msgs.cnd);
-
- mtx_unlock(&rk->rk_curr_msgs.lock);
-}
-
-static RD_INLINE RD_UNUSED void
-rd_kafka_curr_msgs_get(rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
- if (rk->rk_type != RD_KAFKA_PRODUCER) {
- *cntp = 0;
- *sizep = 0;
- return;
- }
-
- mtx_lock(&rk->rk_curr_msgs.lock);
- *cntp = rk->rk_curr_msgs.cnt;
- *sizep = rk->rk_curr_msgs.size;
- mtx_unlock(&rk->rk_curr_msgs.lock);
-}
-
-static RD_INLINE RD_UNUSED int rd_kafka_curr_msgs_cnt(rd_kafka_t *rk) {
- int cnt;
- if (rk->rk_type != RD_KAFKA_PRODUCER)
- return 0;
-
- mtx_lock(&rk->rk_curr_msgs.lock);
- cnt = rk->rk_curr_msgs.cnt;
- mtx_unlock(&rk->rk_curr_msgs.lock);
-
- return cnt;
-}
-
-/**
- * @brief Wait until \p tspec for curr_msgs to reach 0.
- *
- * @returns rd_true if zero is reached, or rd_false on timeout.
- * The remaining messages are returned in \p *curr_msgsp
- */
-static RD_INLINE RD_UNUSED rd_bool_t
-rd_kafka_curr_msgs_wait_zero(rd_kafka_t *rk,
- int timeout_ms,
- unsigned int *curr_msgsp) {
- unsigned int cnt;
- struct timespec tspec;
-
- rd_timeout_init_timespec(&tspec, timeout_ms);
-
- mtx_lock(&rk->rk_curr_msgs.lock);
- while ((cnt = rk->rk_curr_msgs.cnt) > 0) {
- if (cnd_timedwait_abs(&rk->rk_curr_msgs.cnd,
- &rk->rk_curr_msgs.lock,
- &tspec) == thrd_timedout)
- break;
- }
- mtx_unlock(&rk->rk_curr_msgs.lock);
-
- *curr_msgsp = cnt;
- return cnt == 0;
-}
-
-void rd_kafka_destroy_final(rd_kafka_t *rk);
-
-void rd_kafka_global_init(void);
-
-/**
- * @returns true if \p rk handle is terminating.
- *
- * @remark If consumer_close() is called from destroy*() it will be
- * called prior to _F_TERMINATE being set and will thus not
- * be able to use rd_kafka_terminating() to know it is shutting down.
- * That code should instead just check that rk_terminate is non-zero
- * (the _F_DESTROY_CALLED flag will be set).
- */
-#define rd_kafka_terminating(rk) \
- (rd_atomic32_get(&(rk)->rk_terminate) & RD_KAFKA_DESTROY_F_TERMINATE)
-
-/**
- * @returns the destroy flags set matching \p flags, which might be
- * a subset of the flags.
- */
-#define rd_kafka_destroy_flags_check(rk, flags) \
- (rd_atomic32_get(&(rk)->rk_terminate) & (flags))
-
-/**
- * @returns true if no consumer callbacks, or standard consumer_close
- * behaviour, should be triggered. */
-#define rd_kafka_destroy_flags_no_consumer_close(rk) \
- rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE)
-
-#define rd_kafka_is_simple_consumer(rk) \
- (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
-int rd_kafka_simple_consumer_add(rd_kafka_t *rk);
-
-
-/**
- * @returns true if idempotency is enabled (producer only).
- */
-#define rd_kafka_is_idempotent(rk) ((rk)->rk_conf.eos.idempotence)
-
-/**
- * @returns true if the producer is transactional (producer only).
- */
-#define rd_kafka_is_transactional(rk) \
- ((rk)->rk_conf.eos.transactional_id != NULL)
-
-
-#define RD_KAFKA_PURGE_F_ABORT_TXN \
- 0x100 /**< Internal flag used when \
- * aborting transaction */
-#define RD_KAFKA_PURGE_F_MASK 0x107
-const char *rd_kafka_purge_flags2str(int flags);
-
-
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-
-
-
-/**
- * Debug contexts
- */
-#define RD_KAFKA_DBG_GENERIC 0x1
-#define RD_KAFKA_DBG_BROKER 0x2
-#define RD_KAFKA_DBG_TOPIC 0x4
-#define RD_KAFKA_DBG_METADATA 0x8
-#define RD_KAFKA_DBG_FEATURE 0x10
-#define RD_KAFKA_DBG_QUEUE 0x20
-#define RD_KAFKA_DBG_MSG 0x40
-#define RD_KAFKA_DBG_PROTOCOL 0x80
-#define RD_KAFKA_DBG_CGRP 0x100
-#define RD_KAFKA_DBG_SECURITY 0x200
-#define RD_KAFKA_DBG_FETCH 0x400
-#define RD_KAFKA_DBG_INTERCEPTOR 0x800
-#define RD_KAFKA_DBG_PLUGIN 0x1000
-#define RD_KAFKA_DBG_CONSUMER 0x2000
-#define RD_KAFKA_DBG_ADMIN 0x4000
-#define RD_KAFKA_DBG_EOS 0x8000
-#define RD_KAFKA_DBG_MOCK 0x10000
-#define RD_KAFKA_DBG_ASSIGNOR 0x20000
-#define RD_KAFKA_DBG_CONF 0x40000
-#define RD_KAFKA_DBG_ALL 0xfffff
-#define RD_KAFKA_DBG_NONE 0x0
-
-
-void rd_kafka_log0(const rd_kafka_conf_t *conf,
- const rd_kafka_t *rk,
- const char *extra,
- int level,
- int ctx,
- const char *fac,
- const char *fmt,
- ...) RD_FORMAT(printf, 7, 8);
-
-#define rd_kafka_log(rk, level, fac, ...) \
- rd_kafka_log0(&rk->rk_conf, rk, NULL, level, RD_KAFKA_DBG_NONE, fac, \
- __VA_ARGS__)
-
-#define rd_kafka_dbg(rk, ctx, fac, ...) \
- do { \
- if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_##ctx))) \
- rd_kafka_log0(&rk->rk_conf, rk, NULL, LOG_DEBUG, \
- (RD_KAFKA_DBG_##ctx), fac, __VA_ARGS__); \
- } while (0)
-
-/* dbg() not requiring an rk, just the conf object, for early logging */
-#define rd_kafka_dbg0(conf, ctx, fac, ...) \
- do { \
- if (unlikely((conf)->debug & (RD_KAFKA_DBG_##ctx))) \
- rd_kafka_log0(conf, NULL, NULL, LOG_DEBUG, \
- (RD_KAFKA_DBG_##ctx), fac, __VA_ARGS__); \
- } while (0)
-
-/* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
- * when logging another broker's name in the message. */
-#define rd_rkb_log0(rkb, level, ctx, fac, ...) \
- do { \
- char _logname[RD_KAFKA_NODENAME_SIZE]; \
- mtx_lock(&(rkb)->rkb_logname_lock); \
- rd_strlcpy(_logname, rkb->rkb_logname, sizeof(_logname)); \
- mtx_unlock(&(rkb)->rkb_logname_lock); \
- rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, (rkb)->rkb_rk, \
- _logname, level, ctx, fac, __VA_ARGS__); \
- } while (0)
-
-#define rd_rkb_log(rkb, level, fac, ...) \
- rd_rkb_log0(rkb, level, RD_KAFKA_DBG_NONE, fac, __VA_ARGS__)
-
-#define rd_rkb_dbg(rkb, ctx, fac, ...) \
- do { \
- if (unlikely((rkb)->rkb_rk->rk_conf.debug & \
- (RD_KAFKA_DBG_##ctx))) { \
- rd_rkb_log0(rkb, LOG_DEBUG, (RD_KAFKA_DBG_##ctx), fac, \
- __VA_ARGS__); \
- } \
- } while (0)
-
-
-
-extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
-
-static RD_UNUSED RD_INLINE rd_kafka_resp_err_t
-rd_kafka_set_last_error(rd_kafka_resp_err_t err, int errnox) {
- if (errnox) {
- /* MSVC:
- * This is the correct way to set errno on Windows,
- * but it is still pointless due to different errnos in
- * in different runtimes:
- * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
- * errno is thus highly deprecated, and buggy, on Windows
- * when using librdkafka as a dynamically loaded DLL. */
- rd_set_errno(errnox);
- }
- rd_kafka_last_error_code = err;
- return err;
-}
-
-
-int rd_kafka_set_fatal_error0(rd_kafka_t *rk,
- rd_dolock_t do_lock,
- rd_kafka_resp_err_t err,
- const char *fmt,
- ...) RD_FORMAT(printf, 4, 5);
-#define rd_kafka_set_fatal_error(rk, err, fmt, ...) \
- rd_kafka_set_fatal_error0(rk, RD_DO_LOCK, err, fmt, __VA_ARGS__)
-
-rd_kafka_error_t *rd_kafka_get_fatal_error(rd_kafka_t *rk);
-
-static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
-rd_kafka_fatal_error_code(rd_kafka_t *rk) {
- /* This is an optimization to avoid an atomic read which are costly
- * on some platforms:
- * Fatal errors are currently only raised by the idempotent producer
- * and static consumers (group.instance.id). */
- if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) ||
- (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_conf.group_instance_id))
- return rd_atomic32_get(&rk->rk_fatal.err);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-extern rd_atomic32_t rd_kafka_thread_cnt_curr;
-extern char RD_TLS rd_kafka_thread_name[64];
-
-void rd_kafka_set_thread_name(const char *fmt, ...) RD_FORMAT(printf, 1, 2);
-void rd_kafka_set_thread_sysname(const char *fmt, ...) RD_FORMAT(printf, 1, 2);
-
-int rd_kafka_path_is_dir(const char *path);
-rd_bool_t rd_kafka_dir_is_empty(const char *path);
-
-rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
- rd_kafka_q_t *rkq,
- rd_kafka_op_t *rko,
- rd_kafka_q_cb_type_t cb_type,
- void *opaque);
-
-rd_kafka_resp_err_t rd_kafka_subscribe_rkt(rd_kafka_topic_t *rkt);
-
-
-/**
- * @returns the number of milliseconds the maximum poll interval
- * was exceeded, or 0 if not exceeded.
- *
- * @remark Only relevant for high-level consumer.
- *
- * @locality any
- * @locks none
- */
-static RD_INLINE RD_UNUSED int rd_kafka_max_poll_exceeded(rd_kafka_t *rk) {
- rd_ts_t last_poll;
- int exceeded;
-
- if (rk->rk_type != RD_KAFKA_CONSUMER)
- return 0;
-
- last_poll = rd_atomic64_get(&rk->rk_ts_last_poll);
-
- /* Application is blocked in librdkafka function, see
- * rd_kafka_app_poll_blocking(). */
- if (last_poll == INT64_MAX)
- return 0;
-
- exceeded = (int)((rd_clock() - last_poll) / 1000ll) -
- rk->rk_conf.max_poll_interval_ms;
-
- if (unlikely(exceeded > 0))
- return exceeded;
-
- return 0;
-}
-
-/**
- * @brief Call on entry to blocking polling function to indicate
- * that the application is blocked waiting for librdkafka
- * and that max.poll.interval.ms should not be enforced.
- *
- * Call app_polled() Upon return from the function calling
- * this function to register the application's last time of poll.
- *
- * @remark Only relevant for high-level consumer.
- *
- * @locality any
- * @locks none
- */
-static RD_INLINE RD_UNUSED void rd_kafka_app_poll_blocking(rd_kafka_t *rk) {
- if (rk->rk_type == RD_KAFKA_CONSUMER)
- rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX);
-}
-
-/**
- * @brief Set the last application poll time to now.
- *
- * @remark Only relevant for high-level consumer.
- *
- * @locality any
- * @locks none
- */
-static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) {
- if (rk->rk_type == RD_KAFKA_CONSUMER)
- rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock());
-}
-
-
-
-void rd_kafka_term_sig_handler(int sig);
-
-/**
- * rdkafka_background.c
- */
-int rd_kafka_background_thread_main(void *arg);
-rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk,
- char *errstr,
- size_t errstr_size);
-
-
-#endif /* _RDKAFKA_INT_H_ */