diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_admin.c | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_admin.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_admin.c | 6668 |
1 files changed, 6668 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_admin.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_admin.c new file mode 100644 index 00000000..6aaec636 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_admin.c @@ -0,0 +1,6668 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2018 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdkafka_int.h" +#include "rdkafka_admin.h" +#include "rdkafka_request.h" +#include "rdkafka_aux.h" + +#include <stdarg.h> + + + +/** @brief Descriptive strings for rko_u.admin_request.state */ +static const char *rd_kafka_admin_state_desc[] = { + "initializing", + "waiting for broker", + "waiting for controller", + "waiting for fanouts", + "constructing request", + "waiting for response from broker", + "waiting for a valid list of brokers to be available"}; + + + +/** + * @brief Admin API implementation. + * + * The public Admin API in librdkafka exposes a completely asynchronous + * interface where the initial request API (e.g., ..CreateTopics()) + * is non-blocking and returns immediately, and the application polls + * a ..queue_t for the result. + * + * The underlying handling of the request is also completely asynchronous + * inside librdkafka, for two reasons: + * - everything is async in librdkafka so adding something new that isn't + * would mean that existing functionality will need to be changed if + * it should be able to work simultaneously (such as statistics, timers, + * etc). There is no functional value to making the admin API + * synchronous internally, even if it would simplify its implementation. + * So making it async allows the Admin API to be used with existing + * client types in existing applications without breakage. + * - the async approach allows multiple outstanding Admin API requests + * simultaneously. + * + * The internal async implementation relies on the following concepts: + * - it uses a single rko (rd_kafka_op_t) to maintain state. + * - the rko has a callback attached - called the worker callback. + * - the worker callback is a small state machine that triggers + * async operations (be it controller lookups, timeout timers, + * protocol transmits, etc). + * - the worker callback is only called on the rdkafka main thread. + * - the callback is triggered by different events and sources by enqueuing + * the rko on the rdkafka main ops queue. + * + * + * Let's illustrate this with a DeleteTopics example. This might look + * daunting, but it boils down to an asynchronous state machine being + * triggered by enqueuing the rko op. + * + * 1. [app thread] The user constructs the input arguments, + * including a response rkqu queue and then calls DeleteTopics(). + * + * 2. [app thread] DeleteTopics() creates a new internal op (rko) of type + * RD_KAFKA_OP_DELETETOPICS, makes a **copy** on the rko of all the + * input arguments (which allows the caller to free the originals + * whenever she likes). The rko op worker callback is set to the + * generic admin worker callback rd_kafka_admin_worker() + * + * 3. [app thread] DeleteTopics() enqueues the rko on librdkafka's main ops + * queue that is served by the rdkafka main thread in rd_kafka_thread_main() + * + * 4. [rdkafka main thread] The rko is dequeued by rd_kafka_q_serve and + * the rd_kafka_poll_cb() is called. + * + * 5. [rdkafka main thread] The rko_type switch case identifies the rko + * as an RD_KAFKA_OP_DELETETOPICS which is served by the op callback + * set in step 2. + * + * 6. [rdkafka main thread] The worker callback is called. + * After some initial checking of err==ERR__DESTROY events + * (which is used to clean up outstanding ops (etc) on termination), + * the code hits a state machine using rko_u.admin_request.state. + * + * 7. [rdkafka main thread] The initial state is RD_KAFKA_ADMIN_STATE_INIT + * where the worker validates the user input. + * An enqueue once (eonce) object is created - the use of this object + * allows having multiple outstanding async functions referencing the + * same underlying rko object, but only allowing the first one + * to trigger an event. + * A timeout timer is set up to trigger the eonce object when the + * full options.request_timeout has elapsed. + * + * 8. [rdkafka main thread] After initialization the state is updated + * to WAIT_BROKER or WAIT_CONTROLLER and the code falls through to + * looking up a specific broker or the controller broker and waiting for + * an active connection. + * Both the lookup and the waiting for an active connection are + * fully asynchronous, and the same eonce used for the timer is passed + * to the rd_kafka_broker_controller_async() or broker_async() functions + * which will trigger the eonce when a broker state change occurs. + * If the controller is already known (from metadata) and the connection + * is up a rkb broker object is returned and the eonce is not used, + * skip to step 11. + * + * 9. [rdkafka main thread] Upon metadata retrieval (which is triggered + * automatically by other parts of the code) the controller_id may be + * updated in which case the eonce is triggered. + * The eonce triggering enqueues the original rko on the rdkafka main + * ops queue again and we go to step 8 which will check if the controller + * connection is up. + * + * 10. [broker thread] If the controller_id is now known we wait for + * the corresponding broker's connection to come up. This signaling + * is performed from the broker thread upon broker state changes + * and uses the same eonce. The eonce triggering enqueues the original + * rko on the rdkafka main ops queue again we go to back to step 8 + * to check if broker is now available. + * + * 11. [rdkafka main thread] Back in the worker callback we now have an + * rkb broker pointer (with reference count increased) for the controller + * with the connection up (it might go down while we're referencing it, + * but that does not stop us from enqueuing a protocol request). + * + * 12. [rdkafka main thread] A DeleteTopics protocol request buffer is + * constructed using the input parameters saved on the rko and the + * buffer is enqueued on the broker's transmit queue. + * The buffer is set up to provide the reply buffer on the rdkafka main + * ops queue (the same queue we are operating from) with a handler + * callback of rd_kafka_admin_handle_response(). + * The state is updated to the RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE. + * + * 13. [broker thread] If the request times out, a response with error code + * (ERR__TIMED_OUT) is enqueued. Go to 16. + * + * 14. [broker thread] If a response is received, the response buffer + * is enqueued. Go to 16. + * + * 15. [rdkafka main thread] The buffer callback (..handle_response()) + * is called, which attempts to extract the original rko from the eonce, + * but if the eonce has already been triggered by some other source + * (the timeout timer) the buffer callback simply returns and does nothing + * since the admin request is over and a result (probably a timeout) + * has been enqueued for the application. + * If the rko was still intact we temporarily set the reply buffer + * in the rko struct and call the worker callback. Go to 17. + * + * 16. [rdkafka main thread] The worker callback is called in state + * RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE without a response but with an error. + * An error result op is created and enqueued on the application's + * provided response rkqu queue. + * + * 17. [rdkafka main thread] The worker callback is called in state + * RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE with a response buffer with no + * error set. + * The worker calls the response `parse()` callback to parse the response + * buffer and populates a result op (rko_result) with the response + * information (such as per-topic error codes, etc). + * The result op is returned to the worker. + * + * 18. [rdkafka main thread] The worker enqueues the result op (rko_result) + * on the application's provided response rkqu queue. + * + * 19. [app thread] The application calls rd_kafka_queue_poll() to + * receive the result of the operation. The result may have been + * enqueued in step 18 thanks to succesful completion, or in any + * of the earlier stages when an error was encountered. + * + * 20. [app thread] The application uses rd_kafka_event_DeleteTopics_result() + * to retrieve the request-specific result type. + * + * 21. Done. + * + * + * + * + * Fanout (RD_KAFKA_OP_ADMIN_FANOUT) requests + * ------------------------------------------ + * + * Certain Admin APIs may have requests that need to be sent to different + * brokers, for instance DeleteRecords which needs to be sent to the leader + * for each given partition. + * + * To achieve this we create a Fanout (RD_KAFKA_OP_ADMIN_FANOUT) op for the + * overall Admin API call (e.g., DeleteRecords), and then sub-ops for each + * of the per-broker requests. These sub-ops have the proper op type for + * the operation they are performing (e.g., RD_KAFKA_OP_DELETERECORDS) + * but their replyq does not point back to the application replyq but + * rk_ops which is handled by the librdkafka main thread and with the op + * callback set to rd_kafka_admin_fanout_worker(). This worker aggregates + * the results of each fanned out sub-op and merges the result into a + * single result op (RD_KAFKA_OP_ADMIN_RESULT) that is enqueued on the + * application's replyq. + * + * We rely on the timeouts on the fanned out sub-ops rather than the parent + * fanout op. + * + * The parent fanout op must not be destroyed until all fanned out sub-ops + * are done (either by success, failure or timeout) and destroyed, and this + * is tracked by the rko_u.admin_request.fanout.outstanding counter. + * + */ + + +/** + * @enum Admin request target broker. Must be negative values since the field + * used is broker_id. + */ +enum { RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1, /**< Cluster controller */ + RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */ + RD_KAFKA_ADMIN_TARGET_FANOUT = -3, /**< This rko is a fanout and + * and has no target broker */ + RD_KAFKA_ADMIN_TARGET_ALL = -4, /**< All available brokers */ +}; + +/** + * @brief Admin op callback types + */ +typedef rd_kafka_resp_err_t(rd_kafka_admin_Request_cb_t)( + rd_kafka_broker_t *rkb, + const rd_list_t *configs /*(ConfigResource_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) RD_WARN_UNUSED_RESULT; + +typedef rd_kafka_resp_err_t(rd_kafka_admin_Response_parse_cb_t)( + rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) RD_WARN_UNUSED_RESULT; + +typedef void(rd_kafka_admin_fanout_PartialResponse_cb_t)( + rd_kafka_op_t *rko_req, + const rd_kafka_op_t *rko_partial); + +typedef rd_list_copy_cb_t rd_kafka_admin_fanout_CopyResult_cb_t; + +typedef rd_list_copy_cb_t rd_kafka_admin_fanout_CopyArg_cb_t; + +/** + * @struct Request-specific worker callbacks. + */ +struct rd_kafka_admin_worker_cbs { + /**< Protocol request callback which is called + * to construct and send the request. */ + rd_kafka_admin_Request_cb_t *request; + + /**< Protocol response parser callback which is called + * to translate the response to a rko_result op. */ + rd_kafka_admin_Response_parse_cb_t *parse; +}; + +/** + * @struct Fanout request callbacks. + */ +struct rd_kafka_admin_fanout_worker_cbs { + /** Merge results from a fanned out request into the user response. */ + rd_kafka_admin_fanout_PartialResponse_cb_t *partial_response; + + /** Copy an accumulated result for storing into the rko_result. */ + rd_kafka_admin_fanout_CopyResult_cb_t *copy_result; + + /** Copy the original arguments, used by target ALL. */ + rd_kafka_admin_fanout_CopyArg_cb_t *copy_arg; +}; + +/* Forward declarations */ +static void rd_kafka_admin_common_worker_destroy(rd_kafka_t *rk, + rd_kafka_op_t *rko, + rd_bool_t do_destroy); +static void rd_kafka_AdminOptions_init(rd_kafka_t *rk, + rd_kafka_AdminOptions_t *options); + +static void rd_kafka_AdminOptions_copy_to(rd_kafka_AdminOptions_t *dst, + const rd_kafka_AdminOptions_t *src); + +static rd_kafka_op_res_t +rd_kafka_admin_worker(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko); +static rd_kafka_ConfigEntry_t * +rd_kafka_ConfigEntry_copy(const rd_kafka_ConfigEntry_t *src); +static void rd_kafka_ConfigEntry_free(void *ptr); +static void *rd_kafka_ConfigEntry_list_copy(const void *src, void *opaque); + +static void rd_kafka_admin_handle_response(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *reply, + rd_kafka_buf_t *request, + void *opaque); + +static rd_kafka_op_res_t +rd_kafka_admin_fanout_worker(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko_fanout); + + +/** + * @name Common admin request code + * @{ + * + * + */ + +/** + * @brief Create a new admin_result op based on the request op \p rko_req. + * + * @remark This moves the rko_req's admin_request.args list from \p rko_req + * to the returned rko. The \p rko_req args will be emptied. + */ +static rd_kafka_op_t *rd_kafka_admin_result_new(rd_kafka_op_t *rko_req) { + rd_kafka_op_t *rko_result; + rd_kafka_op_t *rko_fanout; + + if ((rko_fanout = rko_req->rko_u.admin_request.fanout_parent)) { + /* If this is a fanned out request the rko_result needs to be + * handled by the fanout worker rather than the application. */ + rko_result = rd_kafka_op_new_cb(rko_req->rko_rk, + RD_KAFKA_OP_ADMIN_RESULT, + rd_kafka_admin_fanout_worker); + /* Transfer fanout pointer to result */ + rko_result->rko_u.admin_result.fanout_parent = rko_fanout; + rko_req->rko_u.admin_request.fanout_parent = NULL; + /* Set event type based on original fanout ops reqtype, + * e.g., ..OP_DELETERECORDS */ + rko_result->rko_u.admin_result.reqtype = + rko_fanout->rko_u.admin_request.fanout.reqtype; + + } else { + rko_result = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_RESULT); + + /* If this is fanout request (i.e., the parent OP_ADMIN_FANOUT + * to fanned out requests) we need to use the original + * application request type. */ + if (rko_req->rko_type == RD_KAFKA_OP_ADMIN_FANOUT) + rko_result->rko_u.admin_result.reqtype = + rko_req->rko_u.admin_request.fanout.reqtype; + else + rko_result->rko_u.admin_result.reqtype = + rko_req->rko_type; + } + + rko_result->rko_rk = rko_req->rko_rk; + + rko_result->rko_u.admin_result.opaque = rd_kafka_confval_get_ptr( + &rko_req->rko_u.admin_request.options.opaque); + + /* Move request arguments (list) from request to result. + * This is mainly so that partial_response() knows what arguments + * were provided to the response's request it is merging. */ + rd_list_move(&rko_result->rko_u.admin_result.args, + &rko_req->rko_u.admin_request.args); + + rko_result->rko_evtype = rko_req->rko_u.admin_request.reply_event_type; + + return rko_result; +} + + +/** + * @brief Set error code and error string on admin_result op \p rko. + */ +static void rd_kafka_admin_result_set_err0(rd_kafka_op_t *rko, + rd_kafka_resp_err_t err, + const char *fmt, + va_list ap) { + char buf[512]; + + rd_vsnprintf(buf, sizeof(buf), fmt, ap); + + rko->rko_err = err; + + if (rko->rko_u.admin_result.errstr) + rd_free(rko->rko_u.admin_result.errstr); + rko->rko_u.admin_result.errstr = rd_strdup(buf); + + rd_kafka_dbg(rko->rko_rk, ADMIN, "ADMINFAIL", + "Admin %s result error: %s", + rd_kafka_op2str(rko->rko_u.admin_result.reqtype), + rko->rko_u.admin_result.errstr); +} + +/** + * @sa rd_kafka_admin_result_set_err0 + */ +static RD_UNUSED RD_FORMAT(printf, 3, 4) void rd_kafka_admin_result_set_err( + rd_kafka_op_t *rko, + rd_kafka_resp_err_t err, + const char *fmt, + ...) { + va_list ap; + + va_start(ap, fmt); + rd_kafka_admin_result_set_err0(rko, err, fmt, ap); + va_end(ap); +} + +/** + * @brief Enqueue admin_result on application's queue. + */ +static RD_INLINE void rd_kafka_admin_result_enq(rd_kafka_op_t *rko_req, + rd_kafka_op_t *rko_result) { + rd_kafka_replyq_enq(&rko_req->rko_u.admin_request.replyq, rko_result, + rko_req->rko_u.admin_request.replyq.version); +} + +/** + * @brief Set request-level error code and string in reply op. + * + * @remark This function will NOT destroy the \p rko_req, so don't forget to + * call rd_kafka_admin_common_worker_destroy() when done with the rko. + */ +static RD_FORMAT(printf, + 3, + 4) void rd_kafka_admin_result_fail(rd_kafka_op_t *rko_req, + rd_kafka_resp_err_t err, + const char *fmt, + ...) { + va_list ap; + rd_kafka_op_t *rko_result; + + if (!rko_req->rko_u.admin_request.replyq.q) + return; + + rko_result = rd_kafka_admin_result_new(rko_req); + + va_start(ap, fmt); + rd_kafka_admin_result_set_err0(rko_result, err, fmt, ap); + va_end(ap); + + rd_kafka_admin_result_enq(rko_req, rko_result); +} + + +/** + * @brief Send the admin request contained in \p rko upon receiving + * a FindCoordinator response. + * + * @param opaque Must be an admin request op's eonce (rko_u.admin_request.eonce) + * (i.e. created by \c rd_kafka_admin_request_op_new ) + * + * @remark To be used as a callback for \c rd_kafka_coord_req + */ +static rd_kafka_resp_err_t +rd_kafka_admin_coord_request(rd_kafka_broker_t *rkb, + rd_kafka_op_t *rko_ignore, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_enq_once_t *eonce = opaque; + rd_kafka_op_t *rko; + char errstr[512]; + rd_kafka_resp_err_t err; + + + rko = rd_kafka_enq_once_del_source_return(eonce, "coordinator request"); + if (!rko) + /* Admin request has timed out and been destroyed */ + return RD_KAFKA_RESP_ERR__DESTROY; + + rd_kafka_enq_once_add_source(eonce, "coordinator response"); + + err = rko->rko_u.admin_request.cbs->request( + rkb, &rko->rko_u.admin_request.args, + &rko->rko_u.admin_request.options, errstr, sizeof(errstr), replyq, + rd_kafka_admin_handle_response, eonce); + if (err) { + rd_kafka_enq_once_del_source(eonce, "coordinator response"); + rd_kafka_admin_result_fail( + rko, err, "%s worker failed to send request: %s", + rd_kafka_op2str(rko->rko_type), errstr); + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_true /*destroy*/); + } + return err; +} + + +/** + * @brief Return the topics list from a topic-related result object. + */ +static const rd_kafka_topic_result_t ** +rd_kafka_admin_result_ret_topics(const rd_kafka_op_t *rko, size_t *cntp) { + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_CREATETOPICS || + reqtype == RD_KAFKA_OP_DELETETOPICS || + reqtype == RD_KAFKA_OP_CREATEPARTITIONS); + + *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); + return (const rd_kafka_topic_result_t **) + rko->rko_u.admin_result.results.rl_elems; +} + +/** + * @brief Return the ConfigResource list from a config-related result object. + */ +static const rd_kafka_ConfigResource_t ** +rd_kafka_admin_result_ret_resources(const rd_kafka_op_t *rko, size_t *cntp) { + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_ALTERCONFIGS || + reqtype == RD_KAFKA_OP_DESCRIBECONFIGS); + + *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); + return (const rd_kafka_ConfigResource_t **) + rko->rko_u.admin_result.results.rl_elems; +} + +/** + * @brief Return the acl result list from a acl-related result object. + */ +static const rd_kafka_acl_result_t ** +rd_kafka_admin_result_ret_acl_results(const rd_kafka_op_t *rko, size_t *cntp) { + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_CREATEACLS); + + *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); + return (const rd_kafka_acl_result_t **) + rko->rko_u.admin_result.results.rl_elems; +} + +/** + * @brief Return the acl binding list from a acl-related result object. + */ +static const rd_kafka_AclBinding_t ** +rd_kafka_admin_result_ret_acl_bindings(const rd_kafka_op_t *rko, size_t *cntp) { + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_DESCRIBEACLS); + + *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); + return (const rd_kafka_AclBinding_t **) + rko->rko_u.admin_result.results.rl_elems; +} + +/** + * @brief Return the groups list from a group-related result object. + */ +static const rd_kafka_group_result_t ** +rd_kafka_admin_result_ret_groups(const rd_kafka_op_t *rko, size_t *cntp) { + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_DELETEGROUPS || + reqtype == RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS || + reqtype == RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS || + reqtype == RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS); + + *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); + return (const rd_kafka_group_result_t **) + rko->rko_u.admin_result.results.rl_elems; +} + +/** + * @brief Return the DeleteAcls response list from a acl-related result object. + */ +static const rd_kafka_DeleteAcls_result_response_t ** +rd_kafka_admin_result_ret_delete_acl_result_responses(const rd_kafka_op_t *rko, + size_t *cntp) { + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_DELETEACLS); + + *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); + return (const rd_kafka_DeleteAcls_result_response_t **) + rko->rko_u.admin_result.results.rl_elems; +} + +/** + * @brief Create a new admin_request op of type \p optype and sets up the + * generic (type independent files). + * + * The caller shall then populate the admin_request.args list + * and enqueue the op on rk_ops for further processing work. + * + * @param cbs Callbacks, must reside in .data segment. + * @param options Optional options, may be NULL to use defaults. + * + * @locks none + * @locality application thread + */ +static rd_kafka_op_t * +rd_kafka_admin_request_op_new(rd_kafka_t *rk, + rd_kafka_op_type_t optype, + rd_kafka_event_type_t reply_event_type, + const struct rd_kafka_admin_worker_cbs *cbs, + const rd_kafka_AdminOptions_t *options, + rd_kafka_q_t *rkq) { + rd_kafka_op_t *rko; + + rd_assert(rk); + rd_assert(rkq); + rd_assert(cbs); + + rko = rd_kafka_op_new_cb(rk, optype, rd_kafka_admin_worker); + + rko->rko_u.admin_request.reply_event_type = reply_event_type; + + rko->rko_u.admin_request.cbs = (struct rd_kafka_admin_worker_cbs *)cbs; + + /* Make a copy of the options */ + if (options) + rd_kafka_AdminOptions_copy_to(&rko->rko_u.admin_request.options, + options); + else + rd_kafka_AdminOptions_init(rk, + &rko->rko_u.admin_request.options); + + /* Default to controller */ + rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; + + /* Calculate absolute timeout */ + rko->rko_u.admin_request.abs_timeout = + rd_timeout_init(rd_kafka_confval_get_int( + &rko->rko_u.admin_request.options.request_timeout)); + + /* Setup enq-op-once, which is triggered by either timer code + * or future wait-controller code. */ + rko->rko_u.admin_request.eonce = + rd_kafka_enq_once_new(rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0)); + + /* The timer itself must be started from the rdkafka main thread, + * not here. */ + + /* Set up replyq */ + rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0); + + rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_INIT; + return rko; +} + + +/** + * @returns the remaining request timeout in milliseconds. + */ +static RD_INLINE int rd_kafka_admin_timeout_remains(rd_kafka_op_t *rko) { + return rd_timeout_remains(rko->rko_u.admin_request.abs_timeout); +} + +/** + * @returns the remaining request timeout in microseconds. + */ +static RD_INLINE rd_ts_t rd_kafka_admin_timeout_remains_us(rd_kafka_op_t *rko) { + return rd_timeout_remains_us(rko->rko_u.admin_request.abs_timeout); +} + + +/** + * @brief Timer timeout callback for the admin rko's eonce object. + */ +static void rd_kafka_admin_eonce_timeout_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_enq_once_t *eonce = arg; + + rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT, + "timeout timer"); +} + + + +/** + * @brief Common worker destroy to be called in destroy: label + * in worker. + */ +static void rd_kafka_admin_common_worker_destroy(rd_kafka_t *rk, + rd_kafka_op_t *rko, + rd_bool_t do_destroy) { + int timer_was_stopped; + + /* Free resources for this op. */ + timer_was_stopped = rd_kafka_timer_stop( + &rk->rk_timers, &rko->rko_u.admin_request.tmr, rd_true); + + + if (rko->rko_u.admin_request.eonce) { + /* Remove the stopped timer's eonce reference since its + * callback will not have fired if we stopped the timer. */ + if (timer_was_stopped) + rd_kafka_enq_once_del_source( + rko->rko_u.admin_request.eonce, "timeout timer"); + + /* This is thread-safe to do even if there are outstanding + * timers or wait-controller references to the eonce + * since they only hold direct reference to the eonce, + * not the rko (the eonce holds a reference to the rko but + * it is cleared here). */ + rd_kafka_enq_once_destroy(rko->rko_u.admin_request.eonce); + rko->rko_u.admin_request.eonce = NULL; + } + + if (do_destroy) + rd_kafka_op_destroy(rko); +} + + + +/** + * @brief Asynchronously look up a broker. + * To be called repeatedly from each invocation of the worker + * when in state RD_KAFKA_ADMIN_STATE_WAIT_BROKER until + * a valid rkb is returned. + * + * @returns the broker rkb with refcount increased, or NULL if not yet + * available. + */ +static rd_kafka_broker_t *rd_kafka_admin_common_get_broker(rd_kafka_t *rk, + rd_kafka_op_t *rko, + int32_t broker_id) { + rd_kafka_broker_t *rkb; + + rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up broker %" PRId32, + rd_kafka_op2str(rko->rko_type), broker_id); + + /* Since we're iterating over this broker_async() call + * (asynchronously) until a broker is availabe (or timeout) + * we need to re-enable the eonce to be triggered again (which + * is not necessary the first time we get here, but there + * is no harm doing it then either). */ + rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce, rko, + RD_KAFKA_REPLYQ(rk->rk_ops, 0)); + + /* Look up the broker asynchronously, if the broker + * is not available the eonce is registered for broker + * state changes which will cause our function to be called + * again as soon as (any) broker state changes. + * When we are called again we perform the broker lookup + * again and hopefully get an rkb back, otherwise defer a new + * async wait. Repeat until success or timeout. */ + if (!(rkb = rd_kafka_broker_get_async( + rk, broker_id, RD_KAFKA_BROKER_STATE_UP, + rko->rko_u.admin_request.eonce))) { + /* Broker not available, wait asynchronously + * for broker metadata code to trigger eonce. */ + return NULL; + } + + rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: broker %" PRId32 " is %s", + rd_kafka_op2str(rko->rko_type), broker_id, rkb->rkb_name); + + return rkb; +} + + +/** + * @brief Asynchronously look up the controller. + * To be called repeatedly from each invocation of the worker + * when in state RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER until + * a valid rkb is returned. + * + * @returns the controller rkb with refcount increased, or NULL if not yet + * available. + */ +static rd_kafka_broker_t * +rd_kafka_admin_common_get_controller(rd_kafka_t *rk, rd_kafka_op_t *rko) { + rd_kafka_broker_t *rkb; + + rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up controller", + rd_kafka_op2str(rko->rko_type)); + + /* Since we're iterating over this controller_async() call + * (asynchronously) until a controller is availabe (or timeout) + * we need to re-enable the eonce to be triggered again (which + * is not necessary the first time we get here, but there + * is no harm doing it then either). */ + rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce, rko, + RD_KAFKA_REPLYQ(rk->rk_ops, 0)); + + /* Look up the controller asynchronously, if the controller + * is not available the eonce is registered for broker + * state changes which will cause our function to be called + * again as soon as (any) broker state changes. + * When we are called again we perform the controller lookup + * again and hopefully get an rkb back, otherwise defer a new + * async wait. Repeat until success or timeout. */ + if (!(rkb = rd_kafka_broker_controller_async( + rk, RD_KAFKA_BROKER_STATE_UP, + rko->rko_u.admin_request.eonce))) { + /* Controller not available, wait asynchronously + * for controller code to trigger eonce. */ + return NULL; + } + + rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: controller %s", + rd_kafka_op2str(rko->rko_type), rkb->rkb_name); + + return rkb; +} + + +/** + * @brief Asynchronously look up current list of broker ids until available. + * Bootstrap and logical brokers are excluded from the list. + * + * To be called repeatedly from each invocation of the worker + * when in state RD_KAFKA_ADMIN_STATE_WAIT_BROKER_LIST until + * a not-NULL rd_list_t * is returned. + * + * @param rk Client instance. + * @param rko Op containing the admin request eonce to use for the + * async callback. + * @return List of int32_t with broker nodeids when ready, NULL when + * the eonce callback will be called. + */ +static rd_list_t * +rd_kafka_admin_common_brokers_get_nodeids(rd_kafka_t *rk, rd_kafka_op_t *rko) { + rd_list_t *broker_ids; + + rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up brokers", + rd_kafka_op2str(rko->rko_type)); + + /* Since we're iterating over this rd_kafka_brokers_get_nodeids_async() + * call (asynchronously) until a nodeids list is available (or timeout), + * we need to re-enable the eonce to be triggered again (which + * is not necessary the first time we get here, but there + * is no harm doing it then either). */ + rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce, rko, + RD_KAFKA_REPLYQ(rk->rk_ops, 0)); + + /* Look up the nodeids list asynchronously, if it's + * not available the eonce is registered for broker + * state changes which will cause our function to be called + * again as soon as (any) broker state changes. + * When we are called again we perform the same lookup + * again and hopefully get a list of nodeids again, + * otherwise defer a new async wait. + * Repeat until success or timeout. */ + if (!(broker_ids = rd_kafka_brokers_get_nodeids_async( + rk, rko->rko_u.admin_request.eonce))) { + /* nodeids list not available, wait asynchronously + * for the eonce to be triggered. */ + return NULL; + } + + rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: %d broker(s)", + rd_kafka_op2str(rko->rko_type), rd_list_cnt(broker_ids)); + + return broker_ids; +} + + + +/** + * @brief Handle response from broker by triggering worker callback. + * + * @param opaque is the eonce from the worker protocol request call. + */ +static void rd_kafka_admin_handle_response(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *reply, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_enq_once_t *eonce = opaque; + rd_kafka_op_t *rko; + + /* From ...add_source("send") */ + rko = rd_kafka_enq_once_disable(eonce); + + if (!rko) { + /* The operation timed out and the worker was + * dismantled while we were waiting for broker response, + * do nothing - everything has been cleaned up. */ + rd_kafka_dbg( + rk, ADMIN, "ADMIN", + "Dropping outdated %sResponse with return code %s", + request ? rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey) + : "???", + rd_kafka_err2str(err)); + return; + } + + /* Attach reply buffer to rko for parsing in the worker. */ + rd_assert(!rko->rko_u.admin_request.reply_buf); + rko->rko_u.admin_request.reply_buf = reply; + rko->rko_err = err; + + if (rko->rko_op_cb(rk, NULL, rko) == RD_KAFKA_OP_RES_HANDLED) + rd_kafka_op_destroy(rko); +} + +/** + * @brief Generic handler for protocol responses, calls the admin ops' + * Response_parse_cb and enqueues the result to the caller's queue. + */ +static void rd_kafka_admin_response_parse(rd_kafka_op_t *rko) { + rd_kafka_resp_err_t err; + rd_kafka_op_t *rko_result = NULL; + char errstr[512]; + + if (rko->rko_err) { + rd_kafka_admin_result_fail(rko, rko->rko_err, + "%s worker request failed: %s", + rd_kafka_op2str(rko->rko_type), + rd_kafka_err2str(rko->rko_err)); + return; + } + + /* Response received. + * Let callback parse response and provide result in rko_result + * which is then enqueued on the reply queue. */ + err = rko->rko_u.admin_request.cbs->parse( + rko, &rko_result, rko->rko_u.admin_request.reply_buf, errstr, + sizeof(errstr)); + if (err) { + rd_kafka_admin_result_fail( + rko, err, "%s worker failed to parse response: %s", + rd_kafka_op2str(rko->rko_type), errstr); + return; + } + + rd_assert(rko_result); + + /* Enqueue result on application queue, we're done. */ + rd_kafka_admin_result_enq(rko, rko_result); +} + +/** + * @brief Generic handler for coord_req() responses. + */ +static void rd_kafka_admin_coord_response_parse(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_op_t *rko_result; + rd_kafka_enq_once_t *eonce = opaque; + rd_kafka_op_t *rko; + char errstr[512]; + + rko = + rd_kafka_enq_once_del_source_return(eonce, "coordinator response"); + if (!rko) + /* Admin request has timed out and been destroyed */ + return; + + if (err) { + rd_kafka_admin_result_fail( + rko, err, "%s worker coordinator request failed: %s", + rd_kafka_op2str(rko->rko_type), rd_kafka_err2str(err)); + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_true /*destroy*/); + return; + } + + err = rko->rko_u.admin_request.cbs->parse(rko, &rko_result, rkbuf, + errstr, sizeof(errstr)); + if (err) { + rd_kafka_admin_result_fail( + rko, err, + "%s worker failed to parse coordinator %sResponse: %s", + rd_kafka_op2str(rko->rko_type), + rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), errstr); + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_true /*destroy*/); + return; + } + + rd_assert(rko_result); + + /* Enqueue result on application queue, we're done. */ + rd_kafka_admin_result_enq(rko, rko_result); +} + +static void rd_kafka_admin_fanout_op_distribute(rd_kafka_t *rk, + rd_kafka_op_t *rko, + rd_list_t *nodeids); + + +/** + * @brief Common worker state machine handling regardless of request type. + * + * Tasks: + * - Sets up timeout on first call. + * - Checks for timeout. + * - Checks for and fails on errors. + * - Async Controller and broker lookups + * - Calls the Request callback + * - Calls the parse callback + * - Result reply + * - Destruction of rko + * + * rko->rko_err may be one of: + * RD_KAFKA_RESP_ERR_NO_ERROR, or + * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup, or + * RD_KAFKA_RESP_ERR__TIMED_OUT if request has timed out, + * or any other error code triggered by other parts of the code. + * + * @returns a hint to the op code whether the rko should be destroyed or not. + */ +static rd_kafka_op_res_t +rd_kafka_admin_worker(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { + const char *name = rd_kafka_op2str(rko->rko_type); + rd_ts_t timeout_in; + rd_kafka_broker_t *rkb = NULL; + rd_kafka_resp_err_t err; + rd_list_t *nodeids = NULL; + char errstr[512]; + + /* ADMIN_FANOUT handled by fanout_worker() */ + rd_assert((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) != + RD_KAFKA_OP_ADMIN_FANOUT); + + if (rd_kafka_terminating(rk)) { + rd_kafka_dbg( + rk, ADMIN, name, + "%s worker called in state %s: " + "handle is terminating: %s", + name, + rd_kafka_admin_state_desc[rko->rko_u.admin_request.state], + rd_kafka_err2str(rko->rko_err)); + rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY, + "Handle is terminating: %s", + rd_kafka_err2str(rko->rko_err)); + goto destroy; + } + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) { + rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY, + "Destroyed"); + goto destroy; /* rko being destroyed (silent) */ + } + + rd_kafka_dbg(rk, ADMIN, name, "%s worker called in state %s: %s", name, + rd_kafka_admin_state_desc[rko->rko_u.admin_request.state], + rd_kafka_err2str(rko->rko_err)); + + rd_assert(thrd_is_current(rko->rko_rk->rk_thread)); + + /* Check for errors raised asynchronously (e.g., by timer) */ + if (rko->rko_err) { + rd_kafka_admin_result_fail( + rko, rko->rko_err, "Failed while %s: %s", + rd_kafka_admin_state_desc[rko->rko_u.admin_request.state], + rd_kafka_err2str(rko->rko_err)); + goto destroy; + } + + /* Check for timeout */ + timeout_in = rd_kafka_admin_timeout_remains_us(rko); + if (timeout_in <= 0) { + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__TIMED_OUT, "Timed out %s", + rd_kafka_admin_state_desc[rko->rko_u.admin_request.state]); + goto destroy; + } + +redo: + switch (rko->rko_u.admin_request.state) { + case RD_KAFKA_ADMIN_STATE_INIT: { + int32_t broker_id; + + /* First call. */ + + /* Set up timeout timer. */ + rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce, + "timeout timer"); + rd_kafka_timer_start_oneshot( + &rk->rk_timers, &rko->rko_u.admin_request.tmr, rd_true, + timeout_in, rd_kafka_admin_eonce_timeout_cb, + rko->rko_u.admin_request.eonce); + + /* Use explicitly specified broker_id, if available. */ + broker_id = (int32_t)rd_kafka_confval_get_int( + &rko->rko_u.admin_request.options.broker); + + if (broker_id != -1) { + rd_kafka_dbg(rk, ADMIN, name, + "%s using explicitly " + "set broker id %" PRId32 + " rather than %" PRId32, + name, broker_id, + rko->rko_u.admin_request.broker_id); + rko->rko_u.admin_request.broker_id = broker_id; + } else { + /* Default to controller */ + broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; + } + + /* Resolve target broker(s) */ + switch (rko->rko_u.admin_request.broker_id) { + case RD_KAFKA_ADMIN_TARGET_CONTROLLER: + /* Controller */ + rko->rko_u.admin_request.state = + RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER; + goto redo; /* Trigger next state immediately */ + + case RD_KAFKA_ADMIN_TARGET_COORDINATOR: + /* Group (or other) coordinator */ + rko->rko_u.admin_request.state = + RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE; + rd_kafka_enq_once_add_source( + rko->rko_u.admin_request.eonce, + "coordinator request"); + rd_kafka_coord_req( + rk, rko->rko_u.admin_request.coordtype, + rko->rko_u.admin_request.coordkey, + rd_kafka_admin_coord_request, NULL, 0 /* no delay*/, + rd_kafka_admin_timeout_remains(rko), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_admin_coord_response_parse, + rko->rko_u.admin_request.eonce); + /* Wait asynchronously for broker response, which will + * trigger the eonce and worker to be called again. */ + return RD_KAFKA_OP_RES_KEEP; + case RD_KAFKA_ADMIN_TARGET_ALL: + /* All brokers */ + rko->rko_u.admin_request.state = + RD_KAFKA_ADMIN_STATE_WAIT_BROKER_LIST; + goto redo; /* Trigger next state immediately */ + + case RD_KAFKA_ADMIN_TARGET_FANOUT: + /* Shouldn't come here, fanouts are handled by + * fanout_worker() */ + RD_NOTREACHED(); + return RD_KAFKA_OP_RES_KEEP; + + default: + /* Specific broker */ + rd_assert(rko->rko_u.admin_request.broker_id >= 0); + rko->rko_u.admin_request.state = + RD_KAFKA_ADMIN_STATE_WAIT_BROKER; + goto redo; /* Trigger next state immediately */ + } + } + + + case RD_KAFKA_ADMIN_STATE_WAIT_BROKER: + /* Broker lookup */ + if (!(rkb = rd_kafka_admin_common_get_broker( + rk, rko, rko->rko_u.admin_request.broker_id))) { + /* Still waiting for broker to become available */ + return RD_KAFKA_OP_RES_KEEP; + } + + rko->rko_u.admin_request.state = + RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST; + goto redo; + + case RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER: + if (!(rkb = rd_kafka_admin_common_get_controller(rk, rko))) { + /* Still waiting for controller to become available. */ + return RD_KAFKA_OP_RES_KEEP; + } + + rko->rko_u.admin_request.state = + RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST; + goto redo; + + case RD_KAFKA_ADMIN_STATE_WAIT_BROKER_LIST: + /* Wait for a valid list of brokers to be available. */ + if (!(nodeids = + rd_kafka_admin_common_brokers_get_nodeids(rk, rko))) { + /* Still waiting for brokers to become available. */ + return RD_KAFKA_OP_RES_KEEP; + } + + rd_kafka_admin_fanout_op_distribute(rk, rko, nodeids); + rd_list_destroy(nodeids); + rko->rko_u.admin_request.state = + RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS; + goto redo; + + case RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS: + /* This op can be destroyed, as a new fanout op has been + * sent, and the response will be enqueued there. */ + goto destroy; + + case RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST: + /* Got broker, send protocol request. */ + + /* Make sure we're called from a 'goto redo' where + * the rkb was set. */ + rd_assert(rkb); + + /* Still need to use the eonce since this worker may + * time out while waiting for response from broker, in which + * case the broker response will hit an empty eonce (ok). */ + rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce, + "send"); + + /* Send request (async) */ + err = rko->rko_u.admin_request.cbs->request( + rkb, &rko->rko_u.admin_request.args, + &rko->rko_u.admin_request.options, errstr, sizeof(errstr), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_admin_handle_response, + rko->rko_u.admin_request.eonce); + + /* Loose broker refcount from get_broker(), get_controller() */ + rd_kafka_broker_destroy(rkb); + + if (err) { + rd_kafka_enq_once_del_source( + rko->rko_u.admin_request.eonce, "send"); + rd_kafka_admin_result_fail(rko, err, "%s", errstr); + goto destroy; + } + + rko->rko_u.admin_request.state = + RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE; + + /* Wait asynchronously for broker response, which will + * trigger the eonce and worker to be called again. */ + return RD_KAFKA_OP_RES_KEEP; + + + case RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE: + rd_kafka_admin_response_parse(rko); + goto destroy; + } + + return RD_KAFKA_OP_RES_KEEP; + +destroy: + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_false /*don't destroy*/); + return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy() */ +} + +/** + * @brief Create a new admin_fanout op of type \p req_type and sets up the + * generic (type independent files). + * + * The caller shall then populate the \c admin_fanout.requests list, + * initialize the \c admin_fanout.responses list, + * set the initial \c admin_fanout.outstanding value, + * and enqueue the op on rk_ops for further processing work. + * + * @param cbs Callbacks, must reside in .data segment. + * @param options Optional options, may be NULL to use defaults. + * @param rkq is the application reply queue. + * + * @locks none + * @locality application thread + */ +static rd_kafka_op_t * +rd_kafka_admin_fanout_op_new(rd_kafka_t *rk, + rd_kafka_op_type_t req_type, + rd_kafka_event_type_t reply_event_type, + const struct rd_kafka_admin_fanout_worker_cbs *cbs, + const rd_kafka_AdminOptions_t *options, + rd_kafka_q_t *rkq) { + rd_kafka_op_t *rko; + + rd_assert(rk); + rd_assert(rkq); + rd_assert(cbs); + + rko = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_FANOUT); + rko->rko_rk = rk; + + rko->rko_u.admin_request.reply_event_type = reply_event_type; + + rko->rko_u.admin_request.fanout.cbs = + (struct rd_kafka_admin_fanout_worker_cbs *)cbs; + + /* Make a copy of the options */ + if (options) + rd_kafka_AdminOptions_copy_to(&rko->rko_u.admin_request.options, + options); + else + rd_kafka_AdminOptions_init(rk, + &rko->rko_u.admin_request.options); + + rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_FANOUT; + + /* Calculate absolute timeout */ + rko->rko_u.admin_request.abs_timeout = + rd_timeout_init(rd_kafka_confval_get_int( + &rko->rko_u.admin_request.options.request_timeout)); + + /* Set up replyq */ + rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0); + + rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS; + + rko->rko_u.admin_request.fanout.reqtype = req_type; + + return rko; +} + +/** + * @brief Duplicate the fanout operation for each nodeid passed and + * enqueue each new operation. Use the same fanout_parent as + * the passed \p rko. + * + * @param rk Client instance. + * @param rko Operation to distribute to each broker. + * @param nodeids List of int32_t with the broker nodeids. + * @param rkq + * @return rd_kafka_op_t* + */ +static void rd_kafka_admin_fanout_op_distribute(rd_kafka_t *rk, + rd_kafka_op_t *rko, + rd_list_t *nodeids) { + int i, nodeids_cnt, timeout_remains; + rd_kafka_op_t *rko_fanout; + rd_kafka_AdminOptions_t *options = &rko->rko_u.admin_request.options; + timeout_remains = rd_kafka_admin_timeout_remains(rko); + rd_kafka_AdminOptions_set_request_timeout(options, timeout_remains, + NULL, 0); + + nodeids_cnt = rd_list_cnt(nodeids); + rko_fanout = rko->rko_u.admin_request.fanout_parent; + rko_fanout->rko_u.admin_request.fanout.outstanding = (int)nodeids_cnt; + rko->rko_u.admin_request.fanout_parent = NULL; + + /* Create individual request ops for each node */ + for (i = 0; i < nodeids_cnt; i++) { + rd_kafka_op_t *rko_dup = rd_kafka_admin_request_op_new( + rk, rko->rko_type, + rko->rko_u.admin_request.reply_event_type, + rko->rko_u.admin_request.cbs, options, rk->rk_ops); + + rko_dup->rko_u.admin_request.fanout_parent = rko_fanout; + rko_dup->rko_u.admin_request.broker_id = + rd_list_get_int32(nodeids, i); + + rd_list_init_copy(&rko_dup->rko_u.admin_request.args, + &rko->rko_u.admin_request.args); + rd_list_copy_to( + &rko_dup->rko_u.admin_request.args, + &rko->rko_u.admin_request.args, + rko_fanout->rko_u.admin_request.fanout.cbs->copy_arg, NULL); + + rd_kafka_q_enq(rk->rk_ops, rko_dup); + } +} + + +/** + * @brief Common fanout worker state machine handling regardless of request type + * + * @param rko Result of a fanned out operation, e.g., DELETERECORDS result. + * + * Tasks: + * - Checks for and responds to client termination + * - Polls for fanned out responses + * - Calls the partial response callback + * - Calls the merge responses callback upon receipt of all partial responses + * - Destruction of rko + * + * rko->rko_err may be one of: + * RD_KAFKA_RESP_ERR_NO_ERROR, or + * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup. + * + * @returns a hint to the op code whether the rko should be destroyed or not. + */ +static rd_kafka_op_res_t rd_kafka_admin_fanout_worker(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_op_t *rko_fanout = rko->rko_u.admin_result.fanout_parent; + const char *name = + rd_kafka_op2str(rko_fanout->rko_u.admin_request.fanout.reqtype); + rd_kafka_op_t *rko_result; + + RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_ADMIN_RESULT); + RD_KAFKA_OP_TYPE_ASSERT(rko_fanout, RD_KAFKA_OP_ADMIN_FANOUT); + + rd_assert(rko_fanout->rko_u.admin_request.fanout.outstanding > 0); + rko_fanout->rko_u.admin_request.fanout.outstanding--; + + rko->rko_u.admin_result.fanout_parent = NULL; + + if (rd_kafka_terminating(rk)) { + rd_kafka_dbg(rk, ADMIN, name, + "%s fanout worker called for fanned out op %s: " + "handle is terminating: %s", + name, rd_kafka_op2str(rko->rko_type), + rd_kafka_err2str(rko_fanout->rko_err)); + if (!rko->rko_err) + rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY; + } + + rd_kafka_dbg(rk, ADMIN, name, + "%s fanout worker called for %s with %d request(s) " + "outstanding: %s", + name, rd_kafka_op2str(rko->rko_type), + rko_fanout->rko_u.admin_request.fanout.outstanding, + rd_kafka_err2str(rko_fanout->rko_err)); + + /* Add partial response to rko_fanout's result list. */ + rko_fanout->rko_u.admin_request.fanout.cbs->partial_response(rko_fanout, + rko); + + if (rko_fanout->rko_u.admin_request.fanout.outstanding > 0) + /* Wait for outstanding requests to finish */ + return RD_KAFKA_OP_RES_HANDLED; + + rko_result = rd_kafka_admin_result_new(rko_fanout); + rd_list_init_copy(&rko_result->rko_u.admin_result.results, + &rko_fanout->rko_u.admin_request.fanout.results); + rd_list_copy_to(&rko_result->rko_u.admin_result.results, + &rko_fanout->rko_u.admin_request.fanout.results, + rko_fanout->rko_u.admin_request.fanout.cbs->copy_result, + NULL); + + /* Enqueue result on application queue, we're done. */ + rd_kafka_replyq_enq(&rko_fanout->rko_u.admin_request.replyq, rko_result, + rko_fanout->rko_u.admin_request.replyq.version); + + /* FALLTHRU */ + if (rko_fanout->rko_u.admin_request.fanout.outstanding == 0) + rd_kafka_op_destroy(rko_fanout); + + return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy(rko) */ +} + +/** + * @brief Create a new operation that targets all the brokers. + * The operation consists of a fanout parent that is reused and + * fanout operation that is duplicated for each broker found. + * + * @param rk Client instance- + * @param optype Operation type. + * @param reply_event_type Reply event type. + * @param cbs Fanned out op callbacks. + * @param fanout_cbs Fanout parent out op callbacks. + * @param result_free Callback for freeing the result list. + * @param options Operation options. + * @param rkq Result queue. + * @return The newly created op targeting all the brokers. + * + * @sa Use rd_kafka_op_destroy() to release it. + */ +static rd_kafka_op_t *rd_kafka_admin_request_op_target_all_new( + rd_kafka_t *rk, + rd_kafka_op_type_t optype, + rd_kafka_event_type_t reply_event_type, + const struct rd_kafka_admin_worker_cbs *cbs, + const struct rd_kafka_admin_fanout_worker_cbs *fanout_cbs, + void (*result_free)(void *), + const rd_kafka_AdminOptions_t *options, + rd_kafka_q_t *rkq) { + rd_kafka_op_t *rko, *rko_fanout; + + rko_fanout = rd_kafka_admin_fanout_op_new(rk, optype, reply_event_type, + fanout_cbs, options, rkq); + + rko = rd_kafka_admin_request_op_new(rk, optype, reply_event_type, cbs, + options, rk->rk_ops); + + rko_fanout->rko_u.admin_request.fanout.outstanding = 1; + rko->rko_u.admin_request.fanout_parent = rko_fanout; + rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_ALL; + + rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, (int)1, + result_free); + + return rko; +} + +/**@}*/ + + +/** + * @name Generic AdminOptions + * @{ + * + * + */ + +rd_kafka_resp_err_t +rd_kafka_AdminOptions_set_request_timeout(rd_kafka_AdminOptions_t *options, + int timeout_ms, + char *errstr, + size_t errstr_size) { + return rd_kafka_confval_set_type(&options->request_timeout, + RD_KAFKA_CONFVAL_INT, &timeout_ms, + errstr, errstr_size); +} + + +rd_kafka_resp_err_t +rd_kafka_AdminOptions_set_operation_timeout(rd_kafka_AdminOptions_t *options, + int timeout_ms, + char *errstr, + size_t errstr_size) { + return rd_kafka_confval_set_type(&options->operation_timeout, + RD_KAFKA_CONFVAL_INT, &timeout_ms, + errstr, errstr_size); +} + + +rd_kafka_resp_err_t +rd_kafka_AdminOptions_set_validate_only(rd_kafka_AdminOptions_t *options, + int true_or_false, + char *errstr, + size_t errstr_size) { + return rd_kafka_confval_set_type(&options->validate_only, + RD_KAFKA_CONFVAL_INT, &true_or_false, + errstr, errstr_size); +} + +rd_kafka_resp_err_t +rd_kafka_AdminOptions_set_incremental(rd_kafka_AdminOptions_t *options, + int true_or_false, + char *errstr, + size_t errstr_size) { + rd_snprintf(errstr, errstr_size, + "Incremental updates currently not supported, see KIP-248"); + return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; + + return rd_kafka_confval_set_type(&options->incremental, + RD_KAFKA_CONFVAL_INT, &true_or_false, + errstr, errstr_size); +} + +rd_kafka_resp_err_t +rd_kafka_AdminOptions_set_broker(rd_kafka_AdminOptions_t *options, + int32_t broker_id, + char *errstr, + size_t errstr_size) { + int ibroker_id = (int)broker_id; + + return rd_kafka_confval_set_type(&options->broker, RD_KAFKA_CONFVAL_INT, + &ibroker_id, errstr, errstr_size); +} + +rd_kafka_error_t *rd_kafka_AdminOptions_set_require_stable_offsets( + rd_kafka_AdminOptions_t *options, + int true_or_false) { + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_confval_set_type( + &options->require_stable_offsets, RD_KAFKA_CONFVAL_INT, + &true_or_false, errstr, sizeof(errstr)); + return !err ? NULL : rd_kafka_error_new(err, "%s", errstr); +} + +rd_kafka_error_t *rd_kafka_AdminOptions_set_match_consumer_group_states( + rd_kafka_AdminOptions_t *options, + const rd_kafka_consumer_group_state_t *consumer_group_states, + size_t consumer_group_states_cnt) { + size_t i; + char errstr[512]; + rd_kafka_resp_err_t err; + rd_list_t *states_list = rd_list_new(0, NULL); + rd_list_init_int32(states_list, consumer_group_states_cnt); + uint64_t states_bitmask = 0; + + if (RD_KAFKA_CONSUMER_GROUP_STATE__CNT >= 64) { + rd_assert("BUG: cannot handle states with a bitmask anymore"); + } + + for (i = 0; i < consumer_group_states_cnt; i++) { + uint64_t state_bit; + rd_kafka_consumer_group_state_t state = + consumer_group_states[i]; + + if (state < 0 || state >= RD_KAFKA_CONSUMER_GROUP_STATE__CNT) { + rd_list_destroy(states_list); + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "Invalid group state value"); + } + + state_bit = 1 << state; + if (states_bitmask & state_bit) { + rd_list_destroy(states_list); + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate states not allowed"); + } else { + states_bitmask = states_bitmask | state_bit; + rd_list_set_int32(states_list, (int32_t)i, state); + } + } + err = rd_kafka_confval_set_type(&options->match_consumer_group_states, + RD_KAFKA_CONFVAL_PTR, states_list, + errstr, sizeof(errstr)); + if (err) { + rd_list_destroy(states_list); + } + return !err ? NULL : rd_kafka_error_new(err, "%s", errstr); +} + +void rd_kafka_AdminOptions_set_opaque(rd_kafka_AdminOptions_t *options, + void *opaque) { + rd_kafka_confval_set_type(&options->opaque, RD_KAFKA_CONFVAL_PTR, + opaque, NULL, 0); +} + + +/** + * @brief Initialize and set up defaults for AdminOptions + */ +static void rd_kafka_AdminOptions_init(rd_kafka_t *rk, + rd_kafka_AdminOptions_t *options) { + rd_kafka_confval_init_int(&options->request_timeout, "request_timeout", + 0, 3600 * 1000, + rk->rk_conf.admin.request_timeout_ms); + + if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || + options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS || + options->for_api == RD_KAFKA_ADMIN_OP_DELETETOPICS || + options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS || + options->for_api == RD_KAFKA_ADMIN_OP_DELETERECORDS) + rd_kafka_confval_init_int(&options->operation_timeout, + "operation_timeout", -1, 3600 * 1000, + rk->rk_conf.admin.request_timeout_ms); + else + rd_kafka_confval_disable(&options->operation_timeout, + "operation_timeout"); + + if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || + options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS || + options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS || + options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS) + rd_kafka_confval_init_int(&options->validate_only, + "validate_only", 0, 1, 0); + else + rd_kafka_confval_disable(&options->validate_only, + "validate_only"); + + if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || + options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS) + rd_kafka_confval_init_int(&options->incremental, "incremental", + 0, 1, 0); + else + rd_kafka_confval_disable(&options->incremental, "incremental"); + + if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || + options->for_api == RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS) + rd_kafka_confval_init_int(&options->require_stable_offsets, + "require_stable_offsets", 0, 1, 0); + else + rd_kafka_confval_disable(&options->require_stable_offsets, + "require_stable_offsets"); + + if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || + options->for_api == RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS) + rd_kafka_confval_init_ptr(&options->match_consumer_group_states, + "match_consumer_group_states"); + else + rd_kafka_confval_disable(&options->match_consumer_group_states, + "match_consumer_group_states"); + + rd_kafka_confval_init_int(&options->broker, "broker", 0, INT32_MAX, -1); + rd_kafka_confval_init_ptr(&options->opaque, "opaque"); +} + +/** + * @brief Copy contents of \p src to \p dst. + * Deep copy every pointer confval. + * + * @param dst The destination AdminOptions. + * @param src The source AdminOptions. + */ +static void rd_kafka_AdminOptions_copy_to(rd_kafka_AdminOptions_t *dst, + const rd_kafka_AdminOptions_t *src) { + *dst = *src; + if (src->match_consumer_group_states.u.PTR) { + char errstr[512]; + rd_list_t *states_list_copy = rd_list_copy_preallocated( + src->match_consumer_group_states.u.PTR, NULL); + + rd_kafka_resp_err_t err = rd_kafka_confval_set_type( + &dst->match_consumer_group_states, RD_KAFKA_CONFVAL_PTR, + states_list_copy, errstr, sizeof(errstr)); + rd_assert(!err); + } +} + + +rd_kafka_AdminOptions_t * +rd_kafka_AdminOptions_new(rd_kafka_t *rk, rd_kafka_admin_op_t for_api) { + rd_kafka_AdminOptions_t *options; + + if ((int)for_api < 0 || for_api >= RD_KAFKA_ADMIN_OP__CNT) + return NULL; + + options = rd_calloc(1, sizeof(*options)); + + options->for_api = for_api; + + rd_kafka_AdminOptions_init(rk, options); + + return options; +} + +void rd_kafka_AdminOptions_destroy(rd_kafka_AdminOptions_t *options) { + if (options->match_consumer_group_states.u.PTR) { + rd_list_destroy(options->match_consumer_group_states.u.PTR); + } + rd_free(options); +} + +/**@}*/ + + + +/** + * @name CreateTopics + * @{ + * + * + * + */ + + + +rd_kafka_NewTopic_t *rd_kafka_NewTopic_new(const char *topic, + int num_partitions, + int replication_factor, + char *errstr, + size_t errstr_size) { + rd_kafka_NewTopic_t *new_topic; + + if (!topic) { + rd_snprintf(errstr, errstr_size, "Invalid topic name"); + return NULL; + } + + if (num_partitions < -1 || num_partitions > RD_KAFKAP_PARTITIONS_MAX) { + rd_snprintf(errstr, errstr_size, + "num_partitions out of " + "expected range %d..%d or -1 for broker default", + 1, RD_KAFKAP_PARTITIONS_MAX); + return NULL; + } + + if (replication_factor < -1 || + replication_factor > RD_KAFKAP_BROKERS_MAX) { + rd_snprintf(errstr, errstr_size, + "replication_factor out of expected range %d..%d", + -1, RD_KAFKAP_BROKERS_MAX); + return NULL; + } + + new_topic = rd_calloc(1, sizeof(*new_topic)); + new_topic->topic = rd_strdup(topic); + new_topic->num_partitions = num_partitions; + new_topic->replication_factor = replication_factor; + + /* List of int32 lists */ + rd_list_init(&new_topic->replicas, 0, rd_list_destroy_free); + rd_list_prealloc_elems(&new_topic->replicas, 0, + num_partitions == -1 ? 0 : num_partitions, + 0 /*nozero*/); + + /* List of ConfigEntrys */ + rd_list_init(&new_topic->config, 0, rd_kafka_ConfigEntry_free); + + return new_topic; +} + + +/** + * @brief Topic name comparator for NewTopic_t + */ +static int rd_kafka_NewTopic_cmp(const void *_a, const void *_b) { + const rd_kafka_NewTopic_t *a = _a, *b = _b; + return strcmp(a->topic, b->topic); +} + + + +/** + * @brief Allocate a new NewTopic and make a copy of \p src + */ +static rd_kafka_NewTopic_t * +rd_kafka_NewTopic_copy(const rd_kafka_NewTopic_t *src) { + rd_kafka_NewTopic_t *dst; + + dst = rd_kafka_NewTopic_new(src->topic, src->num_partitions, + src->replication_factor, NULL, 0); + rd_assert(dst); + + rd_list_destroy(&dst->replicas); /* created in .._new() */ + rd_list_init_copy(&dst->replicas, &src->replicas); + rd_list_copy_to(&dst->replicas, &src->replicas, + rd_list_copy_preallocated, NULL); + + rd_list_init_copy(&dst->config, &src->config); + rd_list_copy_to(&dst->config, &src->config, + rd_kafka_ConfigEntry_list_copy, NULL); + + return dst; +} + +void rd_kafka_NewTopic_destroy(rd_kafka_NewTopic_t *new_topic) { + rd_list_destroy(&new_topic->replicas); + rd_list_destroy(&new_topic->config); + rd_free(new_topic->topic); + rd_free(new_topic); +} + +static void rd_kafka_NewTopic_free(void *ptr) { + rd_kafka_NewTopic_destroy(ptr); +} + +void rd_kafka_NewTopic_destroy_array(rd_kafka_NewTopic_t **new_topics, + size_t new_topic_cnt) { + size_t i; + for (i = 0; i < new_topic_cnt; i++) + rd_kafka_NewTopic_destroy(new_topics[i]); +} + + +rd_kafka_resp_err_t +rd_kafka_NewTopic_set_replica_assignment(rd_kafka_NewTopic_t *new_topic, + int32_t partition, + int32_t *broker_ids, + size_t broker_id_cnt, + char *errstr, + size_t errstr_size) { + rd_list_t *rl; + int i; + + if (new_topic->replication_factor != -1) { + rd_snprintf(errstr, errstr_size, + "Specifying a replication factor and " + "a replica assignment are mutually exclusive"); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } else if (new_topic->num_partitions == -1) { + rd_snprintf(errstr, errstr_size, + "Specifying a default partition count and a " + "replica assignment are mutually exclusive"); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + /* Replica partitions must be added consecutively starting from 0. */ + if (partition != rd_list_cnt(&new_topic->replicas)) { + rd_snprintf(errstr, errstr_size, + "Partitions must be added in order, " + "starting at 0: expecting partition %d, " + "not %" PRId32, + rd_list_cnt(&new_topic->replicas), partition); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) { + rd_snprintf(errstr, errstr_size, + "Too many brokers specified " + "(RD_KAFKAP_BROKERS_MAX=%d)", + RD_KAFKAP_BROKERS_MAX); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + + rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt); + + for (i = 0; i < (int)broker_id_cnt; i++) + rd_list_set_int32(rl, i, broker_ids[i]); + + rd_list_add(&new_topic->replicas, rl); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Generic constructor of ConfigEntry which is also added to \p rl + */ +static rd_kafka_resp_err_t +rd_kafka_admin_add_config0(rd_list_t *rl, + const char *name, + const char *value, + rd_kafka_AlterOperation_t operation) { + rd_kafka_ConfigEntry_t *entry; + + if (!name) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + entry = rd_calloc(1, sizeof(*entry)); + entry->kv = rd_strtup_new(name, value); + entry->a.operation = operation; + + rd_list_add(rl, entry); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +rd_kafka_resp_err_t rd_kafka_NewTopic_set_config(rd_kafka_NewTopic_t *new_topic, + const char *name, + const char *value) { + return rd_kafka_admin_add_config0(&new_topic->config, name, value, + RD_KAFKA_ALTER_OP_ADD); +} + + + +/** + * @brief Parse CreateTopicsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_op_t *rko_result = NULL; + int32_t topic_cnt; + int i; + + if (rd_kafka_buf_ApiVersion(reply) >= 2) { + int32_t Throttle_Time; + rd_kafka_buf_read_i32(reply, &Throttle_Time); + rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); + } + + /* #topics */ + rd_kafka_buf_read_i32(reply, &topic_cnt); + + if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) + rd_kafka_buf_parse_fail( + reply, + "Received %" PRId32 + " topics in response " + "when only %d were requested", + topic_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); + + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt, + rd_kafka_topic_result_free); + + for (i = 0; i < (int)topic_cnt; i++) { + rd_kafkap_str_t ktopic; + int16_t error_code; + rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER; + char *this_errstr = NULL; + rd_kafka_topic_result_t *terr; + rd_kafka_NewTopic_t skel; + int orig_pos; + + rd_kafka_buf_read_str(reply, &ktopic); + rd_kafka_buf_read_i16(reply, &error_code); + + if (rd_kafka_buf_ApiVersion(reply) >= 1) + rd_kafka_buf_read_str(reply, &error_msg); + + /* For non-blocking CreateTopicsRequests the broker + * will returned REQUEST_TIMED_OUT for topics + * that were triggered for creation - + * we hide this error code from the application + * since the topic creation is in fact in progress. */ + if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && + rd_kafka_confval_get_int(&rko_req->rko_u.admin_request + .options.operation_timeout) <= + 0) { + error_code = RD_KAFKA_RESP_ERR_NO_ERROR; + this_errstr = NULL; + } + + if (error_code) { + if (RD_KAFKAP_STR_IS_NULL(&error_msg) || + RD_KAFKAP_STR_LEN(&error_msg) == 0) + this_errstr = + (char *)rd_kafka_err2str(error_code); + else + RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); + } + + terr = rd_kafka_topic_result_new(ktopic.str, + RD_KAFKAP_STR_LEN(&ktopic), + error_code, this_errstr); + + /* As a convenience to the application we insert topic result + * in the same order as they were requested. The broker + * does not maintain ordering unfortunately. */ + skel.topic = terr->topic; + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, + &skel, rd_kafka_NewTopic_cmp); + if (orig_pos == -1) { + rd_kafka_topic_result_destroy(terr); + rd_kafka_buf_parse_fail( + reply, + "Broker returned topic %.*s that was not " + "included in the original request", + RD_KAFKAP_STR_PR(&ktopic)); + } + + if (rd_list_elem(&rko_result->rko_u.admin_result.results, + orig_pos) != NULL) { + rd_kafka_topic_result_destroy(terr); + rd_kafka_buf_parse_fail( + reply, "Broker returned topic %.*s multiple times", + RD_KAFKAP_STR_PR(&ktopic)); + } + + rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, + terr); + } + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "CreateTopics response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + + +void rd_kafka_CreateTopics(rd_kafka_t *rk, + rd_kafka_NewTopic_t **new_topics, + size_t new_topic_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + size_t i; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_CreateTopicsRequest, + rd_kafka_CreateTopicsResponse_parse, + }; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_CREATETOPICS, + RD_KAFKA_EVENT_CREATETOPICS_RESULT, + &cbs, options, rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, (int)new_topic_cnt, + rd_kafka_NewTopic_free); + + for (i = 0; i < new_topic_cnt; i++) + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_NewTopic_copy(new_topics[i])); + + rd_kafka_q_enq(rk->rk_ops, rko); +} + + +/** + * @brief Get an array of topic results from a CreateTopics result. + * + * The returned \p topics life-time is the same as the \p result object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_topic_result_t **rd_kafka_CreateTopics_result_topics( + const rd_kafka_CreateTopics_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result, + cntp); +} + +/**@}*/ + + + +/** + * @name Delete topics + * @{ + * + * + * + * + */ + +rd_kafka_DeleteTopic_t *rd_kafka_DeleteTopic_new(const char *topic) { + size_t tsize = strlen(topic) + 1; + rd_kafka_DeleteTopic_t *del_topic; + + /* Single allocation */ + del_topic = rd_malloc(sizeof(*del_topic) + tsize); + del_topic->topic = del_topic->data; + memcpy(del_topic->topic, topic, tsize); + + return del_topic; +} + +void rd_kafka_DeleteTopic_destroy(rd_kafka_DeleteTopic_t *del_topic) { + rd_free(del_topic); +} + +static void rd_kafka_DeleteTopic_free(void *ptr) { + rd_kafka_DeleteTopic_destroy(ptr); +} + + +void rd_kafka_DeleteTopic_destroy_array(rd_kafka_DeleteTopic_t **del_topics, + size_t del_topic_cnt) { + size_t i; + for (i = 0; i < del_topic_cnt; i++) + rd_kafka_DeleteTopic_destroy(del_topics[i]); +} + + +/** + * @brief Topic name comparator for DeleteTopic_t + */ +static int rd_kafka_DeleteTopic_cmp(const void *_a, const void *_b) { + const rd_kafka_DeleteTopic_t *a = _a, *b = _b; + return strcmp(a->topic, b->topic); +} + +/** + * @brief Allocate a new DeleteTopic and make a copy of \p src + */ +static rd_kafka_DeleteTopic_t * +rd_kafka_DeleteTopic_copy(const rd_kafka_DeleteTopic_t *src) { + return rd_kafka_DeleteTopic_new(src->topic); +} + + + +/** + * @brief Parse DeleteTopicsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_DeleteTopicsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_op_t *rko_result = NULL; + int32_t topic_cnt; + int i; + + if (rd_kafka_buf_ApiVersion(reply) >= 1) { + int32_t Throttle_Time; + rd_kafka_buf_read_i32(reply, &Throttle_Time); + rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); + } + + /* #topics */ + rd_kafka_buf_read_i32(reply, &topic_cnt); + + if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) + rd_kafka_buf_parse_fail( + reply, + "Received %" PRId32 + " topics in response " + "when only %d were requested", + topic_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt, + rd_kafka_topic_result_free); + + for (i = 0; i < (int)topic_cnt; i++) { + rd_kafkap_str_t ktopic; + int16_t error_code; + rd_kafka_topic_result_t *terr; + rd_kafka_NewTopic_t skel; + int orig_pos; + + rd_kafka_buf_read_str(reply, &ktopic); + rd_kafka_buf_read_i16(reply, &error_code); + + /* For non-blocking DeleteTopicsRequests the broker + * will returned REQUEST_TIMED_OUT for topics + * that were triggered for creation - + * we hide this error code from the application + * since the topic creation is in fact in progress. */ + if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && + rd_kafka_confval_get_int(&rko_req->rko_u.admin_request + .options.operation_timeout) <= + 0) { + error_code = RD_KAFKA_RESP_ERR_NO_ERROR; + } + + terr = rd_kafka_topic_result_new( + ktopic.str, RD_KAFKAP_STR_LEN(&ktopic), error_code, + error_code ? rd_kafka_err2str(error_code) : NULL); + + /* As a convenience to the application we insert topic result + * in the same order as they were requested. The broker + * does not maintain ordering unfortunately. */ + skel.topic = terr->topic; + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, + &skel, rd_kafka_DeleteTopic_cmp); + if (orig_pos == -1) { + rd_kafka_topic_result_destroy(terr); + rd_kafka_buf_parse_fail( + reply, + "Broker returned topic %.*s that was not " + "included in the original request", + RD_KAFKAP_STR_PR(&ktopic)); + } + + if (rd_list_elem(&rko_result->rko_u.admin_result.results, + orig_pos) != NULL) { + rd_kafka_topic_result_destroy(terr); + rd_kafka_buf_parse_fail( + reply, "Broker returned topic %.*s multiple times", + RD_KAFKAP_STR_PR(&ktopic)); + } + + rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, + terr); + } + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "DeleteTopics response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + + + +void rd_kafka_DeleteTopics(rd_kafka_t *rk, + rd_kafka_DeleteTopic_t **del_topics, + size_t del_topic_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + size_t i; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_DeleteTopicsRequest, + rd_kafka_DeleteTopicsResponse_parse, + }; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_DELETETOPICS, + RD_KAFKA_EVENT_DELETETOPICS_RESULT, + &cbs, options, rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, (int)del_topic_cnt, + rd_kafka_DeleteTopic_free); + + for (i = 0; i < del_topic_cnt; i++) + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_DeleteTopic_copy(del_topics[i])); + + rd_kafka_q_enq(rk->rk_ops, rko); +} + + +/** + * @brief Get an array of topic results from a DeleteTopics result. + * + * The returned \p topics life-time is the same as the \p result object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_topic_result_t **rd_kafka_DeleteTopics_result_topics( + const rd_kafka_DeleteTopics_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result, + cntp); +} + + + +/** + * @name Create partitions + * @{ + * + * + * + * + */ + +rd_kafka_NewPartitions_t *rd_kafka_NewPartitions_new(const char *topic, + size_t new_total_cnt, + char *errstr, + size_t errstr_size) { + size_t tsize = strlen(topic) + 1; + rd_kafka_NewPartitions_t *newps; + + if (new_total_cnt < 1 || new_total_cnt > RD_KAFKAP_PARTITIONS_MAX) { + rd_snprintf(errstr, errstr_size, + "new_total_cnt out of " + "expected range %d..%d", + 1, RD_KAFKAP_PARTITIONS_MAX); + return NULL; + } + + /* Single allocation */ + newps = rd_malloc(sizeof(*newps) + tsize); + newps->total_cnt = new_total_cnt; + newps->topic = newps->data; + memcpy(newps->topic, topic, tsize); + + /* List of int32 lists */ + rd_list_init(&newps->replicas, 0, rd_list_destroy_free); + rd_list_prealloc_elems(&newps->replicas, 0, new_total_cnt, + 0 /*nozero*/); + + return newps; +} + +/** + * @brief Topic name comparator for NewPartitions_t + */ +static int rd_kafka_NewPartitions_cmp(const void *_a, const void *_b) { + const rd_kafka_NewPartitions_t *a = _a, *b = _b; + return strcmp(a->topic, b->topic); +} + + +/** + * @brief Allocate a new CreatePartitions and make a copy of \p src + */ +static rd_kafka_NewPartitions_t * +rd_kafka_NewPartitions_copy(const rd_kafka_NewPartitions_t *src) { + rd_kafka_NewPartitions_t *dst; + + dst = rd_kafka_NewPartitions_new(src->topic, src->total_cnt, NULL, 0); + + rd_list_destroy(&dst->replicas); /* created in .._new() */ + rd_list_init_copy(&dst->replicas, &src->replicas); + rd_list_copy_to(&dst->replicas, &src->replicas, + rd_list_copy_preallocated, NULL); + + return dst; +} + +void rd_kafka_NewPartitions_destroy(rd_kafka_NewPartitions_t *newps) { + rd_list_destroy(&newps->replicas); + rd_free(newps); +} + +static void rd_kafka_NewPartitions_free(void *ptr) { + rd_kafka_NewPartitions_destroy(ptr); +} + + +void rd_kafka_NewPartitions_destroy_array(rd_kafka_NewPartitions_t **newps, + size_t newps_cnt) { + size_t i; + for (i = 0; i < newps_cnt; i++) + rd_kafka_NewPartitions_destroy(newps[i]); +} + + + +rd_kafka_resp_err_t +rd_kafka_NewPartitions_set_replica_assignment(rd_kafka_NewPartitions_t *newp, + int32_t new_partition_idx, + int32_t *broker_ids, + size_t broker_id_cnt, + char *errstr, + size_t errstr_size) { + rd_list_t *rl; + int i; + + /* Replica partitions must be added consecutively starting from 0. */ + if (new_partition_idx != rd_list_cnt(&newp->replicas)) { + rd_snprintf(errstr, errstr_size, + "Partitions must be added in order, " + "starting at 0: expecting partition " + "index %d, not %" PRId32, + rd_list_cnt(&newp->replicas), new_partition_idx); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) { + rd_snprintf(errstr, errstr_size, + "Too many brokers specified " + "(RD_KAFKAP_BROKERS_MAX=%d)", + RD_KAFKAP_BROKERS_MAX); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt); + + for (i = 0; i < (int)broker_id_cnt; i++) + rd_list_set_int32(rl, i, broker_ids[i]); + + rd_list_add(&newp->replicas, rl); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +/** + * @brief Parse CreatePartitionsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_CreatePartitionsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_op_t *rko_result = NULL; + int32_t topic_cnt; + int i; + int32_t Throttle_Time; + + rd_kafka_buf_read_i32(reply, &Throttle_Time); + rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); + + /* #topics */ + rd_kafka_buf_read_i32(reply, &topic_cnt); + + if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) + rd_kafka_buf_parse_fail( + reply, + "Received %" PRId32 + " topics in response " + "when only %d were requested", + topic_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt, + rd_kafka_topic_result_free); + + for (i = 0; i < (int)topic_cnt; i++) { + rd_kafkap_str_t ktopic; + int16_t error_code; + char *this_errstr = NULL; + rd_kafka_topic_result_t *terr; + rd_kafka_NewTopic_t skel; + rd_kafkap_str_t error_msg; + int orig_pos; + + rd_kafka_buf_read_str(reply, &ktopic); + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &error_msg); + + /* For non-blocking CreatePartitionsRequests the broker + * will returned REQUEST_TIMED_OUT for topics + * that were triggered for creation - + * we hide this error code from the application + * since the topic creation is in fact in progress. */ + if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && + rd_kafka_confval_get_int(&rko_req->rko_u.admin_request + .options.operation_timeout) <= + 0) { + error_code = RD_KAFKA_RESP_ERR_NO_ERROR; + } + + if (error_code) { + if (RD_KAFKAP_STR_IS_NULL(&error_msg) || + RD_KAFKAP_STR_LEN(&error_msg) == 0) + this_errstr = + (char *)rd_kafka_err2str(error_code); + else + RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); + } + + terr = rd_kafka_topic_result_new( + ktopic.str, RD_KAFKAP_STR_LEN(&ktopic), error_code, + error_code ? this_errstr : NULL); + + /* As a convenience to the application we insert topic result + * in the same order as they were requested. The broker + * does not maintain ordering unfortunately. */ + skel.topic = terr->topic; + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, + &skel, rd_kafka_NewPartitions_cmp); + if (orig_pos == -1) { + rd_kafka_topic_result_destroy(terr); + rd_kafka_buf_parse_fail( + reply, + "Broker returned topic %.*s that was not " + "included in the original request", + RD_KAFKAP_STR_PR(&ktopic)); + } + + if (rd_list_elem(&rko_result->rko_u.admin_result.results, + orig_pos) != NULL) { + rd_kafka_topic_result_destroy(terr); + rd_kafka_buf_parse_fail( + reply, "Broker returned topic %.*s multiple times", + RD_KAFKAP_STR_PR(&ktopic)); + } + + rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, + terr); + } + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "CreatePartitions response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + + + +void rd_kafka_CreatePartitions(rd_kafka_t *rk, + rd_kafka_NewPartitions_t **newps, + size_t newps_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + size_t i; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_CreatePartitionsRequest, + rd_kafka_CreatePartitionsResponse_parse, + }; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_CREATEPARTITIONS, + RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, &cbs, options, + rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, (int)newps_cnt, + rd_kafka_NewPartitions_free); + + for (i = 0; i < newps_cnt; i++) + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_NewPartitions_copy(newps[i])); + + rd_kafka_q_enq(rk->rk_ops, rko); +} + + +/** + * @brief Get an array of topic results from a CreatePartitions result. + * + * The returned \p topics life-time is the same as the \p result object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_topic_result_t **rd_kafka_CreatePartitions_result_topics( + const rd_kafka_CreatePartitions_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result, + cntp); +} + +/**@}*/ + + + +/** + * @name ConfigEntry + * @{ + * + * + * + */ + +static void rd_kafka_ConfigEntry_destroy(rd_kafka_ConfigEntry_t *entry) { + rd_strtup_destroy(entry->kv); + rd_list_destroy(&entry->synonyms); + rd_free(entry); +} + + +static void rd_kafka_ConfigEntry_free(void *ptr) { + rd_kafka_ConfigEntry_destroy((rd_kafka_ConfigEntry_t *)ptr); +} + + +/** + * @brief Create new ConfigEntry + * + * @param name Config entry name + * @param name_len Length of name, or -1 to use strlen() + * @param value Config entry value, or NULL + * @param value_len Length of value, or -1 to use strlen() + */ +static rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new0(const char *name, + size_t name_len, + const char *value, + size_t value_len) { + rd_kafka_ConfigEntry_t *entry; + + if (!name) + return NULL; + + entry = rd_calloc(1, sizeof(*entry)); + entry->kv = rd_strtup_new0(name, name_len, value, value_len); + + rd_list_init(&entry->synonyms, 0, rd_kafka_ConfigEntry_free); + + entry->a.source = RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG; + + return entry; +} + +/** + * @sa rd_kafka_ConfigEntry_new0 + */ +static rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new(const char *name, + const char *value) { + return rd_kafka_ConfigEntry_new0(name, -1, value, -1); +} + + + +/** + * @brief Allocate a new AlterConfigs and make a copy of \p src + */ +static rd_kafka_ConfigEntry_t * +rd_kafka_ConfigEntry_copy(const rd_kafka_ConfigEntry_t *src) { + rd_kafka_ConfigEntry_t *dst; + + dst = rd_kafka_ConfigEntry_new(src->kv->name, src->kv->value); + dst->a = src->a; + + rd_list_destroy(&dst->synonyms); /* created in .._new() */ + rd_list_init_copy(&dst->synonyms, &src->synonyms); + rd_list_copy_to(&dst->synonyms, &src->synonyms, + rd_kafka_ConfigEntry_list_copy, NULL); + + return dst; +} + +static void *rd_kafka_ConfigEntry_list_copy(const void *src, void *opaque) { + return rd_kafka_ConfigEntry_copy((const rd_kafka_ConfigEntry_t *)src); +} + + +const char *rd_kafka_ConfigEntry_name(const rd_kafka_ConfigEntry_t *entry) { + return entry->kv->name; +} + +const char *rd_kafka_ConfigEntry_value(const rd_kafka_ConfigEntry_t *entry) { + return entry->kv->value; +} + +rd_kafka_ConfigSource_t +rd_kafka_ConfigEntry_source(const rd_kafka_ConfigEntry_t *entry) { + return entry->a.source; +} + +int rd_kafka_ConfigEntry_is_read_only(const rd_kafka_ConfigEntry_t *entry) { + return entry->a.is_readonly; +} + +int rd_kafka_ConfigEntry_is_default(const rd_kafka_ConfigEntry_t *entry) { + return entry->a.is_default; +} + +int rd_kafka_ConfigEntry_is_sensitive(const rd_kafka_ConfigEntry_t *entry) { + return entry->a.is_sensitive; +} + +int rd_kafka_ConfigEntry_is_synonym(const rd_kafka_ConfigEntry_t *entry) { + return entry->a.is_synonym; +} + +const rd_kafka_ConfigEntry_t ** +rd_kafka_ConfigEntry_synonyms(const rd_kafka_ConfigEntry_t *entry, + size_t *cntp) { + *cntp = rd_list_cnt(&entry->synonyms); + if (!*cntp) + return NULL; + return (const rd_kafka_ConfigEntry_t **)entry->synonyms.rl_elems; +} + + +/**@}*/ + + + +/** + * @name ConfigSource + * @{ + * + * + * + */ + +const char *rd_kafka_ConfigSource_name(rd_kafka_ConfigSource_t confsource) { + static const char *names[] = { + "UNKNOWN_CONFIG", "DYNAMIC_TOPIC_CONFIG", + "DYNAMIC_BROKER_CONFIG", "DYNAMIC_DEFAULT_BROKER_CONFIG", + "STATIC_BROKER_CONFIG", "DEFAULT_CONFIG", + }; + + if ((unsigned int)confsource >= + (unsigned int)RD_KAFKA_CONFIG_SOURCE__CNT) + return "UNSUPPORTED"; + + return names[confsource]; +} + +/**@}*/ + + + +/** + * @name ConfigResource + * @{ + * + * + * + */ + +const char *rd_kafka_ResourcePatternType_name( + rd_kafka_ResourcePatternType_t resource_pattern_type) { + static const char *names[] = {"UNKNOWN", "ANY", "MATCH", "LITERAL", + "PREFIXED"}; + + if ((unsigned int)resource_pattern_type >= + (unsigned int)RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) + return "UNSUPPORTED"; + + return names[resource_pattern_type]; +} + +const char *rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype) { + static const char *names[] = { + "UNKNOWN", "ANY", "TOPIC", "GROUP", "BROKER", + }; + + if ((unsigned int)restype >= (unsigned int)RD_KAFKA_RESOURCE__CNT) + return "UNSUPPORTED"; + + return names[restype]; +} + + +rd_kafka_ConfigResource_t * +rd_kafka_ConfigResource_new(rd_kafka_ResourceType_t restype, + const char *resname) { + rd_kafka_ConfigResource_t *config; + size_t namesz = resname ? strlen(resname) : 0; + + if (!namesz || (int)restype < 0) + return NULL; + + config = rd_calloc(1, sizeof(*config) + namesz + 1); + config->name = config->data; + memcpy(config->name, resname, namesz + 1); + config->restype = restype; + + rd_list_init(&config->config, 8, rd_kafka_ConfigEntry_free); + + return config; +} + +void rd_kafka_ConfigResource_destroy(rd_kafka_ConfigResource_t *config) { + rd_list_destroy(&config->config); + if (config->errstr) + rd_free(config->errstr); + rd_free(config); +} + +static void rd_kafka_ConfigResource_free(void *ptr) { + rd_kafka_ConfigResource_destroy((rd_kafka_ConfigResource_t *)ptr); +} + + +void rd_kafka_ConfigResource_destroy_array(rd_kafka_ConfigResource_t **config, + size_t config_cnt) { + size_t i; + for (i = 0; i < config_cnt; i++) + rd_kafka_ConfigResource_destroy(config[i]); +} + + +/** + * @brief Type and name comparator for ConfigResource_t + */ +static int rd_kafka_ConfigResource_cmp(const void *_a, const void *_b) { + const rd_kafka_ConfigResource_t *a = _a, *b = _b; + int r = RD_CMP(a->restype, b->restype); + if (r) + return r; + return strcmp(a->name, b->name); +} + +/** + * @brief Allocate a new AlterConfigs and make a copy of \p src + */ +static rd_kafka_ConfigResource_t * +rd_kafka_ConfigResource_copy(const rd_kafka_ConfigResource_t *src) { + rd_kafka_ConfigResource_t *dst; + + dst = rd_kafka_ConfigResource_new(src->restype, src->name); + + rd_list_destroy(&dst->config); /* created in .._new() */ + rd_list_init_copy(&dst->config, &src->config); + rd_list_copy_to(&dst->config, &src->config, + rd_kafka_ConfigEntry_list_copy, NULL); + + return dst; +} + + +static void +rd_kafka_ConfigResource_add_ConfigEntry(rd_kafka_ConfigResource_t *config, + rd_kafka_ConfigEntry_t *entry) { + rd_list_add(&config->config, entry); +} + + +rd_kafka_resp_err_t +rd_kafka_ConfigResource_add_config(rd_kafka_ConfigResource_t *config, + const char *name, + const char *value) { + if (!name || !*name || !value) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + return rd_kafka_admin_add_config0(&config->config, name, value, + RD_KAFKA_ALTER_OP_ADD); +} + +rd_kafka_resp_err_t +rd_kafka_ConfigResource_set_config(rd_kafka_ConfigResource_t *config, + const char *name, + const char *value) { + if (!name || !*name || !value) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + return rd_kafka_admin_add_config0(&config->config, name, value, + RD_KAFKA_ALTER_OP_SET); +} + +rd_kafka_resp_err_t +rd_kafka_ConfigResource_delete_config(rd_kafka_ConfigResource_t *config, + const char *name) { + if (!name || !*name) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + return rd_kafka_admin_add_config0(&config->config, name, NULL, + RD_KAFKA_ALTER_OP_DELETE); +} + + +const rd_kafka_ConfigEntry_t ** +rd_kafka_ConfigResource_configs(const rd_kafka_ConfigResource_t *config, + size_t *cntp) { + *cntp = rd_list_cnt(&config->config); + if (!*cntp) + return NULL; + return (const rd_kafka_ConfigEntry_t **)config->config.rl_elems; +} + + + +rd_kafka_ResourceType_t +rd_kafka_ConfigResource_type(const rd_kafka_ConfigResource_t *config) { + return config->restype; +} + +const char * +rd_kafka_ConfigResource_name(const rd_kafka_ConfigResource_t *config) { + return config->name; +} + +rd_kafka_resp_err_t +rd_kafka_ConfigResource_error(const rd_kafka_ConfigResource_t *config) { + return config->err; +} + +const char * +rd_kafka_ConfigResource_error_string(const rd_kafka_ConfigResource_t *config) { + if (!config->err) + return NULL; + if (config->errstr) + return config->errstr; + return rd_kafka_err2str(config->err); +} + + +/** + * @brief Look in the provided ConfigResource_t* list for a resource of + * type BROKER and set its broker id in \p broker_id, returning + * RD_KAFKA_RESP_ERR_NO_ERROR. + * + * If multiple BROKER resources are found RD_KAFKA_RESP_ERR__CONFLICT + * is returned and an error string is written to errstr. + * + * If no BROKER resources are found RD_KAFKA_RESP_ERR_NO_ERROR + * is returned and \p broker_idp is set to use the coordinator. + */ +static rd_kafka_resp_err_t +rd_kafka_ConfigResource_get_single_broker_id(const rd_list_t *configs, + int32_t *broker_idp, + char *errstr, + size_t errstr_size) { + const rd_kafka_ConfigResource_t *config; + int i; + int32_t broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; /* Some default + * value that we + * can compare + * to below */ + + RD_LIST_FOREACH(config, configs, i) { + char *endptr; + long int r; + + if (config->restype != RD_KAFKA_RESOURCE_BROKER) + continue; + + if (broker_id != RD_KAFKA_ADMIN_TARGET_CONTROLLER) { + rd_snprintf(errstr, errstr_size, + "Only one ConfigResource of type BROKER " + "is allowed per call"); + return RD_KAFKA_RESP_ERR__CONFLICT; + } + + /* Convert string broker-id to int32 */ + r = (int32_t)strtol(config->name, &endptr, 10); + if (r == LONG_MIN || r == LONG_MAX || config->name == endptr || + r < 0) { + rd_snprintf(errstr, errstr_size, + "Expected an int32 broker_id for " + "ConfigResource(type=BROKER, name=%s)", + config->name); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + broker_id = r; + + /* Keep scanning to make sure there are no duplicate + * BROKER resources. */ + } + + *broker_idp = broker_id; + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/**@}*/ + + + +/** + * @name AlterConfigs + * @{ + * + * + * + */ + + + +/** + * @brief Parse AlterConfigsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_AlterConfigsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_op_t *rko_result = NULL; + int32_t res_cnt; + int i; + int32_t Throttle_Time; + + rd_kafka_buf_read_i32(reply, &Throttle_Time); + rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); + + rd_kafka_buf_read_i32(reply, &res_cnt); + + if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) { + rd_snprintf(errstr, errstr_size, + "Received %" PRId32 + " ConfigResources in response " + "when only %d were requested", + res_cnt, + rd_list_cnt(&rko_req->rko_u.admin_request.args)); + return RD_KAFKA_RESP_ERR__BAD_MSG; + } + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, + rd_kafka_ConfigResource_free); + + for (i = 0; i < (int)res_cnt; i++) { + int16_t error_code; + rd_kafkap_str_t error_msg; + int8_t res_type; + rd_kafkap_str_t kres_name; + char *res_name; + char *this_errstr = NULL; + rd_kafka_ConfigResource_t *config; + rd_kafka_ConfigResource_t skel; + int orig_pos; + + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &error_msg); + rd_kafka_buf_read_i8(reply, &res_type); + rd_kafka_buf_read_str(reply, &kres_name); + RD_KAFKAP_STR_DUPA(&res_name, &kres_name); + + if (error_code) { + if (RD_KAFKAP_STR_IS_NULL(&error_msg) || + RD_KAFKAP_STR_LEN(&error_msg) == 0) + this_errstr = + (char *)rd_kafka_err2str(error_code); + else + RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); + } + + config = rd_kafka_ConfigResource_new(res_type, res_name); + if (!config) { + rd_kafka_log(rko_req->rko_rk, LOG_ERR, "ADMIN", + "AlterConfigs returned " + "unsupported ConfigResource #%d with " + "type %d and name \"%s\": ignoring", + i, res_type, res_name); + continue; + } + + config->err = error_code; + if (this_errstr) + config->errstr = rd_strdup(this_errstr); + + /* As a convenience to the application we insert result + * in the same order as they were requested. The broker + * does not maintain ordering unfortunately. */ + skel.restype = config->restype; + skel.name = config->name; + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, + &skel, rd_kafka_ConfigResource_cmp); + if (orig_pos == -1) { + rd_kafka_ConfigResource_destroy(config); + rd_kafka_buf_parse_fail( + reply, + "Broker returned ConfigResource %d,%s " + "that was not " + "included in the original request", + res_type, res_name); + } + + if (rd_list_elem(&rko_result->rko_u.admin_result.results, + orig_pos) != NULL) { + rd_kafka_ConfigResource_destroy(config); + rd_kafka_buf_parse_fail( + reply, + "Broker returned ConfigResource %d,%s " + "multiple times", + res_type, res_name); + } + + rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, + config); + } + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "AlterConfigs response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + + + +void rd_kafka_AlterConfigs(rd_kafka_t *rk, + rd_kafka_ConfigResource_t **configs, + size_t config_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + size_t i; + rd_kafka_resp_err_t err; + char errstr[256]; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_AlterConfigsRequest, + rd_kafka_AlterConfigsResponse_parse, + }; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_ALTERCONFIGS, + RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, + &cbs, options, rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt, + rd_kafka_ConfigResource_free); + + for (i = 0; i < config_cnt; i++) + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_ConfigResource_copy(configs[i])); + + /* If there's a BROKER resource in the list we need to + * speak directly to that broker rather than the controller. + * + * Multiple BROKER resources are not allowed. + */ + err = rd_kafka_ConfigResource_get_single_broker_id( + &rko->rko_u.admin_request.args, &rko->rko_u.admin_request.broker_id, + errstr, sizeof(errstr)); + if (err) { + rd_kafka_admin_result_fail(rko, err, "%s", errstr); + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_true /*destroy*/); + return; + } + + rd_kafka_q_enq(rk->rk_ops, rko); +} + + +const rd_kafka_ConfigResource_t **rd_kafka_AlterConfigs_result_resources( + const rd_kafka_AlterConfigs_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_resources( + (const rd_kafka_op_t *)result, cntp); +} + +/**@}*/ + + + +/** + * @name DescribeConfigs + * @{ + * + * + * + */ + + +/** + * @brief Parse DescribeConfigsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_op_t *rko_result = NULL; + int32_t res_cnt; + int i; + int32_t Throttle_Time; + rd_kafka_ConfigResource_t *config = NULL; + rd_kafka_ConfigEntry_t *entry = NULL; + + rd_kafka_buf_read_i32(reply, &Throttle_Time); + rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); + + /* #resources */ + rd_kafka_buf_read_i32(reply, &res_cnt); + + if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) + rd_kafka_buf_parse_fail( + reply, + "Received %" PRId32 + " ConfigResources in response " + "when only %d were requested", + res_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, + rd_kafka_ConfigResource_free); + + for (i = 0; i < (int)res_cnt; i++) { + int16_t error_code; + rd_kafkap_str_t error_msg; + int8_t res_type; + rd_kafkap_str_t kres_name; + char *res_name; + char *this_errstr = NULL; + rd_kafka_ConfigResource_t skel; + int orig_pos; + int32_t entry_cnt; + int ci; + + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &error_msg); + rd_kafka_buf_read_i8(reply, &res_type); + rd_kafka_buf_read_str(reply, &kres_name); + RD_KAFKAP_STR_DUPA(&res_name, &kres_name); + + if (error_code) { + if (RD_KAFKAP_STR_IS_NULL(&error_msg) || + RD_KAFKAP_STR_LEN(&error_msg) == 0) + this_errstr = + (char *)rd_kafka_err2str(error_code); + else + RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); + } + + config = rd_kafka_ConfigResource_new(res_type, res_name); + if (!config) { + rd_kafka_log(rko_req->rko_rk, LOG_ERR, "ADMIN", + "DescribeConfigs returned " + "unsupported ConfigResource #%d with " + "type %d and name \"%s\": ignoring", + i, res_type, res_name); + continue; + } + + config->err = error_code; + if (this_errstr) + config->errstr = rd_strdup(this_errstr); + + /* #config_entries */ + rd_kafka_buf_read_i32(reply, &entry_cnt); + + for (ci = 0; ci < (int)entry_cnt; ci++) { + rd_kafkap_str_t config_name, config_value; + int32_t syn_cnt; + int si; + + rd_kafka_buf_read_str(reply, &config_name); + rd_kafka_buf_read_str(reply, &config_value); + + entry = rd_kafka_ConfigEntry_new0( + config_name.str, RD_KAFKAP_STR_LEN(&config_name), + config_value.str, RD_KAFKAP_STR_LEN(&config_value)); + + rd_kafka_buf_read_bool(reply, &entry->a.is_readonly); + + /* ApiVersion 0 has is_default field, while + * ApiVersion 1 has source field. + * Convert between the two so they look the same + * to the caller. */ + if (rd_kafka_buf_ApiVersion(reply) == 0) { + rd_kafka_buf_read_bool(reply, + &entry->a.is_default); + if (entry->a.is_default) + entry->a.source = + RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG; + } else { + int8_t config_source; + rd_kafka_buf_read_i8(reply, &config_source); + entry->a.source = config_source; + + if (entry->a.source == + RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG) + entry->a.is_default = 1; + } + + rd_kafka_buf_read_bool(reply, &entry->a.is_sensitive); + + + if (rd_kafka_buf_ApiVersion(reply) == 1) { + /* #config_synonyms (ApiVersion 1) */ + rd_kafka_buf_read_i32(reply, &syn_cnt); + + if (syn_cnt > 100000) + rd_kafka_buf_parse_fail( + reply, + "Broker returned %" PRId32 + " config synonyms for " + "ConfigResource %d,%s: " + "limit is 100000", + syn_cnt, config->restype, + config->name); + + if (syn_cnt > 0) + rd_list_grow(&entry->synonyms, syn_cnt); + + } else { + /* No synonyms in ApiVersion 0 */ + syn_cnt = 0; + } + + + + /* Read synonyms (ApiVersion 1) */ + for (si = 0; si < (int)syn_cnt; si++) { + rd_kafkap_str_t syn_name, syn_value; + int8_t syn_source; + rd_kafka_ConfigEntry_t *syn_entry; + + rd_kafka_buf_read_str(reply, &syn_name); + rd_kafka_buf_read_str(reply, &syn_value); + rd_kafka_buf_read_i8(reply, &syn_source); + + syn_entry = rd_kafka_ConfigEntry_new0( + syn_name.str, RD_KAFKAP_STR_LEN(&syn_name), + syn_value.str, + RD_KAFKAP_STR_LEN(&syn_value)); + if (!syn_entry) + rd_kafka_buf_parse_fail( + reply, + "Broker returned invalid " + "synonym #%d " + "for ConfigEntry #%d (%s) " + "and ConfigResource %d,%s: " + "syn_name.len %d, " + "syn_value.len %d", + si, ci, entry->kv->name, + config->restype, config->name, + (int)syn_name.len, + (int)syn_value.len); + + syn_entry->a.source = syn_source; + syn_entry->a.is_synonym = 1; + + rd_list_add(&entry->synonyms, syn_entry); + } + + rd_kafka_ConfigResource_add_ConfigEntry(config, entry); + entry = NULL; + } + + /* As a convenience to the application we insert result + * in the same order as they were requested. The broker + * does not maintain ordering unfortunately. */ + skel.restype = config->restype; + skel.name = config->name; + orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, + &skel, rd_kafka_ConfigResource_cmp); + if (orig_pos == -1) + rd_kafka_buf_parse_fail( + reply, + "Broker returned ConfigResource %d,%s " + "that was not " + "included in the original request", + res_type, res_name); + + if (rd_list_elem(&rko_result->rko_u.admin_result.results, + orig_pos) != NULL) + rd_kafka_buf_parse_fail( + reply, + "Broker returned ConfigResource %d,%s " + "multiple times", + res_type, res_name); + + rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, + config); + config = NULL; + } + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (entry) + rd_kafka_ConfigEntry_destroy(entry); + if (config) + rd_kafka_ConfigResource_destroy(config); + + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "DescribeConfigs response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + + + +void rd_kafka_DescribeConfigs(rd_kafka_t *rk, + rd_kafka_ConfigResource_t **configs, + size_t config_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + size_t i; + rd_kafka_resp_err_t err; + char errstr[256]; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_DescribeConfigsRequest, + rd_kafka_DescribeConfigsResponse_parse, + }; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_DESCRIBECONFIGS, + RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, &cbs, options, rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt, + rd_kafka_ConfigResource_free); + + for (i = 0; i < config_cnt; i++) + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_ConfigResource_copy(configs[i])); + + /* If there's a BROKER resource in the list we need to + * speak directly to that broker rather than the controller. + * + * Multiple BROKER resources are not allowed. + */ + err = rd_kafka_ConfigResource_get_single_broker_id( + &rko->rko_u.admin_request.args, &rko->rko_u.admin_request.broker_id, + errstr, sizeof(errstr)); + if (err) { + rd_kafka_admin_result_fail(rko, err, "%s", errstr); + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_true /*destroy*/); + return; + } + + rd_kafka_q_enq(rk->rk_ops, rko); +} + + + +const rd_kafka_ConfigResource_t **rd_kafka_DescribeConfigs_result_resources( + const rd_kafka_DescribeConfigs_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_resources( + (const rd_kafka_op_t *)result, cntp); +} + +/**@}*/ + +/** + * @name Delete Records + * @{ + * + * + * + * + */ + +rd_kafka_DeleteRecords_t *rd_kafka_DeleteRecords_new( + const rd_kafka_topic_partition_list_t *before_offsets) { + rd_kafka_DeleteRecords_t *del_records; + + del_records = rd_calloc(1, sizeof(*del_records)); + del_records->offsets = + rd_kafka_topic_partition_list_copy(before_offsets); + + return del_records; +} + +void rd_kafka_DeleteRecords_destroy(rd_kafka_DeleteRecords_t *del_records) { + rd_kafka_topic_partition_list_destroy(del_records->offsets); + rd_free(del_records); +} + +void rd_kafka_DeleteRecords_destroy_array( + rd_kafka_DeleteRecords_t **del_records, + size_t del_record_cnt) { + size_t i; + for (i = 0; i < del_record_cnt; i++) + rd_kafka_DeleteRecords_destroy(del_records[i]); +} + + + +/** @brief Merge the DeleteRecords response from a single broker + * into the user response list. + */ +static void +rd_kafka_DeleteRecords_response_merge(rd_kafka_op_t *rko_fanout, + const rd_kafka_op_t *rko_partial) { + rd_kafka_t *rk = rko_fanout->rko_rk; + const rd_kafka_topic_partition_list_t *partitions; + rd_kafka_topic_partition_list_t *respartitions; + const rd_kafka_topic_partition_t *partition; + + rd_assert(rko_partial->rko_evtype == + RD_KAFKA_EVENT_DELETERECORDS_RESULT); + + /* All partitions (offsets) from the DeleteRecords() call */ + respartitions = + rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results, 0); + + if (rko_partial->rko_err) { + /* If there was a request-level error, set the error on + * all requested partitions for this request. */ + const rd_kafka_topic_partition_list_t *reqpartitions; + rd_kafka_topic_partition_t *reqpartition; + + /* Partitions (offsets) from this DeleteRecordsRequest */ + reqpartitions = + rd_list_elem(&rko_partial->rko_u.admin_result.args, 0); + + RD_KAFKA_TPLIST_FOREACH(reqpartition, reqpartitions) { + rd_kafka_topic_partition_t *respart; + + /* Find result partition */ + respart = rd_kafka_topic_partition_list_find( + respartitions, reqpartition->topic, + reqpartition->partition); + + rd_assert(respart || !*"respart not found"); + + respart->err = rko_partial->rko_err; + } + + return; + } + + /* Partitions from the DeleteRecordsResponse */ + partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); + + RD_KAFKA_TPLIST_FOREACH(partition, partitions) { + rd_kafka_topic_partition_t *respart; + + + /* Find result partition */ + respart = rd_kafka_topic_partition_list_find( + respartitions, partition->topic, partition->partition); + if (unlikely(!respart)) { + rd_dassert(!*"partition not found"); + + rd_kafka_log(rk, LOG_WARNING, "DELETERECORDS", + "DeleteRecords response contains " + "unexpected %s [%" PRId32 + "] which " + "was not in the request list: ignored", + partition->topic, partition->partition); + continue; + } + + respart->offset = partition->offset; + respart->err = partition->err; + } +} + + + +/** + * @brief Parse DeleteRecordsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_op_t *rko_result; + rd_kafka_topic_partition_list_t *offsets; + + rd_kafka_buf_read_throttle_time(reply); + + + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, + RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + offsets = rd_kafka_buf_read_topic_partitions(reply, 0, fields); + if (!offsets) + rd_kafka_buf_parse_fail(reply, + "Failed to parse topic partitions"); + + + rko_result = rd_kafka_admin_result_new(rko_req); + rd_list_init(&rko_result->rko_u.admin_result.results, 1, + rd_kafka_topic_partition_list_destroy_free); + rd_list_add(&rko_result->rko_u.admin_result.results, offsets); + *rko_resultp = rko_result; + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + rd_snprintf(errstr, errstr_size, + "DeleteRecords response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + + +/** + * @brief Call when leaders have been queried to progress the DeleteRecords + * admin op to its next phase, sending DeleteRecords to partition + * leaders. + * + * @param rko Reply op (RD_KAFKA_OP_LEADERS). + */ +static rd_kafka_op_res_t +rd_kafka_DeleteRecords_leaders_queried_cb(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *reply) { + rd_kafka_resp_err_t err = reply->rko_err; + const rd_list_t *leaders = + reply->rko_u.leaders.leaders; /* Possibly NULL (on err) */ + rd_kafka_topic_partition_list_t *partitions = + reply->rko_u.leaders.partitions; /* Possibly NULL (on err) */ + rd_kafka_op_t *rko_fanout = reply->rko_u.leaders.opaque; + rd_kafka_topic_partition_t *rktpar; + rd_kafka_topic_partition_list_t *offsets; + const struct rd_kafka_partition_leader *leader; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_DeleteRecordsRequest, + rd_kafka_DeleteRecordsResponse_parse, + }; + int i; + + rd_assert((rko_fanout->rko_type & ~RD_KAFKA_OP_FLAGMASK) == + RD_KAFKA_OP_ADMIN_FANOUT); + + if (err == RD_KAFKA_RESP_ERR__DESTROY) + goto err; + + /* Requested offsets */ + offsets = rd_list_elem(&rko_fanout->rko_u.admin_request.args, 0); + + /* Update the error field of each partition from the + * leader-queried partition list so that ERR_UNKNOWN_TOPIC_OR_PART + * and similar are propagated, since those partitions are not + * included in the leaders list. */ + RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) { + rd_kafka_topic_partition_t *rktpar2; + + if (!rktpar->err) + continue; + + rktpar2 = rd_kafka_topic_partition_list_find( + offsets, rktpar->topic, rktpar->partition); + rd_assert(rktpar2); + rktpar2->err = rktpar->err; + } + + + if (err) { + err: + rd_kafka_admin_result_fail( + rko_fanout, err, "Failed to query partition leaders: %s", + err == RD_KAFKA_RESP_ERR__NOENT ? "No leaders found" + : rd_kafka_err2str(err)); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + return RD_KAFKA_OP_RES_HANDLED; + } + + /* The response lists is one element deep and that element is a + * rd_kafka_topic_partition_list_t with the results of the deletes. */ + rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, 1, + rd_kafka_topic_partition_list_destroy_free); + rd_list_add(&rko_fanout->rko_u.admin_request.fanout.results, + rd_kafka_topic_partition_list_copy(offsets)); + + rko_fanout->rko_u.admin_request.fanout.outstanding = + rd_list_cnt(leaders); + + rd_assert(rd_list_cnt(leaders) > 0); + + /* For each leader send a request for its partitions */ + RD_LIST_FOREACH(leader, leaders, i) { + rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_DELETERECORDS, + RD_KAFKA_EVENT_DELETERECORDS_RESULT, &cbs, + &rko_fanout->rko_u.admin_request.options, rk->rk_ops); + rko->rko_u.admin_request.fanout_parent = rko_fanout; + rko->rko_u.admin_request.broker_id = leader->rkb->rkb_nodeid; + + rd_kafka_topic_partition_list_sort_by_topic(leader->partitions); + + rd_list_init(&rko->rko_u.admin_request.args, 1, + rd_kafka_topic_partition_list_destroy_free); + rd_list_add( + &rko->rko_u.admin_request.args, + rd_kafka_topic_partition_list_copy(leader->partitions)); + + /* Enqueue op for admin_worker() to transition to next state */ + rd_kafka_q_enq(rk->rk_ops, rko); + } + + return RD_KAFKA_OP_RES_HANDLED; +} + + +void rd_kafka_DeleteRecords(rd_kafka_t *rk, + rd_kafka_DeleteRecords_t **del_records, + size_t del_record_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko_fanout; + static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { + rd_kafka_DeleteRecords_response_merge, + rd_kafka_topic_partition_list_copy_opaque, + }; + const rd_kafka_topic_partition_list_t *offsets; + rd_kafka_topic_partition_list_t *copied_offsets; + + rd_assert(rkqu); + + rko_fanout = rd_kafka_admin_fanout_op_new( + rk, RD_KAFKA_OP_DELETERECORDS, RD_KAFKA_EVENT_DELETERECORDS_RESULT, + &fanout_cbs, options, rkqu->rkqu_q); + + if (del_record_cnt != 1) { + /* We only support one DeleteRecords per call since there + * is no point in passing multiples, but the API still + * needs to be extensible/future-proof. */ + rd_kafka_admin_result_fail(rko_fanout, + RD_KAFKA_RESP_ERR__INVALID_ARG, + "Exactly one DeleteRecords must be " + "passed"); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + return; + } + + offsets = del_records[0]->offsets; + + if (offsets == NULL || offsets->cnt == 0) { + rd_kafka_admin_result_fail(rko_fanout, + RD_KAFKA_RESP_ERR__INVALID_ARG, + "No records to delete"); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + return; + } + + /* Copy offsets list and store it on the request op */ + copied_offsets = rd_kafka_topic_partition_list_copy(offsets); + if (rd_kafka_topic_partition_list_has_duplicates( + copied_offsets, rd_false /*check partition*/)) { + rd_kafka_topic_partition_list_destroy(copied_offsets); + rd_kafka_admin_result_fail(rko_fanout, + RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate partitions not allowed"); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + return; + } + + /* Set default error on each partition so that if any of the partitions + * never get a request sent we have an error to indicate it. */ + rd_kafka_topic_partition_list_set_err(copied_offsets, + RD_KAFKA_RESP_ERR__NOOP); + + rd_list_init(&rko_fanout->rko_u.admin_request.args, 1, + rd_kafka_topic_partition_list_destroy_free); + rd_list_add(&rko_fanout->rko_u.admin_request.args, copied_offsets); + + /* Async query for partition leaders */ + rd_kafka_topic_partition_list_query_leaders_async( + rk, copied_offsets, rd_kafka_admin_timeout_remains(rko_fanout), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_DeleteRecords_leaders_queried_cb, rko_fanout); +} + + +/** + * @brief Get the list of offsets from a DeleteRecords result. + * + * The returned \p offsets life-time is the same as the \p result object. + */ +const rd_kafka_topic_partition_list_t *rd_kafka_DeleteRecords_result_offsets( + const rd_kafka_DeleteRecords_result_t *result) { + const rd_kafka_topic_partition_list_t *offsets; + const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; + size_t cnt; + + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_DELETERECORDS); + + cnt = rd_list_cnt(&rko->rko_u.admin_result.results); + + rd_assert(cnt == 1); + + offsets = (const rd_kafka_topic_partition_list_t *)rd_list_elem( + &rko->rko_u.admin_result.results, 0); + + rd_assert(offsets); + + return offsets; +} + +/**@}*/ + +/** + * @name Delete groups + * @{ + * + * + * + * + */ + +rd_kafka_DeleteGroup_t *rd_kafka_DeleteGroup_new(const char *group) { + size_t tsize = strlen(group) + 1; + rd_kafka_DeleteGroup_t *del_group; + + /* Single allocation */ + del_group = rd_malloc(sizeof(*del_group) + tsize); + del_group->group = del_group->data; + memcpy(del_group->group, group, tsize); + + return del_group; +} + +void rd_kafka_DeleteGroup_destroy(rd_kafka_DeleteGroup_t *del_group) { + rd_free(del_group); +} + +static void rd_kafka_DeleteGroup_free(void *ptr) { + rd_kafka_DeleteGroup_destroy(ptr); +} + +void rd_kafka_DeleteGroup_destroy_array(rd_kafka_DeleteGroup_t **del_groups, + size_t del_group_cnt) { + size_t i; + for (i = 0; i < del_group_cnt; i++) + rd_kafka_DeleteGroup_destroy(del_groups[i]); +} + +/** + * @brief Group name comparator for DeleteGroup_t + */ +static int rd_kafka_DeleteGroup_cmp(const void *_a, const void *_b) { + const rd_kafka_DeleteGroup_t *a = _a, *b = _b; + return strcmp(a->group, b->group); +} + +/** + * @brief Allocate a new DeleteGroup and make a copy of \p src + */ +static rd_kafka_DeleteGroup_t * +rd_kafka_DeleteGroup_copy(const rd_kafka_DeleteGroup_t *src) { + return rd_kafka_DeleteGroup_new(src->group); +} + + +/** + * @brief Parse DeleteGroupsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_DeleteGroupsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + int32_t group_cnt; + int i; + rd_kafka_op_t *rko_result = NULL; + + rd_kafka_buf_read_throttle_time(reply); + + /* #group_error_codes */ + rd_kafka_buf_read_i32(reply, &group_cnt); + + if (group_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) + rd_kafka_buf_parse_fail( + reply, + "Received %" PRId32 + " groups in response " + "when only %d were requested", + group_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); + + rko_result = rd_kafka_admin_result_new(rko_req); + rd_list_init(&rko_result->rko_u.admin_result.results, group_cnt, + rd_kafka_group_result_free); + + for (i = 0; i < (int)group_cnt; i++) { + rd_kafkap_str_t kgroup; + int16_t error_code; + rd_kafka_group_result_t *groupres; + + rd_kafka_buf_read_str(reply, &kgroup); + rd_kafka_buf_read_i16(reply, &error_code); + + groupres = rd_kafka_group_result_new( + kgroup.str, RD_KAFKAP_STR_LEN(&kgroup), NULL, + error_code ? rd_kafka_error_new(error_code, NULL) : NULL); + + rd_list_add(&rko_result->rko_u.admin_result.results, groupres); + } + + *rko_resultp = rko_result; + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "DeleteGroups response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + +/** @brief Merge the DeleteGroups response from a single broker + * into the user response list. + */ +void rd_kafka_DeleteGroups_response_merge(rd_kafka_op_t *rko_fanout, + const rd_kafka_op_t *rko_partial) { + const rd_kafka_group_result_t *groupres = NULL; + rd_kafka_group_result_t *newgroupres; + const rd_kafka_DeleteGroup_t *grp = + rko_partial->rko_u.admin_result.opaque; + int orig_pos; + + rd_assert(rko_partial->rko_evtype == + RD_KAFKA_EVENT_DELETEGROUPS_RESULT); + + if (!rko_partial->rko_err) { + /* Proper results. + * We only send one group per request, make sure it matches */ + groupres = + rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); + rd_assert(groupres); + rd_assert(!strcmp(groupres->group, grp->group)); + newgroupres = rd_kafka_group_result_copy(groupres); + } else { + /* Op errored, e.g. timeout */ + newgroupres = rd_kafka_group_result_new( + grp->group, -1, NULL, + rd_kafka_error_new(rko_partial->rko_err, NULL)); + } + + /* As a convenience to the application we insert group result + * in the same order as they were requested. */ + orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, + rd_kafka_DeleteGroup_cmp); + rd_assert(orig_pos != -1); + + /* Make sure result is not already set */ + rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results, + orig_pos) == NULL); + + rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos, + newgroupres); +} + +void rd_kafka_DeleteGroups(rd_kafka_t *rk, + rd_kafka_DeleteGroup_t **del_groups, + size_t del_group_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko_fanout; + rd_list_t dup_list; + size_t i; + static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { + rd_kafka_DeleteGroups_response_merge, + rd_kafka_group_result_copy_opaque, + }; + + rd_assert(rkqu); + + rko_fanout = rd_kafka_admin_fanout_op_new( + rk, RD_KAFKA_OP_DELETEGROUPS, RD_KAFKA_EVENT_DELETEGROUPS_RESULT, + &fanout_cbs, options, rkqu->rkqu_q); + + if (del_group_cnt == 0) { + rd_kafka_admin_result_fail(rko_fanout, + RD_KAFKA_RESP_ERR__INVALID_ARG, + "No groups to delete"); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + return; + } + + /* Copy group list and store it on the request op. + * Maintain original ordering. */ + rd_list_init(&rko_fanout->rko_u.admin_request.args, (int)del_group_cnt, + rd_kafka_DeleteGroup_free); + for (i = 0; i < del_group_cnt; i++) + rd_list_add(&rko_fanout->rko_u.admin_request.args, + rd_kafka_DeleteGroup_copy(del_groups[i])); + + /* Check for duplicates. + * Make a temporary copy of the group list and sort it to check for + * duplicates, we don't want the original list sorted since we want + * to maintain ordering. */ + rd_list_init(&dup_list, + rd_list_cnt(&rko_fanout->rko_u.admin_request.args), NULL); + rd_list_copy_to(&dup_list, &rko_fanout->rko_u.admin_request.args, NULL, + NULL); + rd_list_sort(&dup_list, rd_kafka_DeleteGroup_cmp); + if (rd_list_find_duplicate(&dup_list, rd_kafka_DeleteGroup_cmp)) { + rd_list_destroy(&dup_list); + rd_kafka_admin_result_fail(rko_fanout, + RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate groups not allowed"); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + return; + } + + rd_list_destroy(&dup_list); + + /* Prepare results list where fanned out op's results will be + * accumulated. */ + rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, + (int)del_group_cnt, rd_kafka_group_result_free); + rko_fanout->rko_u.admin_request.fanout.outstanding = (int)del_group_cnt; + + /* Create individual request ops for each group. + * FIXME: A future optimization is to coalesce all groups for a single + * coordinator into one op. */ + for (i = 0; i < del_group_cnt; i++) { + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_DeleteGroupsRequest, + rd_kafka_DeleteGroupsResponse_parse, + }; + rd_kafka_DeleteGroup_t *grp = + rd_list_elem(&rko_fanout->rko_u.admin_request.args, (int)i); + rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_DELETEGROUPS, + RD_KAFKA_EVENT_DELETEGROUPS_RESULT, &cbs, options, + rk->rk_ops); + + rko->rko_u.admin_request.fanout_parent = rko_fanout; + rko->rko_u.admin_request.broker_id = + RD_KAFKA_ADMIN_TARGET_COORDINATOR; + rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; + rko->rko_u.admin_request.coordkey = rd_strdup(grp->group); + + /* Set the group name as the opaque so the fanout worker use it + * to fill in errors. + * References rko_fanout's memory, which will always outlive + * the fanned out op. */ + rd_kafka_AdminOptions_set_opaque( + &rko->rko_u.admin_request.options, grp); + + rd_list_init(&rko->rko_u.admin_request.args, 1, + rd_kafka_DeleteGroup_free); + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_DeleteGroup_copy(del_groups[i])); + + rd_kafka_q_enq(rk->rk_ops, rko); + } +} + + +/** + * @brief Get an array of group results from a DeleteGroups result. + * + * The returned \p groups life-time is the same as the \p result object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_group_result_t **rd_kafka_DeleteGroups_result_groups( + const rd_kafka_DeleteGroups_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, + cntp); +} + + +/**@}*/ + + +/** + * @name Delete consumer group offsets (committed offsets) + * @{ + * + * + * + * + */ + +rd_kafka_DeleteConsumerGroupOffsets_t *rd_kafka_DeleteConsumerGroupOffsets_new( + const char *group, + const rd_kafka_topic_partition_list_t *partitions) { + size_t tsize = strlen(group) + 1; + rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets; + + rd_assert(partitions); + + /* Single allocation */ + del_grpoffsets = rd_malloc(sizeof(*del_grpoffsets) + tsize); + del_grpoffsets->group = del_grpoffsets->data; + memcpy(del_grpoffsets->group, group, tsize); + del_grpoffsets->partitions = + rd_kafka_topic_partition_list_copy(partitions); + + return del_grpoffsets; +} + +void rd_kafka_DeleteConsumerGroupOffsets_destroy( + rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets) { + rd_kafka_topic_partition_list_destroy(del_grpoffsets->partitions); + rd_free(del_grpoffsets); +} + +static void rd_kafka_DeleteConsumerGroupOffsets_free(void *ptr) { + rd_kafka_DeleteConsumerGroupOffsets_destroy(ptr); +} + +void rd_kafka_DeleteConsumerGroupOffsets_destroy_array( + rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets, + size_t del_grpoffsets_cnt) { + size_t i; + for (i = 0; i < del_grpoffsets_cnt; i++) + rd_kafka_DeleteConsumerGroupOffsets_destroy(del_grpoffsets[i]); +} + + +/** + * @brief Allocate a new DeleteGroup and make a copy of \p src + */ +static rd_kafka_DeleteConsumerGroupOffsets_t * +rd_kafka_DeleteConsumerGroupOffsets_copy( + const rd_kafka_DeleteConsumerGroupOffsets_t *src) { + return rd_kafka_DeleteConsumerGroupOffsets_new(src->group, + src->partitions); +} + + +/** + * @brief Parse OffsetDeleteResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_op_t *rko_result; + int16_t ErrorCode; + rd_kafka_topic_partition_list_t *partitions = NULL; + const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets; + + rd_kafka_buf_read_i16(reply, &ErrorCode); + if (ErrorCode) { + rd_snprintf(errstr, errstr_size, + "OffsetDelete response error: %s", + rd_kafka_err2str(ErrorCode)); + return ErrorCode; + } + + rd_kafka_buf_read_throttle_time(reply); + + + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + partitions = rd_kafka_buf_read_topic_partitions(reply, 16, fields); + if (!partitions) { + rd_snprintf(errstr, errstr_size, + "Failed to parse OffsetDeleteResponse partitions"); + return RD_KAFKA_RESP_ERR__BAD_MSG; + } + + + /* Create result op and group_result_t */ + rko_result = rd_kafka_admin_result_new(rko_req); + del_grpoffsets = rd_list_elem(&rko_result->rko_u.admin_result.args, 0); + + rd_list_init(&rko_result->rko_u.admin_result.results, 1, + rd_kafka_group_result_free); + rd_list_add(&rko_result->rko_u.admin_result.results, + rd_kafka_group_result_new(del_grpoffsets->group, -1, + partitions, NULL)); + rd_kafka_topic_partition_list_destroy(partitions); + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + rd_snprintf(errstr, errstr_size, + "OffsetDelete response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + return reply->rkbuf_err; +} + + +void rd_kafka_DeleteConsumerGroupOffsets( + rd_kafka_t *rk, + rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets, + size_t del_grpoffsets_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_OffsetDeleteRequest, + rd_kafka_OffsetDeleteResponse_parse, + }; + rd_kafka_op_t *rko; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS, + RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT, &cbs, options, + rkqu->rkqu_q); + + if (del_grpoffsets_cnt != 1) { + /* For simplicity we only support one single group for now */ + rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Exactly one " + "DeleteConsumerGroupOffsets must " + "be passed"); + rd_kafka_admin_common_worker_destroy(rk, rko, + rd_true /*destroy*/); + return; + } + + + rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_COORDINATOR; + rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; + rko->rko_u.admin_request.coordkey = rd_strdup(del_grpoffsets[0]->group); + + /* Store copy of group on request so the group name can be reached + * from the response parser. */ + rd_list_init(&rko->rko_u.admin_request.args, 1, + rd_kafka_DeleteConsumerGroupOffsets_free); + rd_list_add( + &rko->rko_u.admin_request.args, + rd_kafka_DeleteConsumerGroupOffsets_copy(del_grpoffsets[0])); + + rd_kafka_q_enq(rk->rk_ops, rko); +} + + +/** + * @brief Get an array of group results from a DeleteGroups result. + * + * The returned \p groups life-time is the same as the \p result object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_group_result_t ** +rd_kafka_DeleteConsumerGroupOffsets_result_groups( + const rd_kafka_DeleteConsumerGroupOffsets_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, + cntp); +} + +void rd_kafka_DeleteConsumerGroupOffsets( + rd_kafka_t *rk, + rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets, + size_t del_grpoffsets_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + +/**@}*/ +/** + * @name CreateAcls + * @{ + * + * + * + */ + +const char *rd_kafka_AclOperation_name(rd_kafka_AclOperation_t operation) { + static const char *names[] = {"UNKNOWN", + "ANY", + "ALL", + "READ", + "WRITE", + "CREATE", + "DELETE", + "ALTER", + "DESCRIBE", + "CLUSTER_ACTION", + "DESCRIBE_CONFIGS", + "ALTER_CONFIGS", + "IDEMPOTENT_WRITE"}; + + if ((unsigned int)operation >= + (unsigned int)RD_KAFKA_ACL_OPERATION__CNT) + return "UNSUPPORTED"; + + return names[operation]; +} + +const char * +rd_kafka_AclPermissionType_name(rd_kafka_AclPermissionType_t permission_type) { + static const char *names[] = {"UNKNOWN", "ANY", "DENY", "ALLOW"}; + + if ((unsigned int)permission_type >= + (unsigned int)RD_KAFKA_ACL_PERMISSION_TYPE__CNT) + return "UNSUPPORTED"; + + return names[permission_type]; +} + +static rd_kafka_AclBinding_t * +rd_kafka_AclBinding_new0(rd_kafka_ResourceType_t restype, + const char *name, + rd_kafka_ResourcePatternType_t resource_pattern_type, + const char *principal, + const char *host, + rd_kafka_AclOperation_t operation, + rd_kafka_AclPermissionType_t permission_type, + rd_kafka_resp_err_t err, + const char *errstr) { + rd_kafka_AclBinding_t *acl_binding; + + acl_binding = rd_calloc(1, sizeof(*acl_binding)); + acl_binding->name = name != NULL ? rd_strdup(name) : NULL; + acl_binding->principal = + principal != NULL ? rd_strdup(principal) : NULL; + acl_binding->host = host != NULL ? rd_strdup(host) : NULL; + acl_binding->restype = restype; + acl_binding->resource_pattern_type = resource_pattern_type; + acl_binding->operation = operation; + acl_binding->permission_type = permission_type; + if (err) + acl_binding->error = rd_kafka_error_new(err, "%s", errstr); + + return acl_binding; +} + +rd_kafka_AclBinding_t * +rd_kafka_AclBinding_new(rd_kafka_ResourceType_t restype, + const char *name, + rd_kafka_ResourcePatternType_t resource_pattern_type, + const char *principal, + const char *host, + rd_kafka_AclOperation_t operation, + rd_kafka_AclPermissionType_t permission_type, + char *errstr, + size_t errstr_size) { + if (!name) { + rd_snprintf(errstr, errstr_size, "Invalid resource name"); + return NULL; + } + if (!principal) { + rd_snprintf(errstr, errstr_size, "Invalid principal"); + return NULL; + } + if (!host) { + rd_snprintf(errstr, errstr_size, "Invalid host"); + return NULL; + } + + if (restype == RD_KAFKA_RESOURCE_ANY || + restype <= RD_KAFKA_RESOURCE_UNKNOWN || + restype >= RD_KAFKA_RESOURCE__CNT) { + rd_snprintf(errstr, errstr_size, "Invalid resource type"); + return NULL; + } + + if (resource_pattern_type == RD_KAFKA_RESOURCE_PATTERN_ANY || + resource_pattern_type == RD_KAFKA_RESOURCE_PATTERN_MATCH || + resource_pattern_type <= RD_KAFKA_RESOURCE_PATTERN_UNKNOWN || + resource_pattern_type >= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) { + rd_snprintf(errstr, errstr_size, + "Invalid resource pattern type"); + return NULL; + } + + if (operation == RD_KAFKA_ACL_OPERATION_ANY || + operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN || + operation >= RD_KAFKA_ACL_OPERATION__CNT) { + rd_snprintf(errstr, errstr_size, "Invalid operation"); + return NULL; + } + + if (permission_type == RD_KAFKA_ACL_PERMISSION_TYPE_ANY || + permission_type <= RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN || + permission_type >= RD_KAFKA_ACL_PERMISSION_TYPE__CNT) { + rd_snprintf(errstr, errstr_size, "Invalid permission type"); + return NULL; + } + + return rd_kafka_AclBinding_new0( + restype, name, resource_pattern_type, principal, host, operation, + permission_type, RD_KAFKA_RESP_ERR_NO_ERROR, NULL); +} + +rd_kafka_AclBindingFilter_t *rd_kafka_AclBindingFilter_new( + rd_kafka_ResourceType_t restype, + const char *name, + rd_kafka_ResourcePatternType_t resource_pattern_type, + const char *principal, + const char *host, + rd_kafka_AclOperation_t operation, + rd_kafka_AclPermissionType_t permission_type, + char *errstr, + size_t errstr_size) { + + + if (restype <= RD_KAFKA_RESOURCE_UNKNOWN || + restype >= RD_KAFKA_RESOURCE__CNT) { + rd_snprintf(errstr, errstr_size, "Invalid resource type"); + return NULL; + } + + if (resource_pattern_type <= RD_KAFKA_RESOURCE_PATTERN_UNKNOWN || + resource_pattern_type >= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) { + rd_snprintf(errstr, errstr_size, + "Invalid resource pattern type"); + return NULL; + } + + if (operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN || + operation >= RD_KAFKA_ACL_OPERATION__CNT) { + rd_snprintf(errstr, errstr_size, "Invalid operation"); + return NULL; + } + + if (permission_type <= RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN || + permission_type >= RD_KAFKA_ACL_PERMISSION_TYPE__CNT) { + rd_snprintf(errstr, errstr_size, "Invalid permission type"); + return NULL; + } + + return rd_kafka_AclBinding_new0( + restype, name, resource_pattern_type, principal, host, operation, + permission_type, RD_KAFKA_RESP_ERR_NO_ERROR, NULL); +} + +rd_kafka_ResourceType_t +rd_kafka_AclBinding_restype(const rd_kafka_AclBinding_t *acl) { + return acl->restype; +} + +const char *rd_kafka_AclBinding_name(const rd_kafka_AclBinding_t *acl) { + return acl->name; +} + +const char *rd_kafka_AclBinding_principal(const rd_kafka_AclBinding_t *acl) { + return acl->principal; +} + +const char *rd_kafka_AclBinding_host(const rd_kafka_AclBinding_t *acl) { + return acl->host; +} + +rd_kafka_AclOperation_t +rd_kafka_AclBinding_operation(const rd_kafka_AclBinding_t *acl) { + return acl->operation; +} + +rd_kafka_AclPermissionType_t +rd_kafka_AclBinding_permission_type(const rd_kafka_AclBinding_t *acl) { + return acl->permission_type; +} + +rd_kafka_ResourcePatternType_t +rd_kafka_AclBinding_resource_pattern_type(const rd_kafka_AclBinding_t *acl) { + return acl->resource_pattern_type; +} + +const rd_kafka_error_t * +rd_kafka_AclBinding_error(const rd_kafka_AclBinding_t *acl) { + return acl->error; +} + +/** + * @brief Allocate a new AclBinding and make a copy of \p src + */ +static rd_kafka_AclBinding_t * +rd_kafka_AclBinding_copy(const rd_kafka_AclBinding_t *src) { + rd_kafka_AclBinding_t *dst; + + dst = rd_kafka_AclBinding_new( + src->restype, src->name, src->resource_pattern_type, src->principal, + src->host, src->operation, src->permission_type, NULL, 0); + rd_assert(dst); + return dst; +} + +/** + * @brief Allocate a new AclBindingFilter and make a copy of \p src + */ +static rd_kafka_AclBindingFilter_t * +rd_kafka_AclBindingFilter_copy(const rd_kafka_AclBindingFilter_t *src) { + rd_kafka_AclBindingFilter_t *dst; + + dst = rd_kafka_AclBindingFilter_new( + src->restype, src->name, src->resource_pattern_type, src->principal, + src->host, src->operation, src->permission_type, NULL, 0); + rd_assert(dst); + return dst; +} + +void rd_kafka_AclBinding_destroy(rd_kafka_AclBinding_t *acl_binding) { + if (acl_binding->name) + rd_free(acl_binding->name); + if (acl_binding->principal) + rd_free(acl_binding->principal); + if (acl_binding->host) + rd_free(acl_binding->host); + if (acl_binding->error) + rd_kafka_error_destroy(acl_binding->error); + rd_free(acl_binding); +} + +static void rd_kafka_AclBinding_free(void *ptr) { + rd_kafka_AclBinding_destroy(ptr); +} + + +void rd_kafka_AclBinding_destroy_array(rd_kafka_AclBinding_t **acl_bindings, + size_t acl_bindings_cnt) { + size_t i; + for (i = 0; i < acl_bindings_cnt; i++) + rd_kafka_AclBinding_destroy(acl_bindings[i]); +} + +/** + * @brief Parse CreateAclsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_CreateAclsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_op_t *rko_result = NULL; + int32_t acl_cnt; + int i; + + rd_kafka_buf_read_throttle_time(reply); + + rd_kafka_buf_read_arraycnt(reply, &acl_cnt, 100000); + + if (acl_cnt != rd_list_cnt(&rko_req->rko_u.admin_request.args)) + rd_kafka_buf_parse_fail( + reply, + "Received %" PRId32 + " acls in response, but %d were requested", + acl_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, acl_cnt, + rd_kafka_acl_result_free); + + for (i = 0; i < (int)acl_cnt; i++) { + int16_t error_code; + rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER; + rd_kafka_acl_result_t *acl_res; + char *errstr = NULL; + + rd_kafka_buf_read_i16(reply, &error_code); + + rd_kafka_buf_read_str(reply, &error_msg); + + if (error_code) { + if (RD_KAFKAP_STR_LEN(&error_msg) == 0) + errstr = (char *)rd_kafka_err2str(error_code); + else + RD_KAFKAP_STR_DUPA(&errstr, &error_msg); + } + + acl_res = rd_kafka_acl_result_new( + error_code ? rd_kafka_error_new(error_code, "%s", errstr) + : NULL); + + rd_list_set(&rko_result->rko_u.admin_result.results, i, + acl_res); + } + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "CreateAcls response protocol parse failure: %s", + rd_kafka_err2str(err)); + + return err; +} + +void rd_kafka_CreateAcls(rd_kafka_t *rk, + rd_kafka_AclBinding_t **new_acls, + size_t new_acls_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + size_t i; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_CreateAclsRequest, rd_kafka_CreateAclsResponse_parse}; + + rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_CREATEACLS, + RD_KAFKA_EVENT_CREATEACLS_RESULT, + &cbs, options, rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, (int)new_acls_cnt, + rd_kafka_AclBinding_free); + + for (i = 0; i < new_acls_cnt; i++) + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_AclBinding_copy(new_acls[i])); + + rd_kafka_q_enq(rk->rk_ops, rko); +} + +/** + * @brief Get an array of rd_kafka_acl_result_t from a CreateAcls result. + * + * The returned \p rd_kafka_acl_result_t life-time is the same as the \p result + * object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_acl_result_t ** +rd_kafka_CreateAcls_result_acls(const rd_kafka_CreateAcls_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_acl_results( + (const rd_kafka_op_t *)result, cntp); +} + +/**@}*/ + +/** + * @name DescribeAcls + * @{ + * + * + * + */ + +/** + * @brief Parse DescribeAclsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_DescribeAclsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_op_t *rko_result = NULL; + int32_t res_cnt; + int i; + int j; + rd_kafka_AclBinding_t *acl = NULL; + int16_t error_code; + rd_kafkap_str_t error_msg; + + rd_kafka_buf_read_throttle_time(reply); + + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &error_msg); + + if (error_code) { + if (RD_KAFKAP_STR_LEN(&error_msg) == 0) + errstr = (char *)rd_kafka_err2str(error_code); + else + RD_KAFKAP_STR_DUPA(&errstr, &error_msg); + } + + /* #resources */ + rd_kafka_buf_read_arraycnt(reply, &res_cnt, 100000); + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, + rd_kafka_AclBinding_free); + + for (i = 0; i < (int)res_cnt; i++) { + int8_t res_type = RD_KAFKA_RESOURCE_UNKNOWN; + rd_kafkap_str_t kres_name; + char *res_name; + int8_t resource_pattern_type = + RD_KAFKA_RESOURCE_PATTERN_LITERAL; + int32_t acl_cnt; + + rd_kafka_buf_read_i8(reply, &res_type); + rd_kafka_buf_read_str(reply, &kres_name); + RD_KAFKAP_STR_DUPA(&res_name, &kres_name); + + if (rd_kafka_buf_ApiVersion(reply) >= 1) { + rd_kafka_buf_read_i8(reply, &resource_pattern_type); + } + + if (res_type <= RD_KAFKA_RESOURCE_UNKNOWN || + res_type >= RD_KAFKA_RESOURCE__CNT) { + rd_rkb_log(rkb, LOG_WARNING, "DESCRIBEACLSRESPONSE", + "DescribeAclsResponse returned unknown " + "resource type %d", + res_type); + res_type = RD_KAFKA_RESOURCE_UNKNOWN; + } + if (resource_pattern_type <= + RD_KAFKA_RESOURCE_PATTERN_UNKNOWN || + resource_pattern_type >= + RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) { + rd_rkb_log(rkb, LOG_WARNING, "DESCRIBEACLSRESPONSE", + "DescribeAclsResponse returned unknown " + "resource pattern type %d", + resource_pattern_type); + resource_pattern_type = + RD_KAFKA_RESOURCE_PATTERN_UNKNOWN; + } + + /* #resources */ + rd_kafka_buf_read_arraycnt(reply, &acl_cnt, 100000); + + for (j = 0; j < (int)acl_cnt; j++) { + rd_kafkap_str_t kprincipal; + rd_kafkap_str_t khost; + int8_t operation = RD_KAFKA_ACL_OPERATION_UNKNOWN; + int8_t permission_type = + RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN; + char *principal; + char *host; + + rd_kafka_buf_read_str(reply, &kprincipal); + rd_kafka_buf_read_str(reply, &khost); + rd_kafka_buf_read_i8(reply, &operation); + rd_kafka_buf_read_i8(reply, &permission_type); + RD_KAFKAP_STR_DUPA(&principal, &kprincipal); + RD_KAFKAP_STR_DUPA(&host, &khost); + + if (operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN || + operation >= RD_KAFKA_ACL_OPERATION__CNT) { + rd_rkb_log(rkb, LOG_WARNING, + "DESCRIBEACLSRESPONSE", + "DescribeAclsResponse returned " + "unknown acl operation %d", + operation); + operation = RD_KAFKA_ACL_OPERATION_UNKNOWN; + } + if (permission_type <= + RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN || + permission_type >= + RD_KAFKA_ACL_PERMISSION_TYPE__CNT) { + rd_rkb_log(rkb, LOG_WARNING, + "DESCRIBEACLSRESPONSE", + "DescribeAclsResponse returned " + "unknown acl permission type %d", + permission_type); + permission_type = + RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN; + } + + acl = rd_kafka_AclBinding_new0( + res_type, res_name, resource_pattern_type, + principal, host, operation, permission_type, + RD_KAFKA_RESP_ERR_NO_ERROR, NULL); + + rd_list_add(&rko_result->rko_u.admin_result.results, + acl); + } + } + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "DescribeAcls response protocol parse failure: %s", + rd_kafka_err2str(err)); + + return err; +} + +void rd_kafka_DescribeAcls(rd_kafka_t *rk, + rd_kafka_AclBindingFilter_t *acl_filter, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_DescribeAclsRequest, + rd_kafka_DescribeAclsResponse_parse, + }; + + rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_DESCRIBEACLS, + RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, + &cbs, options, rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, 1, + rd_kafka_AclBinding_free); + + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_AclBindingFilter_copy(acl_filter)); + + rd_kafka_q_enq(rk->rk_ops, rko); +} + +/** + * @brief Get an array of rd_kafka_AclBinding_t from a DescribeAcls result. + * + * The returned \p rd_kafka_AclBinding_t life-time is the same as the \p result + * object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_AclBinding_t ** +rd_kafka_DescribeAcls_result_acls(const rd_kafka_DescribeAcls_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_acl_bindings( + (const rd_kafka_op_t *)result, cntp); +} + +/**@}*/ + +/** + * @name DeleteAcls + * @{ + * + * + * + */ + +/** + * @brief Allocate a new DeleteAcls result response with the given + * \p err error code and \p errstr error message. + */ +const rd_kafka_DeleteAcls_result_response_t * +rd_kafka_DeleteAcls_result_response_new(rd_kafka_resp_err_t err, char *errstr) { + rd_kafka_DeleteAcls_result_response_t *result_response; + + result_response = rd_calloc(1, sizeof(*result_response)); + if (err) + result_response->error = rd_kafka_error_new( + err, "%s", errstr ? errstr : rd_kafka_err2str(err)); + + /* List of int32 lists */ + rd_list_init(&result_response->matching_acls, 0, + rd_kafka_AclBinding_free); + + return result_response; +} + +static void rd_kafka_DeleteAcls_result_response_destroy( + rd_kafka_DeleteAcls_result_response_t *resp) { + if (resp->error) + rd_kafka_error_destroy(resp->error); + rd_list_destroy(&resp->matching_acls); + rd_free(resp); +} + +static void rd_kafka_DeleteAcls_result_response_free(void *ptr) { + rd_kafka_DeleteAcls_result_response_destroy( + (rd_kafka_DeleteAcls_result_response_t *)ptr); +} + +/** + * @brief Get an array of rd_kafka_AclBinding_t from a DescribeAcls result. + * + * The returned \p rd_kafka_AclBinding_t life-time is the same as the \p result + * object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_DeleteAcls_result_response_t ** +rd_kafka_DeleteAcls_result_responses(const rd_kafka_DeleteAcls_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_delete_acl_result_responses( + (const rd_kafka_op_t *)result, cntp); +} + +const rd_kafka_error_t *rd_kafka_DeleteAcls_result_response_error( + const rd_kafka_DeleteAcls_result_response_t *result_response) { + return result_response->error; +} + +const rd_kafka_AclBinding_t **rd_kafka_DeleteAcls_result_response_matching_acls( + const rd_kafka_DeleteAcls_result_response_t *result_response, + size_t *matching_acls_cntp) { + *matching_acls_cntp = result_response->matching_acls.rl_cnt; + return (const rd_kafka_AclBinding_t **) + result_response->matching_acls.rl_elems; +} + +/** + * @brief Parse DeleteAclsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_DeleteAclsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_op_t *rko_result = NULL; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + int32_t res_cnt; + int i; + int j; + + rd_kafka_buf_read_throttle_time(reply); + + /* #responses */ + rd_kafka_buf_read_arraycnt(reply, &res_cnt, 100000); + + rko_result = rd_kafka_admin_result_new(rko_req); + + rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, + rd_kafka_DeleteAcls_result_response_free); + + for (i = 0; i < (int)res_cnt; i++) { + int16_t error_code; + rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER; + char *errstr = NULL; + const rd_kafka_DeleteAcls_result_response_t *result_response; + int32_t matching_acls_cnt; + + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &error_msg); + + if (error_code) { + if (RD_KAFKAP_STR_IS_NULL(&error_msg) || + RD_KAFKAP_STR_LEN(&error_msg) == 0) + errstr = (char *)rd_kafka_err2str(error_code); + else + RD_KAFKAP_STR_DUPA(&errstr, &error_msg); + } + + result_response = + rd_kafka_DeleteAcls_result_response_new(error_code, errstr); + + /* #maching_acls */ + rd_kafka_buf_read_arraycnt(reply, &matching_acls_cnt, 100000); + for (j = 0; j < (int)matching_acls_cnt; j++) { + int16_t acl_error_code; + int8_t res_type = RD_KAFKA_RESOURCE_UNKNOWN; + rd_kafkap_str_t acl_error_msg = + RD_KAFKAP_STR_INITIALIZER; + rd_kafkap_str_t kres_name; + rd_kafkap_str_t khost; + rd_kafkap_str_t kprincipal; + int8_t resource_pattern_type = + RD_KAFKA_RESOURCE_PATTERN_LITERAL; + int8_t operation = RD_KAFKA_ACL_OPERATION_UNKNOWN; + int8_t permission_type = + RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN; + rd_kafka_AclBinding_t *matching_acl; + char *acl_errstr = NULL; + char *res_name; + char *principal; + char *host; + + rd_kafka_buf_read_i16(reply, &acl_error_code); + rd_kafka_buf_read_str(reply, &acl_error_msg); + if (acl_error_code) { + if (RD_KAFKAP_STR_IS_NULL(&acl_error_msg) || + RD_KAFKAP_STR_LEN(&acl_error_msg) == 0) + acl_errstr = (char *)rd_kafka_err2str( + acl_error_code); + else + RD_KAFKAP_STR_DUPA(&acl_errstr, + &acl_error_msg); + } + + rd_kafka_buf_read_i8(reply, &res_type); + rd_kafka_buf_read_str(reply, &kres_name); + + if (rd_kafka_buf_ApiVersion(reply) >= 1) { + rd_kafka_buf_read_i8(reply, + &resource_pattern_type); + } + + rd_kafka_buf_read_str(reply, &kprincipal); + rd_kafka_buf_read_str(reply, &khost); + rd_kafka_buf_read_i8(reply, &operation); + rd_kafka_buf_read_i8(reply, &permission_type); + RD_KAFKAP_STR_DUPA(&res_name, &kres_name); + RD_KAFKAP_STR_DUPA(&principal, &kprincipal); + RD_KAFKAP_STR_DUPA(&host, &khost); + + if (res_type <= RD_KAFKA_RESOURCE_UNKNOWN || + res_type >= RD_KAFKA_RESOURCE__CNT) { + rd_rkb_log(rkb, LOG_WARNING, + "DELETEACLSRESPONSE", + "DeleteAclsResponse returned " + "unknown resource type %d", + res_type); + res_type = RD_KAFKA_RESOURCE_UNKNOWN; + } + if (resource_pattern_type <= + RD_KAFKA_RESOURCE_PATTERN_UNKNOWN || + resource_pattern_type >= + RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) { + rd_rkb_log(rkb, LOG_WARNING, + "DELETEACLSRESPONSE", + "DeleteAclsResponse returned " + "unknown resource pattern type %d", + resource_pattern_type); + resource_pattern_type = + RD_KAFKA_RESOURCE_PATTERN_UNKNOWN; + } + if (operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN || + operation >= RD_KAFKA_ACL_OPERATION__CNT) { + rd_rkb_log(rkb, LOG_WARNING, + "DELETEACLSRESPONSE", + "DeleteAclsResponse returned " + "unknown acl operation %d", + operation); + operation = RD_KAFKA_ACL_OPERATION_UNKNOWN; + } + if (permission_type <= + RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN || + permission_type >= + RD_KAFKA_ACL_PERMISSION_TYPE__CNT) { + rd_rkb_log(rkb, LOG_WARNING, + "DELETEACLSRESPONSE", + "DeleteAclsResponse returned " + "unknown acl permission type %d", + permission_type); + permission_type = + RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN; + } + + matching_acl = rd_kafka_AclBinding_new0( + res_type, res_name, resource_pattern_type, + principal, host, operation, permission_type, + acl_error_code, acl_errstr); + + rd_list_add( + (rd_list_t *)&result_response->matching_acls, + (void *)matching_acl); + } + + rd_list_add(&rko_result->rko_u.admin_result.results, + (void *)result_response); + } + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "DeleteAcls response protocol parse failure: %s", + rd_kafka_err2str(err)); + + return err; +} + + +void rd_kafka_DeleteAcls(rd_kafka_t *rk, + rd_kafka_AclBindingFilter_t **del_acls, + size_t del_acls_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + size_t i; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_DeleteAclsRequest, rd_kafka_DeleteAclsResponse_parse}; + + rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_DELETEACLS, + RD_KAFKA_EVENT_DELETEACLS_RESULT, + &cbs, options, rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, (int)del_acls_cnt, + rd_kafka_AclBinding_free); + + for (i = 0; i < del_acls_cnt; i++) + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_AclBindingFilter_copy(del_acls[i])); + + rd_kafka_q_enq(rk->rk_ops, rko); +} + +/**@}*/ + +/** + * @name Alter consumer group offsets (committed offsets) + * @{ + * + * + * + * + */ + +rd_kafka_AlterConsumerGroupOffsets_t *rd_kafka_AlterConsumerGroupOffsets_new( + const char *group_id, + const rd_kafka_topic_partition_list_t *partitions) { + rd_assert(group_id && partitions); + + size_t tsize = strlen(group_id) + 1; + rd_kafka_AlterConsumerGroupOffsets_t *alter_grpoffsets; + + /* Single allocation */ + alter_grpoffsets = rd_malloc(sizeof(*alter_grpoffsets) + tsize); + alter_grpoffsets->group_id = alter_grpoffsets->data; + memcpy(alter_grpoffsets->group_id, group_id, tsize); + alter_grpoffsets->partitions = + rd_kafka_topic_partition_list_copy(partitions); + + return alter_grpoffsets; +} + +void rd_kafka_AlterConsumerGroupOffsets_destroy( + rd_kafka_AlterConsumerGroupOffsets_t *alter_grpoffsets) { + rd_kafka_topic_partition_list_destroy(alter_grpoffsets->partitions); + rd_free(alter_grpoffsets); +} + +static void rd_kafka_AlterConsumerGroupOffsets_free(void *ptr) { + rd_kafka_AlterConsumerGroupOffsets_destroy(ptr); +} + +void rd_kafka_AlterConsumerGroupOffsets_destroy_array( + rd_kafka_AlterConsumerGroupOffsets_t **alter_grpoffsets, + size_t alter_grpoffsets_cnt) { + size_t i; + for (i = 0; i < alter_grpoffsets_cnt; i++) + rd_kafka_AlterConsumerGroupOffsets_destroy(alter_grpoffsets[i]); +} + +/** + * @brief Allocate a new AlterGroup and make a copy of \p src + */ +static rd_kafka_AlterConsumerGroupOffsets_t * +rd_kafka_AlterConsumerGroupOffsets_copy( + const rd_kafka_AlterConsumerGroupOffsets_t *src) { + return rd_kafka_AlterConsumerGroupOffsets_new(src->group_id, + src->partitions); +} + +/** + * @brief Send a OffsetCommitRequest to \p rkb with the partitions + * in alter_grpoffsets (AlterConsumerGroupOffsets_t*) using + * \p options. + * + */ +static rd_kafka_resp_err_t rd_kafka_AlterConsumerGroupOffsetsRequest( + rd_kafka_broker_t *rkb, + /* (rd_kafka_AlterConsumerGroupOffsets_t*) */ + const rd_list_t *alter_grpoffsets, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + const rd_kafka_AlterConsumerGroupOffsets_t *grpoffsets = + rd_list_elem(alter_grpoffsets, 0); + + rd_assert(rd_list_cnt(alter_grpoffsets) == 1); + + rd_kafka_topic_partition_list_t *offsets = grpoffsets->partitions; + rd_kafka_consumer_group_metadata_t *cgmetadata = + rd_kafka_consumer_group_metadata_new(grpoffsets->group_id); + + int ret = rd_kafka_OffsetCommitRequest( + rkb, cgmetadata, offsets, replyq, resp_cb, opaque, + "rd_kafka_AlterConsumerGroupOffsetsRequest"); + rd_kafka_consumer_group_metadata_destroy(cgmetadata); + if (ret == 0) { + rd_snprintf(errstr, errstr_size, + "At least one topic-partition offset must " + "be >= 0"); + return RD_KAFKA_RESP_ERR__NO_OFFSET; + } + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Parse OffsetCommitResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_AlterConsumerGroupOffsetsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + rd_kafka_t *rk; + rd_kafka_broker_t *rkb; + rd_kafka_op_t *rko_result; + rd_kafka_topic_partition_list_t *partitions = NULL; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + const rd_kafka_AlterConsumerGroupOffsets_t *alter_grpoffsets = + rd_list_elem(&rko_req->rko_u.admin_request.args, 0); + partitions = + rd_kafka_topic_partition_list_copy(alter_grpoffsets->partitions); + + rk = rko_req->rko_rk; + rkb = reply->rkbuf_rkb; + err = rd_kafka_handle_OffsetCommit(rk, rkb, err, reply, NULL, + partitions, rd_true); + + /* Create result op and group_result_t */ + rko_result = rd_kafka_admin_result_new(rko_req); + rd_list_init(&rko_result->rko_u.admin_result.results, 1, + rd_kafka_group_result_free); + rd_list_add(&rko_result->rko_u.admin_result.results, + rd_kafka_group_result_new(alter_grpoffsets->group_id, -1, + partitions, NULL)); + rd_kafka_topic_partition_list_destroy(partitions); + *rko_resultp = rko_result; + + if (reply->rkbuf_err) + rd_snprintf( + errstr, errstr_size, + "AlterConsumerGroupOffset response parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + +void rd_kafka_AlterConsumerGroupOffsets( + rd_kafka_t *rk, + rd_kafka_AlterConsumerGroupOffsets_t **alter_grpoffsets, + size_t alter_grpoffsets_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + int i; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_AlterConsumerGroupOffsetsRequest, + rd_kafka_AlterConsumerGroupOffsetsResponse_parse, + }; + rd_kafka_op_t *rko; + rd_kafka_topic_partition_list_t *copied_offsets; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS, + RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT, &cbs, options, + rkqu->rkqu_q); + + if (alter_grpoffsets_cnt != 1) { + /* For simplicity we only support one single group for now */ + rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Exactly one " + "AlterConsumerGroupOffsets must " + "be passed"); + goto fail; + } + + if (alter_grpoffsets[0]->partitions->cnt == 0) { + rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Non-empty topic partition list " + "must be present"); + goto fail; + } + + for (i = 0; i < alter_grpoffsets[0]->partitions->cnt; i++) { + if (alter_grpoffsets[0]->partitions->elems[i].offset < 0) { + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "All topic-partition offsets " + "must be >= 0"); + goto fail; + } + } + + /* TODO: add group id duplication check if in future more than one + * AlterConsumerGroupOffsets can be passed */ + + /* Copy offsets list for checking duplicated */ + copied_offsets = + rd_kafka_topic_partition_list_copy(alter_grpoffsets[0]->partitions); + if (rd_kafka_topic_partition_list_has_duplicates( + copied_offsets, rd_false /*check partition*/)) { + rd_kafka_topic_partition_list_destroy(copied_offsets); + rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate partitions not allowed"); + goto fail; + } + rd_kafka_topic_partition_list_destroy(copied_offsets); + + rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_COORDINATOR; + rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; + rko->rko_u.admin_request.coordkey = + rd_strdup(alter_grpoffsets[0]->group_id); + + /* Store copy of group on request so the group name can be reached + * from the response parser. */ + rd_list_init(&rko->rko_u.admin_request.args, 1, + rd_kafka_AlterConsumerGroupOffsets_free); + rd_list_add(&rko->rko_u.admin_request.args, + (void *)rd_kafka_AlterConsumerGroupOffsets_copy( + alter_grpoffsets[0])); + + rd_kafka_q_enq(rk->rk_ops, rko); + return; +fail: + rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); +} + + +/** + * @brief Get an array of group results from a AlterGroups result. + * + * The returned \p groups life-time is the same as the \p result object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_group_result_t ** +rd_kafka_AlterConsumerGroupOffsets_result_groups( + const rd_kafka_AlterConsumerGroupOffsets_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, + cntp); +} + +/**@}*/ + + +/**@}*/ + +/** + * @name List consumer group offsets (committed offsets) + * @{ + * + * + * + * + */ + +rd_kafka_ListConsumerGroupOffsets_t *rd_kafka_ListConsumerGroupOffsets_new( + const char *group_id, + const rd_kafka_topic_partition_list_t *partitions) { + size_t tsize = strlen(group_id) + 1; + rd_kafka_ListConsumerGroupOffsets_t *list_grpoffsets; + + rd_assert(group_id); + + /* Single allocation */ + list_grpoffsets = rd_calloc(1, sizeof(*list_grpoffsets) + tsize); + list_grpoffsets->group_id = list_grpoffsets->data; + memcpy(list_grpoffsets->group_id, group_id, tsize); + if (partitions) { + list_grpoffsets->partitions = + rd_kafka_topic_partition_list_copy(partitions); + } + + return list_grpoffsets; +} + +void rd_kafka_ListConsumerGroupOffsets_destroy( + rd_kafka_ListConsumerGroupOffsets_t *list_grpoffsets) { + if (list_grpoffsets->partitions != NULL) { + rd_kafka_topic_partition_list_destroy( + list_grpoffsets->partitions); + } + rd_free(list_grpoffsets); +} + +static void rd_kafka_ListConsumerGroupOffsets_free(void *ptr) { + rd_kafka_ListConsumerGroupOffsets_destroy(ptr); +} + +void rd_kafka_ListConsumerGroupOffsets_destroy_array( + rd_kafka_ListConsumerGroupOffsets_t **list_grpoffsets, + size_t list_grpoffsets_cnt) { + size_t i; + for (i = 0; i < list_grpoffsets_cnt; i++) + rd_kafka_ListConsumerGroupOffsets_destroy(list_grpoffsets[i]); +} + +/** + * @brief Allocate a new ListGroup and make a copy of \p src + */ +static rd_kafka_ListConsumerGroupOffsets_t * +rd_kafka_ListConsumerGroupOffsets_copy( + const rd_kafka_ListConsumerGroupOffsets_t *src) { + return rd_kafka_ListConsumerGroupOffsets_new(src->group_id, + src->partitions); +} + +/** + * @brief Send a OffsetFetchRequest to \p rkb with the partitions + * in list_grpoffsets (ListConsumerGroupOffsets_t*) using + * \p options. + * + */ +static rd_kafka_resp_err_t rd_kafka_ListConsumerGroupOffsetsRequest( + rd_kafka_broker_t *rkb, + /* (rd_kafka_ListConsumerGroupOffsets_t*) */ + const rd_list_t *list_grpoffsets, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + int op_timeout; + rd_bool_t require_stable_offsets; + const rd_kafka_ListConsumerGroupOffsets_t *grpoffsets = + rd_list_elem(list_grpoffsets, 0); + + rd_assert(rd_list_cnt(list_grpoffsets) == 1); + + op_timeout = rd_kafka_confval_get_int(&options->request_timeout); + require_stable_offsets = + rd_kafka_confval_get_int(&options->require_stable_offsets); + rd_kafka_OffsetFetchRequest( + rkb, grpoffsets->group_id, grpoffsets->partitions, + require_stable_offsets, op_timeout, replyq, resp_cb, opaque); + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Parse OffsetFetchResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_ListConsumerGroupOffsetsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const rd_kafka_ListConsumerGroupOffsets_t *list_grpoffsets = + rd_list_elem(&rko_req->rko_u.admin_request.args, 0); + rd_kafka_t *rk; + rd_kafka_broker_t *rkb; + rd_kafka_topic_partition_list_t *offsets = NULL; + rd_kafka_op_t *rko_result; + rd_kafka_resp_err_t err; + + rk = rko_req->rko_rk; + rkb = reply->rkbuf_rkb; + err = rd_kafka_handle_OffsetFetch(rk, rkb, RD_KAFKA_RESP_ERR_NO_ERROR, + reply, NULL, &offsets, rd_false, + rd_true, rd_false); + + if (unlikely(err != RD_KAFKA_RESP_ERR_NO_ERROR)) { + reply->rkbuf_err = err; + goto err; + } + + /* Create result op and group_result_t */ + rko_result = rd_kafka_admin_result_new(rko_req); + rd_list_init(&rko_result->rko_u.admin_result.results, 1, + rd_kafka_group_result_free); + rd_list_add(&rko_result->rko_u.admin_result.results, + rd_kafka_group_result_new(list_grpoffsets->group_id, -1, + offsets, NULL)); + + if (likely(offsets != NULL)) + rd_kafka_topic_partition_list_destroy(offsets); + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; +err: + if (likely(offsets != NULL)) + rd_kafka_topic_partition_list_destroy(offsets); + + rd_snprintf(errstr, errstr_size, + "ListConsumerGroupOffsetsResponse response failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + +void rd_kafka_ListConsumerGroupOffsets( + rd_kafka_t *rk, + rd_kafka_ListConsumerGroupOffsets_t **list_grpoffsets, + size_t list_grpoffsets_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_ListConsumerGroupOffsetsRequest, + rd_kafka_ListConsumerGroupOffsetsResponse_parse, + }; + rd_kafka_op_t *rko; + rd_kafka_topic_partition_list_t *copied_offsets; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS, + RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT, &cbs, options, + rkqu->rkqu_q); + + if (list_grpoffsets_cnt != 1) { + /* For simplicity we only support one single group for now */ + rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Exactly one " + "ListConsumerGroupOffsets must " + "be passed"); + goto fail; + } + + if (list_grpoffsets[0]->partitions != NULL && + list_grpoffsets[0]->partitions->cnt == 0) { + /* Either pass NULL for all the partitions or a non-empty list + */ + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "NULL or " + "non-empty topic partition list must " + "be passed"); + goto fail; + } + + /* TODO: add group id duplication check when implementing KIP-709 */ + if (list_grpoffsets[0]->partitions != NULL) { + /* Copy offsets list for checking duplicated */ + copied_offsets = rd_kafka_topic_partition_list_copy( + list_grpoffsets[0]->partitions); + if (rd_kafka_topic_partition_list_has_duplicates( + copied_offsets, rd_false /*check partition*/)) { + rd_kafka_topic_partition_list_destroy(copied_offsets); + rd_kafka_admin_result_fail( + rko, RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate partitions not allowed"); + goto fail; + } + rd_kafka_topic_partition_list_destroy(copied_offsets); + } + + rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_COORDINATOR; + rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; + rko->rko_u.admin_request.coordkey = + rd_strdup(list_grpoffsets[0]->group_id); + + /* Store copy of group on request so the group name can be reached + * from the response parser. */ + rd_list_init(&rko->rko_u.admin_request.args, 1, + rd_kafka_ListConsumerGroupOffsets_free); + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_ListConsumerGroupOffsets_copy(list_grpoffsets[0])); + + rd_kafka_q_enq(rk->rk_ops, rko); + return; +fail: + rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); +} + + +/** + * @brief Get an array of group results from a ListConsumerGroups result. + * + * The returned \p groups life-time is the same as the \p result object. + * @param cntp is updated to the number of elements in the array. + */ +const rd_kafka_group_result_t **rd_kafka_ListConsumerGroupOffsets_result_groups( + const rd_kafka_ListConsumerGroupOffsets_result_t *result, + size_t *cntp) { + return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, + cntp); +} + +/**@}*/ + +/** + * @name List consumer groups + * @{ + * + * + * + * + */ + +#define CONSUMER_PROTOCOL_TYPE "consumer" + +/** + * @brief Create a new ConsumerGroupListing object. + * + * @param group_id The group id. + * @param is_simple_consumer_group Is the group simple? + * @param state Group state. + */ +static rd_kafka_ConsumerGroupListing_t * +rd_kafka_ConsumerGroupListing_new(const char *group_id, + rd_bool_t is_simple_consumer_group, + rd_kafka_consumer_group_state_t state) { + rd_kafka_ConsumerGroupListing_t *grplist; + grplist = rd_calloc(1, sizeof(*grplist)); + grplist->group_id = rd_strdup(group_id); + grplist->is_simple_consumer_group = is_simple_consumer_group; + grplist->state = state; + return grplist; +} + +/** + * @brief Copy \p grplist ConsumerGroupListing. + * + * @param grplist The group listing to copy. + * @return A new allocated copy of the passed ConsumerGroupListing. + */ +static rd_kafka_ConsumerGroupListing_t *rd_kafka_ConsumerGroupListing_copy( + const rd_kafka_ConsumerGroupListing_t *grplist) { + return rd_kafka_ConsumerGroupListing_new( + grplist->group_id, grplist->is_simple_consumer_group, + grplist->state); +} + +/** + * @brief Same as rd_kafka_ConsumerGroupListing_copy() but suitable for + * rd_list_copy(). The \p opaque is ignored. + */ +static void *rd_kafka_ConsumerGroupListing_copy_opaque(const void *grplist, + void *opaque) { + return rd_kafka_ConsumerGroupListing_copy(grplist); +} + +static void rd_kafka_ConsumerGroupListing_destroy( + rd_kafka_ConsumerGroupListing_t *grplist) { + RD_IF_FREE(grplist->group_id, rd_free); + rd_free(grplist); +} + +static void rd_kafka_ConsumerGroupListing_free(void *ptr) { + rd_kafka_ConsumerGroupListing_destroy(ptr); +} + +const char *rd_kafka_ConsumerGroupListing_group_id( + const rd_kafka_ConsumerGroupListing_t *grplist) { + return grplist->group_id; +} + +int rd_kafka_ConsumerGroupListing_is_simple_consumer_group( + const rd_kafka_ConsumerGroupListing_t *grplist) { + return grplist->is_simple_consumer_group; +} + +rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupListing_state( + const rd_kafka_ConsumerGroupListing_t *grplist) { + return grplist->state; +} + +/** + * @brief Create a new ListConsumerGroupsResult object. + * + * @param valid + * @param errors + */ +static rd_kafka_ListConsumerGroupsResult_t * +rd_kafka_ListConsumerGroupsResult_new(const rd_list_t *valid, + const rd_list_t *errors) { + rd_kafka_ListConsumerGroupsResult_t *res; + res = rd_calloc(1, sizeof(*res)); + rd_list_init_copy(&res->valid, valid); + rd_list_copy_to(&res->valid, valid, + rd_kafka_ConsumerGroupListing_copy_opaque, NULL); + rd_list_init_copy(&res->errors, errors); + rd_list_copy_to(&res->errors, errors, rd_kafka_error_copy_opaque, NULL); + return res; +} + +static void rd_kafka_ListConsumerGroupsResult_destroy( + rd_kafka_ListConsumerGroupsResult_t *res) { + rd_list_destroy(&res->valid); + rd_list_destroy(&res->errors); + rd_free(res); +} + +static void rd_kafka_ListConsumerGroupsResult_free(void *ptr) { + rd_kafka_ListConsumerGroupsResult_destroy(ptr); +} + +/** + * @brief Copy the passed ListConsumerGroupsResult. + * + * @param res the ListConsumerGroupsResult to copy + * @return a newly allocated ListConsumerGroupsResult object. + * + * @sa Release the object with rd_kafka_ListConsumerGroupsResult_destroy(). + */ +static rd_kafka_ListConsumerGroupsResult_t * +rd_kafka_ListConsumerGroupsResult_copy( + const rd_kafka_ListConsumerGroupsResult_t *res) { + return rd_kafka_ListConsumerGroupsResult_new(&res->valid, &res->errors); +} + +/** + * @brief Same as rd_kafka_ListConsumerGroupsResult_copy() but suitable for + * rd_list_copy(). The \p opaque is ignored. + */ +static void *rd_kafka_ListConsumerGroupsResult_copy_opaque(const void *list, + void *opaque) { + return rd_kafka_ListConsumerGroupsResult_copy(list); +} + +/** + * @brief Send ListConsumerGroupsRequest. Admin worker compatible callback. + */ +static rd_kafka_resp_err_t +rd_kafka_admin_ListConsumerGroupsRequest(rd_kafka_broker_t *rkb, + const rd_list_t *groups /*(char*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + int i; + rd_kafka_resp_err_t err; + rd_kafka_error_t *error; + const char **states_str = NULL; + int states_str_cnt = 0; + rd_list_t *states = + rd_kafka_confval_get_ptr(&options->match_consumer_group_states); + + /* Prepare list_options */ + if (states && rd_list_cnt(states) > 0) { + states_str_cnt = rd_list_cnt(states); + states_str = rd_calloc(states_str_cnt, sizeof(*states_str)); + for (i = 0; i < states_str_cnt; i++) { + states_str[i] = rd_kafka_consumer_group_state_name( + rd_list_get_int32(states, i)); + } + } + + error = rd_kafka_ListGroupsRequest(rkb, -1, states_str, states_str_cnt, + replyq, resp_cb, opaque); + + if (states_str) { + rd_free(states_str); + } + + if (error) { + rd_snprintf(errstr, errstr_size, "%s", + rd_kafka_error_string(error)); + err = rd_kafka_error_code(error); + rd_kafka_error_destroy(error); + return err; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Parse ListConsumerGroupsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_ListConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + int i, cnt; + int16_t error_code, api_version; + rd_kafka_op_t *rko_result = NULL; + rd_kafka_error_t *error = NULL; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_list_t valid, errors; + rd_kafka_ListConsumerGroupsResult_t *list_result; + char *group_id = NULL, *group_state = NULL, *proto_type = NULL; + + api_version = rd_kafka_buf_ApiVersion(reply); + if (api_version >= 1) { + rd_kafka_buf_read_throttle_time(reply); + } + rd_kafka_buf_read_i16(reply, &error_code); + if (error_code) { + error = rd_kafka_error_new(error_code, + "Broker [%d" + "] " + "ListConsumerGroups: %s", + rd_kafka_broker_id(rkb), + rd_kafka_err2str(error_code)); + } + + rd_kafka_buf_read_arraycnt(reply, &cnt, RD_KAFKAP_GROUPS_MAX); + rd_list_init(&valid, cnt, rd_kafka_ConsumerGroupListing_free); + rd_list_init(&errors, 8, rd_free); + if (error) + rd_list_add(&errors, error); + + rko_result = rd_kafka_admin_result_new(rko_req); + rd_list_init(&rko_result->rko_u.admin_result.results, 1, + rd_kafka_ListConsumerGroupsResult_free); + + for (i = 0; i < cnt; i++) { + rd_kafkap_str_t GroupId, ProtocolType, + GroupState = RD_ZERO_INIT; + rd_kafka_ConsumerGroupListing_t *group_listing; + rd_bool_t is_simple_consumer_group, is_consumer_protocol_type; + rd_kafka_consumer_group_state_t state = + RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN; + + rd_kafka_buf_read_str(reply, &GroupId); + rd_kafka_buf_read_str(reply, &ProtocolType); + if (api_version >= 4) { + rd_kafka_buf_read_str(reply, &GroupState); + } + rd_kafka_buf_skip_tags(reply); + + group_id = RD_KAFKAP_STR_DUP(&GroupId); + proto_type = RD_KAFKAP_STR_DUP(&ProtocolType); + if (api_version >= 4) { + group_state = RD_KAFKAP_STR_DUP(&GroupState); + state = rd_kafka_consumer_group_state_code(group_state); + } + + is_simple_consumer_group = *proto_type == '\0'; + is_consumer_protocol_type = + !strcmp(proto_type, CONSUMER_PROTOCOL_TYPE); + if (is_simple_consumer_group || is_consumer_protocol_type) { + group_listing = rd_kafka_ConsumerGroupListing_new( + group_id, is_simple_consumer_group, state); + rd_list_add(&valid, group_listing); + } + + rd_free(group_id); + rd_free(group_state); + rd_free(proto_type); + group_id = NULL; + group_state = NULL; + proto_type = NULL; + } + rd_kafka_buf_skip_tags(reply); + +err_parse: + if (group_id) + rd_free(group_id); + if (group_state) + rd_free(group_state); + if (proto_type) + rd_free(proto_type); + + if (reply->rkbuf_err) { + error_code = reply->rkbuf_err; + error = rd_kafka_error_new( + error_code, + "Broker [%d" + "] " + "ListConsumerGroups response protocol parse failure: %s", + rd_kafka_broker_id(rkb), rd_kafka_err2str(error_code)); + rd_list_add(&errors, error); + } + + list_result = rd_kafka_ListConsumerGroupsResult_new(&valid, &errors); + rd_list_add(&rko_result->rko_u.admin_result.results, list_result); + + *rko_resultp = rko_result; + rd_list_destroy(&valid); + rd_list_destroy(&errors); + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** @brief Merge the ListConsumerGroups response from a single broker + * into the user response list. + */ +static void +rd_kafka_ListConsumerGroups_response_merge(rd_kafka_op_t *rko_fanout, + const rd_kafka_op_t *rko_partial) { + int cnt; + rd_kafka_ListConsumerGroupsResult_t *res = NULL; + rd_kafka_ListConsumerGroupsResult_t *newres; + rd_list_t new_valid, new_errors; + + rd_assert(rko_partial->rko_evtype == + RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT); + + cnt = rd_list_cnt(&rko_fanout->rko_u.admin_request.fanout.results); + if (cnt) { + res = rd_list_elem( + &rko_fanout->rko_u.admin_request.fanout.results, 0); + } else { + rd_list_init(&new_valid, 0, rd_kafka_ConsumerGroupListing_free); + rd_list_init(&new_errors, 0, rd_free); + res = rd_kafka_ListConsumerGroupsResult_new(&new_valid, + &new_errors); + rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, 0, + res); + rd_list_destroy(&new_valid); + rd_list_destroy(&new_errors); + } + if (!rko_partial->rko_err) { + int new_valid_count, new_errors_count; + const rd_list_t *new_valid_list, *new_errors_list; + /* Read the partial result and merge the valid groups + * and the errors into the fanout parent result. */ + newres = + rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); + rd_assert(newres); + new_valid_count = rd_list_cnt(&newres->valid); + new_errors_count = rd_list_cnt(&newres->errors); + if (new_valid_count) { + new_valid_list = &newres->valid; + rd_list_grow(&res->valid, new_valid_count); + rd_list_copy_to( + &res->valid, new_valid_list, + rd_kafka_ConsumerGroupListing_copy_opaque, NULL); + } + if (new_errors_count) { + new_errors_list = &newres->errors; + rd_list_grow(&res->errors, new_errors_count); + rd_list_copy_to(&res->errors, new_errors_list, + rd_kafka_error_copy_opaque, NULL); + } + } else { + /* Op errored, e.g. timeout */ + rd_list_add(&res->errors, + rd_kafka_error_new(rko_partial->rko_err, NULL)); + } +} + +void rd_kafka_ListConsumerGroups(rd_kafka_t *rk, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_admin_ListConsumerGroupsRequest, + rd_kafka_ListConsumerGroupsResponse_parse}; + static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { + rd_kafka_ListConsumerGroups_response_merge, + rd_kafka_ListConsumerGroupsResult_copy_opaque, + }; + + rko = rd_kafka_admin_request_op_target_all_new( + rk, RD_KAFKA_OP_LISTCONSUMERGROUPS, + RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT, &cbs, &fanout_cbs, + rd_kafka_ListConsumerGroupsResult_free, options, rkqu->rkqu_q); + rd_kafka_q_enq(rk->rk_ops, rko); +} + +const rd_kafka_ConsumerGroupListing_t ** +rd_kafka_ListConsumerGroups_result_valid( + const rd_kafka_ListConsumerGroups_result_t *result, + size_t *cntp) { + int list_result_cnt; + const rd_kafka_ListConsumerGroupsResult_t *list_result; + const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_LISTCONSUMERGROUPS); + + list_result_cnt = rd_list_cnt(&rko->rko_u.admin_result.results); + rd_assert(list_result_cnt == 1); + list_result = rd_list_elem(&rko->rko_u.admin_result.results, 0); + *cntp = rd_list_cnt(&list_result->valid); + + return (const rd_kafka_ConsumerGroupListing_t **) + list_result->valid.rl_elems; +} + +const rd_kafka_error_t **rd_kafka_ListConsumerGroups_result_errors( + const rd_kafka_ListConsumerGroups_result_t *result, + size_t *cntp) { + int list_result_cnt, error_cnt; + const rd_kafka_ListConsumerGroupsResult_t *list_result; + const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_LISTCONSUMERGROUPS); + + list_result_cnt = rd_list_cnt(&rko->rko_u.admin_result.results); + rd_assert(list_result_cnt == 1); + list_result = rko->rko_u.admin_result.results.rl_elems[0]; + error_cnt = rd_list_cnt(&list_result->errors); + if (error_cnt == 0) { + *cntp = 0; + return NULL; + } + *cntp = error_cnt; + return (const rd_kafka_error_t **)list_result->errors.rl_elems; +} + +/**@}*/ + +/** + * @name Describe consumer groups + * @{ + * + * + * + * + */ + +/** + * @brief Create a new MemberDescription object. This object is used for + * creating a ConsumerGroupDescription. + * + * @param client_id The client id. + * @param consumer_id The consumer id (or member id). + * @param group_instance_id (optional) The group instance id + * for static membership. + * @param host The consumer host. + * @param assignment The member's assigned partitions, or NULL if none. + * + * @return A new allocated MemberDescription object. + * Use rd_kafka_MemberDescription_destroy() to free when done. + */ +static rd_kafka_MemberDescription_t *rd_kafka_MemberDescription_new( + const char *client_id, + const char *consumer_id, + const char *group_instance_id, + const char *host, + const rd_kafka_topic_partition_list_t *assignment) { + rd_kafka_MemberDescription_t *member; + member = rd_calloc(1, sizeof(*member)); + member->client_id = rd_strdup(client_id); + member->consumer_id = rd_strdup(consumer_id); + if (group_instance_id) + member->group_instance_id = rd_strdup(group_instance_id); + member->host = rd_strdup(host); + if (assignment) + member->assignment.partitions = + rd_kafka_topic_partition_list_copy(assignment); + else + member->assignment.partitions = + rd_kafka_topic_partition_list_new(0); + return member; +} + +/** + * @brief Allocate a new MemberDescription, copy of \p src + * and return it. + * + * @param src The MemberDescription to copy. + * @return A new allocated MemberDescription object, + * Use rd_kafka_MemberDescription_destroy() to free when done. + */ +static rd_kafka_MemberDescription_t * +rd_kafka_MemberDescription_copy(const rd_kafka_MemberDescription_t *src) { + return rd_kafka_MemberDescription_new(src->client_id, src->consumer_id, + src->group_instance_id, src->host, + src->assignment.partitions); +} + +/** + * @brief MemberDescription copy, compatible with rd_list_copy_to. + * + * @param elem The MemberDescription to copy- + * @param opaque Not used. + */ +static void *rd_kafka_MemberDescription_list_copy(const void *elem, + void *opaque) { + return rd_kafka_MemberDescription_copy(elem); +} + +static void +rd_kafka_MemberDescription_destroy(rd_kafka_MemberDescription_t *member) { + rd_free(member->client_id); + rd_free(member->consumer_id); + rd_free(member->host); + if (member->group_instance_id != NULL) + rd_free(member->group_instance_id); + if (member->assignment.partitions) + rd_kafka_topic_partition_list_destroy( + member->assignment.partitions); + rd_free(member); +} + +static void rd_kafka_MemberDescription_free(void *member) { + rd_kafka_MemberDescription_destroy(member); +} + +const char *rd_kafka_MemberDescription_client_id( + const rd_kafka_MemberDescription_t *member) { + return member->client_id; +} + +const char *rd_kafka_MemberDescription_group_instance_id( + const rd_kafka_MemberDescription_t *member) { + return member->group_instance_id; +} + +const char *rd_kafka_MemberDescription_consumer_id( + const rd_kafka_MemberDescription_t *member) { + return member->consumer_id; +} + +const char * +rd_kafka_MemberDescription_host(const rd_kafka_MemberDescription_t *member) { + return member->host; +} + +const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_assignment( + const rd_kafka_MemberDescription_t *member) { + return &member->assignment; +} + +const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions( + const rd_kafka_MemberAssignment_t *assignment) { + return assignment->partitions; +} + + +/** + * @brief Create a new ConsumerGroupDescription object. + * + * @param group_id The group id. + * @param is_simple_consumer_group Is the group simple? + * @param members List of members (rd_kafka_MemberDescription_t) of this + * group. + * @param partition_assignor (optional) Chosen assignor. + * @param state Group state. + * @param coordinator (optional) Group coordinator. + * @param error (optional) Error received for this group. + * @return A new allocated ConsumerGroupDescription object. + * Use rd_kafka_ConsumerGroupDescription_destroy() to free when done. + */ +static rd_kafka_ConsumerGroupDescription_t * +rd_kafka_ConsumerGroupDescription_new(const char *group_id, + rd_bool_t is_simple_consumer_group, + const rd_list_t *members, + const char *partition_assignor, + rd_kafka_consumer_group_state_t state, + const rd_kafka_Node_t *coordinator, + rd_kafka_error_t *error) { + rd_kafka_ConsumerGroupDescription_t *grpdesc; + grpdesc = rd_calloc(1, sizeof(*grpdesc)); + grpdesc->group_id = rd_strdup(group_id); + grpdesc->is_simple_consumer_group = is_simple_consumer_group; + if (members == NULL) { + rd_list_init(&grpdesc->members, 0, + rd_kafka_MemberDescription_free); + } else { + rd_list_init_copy(&grpdesc->members, members); + rd_list_copy_to(&grpdesc->members, members, + rd_kafka_MemberDescription_list_copy, NULL); + } + grpdesc->partition_assignor = !partition_assignor + ? (char *)partition_assignor + : rd_strdup(partition_assignor); + grpdesc->state = state; + if (coordinator != NULL) + grpdesc->coordinator = rd_kafka_Node_copy(coordinator); + grpdesc->error = + error != NULL ? rd_kafka_error_new(rd_kafka_error_code(error), "%s", + rd_kafka_error_string(error)) + : NULL; + return grpdesc; +} + +/** + * @brief New instance of ConsumerGroupDescription from an error. + * + * @param group_id The group id. + * @param error The error. + * @return A new allocated ConsumerGroupDescription with the passed error. + */ +static rd_kafka_ConsumerGroupDescription_t * +rd_kafka_ConsumerGroupDescription_new_error(const char *group_id, + rd_kafka_error_t *error) { + return rd_kafka_ConsumerGroupDescription_new( + group_id, rd_false, NULL, NULL, + RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN, NULL, error); +} + +/** + * @brief Copy \p desc ConsumerGroupDescription. + * + * @param desc The group description to copy. + * @return A new allocated copy of the passed ConsumerGroupDescription. + */ +static rd_kafka_ConsumerGroupDescription_t * +rd_kafka_ConsumerGroupDescription_copy( + const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + return rd_kafka_ConsumerGroupDescription_new( + grpdesc->group_id, grpdesc->is_simple_consumer_group, + &grpdesc->members, grpdesc->partition_assignor, grpdesc->state, + grpdesc->coordinator, grpdesc->error); +} + +/** + * @brief Same as rd_kafka_ConsumerGroupDescription_copy() but suitable for + * rd_list_copy(). The \p opaque is ignored. + */ +static void *rd_kafka_ConsumerGroupDescription_copy_opaque(const void *grpdesc, + void *opaque) { + return rd_kafka_ConsumerGroupDescription_copy(grpdesc); +} + +static void rd_kafka_ConsumerGroupDescription_destroy( + rd_kafka_ConsumerGroupDescription_t *grpdesc) { + if (likely(grpdesc->group_id != NULL)) + rd_free(grpdesc->group_id); + rd_list_destroy(&grpdesc->members); + if (likely(grpdesc->partition_assignor != NULL)) + rd_free(grpdesc->partition_assignor); + if (likely(grpdesc->error != NULL)) + rd_kafka_error_destroy(grpdesc->error); + if (grpdesc->coordinator) + rd_kafka_Node_destroy(grpdesc->coordinator); + rd_free(grpdesc); +} + +static void rd_kafka_ConsumerGroupDescription_free(void *ptr) { + rd_kafka_ConsumerGroupDescription_destroy(ptr); +} + +const char *rd_kafka_ConsumerGroupDescription_group_id( + const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + return grpdesc->group_id; +} + +const rd_kafka_error_t *rd_kafka_ConsumerGroupDescription_error( + const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + return grpdesc->error; +} + + +int rd_kafka_ConsumerGroupDescription_is_simple_consumer_group( + const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + return grpdesc->is_simple_consumer_group; +} + + +const char *rd_kafka_ConsumerGroupDescription_partition_assignor( + const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + return grpdesc->partition_assignor; +} + + +rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupDescription_state( + const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + return grpdesc->state; +} + +const rd_kafka_Node_t *rd_kafka_ConsumerGroupDescription_coordinator( + const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + return grpdesc->coordinator; +} + +size_t rd_kafka_ConsumerGroupDescription_member_count( + const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + return rd_list_cnt(&grpdesc->members); +} + +const rd_kafka_MemberDescription_t *rd_kafka_ConsumerGroupDescription_member( + const rd_kafka_ConsumerGroupDescription_t *grpdesc, + size_t idx) { + return (rd_kafka_MemberDescription_t *)rd_list_elem(&grpdesc->members, + idx); +} + +/** + * @brief Group arguments comparator for DescribeConsumerGroups args + */ +static int rd_kafka_DescribeConsumerGroups_cmp(const void *a, const void *b) { + return strcmp(a, b); +} + +/** @brief Merge the DescribeConsumerGroups response from a single broker + * into the user response list. + */ +static void rd_kafka_DescribeConsumerGroups_response_merge( + rd_kafka_op_t *rko_fanout, + const rd_kafka_op_t *rko_partial) { + rd_kafka_ConsumerGroupDescription_t *groupres = NULL; + rd_kafka_ConsumerGroupDescription_t *newgroupres; + const char *grp = rko_partial->rko_u.admin_result.opaque; + int orig_pos; + + rd_assert(rko_partial->rko_evtype == + RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT); + + if (!rko_partial->rko_err) { + /* Proper results. + * We only send one group per request, make sure it matches */ + groupres = + rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); + rd_assert(groupres); + rd_assert(!strcmp(groupres->group_id, grp)); + newgroupres = rd_kafka_ConsumerGroupDescription_copy(groupres); + } else { + /* Op errored, e.g. timeout */ + rd_kafka_error_t *error = + rd_kafka_error_new(rko_partial->rko_err, NULL); + newgroupres = + rd_kafka_ConsumerGroupDescription_new_error(grp, error); + rd_kafka_error_destroy(error); + } + + /* As a convenience to the application we insert group result + * in the same order as they were requested. */ + orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, + rd_kafka_DescribeConsumerGroups_cmp); + rd_assert(orig_pos != -1); + + /* Make sure result is not already set */ + rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results, + orig_pos) == NULL); + + rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos, + newgroupres); +} + + +/** + * @brief Construct and send DescribeConsumerGroupsRequest to \p rkb + * with the groups (char *) in \p groups, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +static rd_kafka_resp_err_t rd_kafka_admin_DescribeConsumerGroupsRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *groups /*(char*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + int i; + char *group; + rd_kafka_resp_err_t err; + int groups_cnt = rd_list_cnt(groups); + rd_kafka_error_t *error = NULL; + char **groups_arr = rd_calloc(groups_cnt, sizeof(*groups_arr)); + + RD_LIST_FOREACH(group, groups, i) { + groups_arr[i] = rd_list_elem(groups, i); + } + error = rd_kafka_DescribeGroupsRequest(rkb, -1, groups_arr, groups_cnt, + replyq, resp_cb, opaque); + rd_free(groups_arr); + + if (error) { + rd_snprintf(errstr, errstr_size, "%s", + rd_kafka_error_string(error)); + err = rd_kafka_error_code(error); + rd_kafka_error_destroy(error); + return err; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Parse DescribeConsumerGroupsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + int nodeid; + uint16_t port; + int16_t api_version; + int32_t cnt; + rd_kafka_op_t *rko_result = NULL; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_Node_t *node = NULL; + rd_kafka_error_t *error = NULL; + char *group_id = NULL, *group_state = NULL, *proto_type = NULL, + *proto = NULL, *host = NULL; + + api_version = rd_kafka_buf_ApiVersion(reply); + if (api_version >= 1) { + rd_kafka_buf_read_throttle_time(reply); + } + + rd_kafka_buf_read_arraycnt(reply, &cnt, 100000); + + rko_result = rd_kafka_admin_result_new(rko_req); + rd_list_init(&rko_result->rko_u.admin_result.results, cnt, + rd_kafka_ConsumerGroupDescription_free); + + rd_kafka_broker_lock(rkb); + nodeid = rkb->rkb_nodeid; + host = rd_strdup(rkb->rkb_origname); + port = rkb->rkb_port; + rd_kafka_broker_unlock(rkb); + + node = rd_kafka_Node_new(nodeid, host, port, NULL); + while (cnt-- > 0) { + int16_t error_code; + rd_kafkap_str_t GroupId, GroupState, ProtocolType, ProtocolData; + rd_bool_t is_simple_consumer_group, is_consumer_protocol_type; + int32_t member_cnt; + rd_list_t members; + rd_kafka_ConsumerGroupDescription_t *grpdesc = NULL; + + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &GroupId); + rd_kafka_buf_read_str(reply, &GroupState); + rd_kafka_buf_read_str(reply, &ProtocolType); + rd_kafka_buf_read_str(reply, &ProtocolData); + rd_kafka_buf_read_arraycnt(reply, &member_cnt, 100000); + + group_id = RD_KAFKAP_STR_DUP(&GroupId); + group_state = RD_KAFKAP_STR_DUP(&GroupState); + proto_type = RD_KAFKAP_STR_DUP(&ProtocolType); + proto = RD_KAFKAP_STR_DUP(&ProtocolData); + + if (error_code) { + error = rd_kafka_error_new( + error_code, "DescribeConsumerGroups: %s", + rd_kafka_err2str(error_code)); + } + + is_simple_consumer_group = *proto_type == '\0'; + is_consumer_protocol_type = + !strcmp(proto_type, CONSUMER_PROTOCOL_TYPE); + if (error == NULL && !is_simple_consumer_group && + !is_consumer_protocol_type) { + error = rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "GroupId %s is not a consumer group (%s).", + group_id, proto_type); + } + + rd_list_init(&members, 0, rd_kafka_MemberDescription_free); + + while (member_cnt-- > 0) { + rd_kafkap_str_t MemberId, ClientId, ClientHost, + GroupInstanceId = RD_KAFKAP_STR_INITIALIZER; + char *member_id, *client_id, *client_host, + *group_instance_id = NULL; + rd_kafkap_bytes_t MemberMetadata, MemberAssignment; + rd_kafka_MemberDescription_t *member; + rd_kafka_topic_partition_list_t *partitions = NULL; + rd_kafka_buf_t *rkbuf; + + rd_kafka_buf_read_str(reply, &MemberId); + if (api_version >= 4) { + rd_kafka_buf_read_str(reply, &GroupInstanceId); + } + rd_kafka_buf_read_str(reply, &ClientId); + rd_kafka_buf_read_str(reply, &ClientHost); + rd_kafka_buf_read_bytes(reply, &MemberMetadata); + rd_kafka_buf_read_bytes(reply, &MemberAssignment); + if (error != NULL) + continue; + + if (RD_KAFKAP_BYTES_LEN(&MemberAssignment) != 0) { + int16_t version; + /* Parse assignment */ + rkbuf = rd_kafka_buf_new_shadow( + MemberAssignment.data, + RD_KAFKAP_BYTES_LEN(&MemberAssignment), + NULL); + /* Protocol parser needs a broker handle + * to log errors on. */ + rkbuf->rkbuf_rkb = rkb; + /* Decreased in rd_kafka_buf_destroy */ + rd_kafka_broker_keep(rkb); + rd_kafka_buf_read_i16(rkbuf, &version); + const rd_kafka_topic_partition_field_t fields[] = + {RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + partitions = rd_kafka_buf_read_topic_partitions( + rkbuf, 0, fields); + rd_kafka_buf_destroy(rkbuf); + if (!partitions) + rd_kafka_buf_parse_fail( + reply, + "Error reading topic partitions"); + } + + member_id = RD_KAFKAP_STR_DUP(&MemberId); + if (!RD_KAFKAP_STR_IS_NULL(&GroupInstanceId)) { + group_instance_id = + RD_KAFKAP_STR_DUP(&GroupInstanceId); + } + client_id = RD_KAFKAP_STR_DUP(&ClientId); + client_host = RD_KAFKAP_STR_DUP(&ClientHost); + + member = rd_kafka_MemberDescription_new( + client_id, member_id, group_instance_id, + client_host, partitions); + if (partitions) + rd_kafka_topic_partition_list_destroy( + partitions); + rd_list_add(&members, member); + rd_free(member_id); + rd_free(group_instance_id); + rd_free(client_id); + rd_free(client_host); + member_id = NULL; + group_instance_id = NULL; + client_id = NULL; + client_host = NULL; + } + + if (api_version >= 3) { + /* TODO: implement KIP-430 */ + int32_t authorized_operations; + rd_kafka_buf_read_i32(reply, &authorized_operations); + } + + if (error == NULL) { + grpdesc = rd_kafka_ConsumerGroupDescription_new( + group_id, is_simple_consumer_group, &members, proto, + rd_kafka_consumer_group_state_code(group_state), + node, error); + } else { + grpdesc = rd_kafka_ConsumerGroupDescription_new_error( + group_id, error); + } + rd_list_add(&rko_result->rko_u.admin_result.results, grpdesc); + if (error) + rd_kafka_error_destroy(error); + rd_list_destroy(&members); + rd_free(group_id); + rd_free(group_state); + rd_free(proto_type); + rd_free(proto); + error = NULL; + group_id = NULL; + group_state = NULL; + proto_type = NULL; + proto = NULL; + } + + if (host) + rd_free(host); + if (node) + rd_kafka_Node_destroy(node); + *rko_resultp = rko_result; + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (group_id) + rd_free(group_id); + if (group_state) + rd_free(group_state); + if (proto_type) + rd_free(proto_type); + if (proto) + rd_free(proto); + if (error) + rd_kafka_error_destroy(error); + if (host) + rd_free(host); + if (node) + rd_kafka_Node_destroy(node); + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf( + errstr, errstr_size, + "DescribeConsumerGroups response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + +void rd_kafka_DescribeConsumerGroups(rd_kafka_t *rk, + const char **groups, + size_t groups_cnt, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko_fanout; + rd_list_t dup_list; + size_t i; + static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { + rd_kafka_DescribeConsumerGroups_response_merge, + rd_kafka_ConsumerGroupDescription_copy_opaque}; + + rd_assert(rkqu); + + rko_fanout = rd_kafka_admin_fanout_op_new( + rk, RD_KAFKA_OP_DESCRIBECONSUMERGROUPS, + RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT, &fanout_cbs, options, + rkqu->rkqu_q); + + if (groups_cnt == 0) { + rd_kafka_admin_result_fail(rko_fanout, + RD_KAFKA_RESP_ERR__INVALID_ARG, + "No groups to describe"); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + return; + } + + /* Copy group list and store it on the request op. + * Maintain original ordering. */ + rd_list_init(&rko_fanout->rko_u.admin_request.args, (int)groups_cnt, + rd_free); + for (i = 0; i < groups_cnt; i++) + rd_list_add(&rko_fanout->rko_u.admin_request.args, + rd_strdup(groups[i])); + + /* Check for duplicates. + * Make a temporary copy of the group list and sort it to check for + * duplicates, we don't want the original list sorted since we want + * to maintain ordering. */ + rd_list_init(&dup_list, + rd_list_cnt(&rko_fanout->rko_u.admin_request.args), NULL); + rd_list_copy_to(&dup_list, &rko_fanout->rko_u.admin_request.args, NULL, + NULL); + rd_list_sort(&dup_list, rd_kafka_DescribeConsumerGroups_cmp); + if (rd_list_find_duplicate(&dup_list, + rd_kafka_DescribeConsumerGroups_cmp)) { + rd_list_destroy(&dup_list); + rd_kafka_admin_result_fail(rko_fanout, + RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate groups not allowed"); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + return; + } + + rd_list_destroy(&dup_list); + + /* Prepare results list where fanned out op's results will be + * accumulated. */ + rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, + (int)groups_cnt, rd_kafka_ConsumerGroupDescription_free); + rko_fanout->rko_u.admin_request.fanout.outstanding = (int)groups_cnt; + + /* Create individual request ops for each group. + * FIXME: A future optimization is to coalesce all groups for a single + * coordinator into one op. */ + for (i = 0; i < groups_cnt; i++) { + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_admin_DescribeConsumerGroupsRequest, + rd_kafka_DescribeConsumerGroupsResponse_parse, + }; + char *grp = + rd_list_elem(&rko_fanout->rko_u.admin_request.args, (int)i); + rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_DESCRIBECONSUMERGROUPS, + RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT, &cbs, options, + rk->rk_ops); + + rko->rko_u.admin_request.fanout_parent = rko_fanout; + rko->rko_u.admin_request.broker_id = + RD_KAFKA_ADMIN_TARGET_COORDINATOR; + rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; + rko->rko_u.admin_request.coordkey = rd_strdup(grp); + + /* Set the group name as the opaque so the fanout worker use it + * to fill in errors. + * References rko_fanout's memory, which will always outlive + * the fanned out op. */ + rd_kafka_AdminOptions_set_opaque( + &rko->rko_u.admin_request.options, grp); + + rd_list_init(&rko->rko_u.admin_request.args, 1, rd_free); + rd_list_add(&rko->rko_u.admin_request.args, + rd_strdup(groups[i])); + + rd_kafka_q_enq(rk->rk_ops, rko); + } +} + +const rd_kafka_ConsumerGroupDescription_t ** +rd_kafka_DescribeConsumerGroups_result_groups( + const rd_kafka_DescribeConsumerGroups_result_t *result, + size_t *cntp) { + const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; + rd_kafka_op_type_t reqtype = + rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; + rd_assert(reqtype == RD_KAFKA_OP_DESCRIBECONSUMERGROUPS); + + *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); + return (const rd_kafka_ConsumerGroupDescription_t **) + rko->rko_u.admin_result.results.rl_elems; +} + +/**@}*/ |