/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2013, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _RDKAFKA_INT_H_ #define _RDKAFKA_INT_H_ #ifndef _WIN32 #define _GNU_SOURCE /* for strndup() */ #endif #ifdef _MSC_VER typedef int mode_t; #endif #include #include "rdsysqueue.h" #include "rdkafka.h" #include "rd.h" #include "rdlog.h" #include "rdtime.h" #include "rdaddr.h" #include "rdinterval.h" #include "rdavg.h" #include "rdlist.h" #if WITH_SSL #include #endif #define rd_kafka_assert(rk, cond) \ do { \ if (unlikely(!(cond))) \ rd_kafka_crash(__FILE__, __LINE__, __FUNCTION__, (rk), \ "assert: " #cond); \ } while (0) void RD_NORETURN rd_kafka_crash(const char *file, int line, const char *function, rd_kafka_t *rk, const char *reason); /* Forward declarations */ struct rd_kafka_s; struct rd_kafka_topic_s; struct rd_kafka_msg_s; struct rd_kafka_broker_s; struct rd_kafka_toppar_s; typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t; /** * Protocol level sanity */ #define RD_KAFKAP_BROKERS_MAX 10000 #define RD_KAFKAP_TOPICS_MAX 1000000 #define RD_KAFKAP_PARTITIONS_MAX 100000 #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) /** * @struct Represents a fetch position: * an offset and an partition leader epoch (if known, else -1). */ typedef struct rd_kafka_fetch_pos_s { int64_t offset; int32_t leader_epoch; rd_bool_t validated; } rd_kafka_fetch_pos_t; #include "rdkafka_op.h" #include "rdkafka_queue.h" #include "rdkafka_msg.h" #include "rdkafka_proto.h" #include "rdkafka_buf.h" #include "rdkafka_pattern.h" #include "rdkafka_conf.h" #include "rdkafka_transport.h" #include "rdkafka_timer.h" #include "rdkafka_assignor.h" #include "rdkafka_metadata.h" #include "rdkafka_mock.h" #include "rdkafka_partition.h" #include "rdkafka_assignment.h" #include "rdkafka_coord.h" #include "rdkafka_mock.h" /** * Protocol level sanity */ #define RD_KAFKAP_BROKERS_MAX 10000 #define RD_KAFKAP_TOPICS_MAX 1000000 #define RD_KAFKAP_PARTITIONS_MAX 100000 #define RD_KAFKAP_GROUPS_MAX 100000 #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) /** * @enum Idempotent Producer state */ typedef enum { RD_KAFKA_IDEMP_STATE_INIT, /**< Initial state */ RD_KAFKA_IDEMP_STATE_TERM, /**< Instance is terminating */ RD_KAFKA_IDEMP_STATE_FATAL_ERROR, /**< A fatal error has been raised */ RD_KAFKA_IDEMP_STATE_REQ_PID, /**< Request new PID */ RD_KAFKA_IDEMP_STATE_WAIT_TRANSPORT, /**< Waiting for coordinator to * become available. */ RD_KAFKA_IDEMP_STATE_WAIT_PID, /**< PID requested, waiting for reply */ RD_KAFKA_IDEMP_STATE_ASSIGNED, /**< New PID assigned */ RD_KAFKA_IDEMP_STATE_DRAIN_RESET, /**< Wait for outstanding * ProduceRequests to finish * before resetting and * re-requesting a new PID. */ RD_KAFKA_IDEMP_STATE_DRAIN_BUMP, /**< Wait for outstanding * ProduceRequests to finish * before bumping the current * epoch. */ RD_KAFKA_IDEMP_STATE_WAIT_TXN_ABORT, /**< Wait for transaction abort * to finish and trigger a * drain and reset or bump. */ } rd_kafka_idemp_state_t; /** * @returns the idemp_state_t string representation */ static RD_UNUSED const char * rd_kafka_idemp_state2str(rd_kafka_idemp_state_t state) { static const char *names[] = { "Init", "Terminate", "FatalError", "RequestPID", "WaitTransport", "WaitPID", "Assigned", "DrainReset", "DrainBump", "WaitTxnAbort"}; return names[state]; } /** * @enum Transactional Producer state */ typedef enum { /**< Initial state */ RD_KAFKA_TXN_STATE_INIT, /**< Awaiting PID to be acquired by rdkafka_idempotence.c */ RD_KAFKA_TXN_STATE_WAIT_PID, /**< PID acquired, but application has not made a successful * init_transactions() call. */ RD_KAFKA_TXN_STATE_READY_NOT_ACKED, /**< PID acquired, no active transaction. */ RD_KAFKA_TXN_STATE_READY, /**< begin_transaction() has been called. */ RD_KAFKA_TXN_STATE_IN_TRANSACTION, /**< commit_transaction() has been called. */ RD_KAFKA_TXN_STATE_BEGIN_COMMIT, /**< commit_transaction() has been called and all outstanding * messages, partitions, and offsets have been sent. */ RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION, /**< Transaction successfully committed but application has not made * a successful commit_transaction() call yet. */ RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED, /**< begin_transaction() has been called. */ RD_KAFKA_TXN_STATE_BEGIN_ABORT, /**< abort_transaction() has been called. */ RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION, /**< Transaction successfully aborted but application has not made * a successful abort_transaction() call yet. */ RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED, /**< An abortable error has occurred. */ RD_KAFKA_TXN_STATE_ABORTABLE_ERROR, /* A fatal error has occured. */ RD_KAFKA_TXN_STATE_FATAL_ERROR } rd_kafka_txn_state_t; /** * @returns the txn_state_t string representation */ static RD_UNUSED const char * rd_kafka_txn_state2str(rd_kafka_txn_state_t state) { static const char *names[] = {"Init", "WaitPID", "ReadyNotAcked", "Ready", "InTransaction", "BeginCommit", "CommittingTransaction", "CommitNotAcked", "BeginAbort", "AbortingTransaction", "AbortedNotAcked", "AbortableError", "FatalError"}; return names[state]; } /** * Kafka handle, internal representation of the application's rd_kafka_t. */ struct rd_kafka_s { rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */ rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */ TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers; rd_list_t rk_broker_by_id; /* Fast id lookups. */ rd_atomic32_t rk_broker_cnt; /**< Number of brokers in state >= UP */ rd_atomic32_t rk_broker_up_cnt; /**< Number of logical brokers in state >= UP, this is a sub-set * of rk_broker_up_cnt. */ rd_atomic32_t rk_logical_broker_up_cnt; /**< Number of brokers that are down, only includes brokers * that have had at least one connection attempt. */ rd_atomic32_t rk_broker_down_cnt; /**< Logical brokers currently without an address. * Used for calculating ERR__ALL_BROKERS_DOWN. */ rd_atomic32_t rk_broker_addrless_cnt; mtx_t rk_internal_rkb_lock; rd_kafka_broker_t *rk_internal_rkb; /* Broadcasting of broker state changes to wake up * functions waiting for a state change. */ cnd_t rk_broker_state_change_cnd; mtx_t rk_broker_state_change_lock; int rk_broker_state_change_version; /* List of (rd_kafka_enq_once_t*) objects waiting for broker * state changes. Protected by rk_broker_state_change_lock. */ rd_list_t rk_broker_state_change_waiters; /**< (rd_kafka_enq_once_t*) */ TAILQ_HEAD(, rd_kafka_topic_s) rk_topics; int rk_topic_cnt; struct rd_kafka_cgrp_s *rk_cgrp; rd_kafka_conf_t rk_conf; rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */ char rk_name[128]; rd_kafkap_str_t *rk_client_id; rd_kafkap_str_t *rk_group_id; /* Consumer group id */ rd_atomic32_t rk_terminate; /**< Set to RD_KAFKA_DESTROY_F_.. * flags instance * is being destroyed. * The value set is the * destroy flags from * rd_kafka_destroy*() and * the two internal flags shown * below. * * Order: * 1. user_flags | .._F_DESTROY_CALLED * is set in rd_kafka_destroy*(). * 2. consumer_close() is called * for consumers. * 3. .._F_TERMINATE is set to * signal all background threads * to terminate. */ #define RD_KAFKA_DESTROY_F_TERMINATE \ 0x1 /**< Internal flag to make sure \ * rk_terminate is set to non-zero \ * value even if user passed \ * no destroy flags. */ #define RD_KAFKA_DESTROY_F_DESTROY_CALLED \ 0x2 /**< Application has called \ * ..destroy*() and we've \ * begun the termination \ * process. \ * This flag is needed to avoid \ * rk_terminate from being \ * 0 when destroy_flags() \ * is called with flags=0 \ * and prior to _F_TERMINATE \ * has been set. */ #define RD_KAFKA_DESTROY_F_IMMEDIATE \ 0x4 /**< Immediate non-blocking \ * destruction without waiting \ * for all resources \ * to be cleaned up. \ * WARNING: Memory and resource \ * leaks possible. \ * This flag automatically sets \ * .._NO_CONSUMER_CLOSE. */ rwlock_t rk_lock; rd_kafka_type_t rk_type; struct timeval rk_tv_state_change; rd_atomic64_t rk_ts_last_poll; /**< Timestamp of last application * consumer_poll() call * (or equivalent). * Used to enforce * max.poll.interval.ms. * Only relevant for consumer. */ /* First fatal error. */ struct { rd_atomic32_t err; /**< rd_kafka_resp_err_t */ char *errstr; /**< Protected by rk_lock */ int cnt; /**< Number of errors raised, only * the first one is stored. */ } rk_fatal; rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value * from broker. */ /* Locks: rd_kafka_*lock() */ rd_ts_t rk_ts_metadata; /* Timestamp of most recent * metadata. */ struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */ rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */ struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */ char *rk_clusterid; /* ClusterId from metadata */ int32_t rk_controllerid; /* ControllerId from metadata */ /**< Producer: Delivery report mode */ enum { RD_KAFKA_DR_MODE_NONE, /**< No delivery reports */ RD_KAFKA_DR_MODE_CB, /**< Delivery reports through callback */ RD_KAFKA_DR_MODE_EVENT, /**< Delivery reports through event API*/ } rk_drmode; /* Simple consumer count: * >0: Running in legacy / Simple Consumer mode, * 0: No consumers running * <0: Running in High level consumer mode */ rd_atomic32_t rk_simple_cnt; /** * Exactly Once Semantics and Idempotent Producer * * @locks rk_lock */ struct { /* * Idempotence */ rd_kafka_idemp_state_t idemp_state; /**< Idempotent Producer * state */ rd_ts_t ts_idemp_state; /**< Last state change */ rd_kafka_pid_t pid; /**< Current Producer ID and Epoch */ int epoch_cnt; /**< Number of times pid/epoch changed */ rd_atomic32_t inflight_toppar_cnt; /**< Current number of * toppars with inflight * requests. */ rd_kafka_timer_t pid_tmr; /**< PID FSM timer */ /* * Transactions * * All field access is from the rdkafka main thread, * unless a specific lock is mentioned in the doc string. * */ rd_atomic32_t txn_may_enq; /**< Transaction state allows * application to enqueue * (produce) messages. */ rd_kafkap_str_t *transactional_id; /**< transactional.id */ rd_kafka_txn_state_t txn_state; /**< Transactional state. * @locks rk_lock */ rd_ts_t ts_txn_state; /**< Last state change. * @locks rk_lock */ rd_kafka_broker_t *txn_coord; /**< Transaction coordinator, * this is a logical broker.*/ rd_kafka_broker_t *txn_curr_coord; /**< Current actual coord * broker. * This is only used to * check if the coord * changes. */ rd_kafka_broker_monitor_t txn_coord_mon; /**< Monitor for * coordinator to * take action when * the broker state * changes. */ rd_bool_t txn_requires_epoch_bump; /**< Coordinator epoch bump * required to recover from * idempotent producer * fatal error. */ /**< Blocking transactional API application call * currently being handled, its state, reply queue and how * to handle timeout. * Only one transactional API call is allowed at any time. * Protected by the rk_lock. */ struct { char name[64]; /**< API name, e.g., * send_offsets_to_transaction. * This is used to make sure * conflicting APIs are not * called simultaneously. */ rd_bool_t calling; /**< API is being actively called. * I.e., application is blocking * on a txn API call. * This is used to make sure * no concurrent API calls are * being made. */ rd_kafka_error_t *error; /**< Last error from background * processing. This is only * set if the application's * API call timed out. * It will be returned on * the next call. */ rd_bool_t has_result; /**< Indicates whether an API * result (possibly * intermediate) has been set. */ cnd_t cnd; /**< Application thread will * block on this cnd waiting * for a result to be set. */ mtx_t lock; /**< Protects all fields of * txn_curr_api. */ } txn_curr_api; int txn_req_cnt; /**< Number of transaction * requests sent. * This is incremented when a * AddPartitionsToTxn or * AddOffsetsToTxn request * has been sent for the * current transaction, * to keep track of * whether the broker is * aware of the current * transaction and thus * requires an EndTxn request * on abort or not. */ /**< Timer to trigger registration of pending partitions */ rd_kafka_timer_t txn_register_parts_tmr; /**< Lock for txn_pending_rktps and txn_waitresp_rktps */ mtx_t txn_pending_lock; /**< Partitions pending being added to transaction. */ rd_kafka_toppar_tqhead_t txn_pending_rktps; /**< Partitions in-flight added to transaction. */ rd_kafka_toppar_tqhead_t txn_waitresp_rktps; /**< Partitions added and registered to transaction. */ rd_kafka_toppar_tqhead_t txn_rktps; /**< Number of messages that failed delivery. * If this number is >0 on transaction_commit then an * abortable transaction error will be raised. * Is reset to zero on each begin_transaction(). */ rd_atomic64_t txn_dr_fails; /**< Current transaction error. */ rd_kafka_resp_err_t txn_err; /**< Current transaction error string, if any. */ char *txn_errstr; /**< Last InitProducerIdRequest error. */ rd_kafka_resp_err_t txn_init_err; /**< Waiting for transaction coordinator query response */ rd_bool_t txn_wait_coord; /**< Transaction coordinator query timer */ rd_kafka_timer_t txn_coord_tmr; } rk_eos; rd_atomic32_t rk_flushing; /**< Application is calling flush(). */ /** * Consumer state * * @locality rdkafka main thread * @locks_required none */ struct { /** Application consumer queue for messages, events and errors. * (typically points to rkcg_q) */ rd_kafka_q_t *q; /** Current assigned partitions through assign() et.al. */ rd_kafka_assignment_t assignment; /** Waiting for this number of commits to finish. */ int wait_commit_cnt; } rk_consumer; /**< * Coordinator cache. * * @locks none * @locality rdkafka main thread */ rd_kafka_coord_cache_t rk_coord_cache; /**< Coordinator cache */ TAILQ_HEAD(, rd_kafka_coord_req_s) rk_coord_reqs; /**< Coordinator * requests */ struct { mtx_t lock; /* Protects acces to this struct */ cnd_t cnd; /* For waking up blocking injectors */ unsigned int cnt; /* Current message count */ size_t size; /* Current message size sum */ unsigned int max_cnt; /* Max limit */ size_t max_size; /* Max limit */ } rk_curr_msgs; rd_kafka_timers_t rk_timers; thrd_t rk_thread; int rk_initialized; /**< Will be > 0 when the rd_kafka_t * instance has been fully initialized. */ int rk_init_wait_cnt; /**< Number of background threads that * need to finish initialization. */ cnd_t rk_init_cnd; /**< Cond-var used to wait for main thread * to finish its initialization before * before rd_kafka_new() returns. */ mtx_t rk_init_lock; /**< Lock for rk_init_wait and _cmd */ rd_ts_t rk_ts_created; /**< Timestamp (monotonic clock) of * rd_kafka_t creation. */ /** * Background thread and queue, * enabled by setting `background_event_cb()`. */ struct { rd_kafka_q_t *q; /**< Queue served by background thread. */ thrd_t thread; /**< Background thread. */ int calling; /**< Indicates whether the event callback * is being called, reset back to 0 * when the callback returns. * This can be used for troubleshooting * purposes. */ } rk_background; /* * Logs, events or actions to rate limit / suppress */ struct { /**< Log: No brokers support Idempotent Producer */ rd_interval_t no_idemp_brokers; /**< Sparse connections: randomly select broker * to bring up. This interval should allow * for a previous connection to be established, * which varies between different environments: * Use 10 < reconnect.backoff.jitter.ms / 2 < 1000. */ rd_interval_t sparse_connect_random; /**< Lock for sparse_connect_random */ mtx_t sparse_connect_lock; /**< Broker metadata refresh interval: * this is rate-limiting the number of topic-less * broker/cluster metadata refreshes when there are no * topics to refresh. * Will be refreshed every topic.metadata.refresh.interval.ms * but no more often than every 10s. * No locks: only accessed by rdkafka main thread. */ rd_interval_t broker_metadata_refresh; /**< Suppression for allow.auto.create.topics=false not being * supported by the broker. */ rd_interval_t allow_auto_create_topics; } rk_suppress; struct { void *handle; /**< Provider-specific handle struct pointer. * Typically assigned in provider's .init() */ rd_kafka_q_t *callback_q; /**< SASL callback queue, if any. */ } rk_sasl; /* Test mocks */ struct { rd_kafka_mock_cluster_t *cluster; /**< Mock cluster, created * by test.mock.num.brokers */ rd_atomic32_t cluster_cnt; /**< Total number of mock * clusters, created either * through * test.mock.num.brokers * or mock_cluster_new(). */ } rk_mock; }; #define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock) #define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock) #define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock) #define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock) /** * @brief Add \p cnt messages and of total size \p size bytes to the * internal bookkeeping of current message counts. * If the total message count or size after add would exceed the * configured limits \c queue.buffering.max.messages and * \c queue.buffering.max.kbytes then depending on the value of * \p block the function either blocks until enough space is available * if \p block is 1, else immediately returns * RD_KAFKA_RESP_ERR__QUEUE_FULL. * * @param rdmtx If non-null and \p block is set and blocking is to ensue, * then unlock this mutex for the duration of the blocking * and then reacquire with a read-lock. */ static RD_INLINE RD_UNUSED rd_kafka_resp_err_t rd_kafka_curr_msgs_add(rd_kafka_t *rk, unsigned int cnt, size_t size, int block, rwlock_t *rdlock) { if (rk->rk_type != RD_KAFKA_PRODUCER) return RD_KAFKA_RESP_ERR_NO_ERROR; mtx_lock(&rk->rk_curr_msgs.lock); while ( unlikely((rk->rk_curr_msgs.max_cnt > 0 && rk->rk_curr_msgs.cnt + cnt > rk->rk_curr_msgs.max_cnt) || (unsigned long long)(rk->rk_curr_msgs.size + size) > (unsigned long long)rk->rk_curr_msgs.max_size)) { if (!block) { mtx_unlock(&rk->rk_curr_msgs.lock); return RD_KAFKA_RESP_ERR__QUEUE_FULL; } if (rdlock) rwlock_rdunlock(rdlock); cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock); if (rdlock) rwlock_rdlock(rdlock); } rk->rk_curr_msgs.cnt += cnt; rk->rk_curr_msgs.size += size; mtx_unlock(&rk->rk_curr_msgs.lock); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Subtract \p cnt messages of total size \p size from the * current bookkeeping and broadcast a wakeup on the condvar * for any waiting & blocking threads. */ static RD_INLINE RD_UNUSED void rd_kafka_curr_msgs_sub(rd_kafka_t *rk, unsigned int cnt, size_t size) { int broadcast = 0; if (rk->rk_type != RD_KAFKA_PRODUCER) return; mtx_lock(&rk->rk_curr_msgs.lock); rd_kafka_assert(NULL, rk->rk_curr_msgs.cnt >= cnt && rk->rk_curr_msgs.size >= size); /* If the subtraction would pass one of the thresholds * broadcast a wake-up to any waiting listeners. */ if ((rk->rk_curr_msgs.cnt - cnt == 0) || (rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt && rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) || (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size && rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size)) broadcast = 1; rk->rk_curr_msgs.cnt -= cnt; rk->rk_curr_msgs.size -= size; if (unlikely(broadcast)) cnd_broadcast(&rk->rk_curr_msgs.cnd); mtx_unlock(&rk->rk_curr_msgs.lock); } static RD_INLINE RD_UNUSED void rd_kafka_curr_msgs_get(rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) { if (rk->rk_type != RD_KAFKA_PRODUCER) { *cntp = 0; *sizep = 0; return; } mtx_lock(&rk->rk_curr_msgs.lock); *cntp = rk->rk_curr_msgs.cnt; *sizep = rk->rk_curr_msgs.size; mtx_unlock(&rk->rk_curr_msgs.lock); } static RD_INLINE RD_UNUSED int rd_kafka_curr_msgs_cnt(rd_kafka_t *rk) { int cnt; if (rk->rk_type != RD_KAFKA_PRODUCER) return 0; mtx_lock(&rk->rk_curr_msgs.lock); cnt = rk->rk_curr_msgs.cnt; mtx_unlock(&rk->rk_curr_msgs.lock); return cnt; } /** * @brief Wait until \p tspec for curr_msgs to reach 0. * * @returns rd_true if zero is reached, or rd_false on timeout. * The remaining messages are returned in \p *curr_msgsp */ static RD_INLINE RD_UNUSED rd_bool_t rd_kafka_curr_msgs_wait_zero(rd_kafka_t *rk, int timeout_ms, unsigned int *curr_msgsp) { unsigned int cnt; struct timespec tspec; rd_timeout_init_timespec(&tspec, timeout_ms); mtx_lock(&rk->rk_curr_msgs.lock); while ((cnt = rk->rk_curr_msgs.cnt) > 0) { if (cnd_timedwait_abs(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock, &tspec) == thrd_timedout) break; } mtx_unlock(&rk->rk_curr_msgs.lock); *curr_msgsp = cnt; return cnt == 0; } void rd_kafka_destroy_final(rd_kafka_t *rk); void rd_kafka_global_init(void); /** * @returns true if \p rk handle is terminating. * * @remark If consumer_close() is called from destroy*() it will be * called prior to _F_TERMINATE being set and will thus not * be able to use rd_kafka_terminating() to know it is shutting down. * That code should instead just check that rk_terminate is non-zero * (the _F_DESTROY_CALLED flag will be set). */ #define rd_kafka_terminating(rk) \ (rd_atomic32_get(&(rk)->rk_terminate) & RD_KAFKA_DESTROY_F_TERMINATE) /** * @returns the destroy flags set matching \p flags, which might be * a subset of the flags. */ #define rd_kafka_destroy_flags_check(rk, flags) \ (rd_atomic32_get(&(rk)->rk_terminate) & (flags)) /** * @returns true if no consumer callbacks, or standard consumer_close * behaviour, should be triggered. */ #define rd_kafka_destroy_flags_no_consumer_close(rk) \ rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE) #define rd_kafka_is_simple_consumer(rk) \ (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0) int rd_kafka_simple_consumer_add(rd_kafka_t *rk); /** * @returns true if idempotency is enabled (producer only). */ #define rd_kafka_is_idempotent(rk) ((rk)->rk_conf.eos.idempotence) /** * @returns true if the producer is transactional (producer only). */ #define rd_kafka_is_transactional(rk) \ ((rk)->rk_conf.eos.transactional_id != NULL) #define RD_KAFKA_PURGE_F_ABORT_TXN \ 0x100 /**< Internal flag used when \ * aborting transaction */ #define RD_KAFKA_PURGE_F_MASK 0x107 const char *rd_kafka_purge_flags2str(int flags); #include "rdkafka_topic.h" #include "rdkafka_partition.h" /** * Debug contexts */ #define RD_KAFKA_DBG_GENERIC 0x1 #define RD_KAFKA_DBG_BROKER 0x2 #define RD_KAFKA_DBG_TOPIC 0x4 #define RD_KAFKA_DBG_METADATA 0x8 #define RD_KAFKA_DBG_FEATURE 0x10 #define RD_KAFKA_DBG_QUEUE 0x20 #define RD_KAFKA_DBG_MSG 0x40 #define RD_KAFKA_DBG_PROTOCOL 0x80 #define RD_KAFKA_DBG_CGRP 0x100 #define RD_KAFKA_DBG_SECURITY 0x200 #define RD_KAFKA_DBG_FETCH 0x400 #define RD_KAFKA_DBG_INTERCEPTOR 0x800 #define RD_KAFKA_DBG_PLUGIN 0x1000 #define RD_KAFKA_DBG_CONSUMER 0x2000 #define RD_KAFKA_DBG_ADMIN 0x4000 #define RD_KAFKA_DBG_EOS 0x8000 #define RD_KAFKA_DBG_MOCK 0x10000 #define RD_KAFKA_DBG_ASSIGNOR 0x20000 #define RD_KAFKA_DBG_CONF 0x40000 #define RD_KAFKA_DBG_ALL 0xfffff #define RD_KAFKA_DBG_NONE 0x0 void rd_kafka_log0(const rd_kafka_conf_t *conf, const rd_kafka_t *rk, const char *extra, int level, int ctx, const char *fac, const char *fmt, ...) RD_FORMAT(printf, 7, 8); #define rd_kafka_log(rk, level, fac, ...) \ rd_kafka_log0(&rk->rk_conf, rk, NULL, level, RD_KAFKA_DBG_NONE, fac, \ __VA_ARGS__) #define rd_kafka_dbg(rk, ctx, fac, ...) \ do { \ if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_##ctx))) \ rd_kafka_log0(&rk->rk_conf, rk, NULL, LOG_DEBUG, \ (RD_KAFKA_DBG_##ctx), fac, __VA_ARGS__); \ } while (0) /* dbg() not requiring an rk, just the conf object, for early logging */ #define rd_kafka_dbg0(conf, ctx, fac, ...) \ do { \ if (unlikely((conf)->debug & (RD_KAFKA_DBG_##ctx))) \ rd_kafka_log0(conf, NULL, NULL, LOG_DEBUG, \ (RD_KAFKA_DBG_##ctx), fac, __VA_ARGS__); \ } while (0) /* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering * when logging another broker's name in the message. */ #define rd_rkb_log0(rkb, level, ctx, fac, ...) \ do { \ char _logname[RD_KAFKA_NODENAME_SIZE]; \ mtx_lock(&(rkb)->rkb_logname_lock); \ rd_strlcpy(_logname, rkb->rkb_logname, sizeof(_logname)); \ mtx_unlock(&(rkb)->rkb_logname_lock); \ rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, (rkb)->rkb_rk, \ _logname, level, ctx, fac, __VA_ARGS__); \ } while (0) #define rd_rkb_log(rkb, level, fac, ...) \ rd_rkb_log0(rkb, level, RD_KAFKA_DBG_NONE, fac, __VA_ARGS__) #define rd_rkb_dbg(rkb, ctx, fac, ...) \ do { \ if (unlikely((rkb)->rkb_rk->rk_conf.debug & \ (RD_KAFKA_DBG_##ctx))) { \ rd_rkb_log0(rkb, LOG_DEBUG, (RD_KAFKA_DBG_##ctx), fac, \ __VA_ARGS__); \ } \ } while (0) extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code; static RD_UNUSED RD_INLINE rd_kafka_resp_err_t rd_kafka_set_last_error(rd_kafka_resp_err_t err, int errnox) { if (errnox) { /* MSVC: * This is the correct way to set errno on Windows, * but it is still pointless due to different errnos in * in different runtimes: * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/ * errno is thus highly deprecated, and buggy, on Windows * when using librdkafka as a dynamically loaded DLL. */ rd_set_errno(errnox); } rd_kafka_last_error_code = err; return err; } int rd_kafka_set_fatal_error0(rd_kafka_t *rk, rd_dolock_t do_lock, rd_kafka_resp_err_t err, const char *fmt, ...) RD_FORMAT(printf, 4, 5); #define rd_kafka_set_fatal_error(rk, err, fmt, ...) \ rd_kafka_set_fatal_error0(rk, RD_DO_LOCK, err, fmt, __VA_ARGS__) rd_kafka_error_t *rd_kafka_get_fatal_error(rd_kafka_t *rk); static RD_INLINE RD_UNUSED rd_kafka_resp_err_t rd_kafka_fatal_error_code(rd_kafka_t *rk) { /* This is an optimization to avoid an atomic read which are costly * on some platforms: * Fatal errors are currently only raised by the idempotent producer * and static consumers (group.instance.id). */ if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) || (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_conf.group_instance_id)) return rd_atomic32_get(&rk->rk_fatal.err); return RD_KAFKA_RESP_ERR_NO_ERROR; } extern rd_atomic32_t rd_kafka_thread_cnt_curr; extern char RD_TLS rd_kafka_thread_name[64]; void rd_kafka_set_thread_name(const char *fmt, ...) RD_FORMAT(printf, 1, 2); void rd_kafka_set_thread_sysname(const char *fmt, ...) RD_FORMAT(printf, 1, 2); int rd_kafka_path_is_dir(const char *path); rd_bool_t rd_kafka_dir_is_empty(const char *path); rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type, void *opaque); rd_kafka_resp_err_t rd_kafka_subscribe_rkt(rd_kafka_topic_t *rkt); /** * @returns the number of milliseconds the maximum poll interval * was exceeded, or 0 if not exceeded. * * @remark Only relevant for high-level consumer. * * @locality any * @locks none */ static RD_INLINE RD_UNUSED int rd_kafka_max_poll_exceeded(rd_kafka_t *rk) { rd_ts_t last_poll; int exceeded; if (rk->rk_type != RD_KAFKA_CONSUMER) return 0; last_poll = rd_atomic64_get(&rk->rk_ts_last_poll); /* Application is blocked in librdkafka function, see * rd_kafka_app_poll_blocking(). */ if (last_poll == INT64_MAX) return 0; exceeded = (int)((rd_clock() - last_poll) / 1000ll) - rk->rk_conf.max_poll_interval_ms; if (unlikely(exceeded > 0)) return exceeded; return 0; } /** * @brief Call on entry to blocking polling function to indicate * that the application is blocked waiting for librdkafka * and that max.poll.interval.ms should not be enforced. * * Call app_polled() Upon return from the function calling * this function to register the application's last time of poll. * * @remark Only relevant for high-level consumer. * * @locality any * @locks none */ static RD_INLINE RD_UNUSED void rd_kafka_app_poll_blocking(rd_kafka_t *rk) { if (rk->rk_type == RD_KAFKA_CONSUMER) rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX); } /** * @brief Set the last application poll time to now. * * @remark Only relevant for high-level consumer. * * @locality any * @locks none */ static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) { if (rk->rk_type == RD_KAFKA_CONSUMER) rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock()); } void rd_kafka_term_sig_handler(int sig); /** * rdkafka_background.c */ int rd_kafka_background_thread_main(void *arg); rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk, char *errstr, size_t errstr_size); #endif /* _RDKAFKA_INT_H_ */