summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_broker.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_broker.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_broker.h607
1 files changed, 607 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_broker.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_broker.h
new file mode 100644
index 000000000..1e454d4d7
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_broker.h
@@ -0,0 +1,607 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012,2013 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDKAFKA_BROKER_H_
+#define _RDKAFKA_BROKER_H_
+
+#include "rdkafka_feature.h"
+
+
+extern const char *rd_kafka_broker_state_names[];
+extern const char *rd_kafka_secproto_names[];
+
+
+/**
+ * @enum Broker states
+ */
+typedef enum {
+ RD_KAFKA_BROKER_STATE_INIT,
+ RD_KAFKA_BROKER_STATE_DOWN,
+ RD_KAFKA_BROKER_STATE_TRY_CONNECT,
+ RD_KAFKA_BROKER_STATE_CONNECT,
+ RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE,
+ RD_KAFKA_BROKER_STATE_AUTH_LEGACY,
+
+ /* Any state >= STATE_UP means the Kafka protocol layer
+ * is operational (to some degree). */
+ RD_KAFKA_BROKER_STATE_UP,
+ RD_KAFKA_BROKER_STATE_UPDATE,
+ RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
+ RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE,
+ RD_KAFKA_BROKER_STATE_AUTH_REQ,
+} rd_kafka_broker_state_t;
+
+/**
+ * @struct Broker state monitor.
+ *
+ * @warning The monitor object lifetime should be the same as
+ * the rd_kafka_t object, not shorter.
+ */
+typedef struct rd_kafka_broker_monitor_s {
+ TAILQ_ENTRY(rd_kafka_broker_monitor_s) rkbmon_link; /**< rkb_monitors*/
+ struct rd_kafka_broker_s *rkbmon_rkb; /**< Broker being monitored. */
+ rd_kafka_q_t *rkbmon_q; /**< Queue to enqueue op on. */
+
+ /**< Callback triggered on the monitoree's op handler thread.
+ * Do note that the callback might be triggered even after
+ * it has been deleted due to the queueing nature of op queues. */
+ void (*rkbmon_cb)(rd_kafka_broker_t *rkb);
+} rd_kafka_broker_monitor_t;
+
+
+/**
+ * @struct Broker instance
+ */
+struct rd_kafka_broker_s { /* rd_kafka_broker_t */
+ TAILQ_ENTRY(rd_kafka_broker_s) rkb_link;
+
+ int32_t rkb_nodeid; /**< Broker Node Id.
+ * @locks rkb_lock */
+#define RD_KAFKA_NODEID_UA -1
+
+ rd_sockaddr_list_t *rkb_rsal;
+ rd_ts_t rkb_ts_rsal_last;
+ const rd_sockaddr_inx_t *rkb_addr_last; /* Last used connect address */
+
+ rd_kafka_transport_t *rkb_transport;
+
+ uint32_t rkb_corrid;
+ int rkb_connid; /* Connection id, increased by
+ * one for each connection by
+ * this broker. Used as a safe-guard
+ * to help troubleshooting buffer
+ * problems across disconnects. */
+
+ rd_kafka_q_t *rkb_ops;
+
+ mtx_t rkb_lock;
+
+ int rkb_blocking_max_ms; /* Maximum IO poll blocking
+ * time. */
+
+ /* Toppars handled by this broker */
+ TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars;
+ int rkb_toppar_cnt;
+
+ /* Active toppars that are eligible for:
+ * - (consumer) fetching due to underflow
+ * - (producer) producing
+ *
+ * The circleq provides round-robin scheduling for both cases.
+ */
+ CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_active_toppars;
+ int rkb_active_toppar_cnt;
+ rd_kafka_toppar_t *rkb_active_toppar_next; /* Next 'first' toppar
+ * in fetch list.
+ * This is used for
+ * round-robin. */
+
+
+ rd_kafka_cgrp_t *rkb_cgrp;
+
+ rd_ts_t rkb_ts_fetch_backoff;
+ int rkb_fetching;
+
+ rd_kafka_broker_state_t rkb_state; /**< Current broker state */
+
+ rd_ts_t rkb_ts_state; /* Timestamp of last
+ * state change */
+ rd_interval_t rkb_timeout_scan_intvl; /* Waitresp timeout scan
+ * interval. */
+
+ rd_atomic32_t rkb_blocking_request_cnt; /* The number of
+ * in-flight blocking
+ * requests.
+ * A blocking request is
+ * one that is known to
+ * possibly block on the
+ * broker for longer than
+ * the typical processing
+ * time, e.g.:
+ * JoinGroup, SyncGroup */
+
+ int rkb_features; /* Protocol features supported
+ * by this broker.
+ * See RD_KAFKA_FEATURE_* in
+ * rdkafka_proto.h */
+
+ struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs
+ * (MUST be sorted) */
+ size_t rkb_ApiVersions_cnt;
+ rd_interval_t rkb_ApiVersion_fail_intvl; /* Controls how long
+ * the fallback proto
+ * will be used after
+ * ApiVersionRequest
+ * failure. */
+
+ rd_kafka_confsource_t rkb_source;
+ struct {
+ rd_atomic64_t tx_bytes;
+ rd_atomic64_t tx; /**< Kafka requests */
+ rd_atomic64_t tx_err;
+ rd_atomic64_t tx_retries;
+ rd_atomic64_t req_timeouts; /* Accumulated value */
+
+ rd_atomic64_t rx_bytes;
+ rd_atomic64_t rx; /**< Kafka responses */
+ rd_atomic64_t rx_err;
+ rd_atomic64_t rx_corrid_err; /* CorrId misses */
+ rd_atomic64_t rx_partial; /* Partial messages received
+ * and dropped. */
+ rd_atomic64_t zbuf_grow; /* Compression/decompression buffer
+ grows needed */
+ rd_atomic64_t buf_grow; /* rkbuf grows needed */
+ rd_atomic64_t wakeups; /* Poll wakeups */
+
+ rd_atomic32_t connects; /**< Connection attempts,
+ * successful or not. */
+
+ rd_atomic32_t disconnects; /**< Disconnects.
+ * Always peer-triggered. */
+
+ rd_atomic64_t reqtype[RD_KAFKAP__NUM]; /**< Per request-type
+ * counter */
+
+ rd_atomic64_t ts_send; /**< Timestamp of last send */
+ rd_atomic64_t ts_recv; /**< Timestamp of last receive */
+ } rkb_c;
+
+ int rkb_req_timeouts; /* Current value */
+
+ thrd_t rkb_thread;
+
+ rd_refcnt_t rkb_refcnt;
+
+ rd_kafka_t *rkb_rk;
+
+ rd_kafka_buf_t *rkb_recv_buf;
+
+ int rkb_max_inflight; /* Maximum number of in-flight
+ * requests to broker.
+ * Compared to rkb_waitresps length.*/
+ rd_kafka_bufq_t rkb_outbufs;
+ rd_kafka_bufq_t rkb_waitresps;
+ rd_kafka_bufq_t rkb_retrybufs;
+
+ rd_avg_t rkb_avg_int_latency; /* Current internal latency period*/
+ rd_avg_t rkb_avg_outbuf_latency; /**< Current latency
+ * between buf_enq0
+ * and writing to socket
+ */
+ rd_avg_t rkb_avg_rtt; /* Current RTT period */
+ rd_avg_t rkb_avg_throttle; /* Current throttle period */
+
+ /* These are all protected by rkb_lock */
+ char rkb_name[RD_KAFKA_NODENAME_SIZE]; /* Displ name */
+ char rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/
+ uint16_t rkb_port; /* TCP port */
+ char *rkb_origname; /* Original
+ * host name */
+ int rkb_nodename_epoch; /**< Bumped each time
+ * the nodename is changed.
+ * Compared to
+ * rkb_connect_epoch
+ * to trigger a reconnect
+ * for logical broker
+ * when the nodename is
+ * updated. */
+ int rkb_connect_epoch; /**< The value of
+ * rkb_nodename_epoch at the
+ * last connection attempt.
+ */
+
+ /* Logging name is a copy of rkb_name, protected by its own mutex */
+ char *rkb_logname;
+ mtx_t rkb_logname_lock;
+
+ rd_socket_t rkb_wakeup_fd[2]; /* Wake-up fds (r/w) to wake
+ * up from IO-wait when
+ * queues have content. */
+
+ /**< Current, exponentially increased, reconnect backoff. */
+ int rkb_reconnect_backoff_ms;
+
+ /**< Absolute timestamp of next allowed reconnect. */
+ rd_ts_t rkb_ts_reconnect;
+
+ /** Absolute time of last connection attempt. */
+ rd_ts_t rkb_ts_connect;
+
+ /**< Persistent connection demand is tracked by
+ * a counter for each type of demand.
+ * The broker thread will maintain a persistent connection
+ * if any of the counters are non-zero, and revert to
+ * on-demand mode when they all reach zero.
+ * After incrementing any of the counters a broker wakeup
+ * should be signalled to expedite handling. */
+ struct {
+ /**< Producer: partitions are being produced to.
+ * Consumer: partitions are being fetched from.
+ *
+ * Counter is maintained by the broker handler thread
+ * itself, no need for atomic/locking.
+ * Is reset to 0 on each producer|consumer_serve() loop
+ * and updated according to current need, which
+ * will trigger a state transition to
+ * TRY_CONNECT if a connection is needed. */
+ int internal;
+
+ /**< Consumer: Broker is the group coordinator.
+ * Counter is maintained by cgrp logic in
+ * rdkafka main thread.
+ *
+ * Producer: Broker is the transaction coordinator.
+ * Counter is maintained by rdkafka_idempotence.c.
+ *
+ * All: A coord_req_t is waiting for this broker to come up.
+ */
+
+ rd_atomic32_t coord;
+ } rkb_persistconn;
+
+ /**< Currently registered state monitors.
+ * @locks rkb_lock */
+ TAILQ_HEAD(, rd_kafka_broker_monitor_s) rkb_monitors;
+
+ /**< Coordinator request's broker monitor.
+ * Will trigger the coord_req fsm on broker state change. */
+ rd_kafka_broker_monitor_t rkb_coord_monitor;
+
+ rd_kafka_secproto_t rkb_proto;
+
+ int rkb_down_reported; /* Down event reported */
+#if WITH_SASL_CYRUS
+ rd_kafka_timer_t rkb_sasl_kinit_refresh_tmr;
+#endif
+
+
+ /*
+ * Log suppression
+ */
+ struct {
+ /**< Log: compression type not supported by broker. */
+ rd_interval_t unsupported_compression;
+
+ /**< Log: KIP-62 not supported by broker. */
+ rd_interval_t unsupported_kip62;
+
+ /**< Log: KIP-345 not supported by broker. */
+ rd_interval_t unsupported_kip345;
+
+ /**< Log & Error: identical broker_fail() errors. */
+ rd_interval_t fail_error;
+ } rkb_suppress;
+
+ /** Last error. This is used to suppress repeated logs. */
+ struct {
+ char errstr[512]; /**< Last error string */
+ rd_kafka_resp_err_t err; /**< Last error code */
+ int cnt; /**< Number of identical errors */
+ } rkb_last_err;
+};
+
+#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
+#define rd_kafka_broker_keep_fl(FUNC, LINE, RKB) \
+ rd_refcnt_add_fl(FUNC, LINE, &(RKB)->rkb_refcnt)
+#define rd_kafka_broker_lock(rkb) mtx_lock(&(rkb)->rkb_lock)
+#define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock)
+
+
+/**
+ * @brief Locks broker, acquires the states, unlocks, and returns
+ * the state.
+ * @locks broker_lock MUST NOT be held.
+ * @locality any
+ */
+static RD_INLINE RD_UNUSED rd_kafka_broker_state_t
+rd_kafka_broker_get_state(rd_kafka_broker_t *rkb) {
+ rd_kafka_broker_state_t state;
+ rd_kafka_broker_lock(rkb);
+ state = rkb->rkb_state;
+ rd_kafka_broker_unlock(rkb);
+ return state;
+}
+
+
+
+/**
+ * @returns true if the broker state is UP or UPDATE
+ */
+#define rd_kafka_broker_state_is_up(state) \
+ ((state) == RD_KAFKA_BROKER_STATE_UP || \
+ (state) == RD_KAFKA_BROKER_STATE_UPDATE)
+
+
+/**
+ * @returns true if the broker connection is up, else false.
+ * @locks broker_lock MUST NOT be held.
+ * @locality any
+ */
+static RD_UNUSED RD_INLINE rd_bool_t
+rd_kafka_broker_is_up(rd_kafka_broker_t *rkb) {
+ rd_kafka_broker_state_t state = rd_kafka_broker_get_state(rkb);
+ return rd_kafka_broker_state_is_up(state);
+}
+
+
+/**
+ * @brief Broker comparator
+ */
+static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp(const void *_a,
+ const void *_b) {
+ const rd_kafka_broker_t *a = _a, *b = _b;
+ return RD_CMP(a, b);
+}
+
+
+/**
+ * @returns true if broker supports \p features, else false.
+ */
+static RD_UNUSED int rd_kafka_broker_supports(rd_kafka_broker_t *rkb,
+ int features) {
+ const rd_bool_t do_lock = !thrd_is_current(rkb->rkb_thread);
+ int r;
+
+ if (do_lock)
+ rd_kafka_broker_lock(rkb);
+
+ r = (rkb->rkb_features & features) == features;
+
+ if (do_lock)
+ rd_kafka_broker_unlock(rkb);
+ return r;
+}
+
+int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb,
+ int16_t ApiKey,
+ int16_t minver,
+ int16_t maxver,
+ int *featuresp);
+
+rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0_fl(const char *func,
+ int line,
+ rd_kafka_t *rk,
+ int32_t nodeid,
+ int state,
+ rd_bool_t do_connect);
+
+#define rd_kafka_broker_find_by_nodeid0(rk, nodeid, state, do_connect) \
+ rd_kafka_broker_find_by_nodeid0_fl(__FUNCTION__, __LINE__, rk, nodeid, \
+ state, do_connect)
+#define rd_kafka_broker_find_by_nodeid(rk, nodeid) \
+ rd_kafka_broker_find_by_nodeid0(rk, nodeid, -1, rd_false)
+
+
+/**
+ * Filter out brokers that don't support Idempotent Producer.
+ */
+static RD_INLINE RD_UNUSED int
+rd_kafka_broker_filter_non_idempotent(rd_kafka_broker_t *rkb, void *opaque) {
+ return !(rkb->rkb_features & RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER);
+}
+
+
+rd_kafka_broker_t *rd_kafka_broker_any(rd_kafka_t *rk,
+ int state,
+ int (*filter)(rd_kafka_broker_t *rkb,
+ void *opaque),
+ void *opaque,
+ const char *reason);
+rd_kafka_broker_t *rd_kafka_broker_any_up(rd_kafka_t *rk,
+ int *filtered_cnt,
+ int (*filter)(rd_kafka_broker_t *rkb,
+ void *opaque),
+ void *opaque,
+ const char *reason);
+rd_kafka_broker_t *rd_kafka_broker_any_usable(rd_kafka_t *rk,
+ int timeout_ms,
+ rd_dolock_t do_lock,
+ int features,
+ const char *reason);
+
+rd_kafka_broker_t *
+rd_kafka_broker_prefer(rd_kafka_t *rk, int32_t broker_id, int state);
+
+rd_kafka_broker_t *rd_kafka_broker_get_async(rd_kafka_t *rk,
+ int32_t broker_id,
+ int state,
+ rd_kafka_enq_once_t *eonce);
+
+rd_list_t *rd_kafka_brokers_get_nodeids_async(rd_kafka_t *rk,
+ rd_kafka_enq_once_t *eonce);
+
+rd_kafka_broker_t *
+rd_kafka_broker_controller(rd_kafka_t *rk, int state, rd_ts_t abs_timeout);
+rd_kafka_broker_t *rd_kafka_broker_controller_async(rd_kafka_t *rk,
+ int state,
+ rd_kafka_enq_once_t *eonce);
+
+int rd_kafka_brokers_add0(rd_kafka_t *rk, const char *brokerlist);
+void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state);
+
+void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
+ int level,
+ rd_kafka_resp_err_t err,
+ const char *fmt,
+ ...) RD_FORMAT(printf, 4, 5);
+
+void rd_kafka_broker_conn_closed(rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ const char *errstr);
+
+void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb);
+
+#define rd_kafka_broker_destroy(rkb) \
+ rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt, \
+ rd_kafka_broker_destroy_final(rkb))
+
+
+void rd_kafka_broker_update(rd_kafka_t *rk,
+ rd_kafka_secproto_t proto,
+ const struct rd_kafka_metadata_broker *mdb,
+ rd_kafka_broker_t **rkbp);
+rd_kafka_broker_t *rd_kafka_broker_add(rd_kafka_t *rk,
+ rd_kafka_confsource_t source,
+ rd_kafka_secproto_t proto,
+ const char *name,
+ uint16_t port,
+ int32_t nodeid);
+
+rd_kafka_broker_t *rd_kafka_broker_add_logical(rd_kafka_t *rk,
+ const char *name);
+
+/** @define returns true if broker is logical. No locking is needed. */
+#define RD_KAFKA_BROKER_IS_LOGICAL(rkb) ((rkb)->rkb_source == RD_KAFKA_LOGICAL)
+
+void rd_kafka_broker_set_nodename(rd_kafka_broker_t *rkb,
+ rd_kafka_broker_t *from_rkb);
+
+void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb);
+void rd_kafka_broker_connect_done(rd_kafka_broker_t *rkb, const char *errstr);
+
+int rd_kafka_send(rd_kafka_broker_t *rkb);
+int rd_kafka_recv(rd_kafka_broker_t *rkb);
+
+void rd_kafka_dr_msgq(rd_kafka_topic_t *rkt,
+ rd_kafka_msgq_t *rkmq,
+ rd_kafka_resp_err_t err);
+
+void rd_kafka_dr_implicit_ack(rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp,
+ uint64_t last_msgid);
+
+void rd_kafka_broker_buf_enq1(rd_kafka_broker_t *rkb,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+void rd_kafka_broker_buf_enq_replyq(rd_kafka_broker_t *rkb,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
+void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
+
+
+rd_kafka_broker_t *rd_kafka_broker_internal(rd_kafka_t *rk);
+
+void msghdr_print(rd_kafka_t *rk,
+ const char *what,
+ const struct msghdr *msg,
+ int hexdump);
+
+int32_t rd_kafka_broker_id(rd_kafka_broker_t *rkb);
+const char *rd_kafka_broker_name(rd_kafka_broker_t *rkb);
+void rd_kafka_broker_wakeup(rd_kafka_broker_t *rkb, const char *reason);
+int rd_kafka_all_brokers_wakeup(rd_kafka_t *rk,
+ int min_state,
+ const char *reason);
+
+void rd_kafka_connect_any(rd_kafka_t *rk, const char *reason);
+
+void rd_kafka_broker_purge_queues(rd_kafka_broker_t *rkb,
+ int purge_flags,
+ rd_kafka_replyq_t replyq);
+
+int rd_kafka_brokers_get_state_version(rd_kafka_t *rk);
+int rd_kafka_brokers_wait_state_change(rd_kafka_t *rk,
+ int stored_version,
+ int timeout_ms);
+int rd_kafka_brokers_wait_state_change_async(rd_kafka_t *rk,
+ int stored_version,
+ rd_kafka_enq_once_t *eonce);
+void rd_kafka_brokers_broadcast_state_change(rd_kafka_t *rk);
+
+
+
+/**
+ * Updates the current toppar active round-robin next pointer.
+ */
+static RD_INLINE RD_UNUSED void
+rd_kafka_broker_active_toppar_next(rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *sugg_next) {
+ if (CIRCLEQ_EMPTY(&rkb->rkb_active_toppars) ||
+ (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_active_toppars))
+ rkb->rkb_active_toppar_next = NULL;
+ else if (sugg_next)
+ rkb->rkb_active_toppar_next = sugg_next;
+ else
+ rkb->rkb_active_toppar_next =
+ CIRCLEQ_FIRST(&rkb->rkb_active_toppars);
+}
+
+
+void rd_kafka_broker_active_toppar_add(rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp,
+ const char *reason);
+
+void rd_kafka_broker_active_toppar_del(rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp,
+ const char *reason);
+
+
+void rd_kafka_broker_schedule_connection(rd_kafka_broker_t *rkb);
+
+void rd_kafka_broker_persistent_connection_add(rd_kafka_broker_t *rkb,
+ rd_atomic32_t *acntp);
+
+void rd_kafka_broker_persistent_connection_del(rd_kafka_broker_t *rkb,
+ rd_atomic32_t *acntp);
+
+
+void rd_kafka_broker_monitor_add(rd_kafka_broker_monitor_t *rkbmon,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_q_t *rkq,
+ void (*callback)(rd_kafka_broker_t *rkb));
+
+void rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t *rkbmon);
+
+int unittest_broker(void);
+
+#endif /* _RDKAFKA_BROKER_H_ */