diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h | 1054 |
1 files changed, 0 insertions, 1054 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h deleted file mode 100644 index 584ff3c96..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_int.h +++ /dev/null @@ -1,1054 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012-2013, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _RDKAFKA_INT_H_ -#define _RDKAFKA_INT_H_ - -#ifndef _WIN32 -#define _GNU_SOURCE /* for strndup() */ -#endif - -#ifdef _MSC_VER -typedef int mode_t; -#endif - -#include <fcntl.h> - - -#include "rdsysqueue.h" - -#include "rdkafka.h" -#include "rd.h" -#include "rdlog.h" -#include "rdtime.h" -#include "rdaddr.h" -#include "rdinterval.h" -#include "rdavg.h" -#include "rdlist.h" - -#if WITH_SSL -#include <openssl/ssl.h> -#endif - - - -#define rd_kafka_assert(rk, cond) \ - do { \ - if (unlikely(!(cond))) \ - rd_kafka_crash(__FILE__, __LINE__, __FUNCTION__, (rk), \ - "assert: " #cond); \ - } while (0) - - -void RD_NORETURN rd_kafka_crash(const char *file, - int line, - const char *function, - rd_kafka_t *rk, - const char *reason); - - -/* Forward declarations */ -struct rd_kafka_s; -struct rd_kafka_topic_s; -struct rd_kafka_msg_s; -struct rd_kafka_broker_s; -struct rd_kafka_toppar_s; - -typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t; - - -/** - * Protocol level sanity - */ -#define RD_KAFKAP_BROKERS_MAX 10000 -#define RD_KAFKAP_TOPICS_MAX 1000000 -#define RD_KAFKAP_PARTITIONS_MAX 100000 - - -#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) - - -/** - * @struct Represents a fetch position: - * an offset and an partition leader epoch (if known, else -1). - */ -typedef struct rd_kafka_fetch_pos_s { - int64_t offset; - int32_t leader_epoch; - rd_bool_t validated; -} rd_kafka_fetch_pos_t; - - - -#include "rdkafka_op.h" -#include "rdkafka_queue.h" -#include "rdkafka_msg.h" -#include "rdkafka_proto.h" -#include "rdkafka_buf.h" -#include "rdkafka_pattern.h" -#include "rdkafka_conf.h" -#include "rdkafka_transport.h" -#include "rdkafka_timer.h" -#include "rdkafka_assignor.h" -#include "rdkafka_metadata.h" -#include "rdkafka_mock.h" -#include "rdkafka_partition.h" -#include "rdkafka_assignment.h" -#include "rdkafka_coord.h" -#include "rdkafka_mock.h" - -/** - * Protocol level sanity - */ -#define RD_KAFKAP_BROKERS_MAX 10000 -#define RD_KAFKAP_TOPICS_MAX 1000000 -#define RD_KAFKAP_PARTITIONS_MAX 100000 -#define RD_KAFKAP_GROUPS_MAX 100000 - - -#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) - - - -/** - * @enum Idempotent Producer state - */ -typedef enum { - RD_KAFKA_IDEMP_STATE_INIT, /**< Initial state */ - RD_KAFKA_IDEMP_STATE_TERM, /**< Instance is terminating */ - RD_KAFKA_IDEMP_STATE_FATAL_ERROR, /**< A fatal error has been raised */ - RD_KAFKA_IDEMP_STATE_REQ_PID, /**< Request new PID */ - RD_KAFKA_IDEMP_STATE_WAIT_TRANSPORT, /**< Waiting for coordinator to - * become available. */ - RD_KAFKA_IDEMP_STATE_WAIT_PID, /**< PID requested, waiting for reply */ - RD_KAFKA_IDEMP_STATE_ASSIGNED, /**< New PID assigned */ - RD_KAFKA_IDEMP_STATE_DRAIN_RESET, /**< Wait for outstanding - * ProduceRequests to finish - * before resetting and - * re-requesting a new PID. */ - RD_KAFKA_IDEMP_STATE_DRAIN_BUMP, /**< Wait for outstanding - * ProduceRequests to finish - * before bumping the current - * epoch. */ - RD_KAFKA_IDEMP_STATE_WAIT_TXN_ABORT, /**< Wait for transaction abort - * to finish and trigger a - * drain and reset or bump. */ -} rd_kafka_idemp_state_t; - -/** - * @returns the idemp_state_t string representation - */ -static RD_UNUSED const char * -rd_kafka_idemp_state2str(rd_kafka_idemp_state_t state) { - static const char *names[] = { - "Init", "Terminate", "FatalError", "RequestPID", "WaitTransport", - "WaitPID", "Assigned", "DrainReset", "DrainBump", "WaitTxnAbort"}; - return names[state]; -} - - - -/** - * @enum Transactional Producer state - */ -typedef enum { - /**< Initial state */ - RD_KAFKA_TXN_STATE_INIT, - /**< Awaiting PID to be acquired by rdkafka_idempotence.c */ - RD_KAFKA_TXN_STATE_WAIT_PID, - /**< PID acquired, but application has not made a successful - * init_transactions() call. */ - RD_KAFKA_TXN_STATE_READY_NOT_ACKED, - /**< PID acquired, no active transaction. */ - RD_KAFKA_TXN_STATE_READY, - /**< begin_transaction() has been called. */ - RD_KAFKA_TXN_STATE_IN_TRANSACTION, - /**< commit_transaction() has been called. */ - RD_KAFKA_TXN_STATE_BEGIN_COMMIT, - /**< commit_transaction() has been called and all outstanding - * messages, partitions, and offsets have been sent. */ - RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION, - /**< Transaction successfully committed but application has not made - * a successful commit_transaction() call yet. */ - RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED, - /**< begin_transaction() has been called. */ - RD_KAFKA_TXN_STATE_BEGIN_ABORT, - /**< abort_transaction() has been called. */ - RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION, - /**< Transaction successfully aborted but application has not made - * a successful abort_transaction() call yet. */ - RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED, - /**< An abortable error has occurred. */ - RD_KAFKA_TXN_STATE_ABORTABLE_ERROR, - /* A fatal error has occured. */ - RD_KAFKA_TXN_STATE_FATAL_ERROR -} rd_kafka_txn_state_t; - - -/** - * @returns the txn_state_t string representation - */ -static RD_UNUSED const char * -rd_kafka_txn_state2str(rd_kafka_txn_state_t state) { - static const char *names[] = {"Init", - "WaitPID", - "ReadyNotAcked", - "Ready", - "InTransaction", - "BeginCommit", - "CommittingTransaction", - "CommitNotAcked", - "BeginAbort", - "AbortingTransaction", - "AbortedNotAcked", - "AbortableError", - "FatalError"}; - return names[state]; -} - - - -/** - * Kafka handle, internal representation of the application's rd_kafka_t. - */ - -struct rd_kafka_s { - rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */ - rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */ - - TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers; - rd_list_t rk_broker_by_id; /* Fast id lookups. */ - rd_atomic32_t rk_broker_cnt; - /**< Number of brokers in state >= UP */ - rd_atomic32_t rk_broker_up_cnt; - /**< Number of logical brokers in state >= UP, this is a sub-set - * of rk_broker_up_cnt. */ - rd_atomic32_t rk_logical_broker_up_cnt; - /**< Number of brokers that are down, only includes brokers - * that have had at least one connection attempt. */ - rd_atomic32_t rk_broker_down_cnt; - /**< Logical brokers currently without an address. - * Used for calculating ERR__ALL_BROKERS_DOWN. */ - rd_atomic32_t rk_broker_addrless_cnt; - - mtx_t rk_internal_rkb_lock; - rd_kafka_broker_t *rk_internal_rkb; - - /* Broadcasting of broker state changes to wake up - * functions waiting for a state change. */ - cnd_t rk_broker_state_change_cnd; - mtx_t rk_broker_state_change_lock; - int rk_broker_state_change_version; - /* List of (rd_kafka_enq_once_t*) objects waiting for broker - * state changes. Protected by rk_broker_state_change_lock. */ - rd_list_t rk_broker_state_change_waiters; /**< (rd_kafka_enq_once_t*) */ - - TAILQ_HEAD(, rd_kafka_topic_s) rk_topics; - int rk_topic_cnt; - - struct rd_kafka_cgrp_s *rk_cgrp; - - rd_kafka_conf_t rk_conf; - rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */ - char rk_name[128]; - rd_kafkap_str_t *rk_client_id; - rd_kafkap_str_t *rk_group_id; /* Consumer group id */ - - rd_atomic32_t rk_terminate; /**< Set to RD_KAFKA_DESTROY_F_.. - * flags instance - * is being destroyed. - * The value set is the - * destroy flags from - * rd_kafka_destroy*() and - * the two internal flags shown - * below. - * - * Order: - * 1. user_flags | .._F_DESTROY_CALLED - * is set in rd_kafka_destroy*(). - * 2. consumer_close() is called - * for consumers. - * 3. .._F_TERMINATE is set to - * signal all background threads - * to terminate. - */ - -#define RD_KAFKA_DESTROY_F_TERMINATE \ - 0x1 /**< Internal flag to make sure \ - * rk_terminate is set to non-zero \ - * value even if user passed \ - * no destroy flags. */ -#define RD_KAFKA_DESTROY_F_DESTROY_CALLED \ - 0x2 /**< Application has called \ - * ..destroy*() and we've \ - * begun the termination \ - * process. \ - * This flag is needed to avoid \ - * rk_terminate from being \ - * 0 when destroy_flags() \ - * is called with flags=0 \ - * and prior to _F_TERMINATE \ - * has been set. */ -#define RD_KAFKA_DESTROY_F_IMMEDIATE \ - 0x4 /**< Immediate non-blocking \ - * destruction without waiting \ - * for all resources \ - * to be cleaned up. \ - * WARNING: Memory and resource \ - * leaks possible. \ - * This flag automatically sets \ - * .._NO_CONSUMER_CLOSE. */ - - - rwlock_t rk_lock; - rd_kafka_type_t rk_type; - struct timeval rk_tv_state_change; - - rd_atomic64_t rk_ts_last_poll; /**< Timestamp of last application - * consumer_poll() call - * (or equivalent). - * Used to enforce - * max.poll.interval.ms. - * Only relevant for consumer. */ - /* First fatal error. */ - struct { - rd_atomic32_t err; /**< rd_kafka_resp_err_t */ - char *errstr; /**< Protected by rk_lock */ - int cnt; /**< Number of errors raised, only - * the first one is stored. */ - } rk_fatal; - - rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value - * from broker. */ - - /* Locks: rd_kafka_*lock() */ - rd_ts_t rk_ts_metadata; /* Timestamp of most recent - * metadata. */ - - struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */ - rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */ - struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */ - - char *rk_clusterid; /* ClusterId from metadata */ - int32_t rk_controllerid; /* ControllerId from metadata */ - - /**< Producer: Delivery report mode */ - enum { RD_KAFKA_DR_MODE_NONE, /**< No delivery reports */ - RD_KAFKA_DR_MODE_CB, /**< Delivery reports through callback */ - RD_KAFKA_DR_MODE_EVENT, /**< Delivery reports through event API*/ - } rk_drmode; - - /* Simple consumer count: - * >0: Running in legacy / Simple Consumer mode, - * 0: No consumers running - * <0: Running in High level consumer mode */ - rd_atomic32_t rk_simple_cnt; - - /** - * Exactly Once Semantics and Idempotent Producer - * - * @locks rk_lock - */ - struct { - /* - * Idempotence - */ - rd_kafka_idemp_state_t idemp_state; /**< Idempotent Producer - * state */ - rd_ts_t ts_idemp_state; /**< Last state change */ - rd_kafka_pid_t pid; /**< Current Producer ID and Epoch */ - int epoch_cnt; /**< Number of times pid/epoch changed */ - rd_atomic32_t inflight_toppar_cnt; /**< Current number of - * toppars with inflight - * requests. */ - rd_kafka_timer_t pid_tmr; /**< PID FSM timer */ - - /* - * Transactions - * - * All field access is from the rdkafka main thread, - * unless a specific lock is mentioned in the doc string. - * - */ - rd_atomic32_t txn_may_enq; /**< Transaction state allows - * application to enqueue - * (produce) messages. */ - - rd_kafkap_str_t *transactional_id; /**< transactional.id */ - rd_kafka_txn_state_t txn_state; /**< Transactional state. - * @locks rk_lock */ - rd_ts_t ts_txn_state; /**< Last state change. - * @locks rk_lock */ - rd_kafka_broker_t *txn_coord; /**< Transaction coordinator, - * this is a logical broker.*/ - rd_kafka_broker_t *txn_curr_coord; /**< Current actual coord - * broker. - * This is only used to - * check if the coord - * changes. */ - rd_kafka_broker_monitor_t txn_coord_mon; /**< Monitor for - * coordinator to - * take action when - * the broker state - * changes. */ - rd_bool_t txn_requires_epoch_bump; /**< Coordinator epoch bump - * required to recover from - * idempotent producer - * fatal error. */ - - /**< Blocking transactional API application call - * currently being handled, its state, reply queue and how - * to handle timeout. - * Only one transactional API call is allowed at any time. - * Protected by the rk_lock. */ - struct { - char name[64]; /**< API name, e.g., - * send_offsets_to_transaction. - * This is used to make sure - * conflicting APIs are not - * called simultaneously. */ - rd_bool_t calling; /**< API is being actively called. - * I.e., application is blocking - * on a txn API call. - * This is used to make sure - * no concurrent API calls are - * being made. */ - rd_kafka_error_t *error; /**< Last error from background - * processing. This is only - * set if the application's - * API call timed out. - * It will be returned on - * the next call. */ - rd_bool_t has_result; /**< Indicates whether an API - * result (possibly - * intermediate) has been set. - */ - cnd_t cnd; /**< Application thread will - * block on this cnd waiting - * for a result to be set. */ - mtx_t lock; /**< Protects all fields of - * txn_curr_api. */ - } txn_curr_api; - - - int txn_req_cnt; /**< Number of transaction - * requests sent. - * This is incremented when a - * AddPartitionsToTxn or - * AddOffsetsToTxn request - * has been sent for the - * current transaction, - * to keep track of - * whether the broker is - * aware of the current - * transaction and thus - * requires an EndTxn request - * on abort or not. */ - - /**< Timer to trigger registration of pending partitions */ - rd_kafka_timer_t txn_register_parts_tmr; - - /**< Lock for txn_pending_rktps and txn_waitresp_rktps */ - mtx_t txn_pending_lock; - - /**< Partitions pending being added to transaction. */ - rd_kafka_toppar_tqhead_t txn_pending_rktps; - - /**< Partitions in-flight added to transaction. */ - rd_kafka_toppar_tqhead_t txn_waitresp_rktps; - - /**< Partitions added and registered to transaction. */ - rd_kafka_toppar_tqhead_t txn_rktps; - - /**< Number of messages that failed delivery. - * If this number is >0 on transaction_commit then an - * abortable transaction error will be raised. - * Is reset to zero on each begin_transaction(). */ - rd_atomic64_t txn_dr_fails; - - /**< Current transaction error. */ - rd_kafka_resp_err_t txn_err; - - /**< Current transaction error string, if any. */ - char *txn_errstr; - - /**< Last InitProducerIdRequest error. */ - rd_kafka_resp_err_t txn_init_err; - - /**< Waiting for transaction coordinator query response */ - rd_bool_t txn_wait_coord; - - /**< Transaction coordinator query timer */ - rd_kafka_timer_t txn_coord_tmr; - } rk_eos; - - rd_atomic32_t rk_flushing; /**< Application is calling flush(). */ - - /** - * Consumer state - * - * @locality rdkafka main thread - * @locks_required none - */ - struct { - /** Application consumer queue for messages, events and errors. - * (typically points to rkcg_q) */ - rd_kafka_q_t *q; - /** Current assigned partitions through assign() et.al. */ - rd_kafka_assignment_t assignment; - /** Waiting for this number of commits to finish. */ - int wait_commit_cnt; - } rk_consumer; - - /**< - * Coordinator cache. - * - * @locks none - * @locality rdkafka main thread - */ - rd_kafka_coord_cache_t rk_coord_cache; /**< Coordinator cache */ - - TAILQ_HEAD(, rd_kafka_coord_req_s) - rk_coord_reqs; /**< Coordinator - * requests */ - - - struct { - mtx_t lock; /* Protects acces to this struct */ - cnd_t cnd; /* For waking up blocking injectors */ - unsigned int cnt; /* Current message count */ - size_t size; /* Current message size sum */ - unsigned int max_cnt; /* Max limit */ - size_t max_size; /* Max limit */ - } rk_curr_msgs; - - rd_kafka_timers_t rk_timers; - thrd_t rk_thread; - - int rk_initialized; /**< Will be > 0 when the rd_kafka_t - * instance has been fully initialized. */ - - int rk_init_wait_cnt; /**< Number of background threads that - * need to finish initialization. */ - cnd_t rk_init_cnd; /**< Cond-var used to wait for main thread - * to finish its initialization before - * before rd_kafka_new() returns. */ - mtx_t rk_init_lock; /**< Lock for rk_init_wait and _cmd */ - - rd_ts_t rk_ts_created; /**< Timestamp (monotonic clock) of - * rd_kafka_t creation. */ - - /** - * Background thread and queue, - * enabled by setting `background_event_cb()`. - */ - struct { - rd_kafka_q_t *q; /**< Queue served by background thread. */ - thrd_t thread; /**< Background thread. */ - int calling; /**< Indicates whether the event callback - * is being called, reset back to 0 - * when the callback returns. - * This can be used for troubleshooting - * purposes. */ - } rk_background; - - - /* - * Logs, events or actions to rate limit / suppress - */ - struct { - /**< Log: No brokers support Idempotent Producer */ - rd_interval_t no_idemp_brokers; - - /**< Sparse connections: randomly select broker - * to bring up. This interval should allow - * for a previous connection to be established, - * which varies between different environments: - * Use 10 < reconnect.backoff.jitter.ms / 2 < 1000. - */ - rd_interval_t sparse_connect_random; - /**< Lock for sparse_connect_random */ - mtx_t sparse_connect_lock; - - /**< Broker metadata refresh interval: - * this is rate-limiting the number of topic-less - * broker/cluster metadata refreshes when there are no - * topics to refresh. - * Will be refreshed every topic.metadata.refresh.interval.ms - * but no more often than every 10s. - * No locks: only accessed by rdkafka main thread. */ - rd_interval_t broker_metadata_refresh; - - /**< Suppression for allow.auto.create.topics=false not being - * supported by the broker. */ - rd_interval_t allow_auto_create_topics; - } rk_suppress; - - struct { - void *handle; /**< Provider-specific handle struct pointer. - * Typically assigned in provider's .init() */ - rd_kafka_q_t *callback_q; /**< SASL callback queue, if any. */ - } rk_sasl; - - /* Test mocks */ - struct { - rd_kafka_mock_cluster_t *cluster; /**< Mock cluster, created - * by test.mock.num.brokers - */ - rd_atomic32_t cluster_cnt; /**< Total number of mock - * clusters, created either - * through - * test.mock.num.brokers - * or mock_cluster_new(). - */ - - } rk_mock; -}; - -#define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock) -#define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock) -#define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock) -#define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock) - - -/** - * @brief Add \p cnt messages and of total size \p size bytes to the - * internal bookkeeping of current message counts. - * If the total message count or size after add would exceed the - * configured limits \c queue.buffering.max.messages and - * \c queue.buffering.max.kbytes then depending on the value of - * \p block the function either blocks until enough space is available - * if \p block is 1, else immediately returns - * RD_KAFKA_RESP_ERR__QUEUE_FULL. - * - * @param rdmtx If non-null and \p block is set and blocking is to ensue, - * then unlock this mutex for the duration of the blocking - * and then reacquire with a read-lock. - */ -static RD_INLINE RD_UNUSED rd_kafka_resp_err_t -rd_kafka_curr_msgs_add(rd_kafka_t *rk, - unsigned int cnt, - size_t size, - int block, - rwlock_t *rdlock) { - - if (rk->rk_type != RD_KAFKA_PRODUCER) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - mtx_lock(&rk->rk_curr_msgs.lock); - while ( - unlikely((rk->rk_curr_msgs.max_cnt > 0 && - rk->rk_curr_msgs.cnt + cnt > rk->rk_curr_msgs.max_cnt) || - (unsigned long long)(rk->rk_curr_msgs.size + size) > - (unsigned long long)rk->rk_curr_msgs.max_size)) { - if (!block) { - mtx_unlock(&rk->rk_curr_msgs.lock); - return RD_KAFKA_RESP_ERR__QUEUE_FULL; - } - - if (rdlock) - rwlock_rdunlock(rdlock); - - cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock); - - if (rdlock) - rwlock_rdlock(rdlock); - } - - rk->rk_curr_msgs.cnt += cnt; - rk->rk_curr_msgs.size += size; - mtx_unlock(&rk->rk_curr_msgs.lock); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Subtract \p cnt messages of total size \p size from the - * current bookkeeping and broadcast a wakeup on the condvar - * for any waiting & blocking threads. - */ -static RD_INLINE RD_UNUSED void -rd_kafka_curr_msgs_sub(rd_kafka_t *rk, unsigned int cnt, size_t size) { - int broadcast = 0; - - if (rk->rk_type != RD_KAFKA_PRODUCER) - return; - - mtx_lock(&rk->rk_curr_msgs.lock); - rd_kafka_assert(NULL, rk->rk_curr_msgs.cnt >= cnt && - rk->rk_curr_msgs.size >= size); - - /* If the subtraction would pass one of the thresholds - * broadcast a wake-up to any waiting listeners. */ - if ((rk->rk_curr_msgs.cnt - cnt == 0) || - (rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt && - rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) || - (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size && - rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size)) - broadcast = 1; - - rk->rk_curr_msgs.cnt -= cnt; - rk->rk_curr_msgs.size -= size; - - if (unlikely(broadcast)) - cnd_broadcast(&rk->rk_curr_msgs.cnd); - - mtx_unlock(&rk->rk_curr_msgs.lock); -} - -static RD_INLINE RD_UNUSED void -rd_kafka_curr_msgs_get(rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) { - if (rk->rk_type != RD_KAFKA_PRODUCER) { - *cntp = 0; - *sizep = 0; - return; - } - - mtx_lock(&rk->rk_curr_msgs.lock); - *cntp = rk->rk_curr_msgs.cnt; - *sizep = rk->rk_curr_msgs.size; - mtx_unlock(&rk->rk_curr_msgs.lock); -} - -static RD_INLINE RD_UNUSED int rd_kafka_curr_msgs_cnt(rd_kafka_t *rk) { - int cnt; - if (rk->rk_type != RD_KAFKA_PRODUCER) - return 0; - - mtx_lock(&rk->rk_curr_msgs.lock); - cnt = rk->rk_curr_msgs.cnt; - mtx_unlock(&rk->rk_curr_msgs.lock); - - return cnt; -} - -/** - * @brief Wait until \p tspec for curr_msgs to reach 0. - * - * @returns rd_true if zero is reached, or rd_false on timeout. - * The remaining messages are returned in \p *curr_msgsp - */ -static RD_INLINE RD_UNUSED rd_bool_t -rd_kafka_curr_msgs_wait_zero(rd_kafka_t *rk, - int timeout_ms, - unsigned int *curr_msgsp) { - unsigned int cnt; - struct timespec tspec; - - rd_timeout_init_timespec(&tspec, timeout_ms); - - mtx_lock(&rk->rk_curr_msgs.lock); - while ((cnt = rk->rk_curr_msgs.cnt) > 0) { - if (cnd_timedwait_abs(&rk->rk_curr_msgs.cnd, - &rk->rk_curr_msgs.lock, - &tspec) == thrd_timedout) - break; - } - mtx_unlock(&rk->rk_curr_msgs.lock); - - *curr_msgsp = cnt; - return cnt == 0; -} - -void rd_kafka_destroy_final(rd_kafka_t *rk); - -void rd_kafka_global_init(void); - -/** - * @returns true if \p rk handle is terminating. - * - * @remark If consumer_close() is called from destroy*() it will be - * called prior to _F_TERMINATE being set and will thus not - * be able to use rd_kafka_terminating() to know it is shutting down. - * That code should instead just check that rk_terminate is non-zero - * (the _F_DESTROY_CALLED flag will be set). - */ -#define rd_kafka_terminating(rk) \ - (rd_atomic32_get(&(rk)->rk_terminate) & RD_KAFKA_DESTROY_F_TERMINATE) - -/** - * @returns the destroy flags set matching \p flags, which might be - * a subset of the flags. - */ -#define rd_kafka_destroy_flags_check(rk, flags) \ - (rd_atomic32_get(&(rk)->rk_terminate) & (flags)) - -/** - * @returns true if no consumer callbacks, or standard consumer_close - * behaviour, should be triggered. */ -#define rd_kafka_destroy_flags_no_consumer_close(rk) \ - rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE) - -#define rd_kafka_is_simple_consumer(rk) \ - (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0) -int rd_kafka_simple_consumer_add(rd_kafka_t *rk); - - -/** - * @returns true if idempotency is enabled (producer only). - */ -#define rd_kafka_is_idempotent(rk) ((rk)->rk_conf.eos.idempotence) - -/** - * @returns true if the producer is transactional (producer only). - */ -#define rd_kafka_is_transactional(rk) \ - ((rk)->rk_conf.eos.transactional_id != NULL) - - -#define RD_KAFKA_PURGE_F_ABORT_TXN \ - 0x100 /**< Internal flag used when \ - * aborting transaction */ -#define RD_KAFKA_PURGE_F_MASK 0x107 -const char *rd_kafka_purge_flags2str(int flags); - - -#include "rdkafka_topic.h" -#include "rdkafka_partition.h" - - - -/** - * Debug contexts - */ -#define RD_KAFKA_DBG_GENERIC 0x1 -#define RD_KAFKA_DBG_BROKER 0x2 -#define RD_KAFKA_DBG_TOPIC 0x4 -#define RD_KAFKA_DBG_METADATA 0x8 -#define RD_KAFKA_DBG_FEATURE 0x10 -#define RD_KAFKA_DBG_QUEUE 0x20 -#define RD_KAFKA_DBG_MSG 0x40 -#define RD_KAFKA_DBG_PROTOCOL 0x80 -#define RD_KAFKA_DBG_CGRP 0x100 -#define RD_KAFKA_DBG_SECURITY 0x200 -#define RD_KAFKA_DBG_FETCH 0x400 -#define RD_KAFKA_DBG_INTERCEPTOR 0x800 -#define RD_KAFKA_DBG_PLUGIN 0x1000 -#define RD_KAFKA_DBG_CONSUMER 0x2000 -#define RD_KAFKA_DBG_ADMIN 0x4000 -#define RD_KAFKA_DBG_EOS 0x8000 -#define RD_KAFKA_DBG_MOCK 0x10000 -#define RD_KAFKA_DBG_ASSIGNOR 0x20000 -#define RD_KAFKA_DBG_CONF 0x40000 -#define RD_KAFKA_DBG_ALL 0xfffff -#define RD_KAFKA_DBG_NONE 0x0 - - -void rd_kafka_log0(const rd_kafka_conf_t *conf, - const rd_kafka_t *rk, - const char *extra, - int level, - int ctx, - const char *fac, - const char *fmt, - ...) RD_FORMAT(printf, 7, 8); - -#define rd_kafka_log(rk, level, fac, ...) \ - rd_kafka_log0(&rk->rk_conf, rk, NULL, level, RD_KAFKA_DBG_NONE, fac, \ - __VA_ARGS__) - -#define rd_kafka_dbg(rk, ctx, fac, ...) \ - do { \ - if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_##ctx))) \ - rd_kafka_log0(&rk->rk_conf, rk, NULL, LOG_DEBUG, \ - (RD_KAFKA_DBG_##ctx), fac, __VA_ARGS__); \ - } while (0) - -/* dbg() not requiring an rk, just the conf object, for early logging */ -#define rd_kafka_dbg0(conf, ctx, fac, ...) \ - do { \ - if (unlikely((conf)->debug & (RD_KAFKA_DBG_##ctx))) \ - rd_kafka_log0(conf, NULL, NULL, LOG_DEBUG, \ - (RD_KAFKA_DBG_##ctx), fac, __VA_ARGS__); \ - } while (0) - -/* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering - * when logging another broker's name in the message. */ -#define rd_rkb_log0(rkb, level, ctx, fac, ...) \ - do { \ - char _logname[RD_KAFKA_NODENAME_SIZE]; \ - mtx_lock(&(rkb)->rkb_logname_lock); \ - rd_strlcpy(_logname, rkb->rkb_logname, sizeof(_logname)); \ - mtx_unlock(&(rkb)->rkb_logname_lock); \ - rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, (rkb)->rkb_rk, \ - _logname, level, ctx, fac, __VA_ARGS__); \ - } while (0) - -#define rd_rkb_log(rkb, level, fac, ...) \ - rd_rkb_log0(rkb, level, RD_KAFKA_DBG_NONE, fac, __VA_ARGS__) - -#define rd_rkb_dbg(rkb, ctx, fac, ...) \ - do { \ - if (unlikely((rkb)->rkb_rk->rk_conf.debug & \ - (RD_KAFKA_DBG_##ctx))) { \ - rd_rkb_log0(rkb, LOG_DEBUG, (RD_KAFKA_DBG_##ctx), fac, \ - __VA_ARGS__); \ - } \ - } while (0) - - - -extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code; - -static RD_UNUSED RD_INLINE rd_kafka_resp_err_t -rd_kafka_set_last_error(rd_kafka_resp_err_t err, int errnox) { - if (errnox) { - /* MSVC: - * This is the correct way to set errno on Windows, - * but it is still pointless due to different errnos in - * in different runtimes: - * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/ - * errno is thus highly deprecated, and buggy, on Windows - * when using librdkafka as a dynamically loaded DLL. */ - rd_set_errno(errnox); - } - rd_kafka_last_error_code = err; - return err; -} - - -int rd_kafka_set_fatal_error0(rd_kafka_t *rk, - rd_dolock_t do_lock, - rd_kafka_resp_err_t err, - const char *fmt, - ...) RD_FORMAT(printf, 4, 5); -#define rd_kafka_set_fatal_error(rk, err, fmt, ...) \ - rd_kafka_set_fatal_error0(rk, RD_DO_LOCK, err, fmt, __VA_ARGS__) - -rd_kafka_error_t *rd_kafka_get_fatal_error(rd_kafka_t *rk); - -static RD_INLINE RD_UNUSED rd_kafka_resp_err_t -rd_kafka_fatal_error_code(rd_kafka_t *rk) { - /* This is an optimization to avoid an atomic read which are costly - * on some platforms: - * Fatal errors are currently only raised by the idempotent producer - * and static consumers (group.instance.id). */ - if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) || - (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_conf.group_instance_id)) - return rd_atomic32_get(&rk->rk_fatal.err); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -extern rd_atomic32_t rd_kafka_thread_cnt_curr; -extern char RD_TLS rd_kafka_thread_name[64]; - -void rd_kafka_set_thread_name(const char *fmt, ...) RD_FORMAT(printf, 1, 2); -void rd_kafka_set_thread_sysname(const char *fmt, ...) RD_FORMAT(printf, 1, 2); - -int rd_kafka_path_is_dir(const char *path); -rd_bool_t rd_kafka_dir_is_empty(const char *path); - -rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, - rd_kafka_q_t *rkq, - rd_kafka_op_t *rko, - rd_kafka_q_cb_type_t cb_type, - void *opaque); - -rd_kafka_resp_err_t rd_kafka_subscribe_rkt(rd_kafka_topic_t *rkt); - - -/** - * @returns the number of milliseconds the maximum poll interval - * was exceeded, or 0 if not exceeded. - * - * @remark Only relevant for high-level consumer. - * - * @locality any - * @locks none - */ -static RD_INLINE RD_UNUSED int rd_kafka_max_poll_exceeded(rd_kafka_t *rk) { - rd_ts_t last_poll; - int exceeded; - - if (rk->rk_type != RD_KAFKA_CONSUMER) - return 0; - - last_poll = rd_atomic64_get(&rk->rk_ts_last_poll); - - /* Application is blocked in librdkafka function, see - * rd_kafka_app_poll_blocking(). */ - if (last_poll == INT64_MAX) - return 0; - - exceeded = (int)((rd_clock() - last_poll) / 1000ll) - - rk->rk_conf.max_poll_interval_ms; - - if (unlikely(exceeded > 0)) - return exceeded; - - return 0; -} - -/** - * @brief Call on entry to blocking polling function to indicate - * that the application is blocked waiting for librdkafka - * and that max.poll.interval.ms should not be enforced. - * - * Call app_polled() Upon return from the function calling - * this function to register the application's last time of poll. - * - * @remark Only relevant for high-level consumer. - * - * @locality any - * @locks none - */ -static RD_INLINE RD_UNUSED void rd_kafka_app_poll_blocking(rd_kafka_t *rk) { - if (rk->rk_type == RD_KAFKA_CONSUMER) - rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX); -} - -/** - * @brief Set the last application poll time to now. - * - * @remark Only relevant for high-level consumer. - * - * @locality any - * @locks none - */ -static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) { - if (rk->rk_type == RD_KAFKA_CONSUMER) - rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock()); -} - - - -void rd_kafka_term_sig_handler(int sig); - -/** - * rdkafka_background.c - */ -int rd_kafka_background_thread_main(void *arg); -rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk, - char *errstr, - size_t errstr_size); - - -#endif /* _RDKAFKA_INT_H_ */ |