diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c | 623 |
1 files changed, 623 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c new file mode 100644 index 00000000..9e41bab7 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_coord.c @@ -0,0 +1,623 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "rdkafka_int.h" +#include "rdkafka_request.h" +#include "rdkafka_coord.h" + + +/** + * @name Coordinator cache + * @{ + * + */ +void rd_kafka_coord_cache_entry_destroy(rd_kafka_coord_cache_t *cc, + rd_kafka_coord_cache_entry_t *cce) { + rd_assert(cc->cc_cnt > 0); + rd_free(cce->cce_coordkey); + rd_kafka_broker_destroy(cce->cce_rkb); + TAILQ_REMOVE(&cc->cc_entries, cce, cce_link); + cc->cc_cnt--; + rd_free(cce); +} + + +/** + * @brief Delete any expired cache entries + * + * @locality rdkafka main thread + */ +void rd_kafka_coord_cache_expire(rd_kafka_coord_cache_t *cc) { + rd_kafka_coord_cache_entry_t *cce, *next; + rd_ts_t expire = rd_clock() - cc->cc_expire_thres; + + next = TAILQ_LAST(&cc->cc_entries, rd_kafka_coord_cache_head_s); + while (next) { + cce = next; + + if (cce->cce_ts_used > expire) + break; + + next = TAILQ_PREV(cce, rd_kafka_coord_cache_head_s, cce_link); + rd_kafka_coord_cache_entry_destroy(cc, cce); + } +} + + +static rd_kafka_coord_cache_entry_t * +rd_kafka_coord_cache_find(rd_kafka_coord_cache_t *cc, + rd_kafka_coordtype_t coordtype, + const char *coordkey) { + rd_kafka_coord_cache_entry_t *cce; + + TAILQ_FOREACH(cce, &cc->cc_entries, cce_link) { + if (cce->cce_coordtype == coordtype && + !strcmp(cce->cce_coordkey, coordkey)) { + /* Match */ + cce->cce_ts_used = rd_clock(); + if (TAILQ_FIRST(&cc->cc_entries) != cce) { + /* Move to head of list */ + TAILQ_REMOVE(&cc->cc_entries, cce, cce_link); + TAILQ_INSERT_HEAD(&cc->cc_entries, cce, + cce_link); + } + return cce; + } + } + + return NULL; +} + + +rd_kafka_broker_t *rd_kafka_coord_cache_get(rd_kafka_coord_cache_t *cc, + rd_kafka_coordtype_t coordtype, + const char *coordkey) { + rd_kafka_coord_cache_entry_t *cce; + + cce = rd_kafka_coord_cache_find(cc, coordtype, coordkey); + if (!cce) + return NULL; + + rd_kafka_broker_keep(cce->cce_rkb); + return cce->cce_rkb; +} + + + +static void rd_kafka_coord_cache_add(rd_kafka_coord_cache_t *cc, + rd_kafka_coordtype_t coordtype, + const char *coordkey, + rd_kafka_broker_t *rkb) { + rd_kafka_coord_cache_entry_t *cce; + + if (!(cce = rd_kafka_coord_cache_find(cc, coordtype, coordkey))) { + if (cc->cc_cnt > 10) { + /* Not enough room in cache, remove least used entry */ + rd_kafka_coord_cache_entry_t *rem = TAILQ_LAST( + &cc->cc_entries, rd_kafka_coord_cache_head_s); + rd_kafka_coord_cache_entry_destroy(cc, rem); + } + + cce = rd_calloc(1, sizeof(*cce)); + cce->cce_coordtype = coordtype; + cce->cce_coordkey = rd_strdup(coordkey); + cce->cce_ts_used = rd_clock(); + + TAILQ_INSERT_HEAD(&cc->cc_entries, cce, cce_link); + cc->cc_cnt++; + } + + if (cce->cce_rkb != rkb) { + if (cce->cce_rkb) + rd_kafka_broker_destroy(cce->cce_rkb); + cce->cce_rkb = rkb; + rd_kafka_broker_keep(rkb); + } +} + + +/** + * @brief Evict any cache entries for broker \p rkb. + * + * Use this when a request returns ERR_NOT_COORDINATOR_FOR... + * + * @locality rdkafka main thread + * @locks none + */ +void rd_kafka_coord_cache_evict(rd_kafka_coord_cache_t *cc, + rd_kafka_broker_t *rkb) { + rd_kafka_coord_cache_entry_t *cce, *tmp; + + TAILQ_FOREACH_SAFE(cce, &cc->cc_entries, cce_link, tmp) { + if (cce->cce_rkb == rkb) + rd_kafka_coord_cache_entry_destroy(cc, cce); + } +} + +/** + * @brief Destroy all coord cache entries. + */ +void rd_kafka_coord_cache_destroy(rd_kafka_coord_cache_t *cc) { + rd_kafka_coord_cache_entry_t *cce; + + while ((cce = TAILQ_FIRST(&cc->cc_entries))) + rd_kafka_coord_cache_entry_destroy(cc, cce); +} + + +/** + * @brief Initialize the coord cache. + * + * Locking of the coord-cache is up to the owner. + */ +void rd_kafka_coord_cache_init(rd_kafka_coord_cache_t *cc, + int expire_thres_ms) { + TAILQ_INIT(&cc->cc_entries); + cc->cc_cnt = 0; + cc->cc_expire_thres = expire_thres_ms * 1000; +} + +/**@}*/ + + +/** + * @name Asynchronous coordinator requests + * @{ + * + */ + + + +static void rd_kafka_coord_req_fsm(rd_kafka_t *rk, rd_kafka_coord_req_t *creq); + +/** + * @brief Timer callback for delayed coord requests. + */ +static void rd_kafka_coord_req_tmr_cb(rd_kafka_timers_t *rkts, void *arg) { + rd_kafka_coord_req_t *creq = arg; + + rd_kafka_coord_req_fsm(rkts->rkts_rk, creq); +} + + +/** + * @brief Look up coordinator for \p coordtype and \p coordkey + * (either from cache or by FindCoordinator), make sure there is + * a connection to the coordinator, and then call \p send_req_cb, + * passing the coordinator broker instance and \p rko + * to send the request. + * These steps may be performed by this function, or asynchronously + * at a later time. + * + * @param delay_ms If non-zero, delay scheduling of the coord request + * for this long. The passed \p timeout_ms is automatically + * adjusted to + \p delay_ms. + * + * Response, or error, is sent on \p replyq with callback \p rkbuf_cb. + * + * @locality rdkafka main thread + * @locks none + */ +void rd_kafka_coord_req(rd_kafka_t *rk, + rd_kafka_coordtype_t coordtype, + const char *coordkey, + rd_kafka_send_req_cb_t *send_req_cb, + rd_kafka_op_t *rko, + int delay_ms, + int timeout_ms, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *reply_opaque) { + rd_kafka_coord_req_t *creq; + + creq = rd_calloc(1, sizeof(*creq)); + creq->creq_coordtype = coordtype; + creq->creq_coordkey = rd_strdup(coordkey); + creq->creq_ts_timeout = rd_timeout_init(delay_ms + timeout_ms); + creq->creq_send_req_cb = send_req_cb; + creq->creq_rko = rko; + creq->creq_replyq = replyq; + creq->creq_resp_cb = resp_cb; + creq->creq_reply_opaque = reply_opaque; + creq->creq_refcnt = 1; + creq->creq_done = rd_false; + rd_interval_init(&creq->creq_query_intvl); + + TAILQ_INSERT_TAIL(&rk->rk_coord_reqs, creq, creq_link); + + if (delay_ms) + rd_kafka_timer_start_oneshot(&rk->rk_timers, &creq->creq_tmr, + rd_true, (rd_ts_t)delay_ms * 1000, + rd_kafka_coord_req_tmr_cb, creq); + else + rd_kafka_coord_req_fsm(rk, creq); +} + + +/** + * @brief Decrease refcount of creq and free it if no more references. + * + * @param done Mark creq as done, having performed its duties. There may still + * be lingering references. + * + * @returns true if creq was destroyed, else false. + */ +static rd_bool_t rd_kafka_coord_req_destroy(rd_kafka_t *rk, + rd_kafka_coord_req_t *creq, + rd_bool_t done) { + + rd_assert(creq->creq_refcnt > 0); + + if (done) { + /* Request has been performed, remove from rk_coord_reqs + * list so creq won't be triggered again by state broadcasts, + * etc. */ + rd_dassert(!creq->creq_done); + TAILQ_REMOVE(&rk->rk_coord_reqs, creq, creq_link); + creq->creq_done = rd_true; + + rd_kafka_timer_stop(&rk->rk_timers, &creq->creq_tmr, + RD_DO_LOCK); + } + + if (--creq->creq_refcnt > 0) + return rd_false; + + rd_dassert(creq->creq_done); + + /* Clear out coordinator we were waiting for. */ + if (creq->creq_rkb) { + rd_kafka_broker_persistent_connection_del( + creq->creq_rkb, &creq->creq_rkb->rkb_persistconn.coord); + rd_kafka_broker_destroy(creq->creq_rkb); + creq->creq_rkb = NULL; + } + + rd_kafka_replyq_destroy(&creq->creq_replyq); + rd_free(creq->creq_coordkey); + rd_free(creq); + + return rd_true; +} + +static void rd_kafka_coord_req_keep(rd_kafka_coord_req_t *creq) { + creq->creq_refcnt++; +} + +static void rd_kafka_coord_req_fail(rd_kafka_t *rk, + rd_kafka_coord_req_t *creq, + rd_kafka_resp_err_t err) { + rd_kafka_op_t *reply; + rd_kafka_buf_t *rkbuf; + + reply = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF); + reply->rko_rk = rk; /* Set rk since the rkbuf will not have a rkb + * to reach it. */ + reply->rko_err = err; + + /* Need a dummy rkbuf to pass state to the buf resp_cb */ + rkbuf = rd_kafka_buf_new(0, 0); + rkbuf->rkbuf_cb = creq->creq_resp_cb; + rkbuf->rkbuf_opaque = creq->creq_reply_opaque; + reply->rko_u.xbuf.rkbuf = rkbuf; + + rd_kafka_replyq_enq(&creq->creq_replyq, reply, 0); + + rd_kafka_coord_req_destroy(rk, creq, rd_true /*done*/); +} + + +static void rd_kafka_coord_req_handle_FindCoordinator(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + const int log_decode_errors = LOG_ERR; + rd_kafka_coord_req_t *creq = opaque; + int16_t ErrorCode; + rd_kafkap_str_t Host; + int32_t NodeId, Port; + char errstr[256] = ""; + int actions; + rd_kafka_broker_t *coord; + rd_kafka_metadata_broker_t mdb = RD_ZERO_INIT; + + /* If creq has finished (possibly because of an earlier FindCoordinator + * response or a broker state broadcast we simply ignore the + * response. */ + if (creq->creq_done) + err = RD_KAFKA_RESP_ERR__DESTROY; + + if (err) + goto err; + + if (request->rkbuf_reqhdr.ApiVersion >= 1) + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + + if (request->rkbuf_reqhdr.ApiVersion >= 1) { + rd_kafkap_str_t ErrorMsg; + rd_kafka_buf_read_str(rkbuf, &ErrorMsg); + if (ErrorCode) + rd_snprintf(errstr, sizeof(errstr), "%.*s", + RD_KAFKAP_STR_PR(&ErrorMsg)); + } + + if ((err = ErrorCode)) + goto err; + + rd_kafka_buf_read_i32(rkbuf, &NodeId); + rd_kafka_buf_read_str(rkbuf, &Host); + rd_kafka_buf_read_i32(rkbuf, &Port); + + mdb.id = NodeId; + RD_KAFKAP_STR_DUPA(&mdb.host, &Host); + mdb.port = Port; + + /* Find, update or add broker */ + rd_kafka_broker_update(rk, rkb->rkb_proto, &mdb, &coord); + + if (!coord) { + err = RD_KAFKA_RESP_ERR__FAIL; + rd_snprintf(errstr, sizeof(errstr), + "Failed to add broker: " + "instance is probably terminating"); + goto err; + } + + + rd_kafka_coord_cache_add(&rk->rk_coord_cache, creq->creq_coordtype, + creq->creq_coordkey, coord); + rd_kafka_broker_destroy(coord); /* refcnt from broker_update() */ + + rd_kafka_coord_req_fsm(rk, creq); + + /* Drop refcount from req_fsm() */ + rd_kafka_coord_req_destroy(rk, creq, rd_false /*!done*/); + + return; + +err_parse: + err = rkbuf->rkbuf_err; +err: + actions = rd_kafka_err_action( + rkb, err, request, + + RD_KAFKA_ERR_ACTION_SPECIAL, RD_KAFKA_RESP_ERR__DESTROY, + + RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED, + + RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED, + + RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR__TRANSPORT, + + RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_END); + + if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) { + rd_kafka_coord_req_fail(rk, creq, err); + return; + + } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) { + rd_kafka_buf_retry(rkb, request); + return; /* Keep refcnt from req_fsm() and retry */ + } + + /* Rely on state broadcast to trigger retry */ + + /* Drop refcount from req_fsm() */ + rd_kafka_coord_req_destroy(rk, creq, rd_false /*!done*/); +} + + + +/** + * @brief State machine for async coordinator requests. + * + * @remark May destroy the \p creq. + * + * @locality any + * @locks none + */ +static void rd_kafka_coord_req_fsm(rd_kafka_t *rk, rd_kafka_coord_req_t *creq) { + rd_kafka_broker_t *rkb; + rd_kafka_resp_err_t err; + + if (creq->creq_done) + /* crqeq has already performed its actions, this is a + * lingering reference, e.g., a late FindCoordinator response. + * Just ignore. */ + return; + + if (unlikely(rd_kafka_terminating(rk))) { + rd_kafka_coord_req_fail(rk, creq, RD_KAFKA_RESP_ERR__DESTROY); + return; + } + + /* Do nothing if creq is delayed and the delay time hasn't expired yet. + * We will be called again by the timer once it expires.*/ + if (rd_kafka_timer_next(&rk->rk_timers, &creq->creq_tmr, RD_DO_LOCK) > + 0) + return; + + /* Check cache first */ + rkb = rd_kafka_coord_cache_get( + &rk->rk_coord_cache, creq->creq_coordtype, creq->creq_coordkey); + + if (rkb) { + if (rd_kafka_broker_is_up(rkb)) { + /* Cached coordinator is up, send request */ + rd_kafka_replyq_t replyq; + + /* Clear out previous coordinator we waited for. */ + if (creq->creq_rkb) { + rd_kafka_broker_persistent_connection_del( + creq->creq_rkb, + &creq->creq_rkb->rkb_persistconn.coord); + rd_kafka_broker_destroy(creq->creq_rkb); + creq->creq_rkb = NULL; + } + + rd_kafka_replyq_copy(&replyq, &creq->creq_replyq); + err = creq->creq_send_req_cb(rkb, creq->creq_rko, + replyq, creq->creq_resp_cb, + creq->creq_reply_opaque); + + if (err) { + /* Permanent error, e.g., request not + * supported by broker. */ + rd_kafka_replyq_destroy(&replyq); + rd_kafka_coord_req_fail(rk, creq, err); + } else { + rd_kafka_coord_req_destroy(rk, creq, + rd_true /*done*/); + } + + } else if (creq->creq_rkb == rkb) { + /* No change in coordinator, but it is still not up. + * Query for coordinator if at least a second has + * passed since this coord_req was created or the + * last time we queried. */ + if (rd_interval(&creq->creq_query_intvl, + 1000 * 1000 /* 1s */, 0) > 0) { + rd_rkb_dbg(rkb, BROKER, "COORD", + "Coordinator connection is " + "still down: " + "querying for new coordinator"); + rd_kafka_broker_destroy(rkb); + goto query_coord; + } + + } else { + /* No connection yet. + * Let broker thread know we need a connection. + * We'll be re-triggered on broker state broadcast. */ + + if (creq->creq_rkb) { + /* Clear previous */ + rd_kafka_broker_persistent_connection_del( + creq->creq_rkb, + &creq->creq_rkb->rkb_persistconn.coord); + rd_kafka_broker_destroy(creq->creq_rkb); + } + + rd_kafka_broker_keep(rkb); + creq->creq_rkb = rkb; + rd_kafka_broker_persistent_connection_add( + rkb, &rkb->rkb_persistconn.coord); + } + + rd_kafka_broker_destroy(rkb); + return; + + } else if (creq->creq_rkb) { + /* No coordinator information, clear out the previous + * coordinator we waited for. */ + rd_kafka_broker_persistent_connection_del( + creq->creq_rkb, &creq->creq_rkb->rkb_persistconn.coord); + rd_kafka_broker_destroy(creq->creq_rkb); + creq->creq_rkb = NULL; + } + +query_coord: + /* Get any usable broker to look up the coordinator */ + rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, RD_DO_LOCK, + RD_KAFKA_FEATURE_BROKER_GROUP_COORD, + "broker to look up coordinator"); + + if (!rkb) { + /* No available brokers yet, we'll be re-triggered on + * broker state broadcast. */ + return; + } + + + /* Send FindCoordinator request, the handler will continue + * the state machine. */ + rd_kafka_coord_req_keep(creq); + err = rd_kafka_FindCoordinatorRequest( + rkb, creq->creq_coordtype, creq->creq_coordkey, + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_coord_req_handle_FindCoordinator, creq); + + rd_kafka_broker_destroy(rkb); + + if (err) { + rd_kafka_coord_req_fail(rk, creq, err); + /* from keep() above */ + rd_kafka_coord_req_destroy(rk, creq, rd_false /*!done*/); + } +} + + + +/** + * @brief Callback called from rdkafka main thread on each + * broker state change from or to UP. + * + * @locality rdkafka main thread + * @locks none + */ +void rd_kafka_coord_rkb_monitor_cb(rd_kafka_broker_t *rkb) { + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_coord_req_t *creq, *tmp; + + /* Run through all coord_req fsms */ + TAILQ_FOREACH_SAFE(creq, &rk->rk_coord_reqs, creq_link, tmp) { + rd_kafka_coord_req_fsm(rk, creq); + } +} + + + +/** + * @brief Instance is terminating: destroy all coord reqs + */ +void rd_kafka_coord_reqs_term(rd_kafka_t *rk) { + rd_kafka_coord_req_t *creq; + + while ((creq = TAILQ_FIRST(&rk->rk_coord_reqs))) + rd_kafka_coord_req_fail(rk, creq, RD_KAFKA_RESP_ERR__DESTROY); +} + + +/** + * @brief Initialize coord reqs list. + */ +void rd_kafka_coord_reqs_init(rd_kafka_t *rk) { + TAILQ_INIT(&rk->rk_coord_reqs); +} + +/**@}*/ |