summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c623
1 files changed, 623 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c
new file mode 100644
index 000000000..9e41bab72
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c
@@ -0,0 +1,623 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2019 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include "rdkafka_int.h"
+#include "rdkafka_request.h"
+#include "rdkafka_coord.h"
+
+
+/**
+ * @name Coordinator cache
+ * @{
+ *
+ */
+void rd_kafka_coord_cache_entry_destroy(rd_kafka_coord_cache_t *cc,
+ rd_kafka_coord_cache_entry_t *cce) {
+ rd_assert(cc->cc_cnt > 0);
+ rd_free(cce->cce_coordkey);
+ rd_kafka_broker_destroy(cce->cce_rkb);
+ TAILQ_REMOVE(&cc->cc_entries, cce, cce_link);
+ cc->cc_cnt--;
+ rd_free(cce);
+}
+
+
+/**
+ * @brief Delete any expired cache entries
+ *
+ * @locality rdkafka main thread
+ */
+void rd_kafka_coord_cache_expire(rd_kafka_coord_cache_t *cc) {
+ rd_kafka_coord_cache_entry_t *cce, *next;
+ rd_ts_t expire = rd_clock() - cc->cc_expire_thres;
+
+ next = TAILQ_LAST(&cc->cc_entries, rd_kafka_coord_cache_head_s);
+ while (next) {
+ cce = next;
+
+ if (cce->cce_ts_used > expire)
+ break;
+
+ next = TAILQ_PREV(cce, rd_kafka_coord_cache_head_s, cce_link);
+ rd_kafka_coord_cache_entry_destroy(cc, cce);
+ }
+}
+
+
+static rd_kafka_coord_cache_entry_t *
+rd_kafka_coord_cache_find(rd_kafka_coord_cache_t *cc,
+ rd_kafka_coordtype_t coordtype,
+ const char *coordkey) {
+ rd_kafka_coord_cache_entry_t *cce;
+
+ TAILQ_FOREACH(cce, &cc->cc_entries, cce_link) {
+ if (cce->cce_coordtype == coordtype &&
+ !strcmp(cce->cce_coordkey, coordkey)) {
+ /* Match */
+ cce->cce_ts_used = rd_clock();
+ if (TAILQ_FIRST(&cc->cc_entries) != cce) {
+ /* Move to head of list */
+ TAILQ_REMOVE(&cc->cc_entries, cce, cce_link);
+ TAILQ_INSERT_HEAD(&cc->cc_entries, cce,
+ cce_link);
+ }
+ return cce;
+ }
+ }
+
+ return NULL;
+}
+
+
+rd_kafka_broker_t *rd_kafka_coord_cache_get(rd_kafka_coord_cache_t *cc,
+ rd_kafka_coordtype_t coordtype,
+ const char *coordkey) {
+ rd_kafka_coord_cache_entry_t *cce;
+
+ cce = rd_kafka_coord_cache_find(cc, coordtype, coordkey);
+ if (!cce)
+ return NULL;
+
+ rd_kafka_broker_keep(cce->cce_rkb);
+ return cce->cce_rkb;
+}
+
+
+
+static void rd_kafka_coord_cache_add(rd_kafka_coord_cache_t *cc,
+ rd_kafka_coordtype_t coordtype,
+ const char *coordkey,
+ rd_kafka_broker_t *rkb) {
+ rd_kafka_coord_cache_entry_t *cce;
+
+ if (!(cce = rd_kafka_coord_cache_find(cc, coordtype, coordkey))) {
+ if (cc->cc_cnt > 10) {
+ /* Not enough room in cache, remove least used entry */
+ rd_kafka_coord_cache_entry_t *rem = TAILQ_LAST(
+ &cc->cc_entries, rd_kafka_coord_cache_head_s);
+ rd_kafka_coord_cache_entry_destroy(cc, rem);
+ }
+
+ cce = rd_calloc(1, sizeof(*cce));
+ cce->cce_coordtype = coordtype;
+ cce->cce_coordkey = rd_strdup(coordkey);
+ cce->cce_ts_used = rd_clock();
+
+ TAILQ_INSERT_HEAD(&cc->cc_entries, cce, cce_link);
+ cc->cc_cnt++;
+ }
+
+ if (cce->cce_rkb != rkb) {
+ if (cce->cce_rkb)
+ rd_kafka_broker_destroy(cce->cce_rkb);
+ cce->cce_rkb = rkb;
+ rd_kafka_broker_keep(rkb);
+ }
+}
+
+
+/**
+ * @brief Evict any cache entries for broker \p rkb.
+ *
+ * Use this when a request returns ERR_NOT_COORDINATOR_FOR...
+ *
+ * @locality rdkafka main thread
+ * @locks none
+ */
+void rd_kafka_coord_cache_evict(rd_kafka_coord_cache_t *cc,
+ rd_kafka_broker_t *rkb) {
+ rd_kafka_coord_cache_entry_t *cce, *tmp;
+
+ TAILQ_FOREACH_SAFE(cce, &cc->cc_entries, cce_link, tmp) {
+ if (cce->cce_rkb == rkb)
+ rd_kafka_coord_cache_entry_destroy(cc, cce);
+ }
+}
+
+/**
+ * @brief Destroy all coord cache entries.
+ */
+void rd_kafka_coord_cache_destroy(rd_kafka_coord_cache_t *cc) {
+ rd_kafka_coord_cache_entry_t *cce;
+
+ while ((cce = TAILQ_FIRST(&cc->cc_entries)))
+ rd_kafka_coord_cache_entry_destroy(cc, cce);
+}
+
+
+/**
+ * @brief Initialize the coord cache.
+ *
+ * Locking of the coord-cache is up to the owner.
+ */
+void rd_kafka_coord_cache_init(rd_kafka_coord_cache_t *cc,
+ int expire_thres_ms) {
+ TAILQ_INIT(&cc->cc_entries);
+ cc->cc_cnt = 0;
+ cc->cc_expire_thres = expire_thres_ms * 1000;
+}
+
+/**@}*/
+
+
+/**
+ * @name Asynchronous coordinator requests
+ * @{
+ *
+ */
+
+
+
+static void rd_kafka_coord_req_fsm(rd_kafka_t *rk, rd_kafka_coord_req_t *creq);
+
+/**
+ * @brief Timer callback for delayed coord requests.
+ */
+static void rd_kafka_coord_req_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
+ rd_kafka_coord_req_t *creq = arg;
+
+ rd_kafka_coord_req_fsm(rkts->rkts_rk, creq);
+}
+
+
+/**
+ * @brief Look up coordinator for \p coordtype and \p coordkey
+ * (either from cache or by FindCoordinator), make sure there is
+ * a connection to the coordinator, and then call \p send_req_cb,
+ * passing the coordinator broker instance and \p rko
+ * to send the request.
+ * These steps may be performed by this function, or asynchronously
+ * at a later time.
+ *
+ * @param delay_ms If non-zero, delay scheduling of the coord request
+ * for this long. The passed \p timeout_ms is automatically
+ * adjusted to + \p delay_ms.
+ *
+ * Response, or error, is sent on \p replyq with callback \p rkbuf_cb.
+ *
+ * @locality rdkafka main thread
+ * @locks none
+ */
+void rd_kafka_coord_req(rd_kafka_t *rk,
+ rd_kafka_coordtype_t coordtype,
+ const char *coordkey,
+ rd_kafka_send_req_cb_t *send_req_cb,
+ rd_kafka_op_t *rko,
+ int delay_ms,
+ int timeout_ms,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *reply_opaque) {
+ rd_kafka_coord_req_t *creq;
+
+ creq = rd_calloc(1, sizeof(*creq));
+ creq->creq_coordtype = coordtype;
+ creq->creq_coordkey = rd_strdup(coordkey);
+ creq->creq_ts_timeout = rd_timeout_init(delay_ms + timeout_ms);
+ creq->creq_send_req_cb = send_req_cb;
+ creq->creq_rko = rko;
+ creq->creq_replyq = replyq;
+ creq->creq_resp_cb = resp_cb;
+ creq->creq_reply_opaque = reply_opaque;
+ creq->creq_refcnt = 1;
+ creq->creq_done = rd_false;
+ rd_interval_init(&creq->creq_query_intvl);
+
+ TAILQ_INSERT_TAIL(&rk->rk_coord_reqs, creq, creq_link);
+
+ if (delay_ms)
+ rd_kafka_timer_start_oneshot(&rk->rk_timers, &creq->creq_tmr,
+ rd_true, (rd_ts_t)delay_ms * 1000,
+ rd_kafka_coord_req_tmr_cb, creq);
+ else
+ rd_kafka_coord_req_fsm(rk, creq);
+}
+
+
+/**
+ * @brief Decrease refcount of creq and free it if no more references.
+ *
+ * @param done Mark creq as done, having performed its duties. There may still
+ * be lingering references.
+ *
+ * @returns true if creq was destroyed, else false.
+ */
+static rd_bool_t rd_kafka_coord_req_destroy(rd_kafka_t *rk,
+ rd_kafka_coord_req_t *creq,
+ rd_bool_t done) {
+
+ rd_assert(creq->creq_refcnt > 0);
+
+ if (done) {
+ /* Request has been performed, remove from rk_coord_reqs
+ * list so creq won't be triggered again by state broadcasts,
+ * etc. */
+ rd_dassert(!creq->creq_done);
+ TAILQ_REMOVE(&rk->rk_coord_reqs, creq, creq_link);
+ creq->creq_done = rd_true;
+
+ rd_kafka_timer_stop(&rk->rk_timers, &creq->creq_tmr,
+ RD_DO_LOCK);
+ }
+
+ if (--creq->creq_refcnt > 0)
+ return rd_false;
+
+ rd_dassert(creq->creq_done);
+
+ /* Clear out coordinator we were waiting for. */
+ if (creq->creq_rkb) {
+ rd_kafka_broker_persistent_connection_del(
+ creq->creq_rkb, &creq->creq_rkb->rkb_persistconn.coord);
+ rd_kafka_broker_destroy(creq->creq_rkb);
+ creq->creq_rkb = NULL;
+ }
+
+ rd_kafka_replyq_destroy(&creq->creq_replyq);
+ rd_free(creq->creq_coordkey);
+ rd_free(creq);
+
+ return rd_true;
+}
+
+static void rd_kafka_coord_req_keep(rd_kafka_coord_req_t *creq) {
+ creq->creq_refcnt++;
+}
+
+static void rd_kafka_coord_req_fail(rd_kafka_t *rk,
+ rd_kafka_coord_req_t *creq,
+ rd_kafka_resp_err_t err) {
+ rd_kafka_op_t *reply;
+ rd_kafka_buf_t *rkbuf;
+
+ reply = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
+ reply->rko_rk = rk; /* Set rk since the rkbuf will not have a rkb
+ * to reach it. */
+ reply->rko_err = err;
+
+ /* Need a dummy rkbuf to pass state to the buf resp_cb */
+ rkbuf = rd_kafka_buf_new(0, 0);
+ rkbuf->rkbuf_cb = creq->creq_resp_cb;
+ rkbuf->rkbuf_opaque = creq->creq_reply_opaque;
+ reply->rko_u.xbuf.rkbuf = rkbuf;
+
+ rd_kafka_replyq_enq(&creq->creq_replyq, reply, 0);
+
+ rd_kafka_coord_req_destroy(rk, creq, rd_true /*done*/);
+}
+
+
+static void rd_kafka_coord_req_handle_FindCoordinator(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ const int log_decode_errors = LOG_ERR;
+ rd_kafka_coord_req_t *creq = opaque;
+ int16_t ErrorCode;
+ rd_kafkap_str_t Host;
+ int32_t NodeId, Port;
+ char errstr[256] = "";
+ int actions;
+ rd_kafka_broker_t *coord;
+ rd_kafka_metadata_broker_t mdb = RD_ZERO_INIT;
+
+ /* If creq has finished (possibly because of an earlier FindCoordinator
+ * response or a broker state broadcast we simply ignore the
+ * response. */
+ if (creq->creq_done)
+ err = RD_KAFKA_RESP_ERR__DESTROY;
+
+ if (err)
+ goto err;
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 1)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 1) {
+ rd_kafkap_str_t ErrorMsg;
+ rd_kafka_buf_read_str(rkbuf, &ErrorMsg);
+ if (ErrorCode)
+ rd_snprintf(errstr, sizeof(errstr), "%.*s",
+ RD_KAFKAP_STR_PR(&ErrorMsg));
+ }
+
+ if ((err = ErrorCode))
+ goto err;
+
+ rd_kafka_buf_read_i32(rkbuf, &NodeId);
+ rd_kafka_buf_read_str(rkbuf, &Host);
+ rd_kafka_buf_read_i32(rkbuf, &Port);
+
+ mdb.id = NodeId;
+ RD_KAFKAP_STR_DUPA(&mdb.host, &Host);
+ mdb.port = Port;
+
+ /* Find, update or add broker */
+ rd_kafka_broker_update(rk, rkb->rkb_proto, &mdb, &coord);
+
+ if (!coord) {
+ err = RD_KAFKA_RESP_ERR__FAIL;
+ rd_snprintf(errstr, sizeof(errstr),
+ "Failed to add broker: "
+ "instance is probably terminating");
+ goto err;
+ }
+
+
+ rd_kafka_coord_cache_add(&rk->rk_coord_cache, creq->creq_coordtype,
+ creq->creq_coordkey, coord);
+ rd_kafka_broker_destroy(coord); /* refcnt from broker_update() */
+
+ rd_kafka_coord_req_fsm(rk, creq);
+
+ /* Drop refcount from req_fsm() */
+ rd_kafka_coord_req_destroy(rk, creq, rd_false /*!done*/);
+
+ return;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+err:
+ actions = rd_kafka_err_action(
+ rkb, err, request,
+
+ RD_KAFKA_ERR_ACTION_SPECIAL, RD_KAFKA_RESP_ERR__DESTROY,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
+
+ RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR__TRANSPORT,
+
+ RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) {
+ rd_kafka_coord_req_fail(rk, creq, err);
+ return;
+
+ } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+ rd_kafka_buf_retry(rkb, request);
+ return; /* Keep refcnt from req_fsm() and retry */
+ }
+
+ /* Rely on state broadcast to trigger retry */
+
+ /* Drop refcount from req_fsm() */
+ rd_kafka_coord_req_destroy(rk, creq, rd_false /*!done*/);
+}
+
+
+
+/**
+ * @brief State machine for async coordinator requests.
+ *
+ * @remark May destroy the \p creq.
+ *
+ * @locality any
+ * @locks none
+ */
+static void rd_kafka_coord_req_fsm(rd_kafka_t *rk, rd_kafka_coord_req_t *creq) {
+ rd_kafka_broker_t *rkb;
+ rd_kafka_resp_err_t err;
+
+ if (creq->creq_done)
+ /* crqeq has already performed its actions, this is a
+ * lingering reference, e.g., a late FindCoordinator response.
+ * Just ignore. */
+ return;
+
+ if (unlikely(rd_kafka_terminating(rk))) {
+ rd_kafka_coord_req_fail(rk, creq, RD_KAFKA_RESP_ERR__DESTROY);
+ return;
+ }
+
+ /* Do nothing if creq is delayed and the delay time hasn't expired yet.
+ * We will be called again by the timer once it expires.*/
+ if (rd_kafka_timer_next(&rk->rk_timers, &creq->creq_tmr, RD_DO_LOCK) >
+ 0)
+ return;
+
+ /* Check cache first */
+ rkb = rd_kafka_coord_cache_get(
+ &rk->rk_coord_cache, creq->creq_coordtype, creq->creq_coordkey);
+
+ if (rkb) {
+ if (rd_kafka_broker_is_up(rkb)) {
+ /* Cached coordinator is up, send request */
+ rd_kafka_replyq_t replyq;
+
+ /* Clear out previous coordinator we waited for. */
+ if (creq->creq_rkb) {
+ rd_kafka_broker_persistent_connection_del(
+ creq->creq_rkb,
+ &creq->creq_rkb->rkb_persistconn.coord);
+ rd_kafka_broker_destroy(creq->creq_rkb);
+ creq->creq_rkb = NULL;
+ }
+
+ rd_kafka_replyq_copy(&replyq, &creq->creq_replyq);
+ err = creq->creq_send_req_cb(rkb, creq->creq_rko,
+ replyq, creq->creq_resp_cb,
+ creq->creq_reply_opaque);
+
+ if (err) {
+ /* Permanent error, e.g., request not
+ * supported by broker. */
+ rd_kafka_replyq_destroy(&replyq);
+ rd_kafka_coord_req_fail(rk, creq, err);
+ } else {
+ rd_kafka_coord_req_destroy(rk, creq,
+ rd_true /*done*/);
+ }
+
+ } else if (creq->creq_rkb == rkb) {
+ /* No change in coordinator, but it is still not up.
+ * Query for coordinator if at least a second has
+ * passed since this coord_req was created or the
+ * last time we queried. */
+ if (rd_interval(&creq->creq_query_intvl,
+ 1000 * 1000 /* 1s */, 0) > 0) {
+ rd_rkb_dbg(rkb, BROKER, "COORD",
+ "Coordinator connection is "
+ "still down: "
+ "querying for new coordinator");
+ rd_kafka_broker_destroy(rkb);
+ goto query_coord;
+ }
+
+ } else {
+ /* No connection yet.
+ * Let broker thread know we need a connection.
+ * We'll be re-triggered on broker state broadcast. */
+
+ if (creq->creq_rkb) {
+ /* Clear previous */
+ rd_kafka_broker_persistent_connection_del(
+ creq->creq_rkb,
+ &creq->creq_rkb->rkb_persistconn.coord);
+ rd_kafka_broker_destroy(creq->creq_rkb);
+ }
+
+ rd_kafka_broker_keep(rkb);
+ creq->creq_rkb = rkb;
+ rd_kafka_broker_persistent_connection_add(
+ rkb, &rkb->rkb_persistconn.coord);
+ }
+
+ rd_kafka_broker_destroy(rkb);
+ return;
+
+ } else if (creq->creq_rkb) {
+ /* No coordinator information, clear out the previous
+ * coordinator we waited for. */
+ rd_kafka_broker_persistent_connection_del(
+ creq->creq_rkb, &creq->creq_rkb->rkb_persistconn.coord);
+ rd_kafka_broker_destroy(creq->creq_rkb);
+ creq->creq_rkb = NULL;
+ }
+
+query_coord:
+ /* Get any usable broker to look up the coordinator */
+ rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, RD_DO_LOCK,
+ RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
+ "broker to look up coordinator");
+
+ if (!rkb) {
+ /* No available brokers yet, we'll be re-triggered on
+ * broker state broadcast. */
+ return;
+ }
+
+
+ /* Send FindCoordinator request, the handler will continue
+ * the state machine. */
+ rd_kafka_coord_req_keep(creq);
+ err = rd_kafka_FindCoordinatorRequest(
+ rkb, creq->creq_coordtype, creq->creq_coordkey,
+ RD_KAFKA_REPLYQ(rk->rk_ops, 0),
+ rd_kafka_coord_req_handle_FindCoordinator, creq);
+
+ rd_kafka_broker_destroy(rkb);
+
+ if (err) {
+ rd_kafka_coord_req_fail(rk, creq, err);
+ /* from keep() above */
+ rd_kafka_coord_req_destroy(rk, creq, rd_false /*!done*/);
+ }
+}
+
+
+
+/**
+ * @brief Callback called from rdkafka main thread on each
+ * broker state change from or to UP.
+ *
+ * @locality rdkafka main thread
+ * @locks none
+ */
+void rd_kafka_coord_rkb_monitor_cb(rd_kafka_broker_t *rkb) {
+ rd_kafka_t *rk = rkb->rkb_rk;
+ rd_kafka_coord_req_t *creq, *tmp;
+
+ /* Run through all coord_req fsms */
+ TAILQ_FOREACH_SAFE(creq, &rk->rk_coord_reqs, creq_link, tmp) {
+ rd_kafka_coord_req_fsm(rk, creq);
+ }
+}
+
+
+
+/**
+ * @brief Instance is terminating: destroy all coord reqs
+ */
+void rd_kafka_coord_reqs_term(rd_kafka_t *rk) {
+ rd_kafka_coord_req_t *creq;
+
+ while ((creq = TAILQ_FIRST(&rk->rk_coord_reqs)))
+ rd_kafka_coord_req_fail(rk, creq, RD_KAFKA_RESP_ERR__DESTROY);
+}
+
+
+/**
+ * @brief Initialize coord reqs list.
+ */
+void rd_kafka_coord_reqs_init(rd_kafka_t *rk) {
+ TAILQ_INIT(&rk->rk_coord_reqs);
+}
+
+/**@}*/