/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012,2013 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _RDKAFKA_BROKER_H_ #define _RDKAFKA_BROKER_H_ #include "rdkafka_feature.h" extern const char *rd_kafka_broker_state_names[]; extern const char *rd_kafka_secproto_names[]; /** * @enum Broker states */ typedef enum { RD_KAFKA_BROKER_STATE_INIT, RD_KAFKA_BROKER_STATE_DOWN, RD_KAFKA_BROKER_STATE_TRY_CONNECT, RD_KAFKA_BROKER_STATE_CONNECT, RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE, RD_KAFKA_BROKER_STATE_AUTH_LEGACY, /* Any state >= STATE_UP means the Kafka protocol layer * is operational (to some degree). */ RD_KAFKA_BROKER_STATE_UP, RD_KAFKA_BROKER_STATE_UPDATE, RD_KAFKA_BROKER_STATE_APIVERSION_QUERY, RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE, RD_KAFKA_BROKER_STATE_AUTH_REQ, } rd_kafka_broker_state_t; /** * @struct Broker state monitor. * * @warning The monitor object lifetime should be the same as * the rd_kafka_t object, not shorter. */ typedef struct rd_kafka_broker_monitor_s { TAILQ_ENTRY(rd_kafka_broker_monitor_s) rkbmon_link; /**< rkb_monitors*/ struct rd_kafka_broker_s *rkbmon_rkb; /**< Broker being monitored. */ rd_kafka_q_t *rkbmon_q; /**< Queue to enqueue op on. */ /**< Callback triggered on the monitoree's op handler thread. * Do note that the callback might be triggered even after * it has been deleted due to the queueing nature of op queues. */ void (*rkbmon_cb)(rd_kafka_broker_t *rkb); } rd_kafka_broker_monitor_t; /** * @struct Broker instance */ struct rd_kafka_broker_s { /* rd_kafka_broker_t */ TAILQ_ENTRY(rd_kafka_broker_s) rkb_link; int32_t rkb_nodeid; /**< Broker Node Id. * @locks rkb_lock */ #define RD_KAFKA_NODEID_UA -1 rd_sockaddr_list_t *rkb_rsal; rd_ts_t rkb_ts_rsal_last; const rd_sockaddr_inx_t *rkb_addr_last; /* Last used connect address */ rd_kafka_transport_t *rkb_transport; uint32_t rkb_corrid; int rkb_connid; /* Connection id, increased by * one for each connection by * this broker. Used as a safe-guard * to help troubleshooting buffer * problems across disconnects. */ rd_kafka_q_t *rkb_ops; mtx_t rkb_lock; int rkb_blocking_max_ms; /* Maximum IO poll blocking * time. */ /* Toppars handled by this broker */ TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars; int rkb_toppar_cnt; /* Active toppars that are eligible for: * - (consumer) fetching due to underflow * - (producer) producing * * The circleq provides round-robin scheduling for both cases. */ CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_active_toppars; int rkb_active_toppar_cnt; rd_kafka_toppar_t *rkb_active_toppar_next; /* Next 'first' toppar * in fetch list. * This is used for * round-robin. */ rd_kafka_cgrp_t *rkb_cgrp; rd_ts_t rkb_ts_fetch_backoff; int rkb_fetching; rd_kafka_broker_state_t rkb_state; /**< Current broker state */ rd_ts_t rkb_ts_state; /* Timestamp of last * state change */ rd_interval_t rkb_timeout_scan_intvl; /* Waitresp timeout scan * interval. */ rd_atomic32_t rkb_blocking_request_cnt; /* The number of * in-flight blocking * requests. * A blocking request is * one that is known to * possibly block on the * broker for longer than * the typical processing * time, e.g.: * JoinGroup, SyncGroup */ int rkb_features; /* Protocol features supported * by this broker. * See RD_KAFKA_FEATURE_* in * rdkafka_proto.h */ struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs * (MUST be sorted) */ size_t rkb_ApiVersions_cnt; rd_interval_t rkb_ApiVersion_fail_intvl; /* Controls how long * the fallback proto * will be used after * ApiVersionRequest * failure. */ rd_kafka_confsource_t rkb_source; struct { rd_atomic64_t tx_bytes; rd_atomic64_t tx; /**< Kafka requests */ rd_atomic64_t tx_err; rd_atomic64_t tx_retries; rd_atomic64_t req_timeouts; /* Accumulated value */ rd_atomic64_t rx_bytes; rd_atomic64_t rx; /**< Kafka responses */ rd_atomic64_t rx_err; rd_atomic64_t rx_corrid_err; /* CorrId misses */ rd_atomic64_t rx_partial; /* Partial messages received * and dropped. */ rd_atomic64_t zbuf_grow; /* Compression/decompression buffer grows needed */ rd_atomic64_t buf_grow; /* rkbuf grows needed */ rd_atomic64_t wakeups; /* Poll wakeups */ rd_atomic32_t connects; /**< Connection attempts, * successful or not. */ rd_atomic32_t disconnects; /**< Disconnects. * Always peer-triggered. */ rd_atomic64_t reqtype[RD_KAFKAP__NUM]; /**< Per request-type * counter */ rd_atomic64_t ts_send; /**< Timestamp of last send */ rd_atomic64_t ts_recv; /**< Timestamp of last receive */ } rkb_c; int rkb_req_timeouts; /* Current value */ thrd_t rkb_thread; rd_refcnt_t rkb_refcnt; rd_kafka_t *rkb_rk; rd_kafka_buf_t *rkb_recv_buf; int rkb_max_inflight; /* Maximum number of in-flight * requests to broker. * Compared to rkb_waitresps length.*/ rd_kafka_bufq_t rkb_outbufs; rd_kafka_bufq_t rkb_waitresps; rd_kafka_bufq_t rkb_retrybufs; rd_avg_t rkb_avg_int_latency; /* Current internal latency period*/ rd_avg_t rkb_avg_outbuf_latency; /**< Current latency * between buf_enq0 * and writing to socket */ rd_avg_t rkb_avg_rtt; /* Current RTT period */ rd_avg_t rkb_avg_throttle; /* Current throttle period */ /* These are all protected by rkb_lock */ char rkb_name[RD_KAFKA_NODENAME_SIZE]; /* Displ name */ char rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/ uint16_t rkb_port; /* TCP port */ char *rkb_origname; /* Original * host name */ int rkb_nodename_epoch; /**< Bumped each time * the nodename is changed. * Compared to * rkb_connect_epoch * to trigger a reconnect * for logical broker * when the nodename is * updated. */ int rkb_connect_epoch; /**< The value of * rkb_nodename_epoch at the * last connection attempt. */ /* Logging name is a copy of rkb_name, protected by its own mutex */ char *rkb_logname; mtx_t rkb_logname_lock; rd_socket_t rkb_wakeup_fd[2]; /* Wake-up fds (r/w) to wake * up from IO-wait when * queues have content. */ /**< Current, exponentially increased, reconnect backoff. */ int rkb_reconnect_backoff_ms; /**< Absolute timestamp of next allowed reconnect. */ rd_ts_t rkb_ts_reconnect; /** Absolute time of last connection attempt. */ rd_ts_t rkb_ts_connect; /**< Persistent connection demand is tracked by * a counter for each type of demand. * The broker thread will maintain a persistent connection * if any of the counters are non-zero, and revert to * on-demand mode when they all reach zero. * After incrementing any of the counters a broker wakeup * should be signalled to expedite handling. */ struct { /**< Producer: partitions are being produced to. * Consumer: partitions are being fetched from. * * Counter is maintained by the broker handler thread * itself, no need for atomic/locking. * Is reset to 0 on each producer|consumer_serve() loop * and updated according to current need, which * will trigger a state transition to * TRY_CONNECT if a connection is needed. */ int internal; /**< Consumer: Broker is the group coordinator. * Counter is maintained by cgrp logic in * rdkafka main thread. * * Producer: Broker is the transaction coordinator. * Counter is maintained by rdkafka_idempotence.c. * * All: A coord_req_t is waiting for this broker to come up. */ rd_atomic32_t coord; } rkb_persistconn; /**< Currently registered state monitors. * @locks rkb_lock */ TAILQ_HEAD(, rd_kafka_broker_monitor_s) rkb_monitors; /**< Coordinator request's broker monitor. * Will trigger the coord_req fsm on broker state change. */ rd_kafka_broker_monitor_t rkb_coord_monitor; rd_kafka_secproto_t rkb_proto; int rkb_down_reported; /* Down event reported */ #if WITH_SASL_CYRUS rd_kafka_timer_t rkb_sasl_kinit_refresh_tmr; #endif /* * Log suppression */ struct { /**< Log: compression type not supported by broker. */ rd_interval_t unsupported_compression; /**< Log: KIP-62 not supported by broker. */ rd_interval_t unsupported_kip62; /**< Log: KIP-345 not supported by broker. */ rd_interval_t unsupported_kip345; /**< Log & Error: identical broker_fail() errors. */ rd_interval_t fail_error; } rkb_suppress; /** Last error. This is used to suppress repeated logs. */ struct { char errstr[512]; /**< Last error string */ rd_kafka_resp_err_t err; /**< Last error code */ int cnt; /**< Number of identical errors */ } rkb_last_err; }; #define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt) #define rd_kafka_broker_keep_fl(FUNC, LINE, RKB) \ rd_refcnt_add_fl(FUNC, LINE, &(RKB)->rkb_refcnt) #define rd_kafka_broker_lock(rkb) mtx_lock(&(rkb)->rkb_lock) #define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock) /** * @brief Locks broker, acquires the states, unlocks, and returns * the state. * @locks broker_lock MUST NOT be held. * @locality any */ static RD_INLINE RD_UNUSED rd_kafka_broker_state_t rd_kafka_broker_get_state(rd_kafka_broker_t *rkb) { rd_kafka_broker_state_t state; rd_kafka_broker_lock(rkb); state = rkb->rkb_state; rd_kafka_broker_unlock(rkb); return state; } /** * @returns true if the broker state is UP or UPDATE */ #define rd_kafka_broker_state_is_up(state) \ ((state) == RD_KAFKA_BROKER_STATE_UP || \ (state) == RD_KAFKA_BROKER_STATE_UPDATE) /** * @returns true if the broker connection is up, else false. * @locks broker_lock MUST NOT be held. * @locality any */ static RD_UNUSED RD_INLINE rd_bool_t rd_kafka_broker_is_up(rd_kafka_broker_t *rkb) { rd_kafka_broker_state_t state = rd_kafka_broker_get_state(rkb); return rd_kafka_broker_state_is_up(state); } /** * @brief Broker comparator */ static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp(const void *_a, const void *_b) { const rd_kafka_broker_t *a = _a, *b = _b; return RD_CMP(a, b); } /** * @returns true if broker supports \p features, else false. */ static RD_UNUSED int rd_kafka_broker_supports(rd_kafka_broker_t *rkb, int features) { const rd_bool_t do_lock = !thrd_is_current(rkb->rkb_thread); int r; if (do_lock) rd_kafka_broker_lock(rkb); r = (rkb->rkb_features & features) == features; if (do_lock) rd_kafka_broker_unlock(rkb); return r; } int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb, int16_t ApiKey, int16_t minver, int16_t maxver, int *featuresp); rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0_fl(const char *func, int line, rd_kafka_t *rk, int32_t nodeid, int state, rd_bool_t do_connect); #define rd_kafka_broker_find_by_nodeid0(rk, nodeid, state, do_connect) \ rd_kafka_broker_find_by_nodeid0_fl(__FUNCTION__, __LINE__, rk, nodeid, \ state, do_connect) #define rd_kafka_broker_find_by_nodeid(rk, nodeid) \ rd_kafka_broker_find_by_nodeid0(rk, nodeid, -1, rd_false) /** * Filter out brokers that don't support Idempotent Producer. */ static RD_INLINE RD_UNUSED int rd_kafka_broker_filter_non_idempotent(rd_kafka_broker_t *rkb, void *opaque) { return !(rkb->rkb_features & RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER); } rd_kafka_broker_t *rd_kafka_broker_any(rd_kafka_t *rk, int state, int (*filter)(rd_kafka_broker_t *rkb, void *opaque), void *opaque, const char *reason); rd_kafka_broker_t *rd_kafka_broker_any_up(rd_kafka_t *rk, int *filtered_cnt, int (*filter)(rd_kafka_broker_t *rkb, void *opaque), void *opaque, const char *reason); rd_kafka_broker_t *rd_kafka_broker_any_usable(rd_kafka_t *rk, int timeout_ms, rd_dolock_t do_lock, int features, const char *reason); rd_kafka_broker_t * rd_kafka_broker_prefer(rd_kafka_t *rk, int32_t broker_id, int state); rd_kafka_broker_t *rd_kafka_broker_get_async(rd_kafka_t *rk, int32_t broker_id, int state, rd_kafka_enq_once_t *eonce); rd_list_t *rd_kafka_brokers_get_nodeids_async(rd_kafka_t *rk, rd_kafka_enq_once_t *eonce); rd_kafka_broker_t * rd_kafka_broker_controller(rd_kafka_t *rk, int state, rd_ts_t abs_timeout); rd_kafka_broker_t *rd_kafka_broker_controller_async(rd_kafka_t *rk, int state, rd_kafka_enq_once_t *eonce); int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist); void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state); void rd_kafka_broker_fail(rd_kafka_broker_t *rkb, int level, rd_kafka_resp_err_t err, const char *fmt, ...) RD_FORMAT(printf, 4, 5); void rd_kafka_broker_conn_closed(rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, const char *errstr); void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb); #define rd_kafka_broker_destroy(rkb) \ rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt, \ rd_kafka_broker_destroy_final(rkb)) void rd_kafka_broker_update(rd_kafka_t *rk, rd_kafka_secproto_t proto, const struct rd_kafka_metadata_broker *mdb, rd_kafka_broker_t **rkbp); rd_kafka_broker_t *rd_kafka_broker_add(rd_kafka_t *rk, rd_kafka_confsource_t source, rd_kafka_secproto_t proto, const char *name, uint16_t port, int32_t nodeid); rd_kafka_broker_t *rd_kafka_broker_add_logical(rd_kafka_t *rk, const char *name); /** @define returns true if broker is logical. No locking is needed. */ #define RD_KAFKA_BROKER_IS_LOGICAL(rkb) ((rkb)->rkb_source == RD_KAFKA_LOGICAL) void rd_kafka_broker_set_nodename(rd_kafka_broker_t *rkb, rd_kafka_broker_t *from_rkb); void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb); void rd_kafka_broker_connect_done(rd_kafka_broker_t *rkb, const char *errstr); int rd_kafka_send(rd_kafka_broker_t *rkb); int rd_kafka_recv(rd_kafka_broker_t *rkb); void rd_kafka_dr_msgq(rd_kafka_topic_t *rkt, rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err); void rd_kafka_dr_implicit_ack(rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp, uint64_t last_msgid); void rd_kafka_broker_buf_enq1(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf, rd_kafka_resp_cb_t *resp_cb, void *opaque); void rd_kafka_broker_buf_enq_replyq(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque); void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf); rd_kafka_broker_t *rd_kafka_broker_internal(rd_kafka_t *rk); void msghdr_print(rd_kafka_t *rk, const char *what, const struct msghdr *msg, int hexdump); int32_t rd_kafka_broker_id(rd_kafka_broker_t *rkb); const char *rd_kafka_broker_name(rd_kafka_broker_t *rkb); void rd_kafka_broker_wakeup(rd_kafka_broker_t *rkb, const char *reason); int rd_kafka_all_brokers_wakeup(rd_kafka_t *rk, int min_state, const char *reason); void rd_kafka_connect_any(rd_kafka_t *rk, const char *reason); void rd_kafka_broker_purge_queues(rd_kafka_broker_t *rkb, int purge_flags, rd_kafka_replyq_t replyq); int rd_kafka_brokers_get_state_version(rd_kafka_t *rk); int rd_kafka_brokers_wait_state_change(rd_kafka_t *rk, int stored_version, int timeout_ms); int rd_kafka_brokers_wait_state_change_async(rd_kafka_t *rk, int stored_version, rd_kafka_enq_once_t *eonce); void rd_kafka_brokers_broadcast_state_change(rd_kafka_t *rk); /** * Updates the current toppar active round-robin next pointer. */ static RD_INLINE RD_UNUSED void rd_kafka_broker_active_toppar_next(rd_kafka_broker_t *rkb, rd_kafka_toppar_t *sugg_next) { if (CIRCLEQ_EMPTY(&rkb->rkb_active_toppars) || (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_active_toppars)) rkb->rkb_active_toppar_next = NULL; else if (sugg_next) rkb->rkb_active_toppar_next = sugg_next; else rkb->rkb_active_toppar_next = CIRCLEQ_FIRST(&rkb->rkb_active_toppars); } void rd_kafka_broker_active_toppar_add(rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp, const char *reason); void rd_kafka_broker_active_toppar_del(rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp, const char *reason); void rd_kafka_broker_schedule_connection(rd_kafka_broker_t *rkb); void rd_kafka_broker_persistent_connection_add(rd_kafka_broker_t *rkb, rd_atomic32_t *acntp); void rd_kafka_broker_persistent_connection_del(rd_kafka_broker_t *rkb, rd_atomic32_t *acntp); void rd_kafka_broker_monitor_add(rd_kafka_broker_monitor_t *rkbmon, rd_kafka_broker_t *rkb, rd_kafka_q_t *rkq, void (*callback)(rd_kafka_broker_t *rkb)); void rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t *rkbmon); int unittest_broker(void); #endif /* _RDKAFKA_BROKER_H_ */