/* * librdkafka - Apache Kafka C library * * Copyright (c) 2018 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "rdkafka_int.h" #include "rdkafka_admin.h" #include "rdkafka_request.h" #include "rdkafka_aux.h" #include /** @brief Descriptive strings for rko_u.admin_request.state */ static const char *rd_kafka_admin_state_desc[] = { "initializing", "waiting for broker", "waiting for controller", "waiting for fanouts", "constructing request", "waiting for response from broker", "waiting for a valid list of brokers to be available"}; /** * @brief Admin API implementation. * * The public Admin API in librdkafka exposes a completely asynchronous * interface where the initial request API (e.g., ..CreateTopics()) * is non-blocking and returns immediately, and the application polls * a ..queue_t for the result. * * The underlying handling of the request is also completely asynchronous * inside librdkafka, for two reasons: * - everything is async in librdkafka so adding something new that isn't * would mean that existing functionality will need to be changed if * it should be able to work simultaneously (such as statistics, timers, * etc). There is no functional value to making the admin API * synchronous internally, even if it would simplify its implementation. * So making it async allows the Admin API to be used with existing * client types in existing applications without breakage. * - the async approach allows multiple outstanding Admin API requests * simultaneously. * * The internal async implementation relies on the following concepts: * - it uses a single rko (rd_kafka_op_t) to maintain state. * - the rko has a callback attached - called the worker callback. * - the worker callback is a small state machine that triggers * async operations (be it controller lookups, timeout timers, * protocol transmits, etc). * - the worker callback is only called on the rdkafka main thread. * - the callback is triggered by different events and sources by enqueuing * the rko on the rdkafka main ops queue. * * * Let's illustrate this with a DeleteTopics example. This might look * daunting, but it boils down to an asynchronous state machine being * triggered by enqueuing the rko op. * * 1. [app thread] The user constructs the input arguments, * including a response rkqu queue and then calls DeleteTopics(). * * 2. [app thread] DeleteTopics() creates a new internal op (rko) of type * RD_KAFKA_OP_DELETETOPICS, makes a **copy** on the rko of all the * input arguments (which allows the caller to free the originals * whenever she likes). The rko op worker callback is set to the * generic admin worker callback rd_kafka_admin_worker() * * 3. [app thread] DeleteTopics() enqueues the rko on librdkafka's main ops * queue that is served by the rdkafka main thread in rd_kafka_thread_main() * * 4. [rdkafka main thread] The rko is dequeued by rd_kafka_q_serve and * the rd_kafka_poll_cb() is called. * * 5. [rdkafka main thread] The rko_type switch case identifies the rko * as an RD_KAFKA_OP_DELETETOPICS which is served by the op callback * set in step 2. * * 6. [rdkafka main thread] The worker callback is called. * After some initial checking of err==ERR__DESTROY events * (which is used to clean up outstanding ops (etc) on termination), * the code hits a state machine using rko_u.admin_request.state. * * 7. [rdkafka main thread] The initial state is RD_KAFKA_ADMIN_STATE_INIT * where the worker validates the user input. * An enqueue once (eonce) object is created - the use of this object * allows having multiple outstanding async functions referencing the * same underlying rko object, but only allowing the first one * to trigger an event. * A timeout timer is set up to trigger the eonce object when the * full options.request_timeout has elapsed. * * 8. [rdkafka main thread] After initialization the state is updated * to WAIT_BROKER or WAIT_CONTROLLER and the code falls through to * looking up a specific broker or the controller broker and waiting for * an active connection. * Both the lookup and the waiting for an active connection are * fully asynchronous, and the same eonce used for the timer is passed * to the rd_kafka_broker_controller_async() or broker_async() functions * which will trigger the eonce when a broker state change occurs. * If the controller is already known (from metadata) and the connection * is up a rkb broker object is returned and the eonce is not used, * skip to step 11. * * 9. [rdkafka main thread] Upon metadata retrieval (which is triggered * automatically by other parts of the code) the controller_id may be * updated in which case the eonce is triggered. * The eonce triggering enqueues the original rko on the rdkafka main * ops queue again and we go to step 8 which will check if the controller * connection is up. * * 10. [broker thread] If the controller_id is now known we wait for * the corresponding broker's connection to come up. This signaling * is performed from the broker thread upon broker state changes * and uses the same eonce. The eonce triggering enqueues the original * rko on the rdkafka main ops queue again we go to back to step 8 * to check if broker is now available. * * 11. [rdkafka main thread] Back in the worker callback we now have an * rkb broker pointer (with reference count increased) for the controller * with the connection up (it might go down while we're referencing it, * but that does not stop us from enqueuing a protocol request). * * 12. [rdkafka main thread] A DeleteTopics protocol request buffer is * constructed using the input parameters saved on the rko and the * buffer is enqueued on the broker's transmit queue. * The buffer is set up to provide the reply buffer on the rdkafka main * ops queue (the same queue we are operating from) with a handler * callback of rd_kafka_admin_handle_response(). * The state is updated to the RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE. * * 13. [broker thread] If the request times out, a response with error code * (ERR__TIMED_OUT) is enqueued. Go to 16. * * 14. [broker thread] If a response is received, the response buffer * is enqueued. Go to 16. * * 15. [rdkafka main thread] The buffer callback (..handle_response()) * is called, which attempts to extract the original rko from the eonce, * but if the eonce has already been triggered by some other source * (the timeout timer) the buffer callback simply returns and does nothing * since the admin request is over and a result (probably a timeout) * has been enqueued for the application. * If the rko was still intact we temporarily set the reply buffer * in the rko struct and call the worker callback. Go to 17. * * 16. [rdkafka main thread] The worker callback is called in state * RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE without a response but with an error. * An error result op is created and enqueued on the application's * provided response rkqu queue. * * 17. [rdkafka main thread] The worker callback is called in state * RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE with a response buffer with no * error set. * The worker calls the response `parse()` callback to parse the response * buffer and populates a result op (rko_result) with the response * information (such as per-topic error codes, etc). * The result op is returned to the worker. * * 18. [rdkafka main thread] The worker enqueues the result op (rko_result) * on the application's provided response rkqu queue. * * 19. [app thread] The application calls rd_kafka_queue_poll() to * receive the result of the operation. The result may have been * enqueued in step 18 thanks to succesful completion, or in any * of the earlier stages when an error was encountered. * * 20. [app thread] The application uses rd_kafka_event_DeleteTopics_result() * to retrieve the request-specific result type. * * 21. Done. * * * * * Fanout (RD_KAFKA_OP_ADMIN_FANOUT) requests * ------------------------------------------ * * Certain Admin APIs may have requests that need to be sent to different * brokers, for instance DeleteRecords which needs to be sent to the leader * for each given partition. * * To achieve this we create a Fanout (RD_KAFKA_OP_ADMIN_FANOUT) op for the * overall Admin API call (e.g., DeleteRecords), and then sub-ops for each * of the per-broker requests. These sub-ops have the proper op type for * the operation they are performing (e.g., RD_KAFKA_OP_DELETERECORDS) * but their replyq does not point back to the application replyq but * rk_ops which is handled by the librdkafka main thread and with the op * callback set to rd_kafka_admin_fanout_worker(). This worker aggregates * the results of each fanned out sub-op and merges the result into a * single result op (RD_KAFKA_OP_ADMIN_RESULT) that is enqueued on the * application's replyq. * * We rely on the timeouts on the fanned out sub-ops rather than the parent * fanout op. * * The parent fanout op must not be destroyed until all fanned out sub-ops * are done (either by success, failure or timeout) and destroyed, and this * is tracked by the rko_u.admin_request.fanout.outstanding counter. * */ /** * @enum Admin request target broker. Must be negative values since the field * used is broker_id. */ enum { RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1, /**< Cluster controller */ RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */ RD_KAFKA_ADMIN_TARGET_FANOUT = -3, /**< This rko is a fanout and * and has no target broker */ RD_KAFKA_ADMIN_TARGET_ALL = -4, /**< All available brokers */ }; /** * @brief Admin op callback types */ typedef rd_kafka_resp_err_t(rd_kafka_admin_Request_cb_t)( rd_kafka_broker_t *rkb, const rd_list_t *configs /*(ConfigResource_t*)*/, rd_kafka_AdminOptions_t *options, char *errstr, size_t errstr_size, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque) RD_WARN_UNUSED_RESULT; typedef rd_kafka_resp_err_t(rd_kafka_admin_Response_parse_cb_t)( rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) RD_WARN_UNUSED_RESULT; typedef void(rd_kafka_admin_fanout_PartialResponse_cb_t)( rd_kafka_op_t *rko_req, const rd_kafka_op_t *rko_partial); typedef rd_list_copy_cb_t rd_kafka_admin_fanout_CopyResult_cb_t; typedef rd_list_copy_cb_t rd_kafka_admin_fanout_CopyArg_cb_t; /** * @struct Request-specific worker callbacks. */ struct rd_kafka_admin_worker_cbs { /**< Protocol request callback which is called * to construct and send the request. */ rd_kafka_admin_Request_cb_t *request; /**< Protocol response parser callback which is called * to translate the response to a rko_result op. */ rd_kafka_admin_Response_parse_cb_t *parse; }; /** * @struct Fanout request callbacks. */ struct rd_kafka_admin_fanout_worker_cbs { /** Merge results from a fanned out request into the user response. */ rd_kafka_admin_fanout_PartialResponse_cb_t *partial_response; /** Copy an accumulated result for storing into the rko_result. */ rd_kafka_admin_fanout_CopyResult_cb_t *copy_result; /** Copy the original arguments, used by target ALL. */ rd_kafka_admin_fanout_CopyArg_cb_t *copy_arg; }; /* Forward declarations */ static void rd_kafka_admin_common_worker_destroy(rd_kafka_t *rk, rd_kafka_op_t *rko, rd_bool_t do_destroy); static void rd_kafka_AdminOptions_init(rd_kafka_t *rk, rd_kafka_AdminOptions_t *options); static void rd_kafka_AdminOptions_copy_to(rd_kafka_AdminOptions_t *dst, const rd_kafka_AdminOptions_t *src); static rd_kafka_op_res_t rd_kafka_admin_worker(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko); static rd_kafka_ConfigEntry_t * rd_kafka_ConfigEntry_copy(const rd_kafka_ConfigEntry_t *src); static void rd_kafka_ConfigEntry_free(void *ptr); static void *rd_kafka_ConfigEntry_list_copy(const void *src, void *opaque); static void rd_kafka_admin_handle_response(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *reply, rd_kafka_buf_t *request, void *opaque); static rd_kafka_op_res_t rd_kafka_admin_fanout_worker(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko_fanout); /** * @name Common admin request code * @{ * * */ /** * @brief Create a new admin_result op based on the request op \p rko_req. * * @remark This moves the rko_req's admin_request.args list from \p rko_req * to the returned rko. The \p rko_req args will be emptied. */ static rd_kafka_op_t *rd_kafka_admin_result_new(rd_kafka_op_t *rko_req) { rd_kafka_op_t *rko_result; rd_kafka_op_t *rko_fanout; if ((rko_fanout = rko_req->rko_u.admin_request.fanout_parent)) { /* If this is a fanned out request the rko_result needs to be * handled by the fanout worker rather than the application. */ rko_result = rd_kafka_op_new_cb(rko_req->rko_rk, RD_KAFKA_OP_ADMIN_RESULT, rd_kafka_admin_fanout_worker); /* Transfer fanout pointer to result */ rko_result->rko_u.admin_result.fanout_parent = rko_fanout; rko_req->rko_u.admin_request.fanout_parent = NULL; /* Set event type based on original fanout ops reqtype, * e.g., ..OP_DELETERECORDS */ rko_result->rko_u.admin_result.reqtype = rko_fanout->rko_u.admin_request.fanout.reqtype; } else { rko_result = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_RESULT); /* If this is fanout request (i.e., the parent OP_ADMIN_FANOUT * to fanned out requests) we need to use the original * application request type. */ if (rko_req->rko_type == RD_KAFKA_OP_ADMIN_FANOUT) rko_result->rko_u.admin_result.reqtype = rko_req->rko_u.admin_request.fanout.reqtype; else rko_result->rko_u.admin_result.reqtype = rko_req->rko_type; } rko_result->rko_rk = rko_req->rko_rk; rko_result->rko_u.admin_result.opaque = rd_kafka_confval_get_ptr( &rko_req->rko_u.admin_request.options.opaque); /* Move request arguments (list) from request to result. * This is mainly so that partial_response() knows what arguments * were provided to the response's request it is merging. */ rd_list_move(&rko_result->rko_u.admin_result.args, &rko_req->rko_u.admin_request.args); rko_result->rko_evtype = rko_req->rko_u.admin_request.reply_event_type; return rko_result; } /** * @brief Set error code and error string on admin_result op \p rko. */ static void rd_kafka_admin_result_set_err0(rd_kafka_op_t *rko, rd_kafka_resp_err_t err, const char *fmt, va_list ap) { char buf[512]; rd_vsnprintf(buf, sizeof(buf), fmt, ap); rko->rko_err = err; if (rko->rko_u.admin_result.errstr) rd_free(rko->rko_u.admin_result.errstr); rko->rko_u.admin_result.errstr = rd_strdup(buf); rd_kafka_dbg(rko->rko_rk, ADMIN, "ADMINFAIL", "Admin %s result error: %s", rd_kafka_op2str(rko->rko_u.admin_result.reqtype), rko->rko_u.admin_result.errstr); } /** * @sa rd_kafka_admin_result_set_err0 */ static RD_UNUSED RD_FORMAT(printf, 3, 4) void rd_kafka_admin_result_set_err( rd_kafka_op_t *rko, rd_kafka_resp_err_t err, const char *fmt, ...) { va_list ap; va_start(ap, fmt); rd_kafka_admin_result_set_err0(rko, err, fmt, ap); va_end(ap); } /** * @brief Enqueue admin_result on application's queue. */ static RD_INLINE void rd_kafka_admin_result_enq(rd_kafka_op_t *rko_req, rd_kafka_op_t *rko_result) { rd_kafka_replyq_enq(&rko_req->rko_u.admin_request.replyq, rko_result, rko_req->rko_u.admin_request.replyq.version); } /** * @brief Set request-level error code and string in reply op. * * @remark This function will NOT destroy the \p rko_req, so don't forget to * call rd_kafka_admin_common_worker_destroy() when done with the rko. */ static RD_FORMAT(printf, 3, 4) void rd_kafka_admin_result_fail(rd_kafka_op_t *rko_req, rd_kafka_resp_err_t err, const char *fmt, ...) { va_list ap; rd_kafka_op_t *rko_result; if (!rko_req->rko_u.admin_request.replyq.q) return; rko_result = rd_kafka_admin_result_new(rko_req); va_start(ap, fmt); rd_kafka_admin_result_set_err0(rko_result, err, fmt, ap); va_end(ap); rd_kafka_admin_result_enq(rko_req, rko_result); } /** * @brief Send the admin request contained in \p rko upon receiving * a FindCoordinator response. * * @param opaque Must be an admin request op's eonce (rko_u.admin_request.eonce) * (i.e. created by \c rd_kafka_admin_request_op_new ) * * @remark To be used as a callback for \c rd_kafka_coord_req */ static rd_kafka_resp_err_t rd_kafka_admin_coord_request(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko_ignore, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque) { rd_kafka_t *rk = rkb->rkb_rk; rd_kafka_enq_once_t *eonce = opaque; rd_kafka_op_t *rko; char errstr[512]; rd_kafka_resp_err_t err; rko = rd_kafka_enq_once_del_source_return(eonce, "coordinator request"); if (!rko) /* Admin request has timed out and been destroyed */ return RD_KAFKA_RESP_ERR__DESTROY; rd_kafka_enq_once_add_source(eonce, "coordinator response"); err = rko->rko_u.admin_request.cbs->request( rkb, &rko->rko_u.admin_request.args, &rko->rko_u.admin_request.options, errstr, sizeof(errstr), replyq, rd_kafka_admin_handle_response, eonce); if (err) { rd_kafka_enq_once_del_source(eonce, "coordinator response"); rd_kafka_admin_result_fail( rko, err, "%s worker failed to send request: %s", rd_kafka_op2str(rko->rko_type), errstr); rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); } return err; } /** * @brief Return the topics list from a topic-related result object. */ static const rd_kafka_topic_result_t ** rd_kafka_admin_result_ret_topics(const rd_kafka_op_t *rko, size_t *cntp) { rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_CREATETOPICS || reqtype == RD_KAFKA_OP_DELETETOPICS || reqtype == RD_KAFKA_OP_CREATEPARTITIONS); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_topic_result_t **) rko->rko_u.admin_result.results.rl_elems; } /** * @brief Return the ConfigResource list from a config-related result object. */ static const rd_kafka_ConfigResource_t ** rd_kafka_admin_result_ret_resources(const rd_kafka_op_t *rko, size_t *cntp) { rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_ALTERCONFIGS || reqtype == RD_KAFKA_OP_DESCRIBECONFIGS); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_ConfigResource_t **) rko->rko_u.admin_result.results.rl_elems; } /** * @brief Return the acl result list from a acl-related result object. */ static const rd_kafka_acl_result_t ** rd_kafka_admin_result_ret_acl_results(const rd_kafka_op_t *rko, size_t *cntp) { rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_CREATEACLS); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_acl_result_t **) rko->rko_u.admin_result.results.rl_elems; } /** * @brief Return the acl binding list from a acl-related result object. */ static const rd_kafka_AclBinding_t ** rd_kafka_admin_result_ret_acl_bindings(const rd_kafka_op_t *rko, size_t *cntp) { rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_DESCRIBEACLS); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_AclBinding_t **) rko->rko_u.admin_result.results.rl_elems; } /** * @brief Return the groups list from a group-related result object. */ static const rd_kafka_group_result_t ** rd_kafka_admin_result_ret_groups(const rd_kafka_op_t *rko, size_t *cntp) { rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_DELETEGROUPS || reqtype == RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS || reqtype == RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS || reqtype == RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_group_result_t **) rko->rko_u.admin_result.results.rl_elems; } /** * @brief Return the DeleteAcls response list from a acl-related result object. */ static const rd_kafka_DeleteAcls_result_response_t ** rd_kafka_admin_result_ret_delete_acl_result_responses(const rd_kafka_op_t *rko, size_t *cntp) { rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_DELETEACLS); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_DeleteAcls_result_response_t **) rko->rko_u.admin_result.results.rl_elems; } /** * @brief Create a new admin_request op of type \p optype and sets up the * generic (type independent files). * * The caller shall then populate the admin_request.args list * and enqueue the op on rk_ops for further processing work. * * @param cbs Callbacks, must reside in .data segment. * @param options Optional options, may be NULL to use defaults. * * @locks none * @locality application thread */ static rd_kafka_op_t * rd_kafka_admin_request_op_new(rd_kafka_t *rk, rd_kafka_op_type_t optype, rd_kafka_event_type_t reply_event_type, const struct rd_kafka_admin_worker_cbs *cbs, const rd_kafka_AdminOptions_t *options, rd_kafka_q_t *rkq) { rd_kafka_op_t *rko; rd_assert(rk); rd_assert(rkq); rd_assert(cbs); rko = rd_kafka_op_new_cb(rk, optype, rd_kafka_admin_worker); rko->rko_u.admin_request.reply_event_type = reply_event_type; rko->rko_u.admin_request.cbs = (struct rd_kafka_admin_worker_cbs *)cbs; /* Make a copy of the options */ if (options) rd_kafka_AdminOptions_copy_to(&rko->rko_u.admin_request.options, options); else rd_kafka_AdminOptions_init(rk, &rko->rko_u.admin_request.options); /* Default to controller */ rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; /* Calculate absolute timeout */ rko->rko_u.admin_request.abs_timeout = rd_timeout_init(rd_kafka_confval_get_int( &rko->rko_u.admin_request.options.request_timeout)); /* Setup enq-op-once, which is triggered by either timer code * or future wait-controller code. */ rko->rko_u.admin_request.eonce = rd_kafka_enq_once_new(rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0)); /* The timer itself must be started from the rdkafka main thread, * not here. */ /* Set up replyq */ rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0); rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_INIT; return rko; } /** * @returns the remaining request timeout in milliseconds. */ static RD_INLINE int rd_kafka_admin_timeout_remains(rd_kafka_op_t *rko) { return rd_timeout_remains(rko->rko_u.admin_request.abs_timeout); } /** * @returns the remaining request timeout in microseconds. */ static RD_INLINE rd_ts_t rd_kafka_admin_timeout_remains_us(rd_kafka_op_t *rko) { return rd_timeout_remains_us(rko->rko_u.admin_request.abs_timeout); } /** * @brief Timer timeout callback for the admin rko's eonce object. */ static void rd_kafka_admin_eonce_timeout_cb(rd_kafka_timers_t *rkts, void *arg) { rd_kafka_enq_once_t *eonce = arg; rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT, "timeout timer"); } /** * @brief Common worker destroy to be called in destroy: label * in worker. */ static void rd_kafka_admin_common_worker_destroy(rd_kafka_t *rk, rd_kafka_op_t *rko, rd_bool_t do_destroy) { int timer_was_stopped; /* Free resources for this op. */ timer_was_stopped = rd_kafka_timer_stop( &rk->rk_timers, &rko->rko_u.admin_request.tmr, rd_true); if (rko->rko_u.admin_request.eonce) { /* Remove the stopped timer's eonce reference since its * callback will not have fired if we stopped the timer. */ if (timer_was_stopped) rd_kafka_enq_once_del_source( rko->rko_u.admin_request.eonce, "timeout timer"); /* This is thread-safe to do even if there are outstanding * timers or wait-controller references to the eonce * since they only hold direct reference to the eonce, * not the rko (the eonce holds a reference to the rko but * it is cleared here). */ rd_kafka_enq_once_destroy(rko->rko_u.admin_request.eonce); rko->rko_u.admin_request.eonce = NULL; } if (do_destroy) rd_kafka_op_destroy(rko); } /** * @brief Asynchronously look up a broker. * To be called repeatedly from each invocation of the worker * when in state RD_KAFKA_ADMIN_STATE_WAIT_BROKER until * a valid rkb is returned. * * @returns the broker rkb with refcount increased, or NULL if not yet * available. */ static rd_kafka_broker_t *rd_kafka_admin_common_get_broker(rd_kafka_t *rk, rd_kafka_op_t *rko, int32_t broker_id) { rd_kafka_broker_t *rkb; rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up broker %" PRId32, rd_kafka_op2str(rko->rko_type), broker_id); /* Since we're iterating over this broker_async() call * (asynchronously) until a broker is availabe (or timeout) * we need to re-enable the eonce to be triggered again (which * is not necessary the first time we get here, but there * is no harm doing it then either). */ rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce, rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0)); /* Look up the broker asynchronously, if the broker * is not available the eonce is registered for broker * state changes which will cause our function to be called * again as soon as (any) broker state changes. * When we are called again we perform the broker lookup * again and hopefully get an rkb back, otherwise defer a new * async wait. Repeat until success or timeout. */ if (!(rkb = rd_kafka_broker_get_async( rk, broker_id, RD_KAFKA_BROKER_STATE_UP, rko->rko_u.admin_request.eonce))) { /* Broker not available, wait asynchronously * for broker metadata code to trigger eonce. */ return NULL; } rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: broker %" PRId32 " is %s", rd_kafka_op2str(rko->rko_type), broker_id, rkb->rkb_name); return rkb; } /** * @brief Asynchronously look up the controller. * To be called repeatedly from each invocation of the worker * when in state RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER until * a valid rkb is returned. * * @returns the controller rkb with refcount increased, or NULL if not yet * available. */ static rd_kafka_broker_t * rd_kafka_admin_common_get_controller(rd_kafka_t *rk, rd_kafka_op_t *rko) { rd_kafka_broker_t *rkb; rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up controller", rd_kafka_op2str(rko->rko_type)); /* Since we're iterating over this controller_async() call * (asynchronously) until a controller is availabe (or timeout) * we need to re-enable the eonce to be triggered again (which * is not necessary the first time we get here, but there * is no harm doing it then either). */ rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce, rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0)); /* Look up the controller asynchronously, if the controller * is not available the eonce is registered for broker * state changes which will cause our function to be called * again as soon as (any) broker state changes. * When we are called again we perform the controller lookup * again and hopefully get an rkb back, otherwise defer a new * async wait. Repeat until success or timeout. */ if (!(rkb = rd_kafka_broker_controller_async( rk, RD_KAFKA_BROKER_STATE_UP, rko->rko_u.admin_request.eonce))) { /* Controller not available, wait asynchronously * for controller code to trigger eonce. */ return NULL; } rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: controller %s", rd_kafka_op2str(rko->rko_type), rkb->rkb_name); return rkb; } /** * @brief Asynchronously look up current list of broker ids until available. * Bootstrap and logical brokers are excluded from the list. * * To be called repeatedly from each invocation of the worker * when in state RD_KAFKA_ADMIN_STATE_WAIT_BROKER_LIST until * a not-NULL rd_list_t * is returned. * * @param rk Client instance. * @param rko Op containing the admin request eonce to use for the * async callback. * @return List of int32_t with broker nodeids when ready, NULL when * the eonce callback will be called. */ static rd_list_t * rd_kafka_admin_common_brokers_get_nodeids(rd_kafka_t *rk, rd_kafka_op_t *rko) { rd_list_t *broker_ids; rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up brokers", rd_kafka_op2str(rko->rko_type)); /* Since we're iterating over this rd_kafka_brokers_get_nodeids_async() * call (asynchronously) until a nodeids list is available (or timeout), * we need to re-enable the eonce to be triggered again (which * is not necessary the first time we get here, but there * is no harm doing it then either). */ rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce, rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0)); /* Look up the nodeids list asynchronously, if it's * not available the eonce is registered for broker * state changes which will cause our function to be called * again as soon as (any) broker state changes. * When we are called again we perform the same lookup * again and hopefully get a list of nodeids again, * otherwise defer a new async wait. * Repeat until success or timeout. */ if (!(broker_ids = rd_kafka_brokers_get_nodeids_async( rk, rko->rko_u.admin_request.eonce))) { /* nodeids list not available, wait asynchronously * for the eonce to be triggered. */ return NULL; } rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: %d broker(s)", rd_kafka_op2str(rko->rko_type), rd_list_cnt(broker_ids)); return broker_ids; } /** * @brief Handle response from broker by triggering worker callback. * * @param opaque is the eonce from the worker protocol request call. */ static void rd_kafka_admin_handle_response(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *reply, rd_kafka_buf_t *request, void *opaque) { rd_kafka_enq_once_t *eonce = opaque; rd_kafka_op_t *rko; /* From ...add_source("send") */ rko = rd_kafka_enq_once_disable(eonce); if (!rko) { /* The operation timed out and the worker was * dismantled while we were waiting for broker response, * do nothing - everything has been cleaned up. */ rd_kafka_dbg( rk, ADMIN, "ADMIN", "Dropping outdated %sResponse with return code %s", request ? rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey) : "???", rd_kafka_err2str(err)); return; } /* Attach reply buffer to rko for parsing in the worker. */ rd_assert(!rko->rko_u.admin_request.reply_buf); rko->rko_u.admin_request.reply_buf = reply; rko->rko_err = err; if (rko->rko_op_cb(rk, NULL, rko) == RD_KAFKA_OP_RES_HANDLED) rd_kafka_op_destroy(rko); } /** * @brief Generic handler for protocol responses, calls the admin ops' * Response_parse_cb and enqueues the result to the caller's queue. */ static void rd_kafka_admin_response_parse(rd_kafka_op_t *rko) { rd_kafka_resp_err_t err; rd_kafka_op_t *rko_result = NULL; char errstr[512]; if (rko->rko_err) { rd_kafka_admin_result_fail(rko, rko->rko_err, "%s worker request failed: %s", rd_kafka_op2str(rko->rko_type), rd_kafka_err2str(rko->rko_err)); return; } /* Response received. * Let callback parse response and provide result in rko_result * which is then enqueued on the reply queue. */ err = rko->rko_u.admin_request.cbs->parse( rko, &rko_result, rko->rko_u.admin_request.reply_buf, errstr, sizeof(errstr)); if (err) { rd_kafka_admin_result_fail( rko, err, "%s worker failed to parse response: %s", rd_kafka_op2str(rko->rko_type), errstr); return; } rd_assert(rko_result); /* Enqueue result on application queue, we're done. */ rd_kafka_admin_result_enq(rko, rko_result); } /** * @brief Generic handler for coord_req() responses. */ static void rd_kafka_admin_coord_response_parse(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { rd_kafka_op_t *rko_result; rd_kafka_enq_once_t *eonce = opaque; rd_kafka_op_t *rko; char errstr[512]; rko = rd_kafka_enq_once_del_source_return(eonce, "coordinator response"); if (!rko) /* Admin request has timed out and been destroyed */ return; if (err) { rd_kafka_admin_result_fail( rko, err, "%s worker coordinator request failed: %s", rd_kafka_op2str(rko->rko_type), rd_kafka_err2str(err)); rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); return; } err = rko->rko_u.admin_request.cbs->parse(rko, &rko_result, rkbuf, errstr, sizeof(errstr)); if (err) { rd_kafka_admin_result_fail( rko, err, "%s worker failed to parse coordinator %sResponse: %s", rd_kafka_op2str(rko->rko_type), rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), errstr); rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); return; } rd_assert(rko_result); /* Enqueue result on application queue, we're done. */ rd_kafka_admin_result_enq(rko, rko_result); } static void rd_kafka_admin_fanout_op_distribute(rd_kafka_t *rk, rd_kafka_op_t *rko, rd_list_t *nodeids); /** * @brief Common worker state machine handling regardless of request type. * * Tasks: * - Sets up timeout on first call. * - Checks for timeout. * - Checks for and fails on errors. * - Async Controller and broker lookups * - Calls the Request callback * - Calls the parse callback * - Result reply * - Destruction of rko * * rko->rko_err may be one of: * RD_KAFKA_RESP_ERR_NO_ERROR, or * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup, or * RD_KAFKA_RESP_ERR__TIMED_OUT if request has timed out, * or any other error code triggered by other parts of the code. * * @returns a hint to the op code whether the rko should be destroyed or not. */ static rd_kafka_op_res_t rd_kafka_admin_worker(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { const char *name = rd_kafka_op2str(rko->rko_type); rd_ts_t timeout_in; rd_kafka_broker_t *rkb = NULL; rd_kafka_resp_err_t err; rd_list_t *nodeids = NULL; char errstr[512]; /* ADMIN_FANOUT handled by fanout_worker() */ rd_assert((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) != RD_KAFKA_OP_ADMIN_FANOUT); if (rd_kafka_terminating(rk)) { rd_kafka_dbg( rk, ADMIN, name, "%s worker called in state %s: " "handle is terminating: %s", name, rd_kafka_admin_state_desc[rko->rko_u.admin_request.state], rd_kafka_err2str(rko->rko_err)); rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY, "Handle is terminating: %s", rd_kafka_err2str(rko->rko_err)); goto destroy; } if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) { rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY, "Destroyed"); goto destroy; /* rko being destroyed (silent) */ } rd_kafka_dbg(rk, ADMIN, name, "%s worker called in state %s: %s", name, rd_kafka_admin_state_desc[rko->rko_u.admin_request.state], rd_kafka_err2str(rko->rko_err)); rd_assert(thrd_is_current(rko->rko_rk->rk_thread)); /* Check for errors raised asynchronously (e.g., by timer) */ if (rko->rko_err) { rd_kafka_admin_result_fail( rko, rko->rko_err, "Failed while %s: %s", rd_kafka_admin_state_desc[rko->rko_u.admin_request.state], rd_kafka_err2str(rko->rko_err)); goto destroy; } /* Check for timeout */ timeout_in = rd_kafka_admin_timeout_remains_us(rko); if (timeout_in <= 0) { rd_kafka_admin_result_fail( rko, RD_KAFKA_RESP_ERR__TIMED_OUT, "Timed out %s", rd_kafka_admin_state_desc[rko->rko_u.admin_request.state]); goto destroy; } redo: switch (rko->rko_u.admin_request.state) { case RD_KAFKA_ADMIN_STATE_INIT: { int32_t broker_id; /* First call. */ /* Set up timeout timer. */ rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce, "timeout timer"); rd_kafka_timer_start_oneshot( &rk->rk_timers, &rko->rko_u.admin_request.tmr, rd_true, timeout_in, rd_kafka_admin_eonce_timeout_cb, rko->rko_u.admin_request.eonce); /* Use explicitly specified broker_id, if available. */ broker_id = (int32_t)rd_kafka_confval_get_int( &rko->rko_u.admin_request.options.broker); if (broker_id != -1) { rd_kafka_dbg(rk, ADMIN, name, "%s using explicitly " "set broker id %" PRId32 " rather than %" PRId32, name, broker_id, rko->rko_u.admin_request.broker_id); rko->rko_u.admin_request.broker_id = broker_id; } else { /* Default to controller */ broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; } /* Resolve target broker(s) */ switch (rko->rko_u.admin_request.broker_id) { case RD_KAFKA_ADMIN_TARGET_CONTROLLER: /* Controller */ rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER; goto redo; /* Trigger next state immediately */ case RD_KAFKA_ADMIN_TARGET_COORDINATOR: /* Group (or other) coordinator */ rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE; rd_kafka_enq_once_add_source( rko->rko_u.admin_request.eonce, "coordinator request"); rd_kafka_coord_req( rk, rko->rko_u.admin_request.coordtype, rko->rko_u.admin_request.coordkey, rd_kafka_admin_coord_request, NULL, 0 /* no delay*/, rd_kafka_admin_timeout_remains(rko), RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_admin_coord_response_parse, rko->rko_u.admin_request.eonce); /* Wait asynchronously for broker response, which will * trigger the eonce and worker to be called again. */ return RD_KAFKA_OP_RES_KEEP; case RD_KAFKA_ADMIN_TARGET_ALL: /* All brokers */ rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_BROKER_LIST; goto redo; /* Trigger next state immediately */ case RD_KAFKA_ADMIN_TARGET_FANOUT: /* Shouldn't come here, fanouts are handled by * fanout_worker() */ RD_NOTREACHED(); return RD_KAFKA_OP_RES_KEEP; default: /* Specific broker */ rd_assert(rko->rko_u.admin_request.broker_id >= 0); rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_BROKER; goto redo; /* Trigger next state immediately */ } } case RD_KAFKA_ADMIN_STATE_WAIT_BROKER: /* Broker lookup */ if (!(rkb = rd_kafka_admin_common_get_broker( rk, rko, rko->rko_u.admin_request.broker_id))) { /* Still waiting for broker to become available */ return RD_KAFKA_OP_RES_KEEP; } rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST; goto redo; case RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER: if (!(rkb = rd_kafka_admin_common_get_controller(rk, rko))) { /* Still waiting for controller to become available. */ return RD_KAFKA_OP_RES_KEEP; } rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST; goto redo; case RD_KAFKA_ADMIN_STATE_WAIT_BROKER_LIST: /* Wait for a valid list of brokers to be available. */ if (!(nodeids = rd_kafka_admin_common_brokers_get_nodeids(rk, rko))) { /* Still waiting for brokers to become available. */ return RD_KAFKA_OP_RES_KEEP; } rd_kafka_admin_fanout_op_distribute(rk, rko, nodeids); rd_list_destroy(nodeids); rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS; goto redo; case RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS: /* This op can be destroyed, as a new fanout op has been * sent, and the response will be enqueued there. */ goto destroy; case RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST: /* Got broker, send protocol request. */ /* Make sure we're called from a 'goto redo' where * the rkb was set. */ rd_assert(rkb); /* Still need to use the eonce since this worker may * time out while waiting for response from broker, in which * case the broker response will hit an empty eonce (ok). */ rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce, "send"); /* Send request (async) */ err = rko->rko_u.admin_request.cbs->request( rkb, &rko->rko_u.admin_request.args, &rko->rko_u.admin_request.options, errstr, sizeof(errstr), RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_admin_handle_response, rko->rko_u.admin_request.eonce); /* Loose broker refcount from get_broker(), get_controller() */ rd_kafka_broker_destroy(rkb); if (err) { rd_kafka_enq_once_del_source( rko->rko_u.admin_request.eonce, "send"); rd_kafka_admin_result_fail(rko, err, "%s", errstr); goto destroy; } rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE; /* Wait asynchronously for broker response, which will * trigger the eonce and worker to be called again. */ return RD_KAFKA_OP_RES_KEEP; case RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE: rd_kafka_admin_response_parse(rko); goto destroy; } return RD_KAFKA_OP_RES_KEEP; destroy: rd_kafka_admin_common_worker_destroy(rk, rko, rd_false /*don't destroy*/); return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy() */ } /** * @brief Create a new admin_fanout op of type \p req_type and sets up the * generic (type independent files). * * The caller shall then populate the \c admin_fanout.requests list, * initialize the \c admin_fanout.responses list, * set the initial \c admin_fanout.outstanding value, * and enqueue the op on rk_ops for further processing work. * * @param cbs Callbacks, must reside in .data segment. * @param options Optional options, may be NULL to use defaults. * @param rkq is the application reply queue. * * @locks none * @locality application thread */ static rd_kafka_op_t * rd_kafka_admin_fanout_op_new(rd_kafka_t *rk, rd_kafka_op_type_t req_type, rd_kafka_event_type_t reply_event_type, const struct rd_kafka_admin_fanout_worker_cbs *cbs, const rd_kafka_AdminOptions_t *options, rd_kafka_q_t *rkq) { rd_kafka_op_t *rko; rd_assert(rk); rd_assert(rkq); rd_assert(cbs); rko = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_FANOUT); rko->rko_rk = rk; rko->rko_u.admin_request.reply_event_type = reply_event_type; rko->rko_u.admin_request.fanout.cbs = (struct rd_kafka_admin_fanout_worker_cbs *)cbs; /* Make a copy of the options */ if (options) rd_kafka_AdminOptions_copy_to(&rko->rko_u.admin_request.options, options); else rd_kafka_AdminOptions_init(rk, &rko->rko_u.admin_request.options); rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_FANOUT; /* Calculate absolute timeout */ rko->rko_u.admin_request.abs_timeout = rd_timeout_init(rd_kafka_confval_get_int( &rko->rko_u.admin_request.options.request_timeout)); /* Set up replyq */ rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0); rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS; rko->rko_u.admin_request.fanout.reqtype = req_type; return rko; } /** * @brief Duplicate the fanout operation for each nodeid passed and * enqueue each new operation. Use the same fanout_parent as * the passed \p rko. * * @param rk Client instance. * @param rko Operation to distribute to each broker. * @param nodeids List of int32_t with the broker nodeids. * @param rkq * @return rd_kafka_op_t* */ static void rd_kafka_admin_fanout_op_distribute(rd_kafka_t *rk, rd_kafka_op_t *rko, rd_list_t *nodeids) { int i, nodeids_cnt, timeout_remains; rd_kafka_op_t *rko_fanout; rd_kafka_AdminOptions_t *options = &rko->rko_u.admin_request.options; timeout_remains = rd_kafka_admin_timeout_remains(rko); rd_kafka_AdminOptions_set_request_timeout(options, timeout_remains, NULL, 0); nodeids_cnt = rd_list_cnt(nodeids); rko_fanout = rko->rko_u.admin_request.fanout_parent; rko_fanout->rko_u.admin_request.fanout.outstanding = (int)nodeids_cnt; rko->rko_u.admin_request.fanout_parent = NULL; /* Create individual request ops for each node */ for (i = 0; i < nodeids_cnt; i++) { rd_kafka_op_t *rko_dup = rd_kafka_admin_request_op_new( rk, rko->rko_type, rko->rko_u.admin_request.reply_event_type, rko->rko_u.admin_request.cbs, options, rk->rk_ops); rko_dup->rko_u.admin_request.fanout_parent = rko_fanout; rko_dup->rko_u.admin_request.broker_id = rd_list_get_int32(nodeids, i); rd_list_init_copy(&rko_dup->rko_u.admin_request.args, &rko->rko_u.admin_request.args); rd_list_copy_to( &rko_dup->rko_u.admin_request.args, &rko->rko_u.admin_request.args, rko_fanout->rko_u.admin_request.fanout.cbs->copy_arg, NULL); rd_kafka_q_enq(rk->rk_ops, rko_dup); } } /** * @brief Common fanout worker state machine handling regardless of request type * * @param rko Result of a fanned out operation, e.g., DELETERECORDS result. * * Tasks: * - Checks for and responds to client termination * - Polls for fanned out responses * - Calls the partial response callback * - Calls the merge responses callback upon receipt of all partial responses * - Destruction of rko * * rko->rko_err may be one of: * RD_KAFKA_RESP_ERR_NO_ERROR, or * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup. * * @returns a hint to the op code whether the rko should be destroyed or not. */ static rd_kafka_op_res_t rd_kafka_admin_fanout_worker(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_op_t *rko_fanout = rko->rko_u.admin_result.fanout_parent; const char *name = rd_kafka_op2str(rko_fanout->rko_u.admin_request.fanout.reqtype); rd_kafka_op_t *rko_result; RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_ADMIN_RESULT); RD_KAFKA_OP_TYPE_ASSERT(rko_fanout, RD_KAFKA_OP_ADMIN_FANOUT); rd_assert(rko_fanout->rko_u.admin_request.fanout.outstanding > 0); rko_fanout->rko_u.admin_request.fanout.outstanding--; rko->rko_u.admin_result.fanout_parent = NULL; if (rd_kafka_terminating(rk)) { rd_kafka_dbg(rk, ADMIN, name, "%s fanout worker called for fanned out op %s: " "handle is terminating: %s", name, rd_kafka_op2str(rko->rko_type), rd_kafka_err2str(rko_fanout->rko_err)); if (!rko->rko_err) rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY; } rd_kafka_dbg(rk, ADMIN, name, "%s fanout worker called for %s with %d request(s) " "outstanding: %s", name, rd_kafka_op2str(rko->rko_type), rko_fanout->rko_u.admin_request.fanout.outstanding, rd_kafka_err2str(rko_fanout->rko_err)); /* Add partial response to rko_fanout's result list. */ rko_fanout->rko_u.admin_request.fanout.cbs->partial_response(rko_fanout, rko); if (rko_fanout->rko_u.admin_request.fanout.outstanding > 0) /* Wait for outstanding requests to finish */ return RD_KAFKA_OP_RES_HANDLED; rko_result = rd_kafka_admin_result_new(rko_fanout); rd_list_init_copy(&rko_result->rko_u.admin_result.results, &rko_fanout->rko_u.admin_request.fanout.results); rd_list_copy_to(&rko_result->rko_u.admin_result.results, &rko_fanout->rko_u.admin_request.fanout.results, rko_fanout->rko_u.admin_request.fanout.cbs->copy_result, NULL); /* Enqueue result on application queue, we're done. */ rd_kafka_replyq_enq(&rko_fanout->rko_u.admin_request.replyq, rko_result, rko_fanout->rko_u.admin_request.replyq.version); /* FALLTHRU */ if (rko_fanout->rko_u.admin_request.fanout.outstanding == 0) rd_kafka_op_destroy(rko_fanout); return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy(rko) */ } /** * @brief Create a new operation that targets all the brokers. * The operation consists of a fanout parent that is reused and * fanout operation that is duplicated for each broker found. * * @param rk Client instance- * @param optype Operation type. * @param reply_event_type Reply event type. * @param cbs Fanned out op callbacks. * @param fanout_cbs Fanout parent out op callbacks. * @param result_free Callback for freeing the result list. * @param options Operation options. * @param rkq Result queue. * @return The newly created op targeting all the brokers. * * @sa Use rd_kafka_op_destroy() to release it. */ static rd_kafka_op_t *rd_kafka_admin_request_op_target_all_new( rd_kafka_t *rk, rd_kafka_op_type_t optype, rd_kafka_event_type_t reply_event_type, const struct rd_kafka_admin_worker_cbs *cbs, const struct rd_kafka_admin_fanout_worker_cbs *fanout_cbs, void (*result_free)(void *), const rd_kafka_AdminOptions_t *options, rd_kafka_q_t *rkq) { rd_kafka_op_t *rko, *rko_fanout; rko_fanout = rd_kafka_admin_fanout_op_new(rk, optype, reply_event_type, fanout_cbs, options, rkq); rko = rd_kafka_admin_request_op_new(rk, optype, reply_event_type, cbs, options, rk->rk_ops); rko_fanout->rko_u.admin_request.fanout.outstanding = 1; rko->rko_u.admin_request.fanout_parent = rko_fanout; rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_ALL; rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, (int)1, result_free); return rko; } /**@}*/ /** * @name Generic AdminOptions * @{ * * */ rd_kafka_resp_err_t rd_kafka_AdminOptions_set_request_timeout(rd_kafka_AdminOptions_t *options, int timeout_ms, char *errstr, size_t errstr_size) { return rd_kafka_confval_set_type(&options->request_timeout, RD_KAFKA_CONFVAL_INT, &timeout_ms, errstr, errstr_size); } rd_kafka_resp_err_t rd_kafka_AdminOptions_set_operation_timeout(rd_kafka_AdminOptions_t *options, int timeout_ms, char *errstr, size_t errstr_size) { return rd_kafka_confval_set_type(&options->operation_timeout, RD_KAFKA_CONFVAL_INT, &timeout_ms, errstr, errstr_size); } rd_kafka_resp_err_t rd_kafka_AdminOptions_set_validate_only(rd_kafka_AdminOptions_t *options, int true_or_false, char *errstr, size_t errstr_size) { return rd_kafka_confval_set_type(&options->validate_only, RD_KAFKA_CONFVAL_INT, &true_or_false, errstr, errstr_size); } rd_kafka_resp_err_t rd_kafka_AdminOptions_set_incremental(rd_kafka_AdminOptions_t *options, int true_or_false, char *errstr, size_t errstr_size) { rd_snprintf(errstr, errstr_size, "Incremental updates currently not supported, see KIP-248"); return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; return rd_kafka_confval_set_type(&options->incremental, RD_KAFKA_CONFVAL_INT, &true_or_false, errstr, errstr_size); } rd_kafka_resp_err_t rd_kafka_AdminOptions_set_broker(rd_kafka_AdminOptions_t *options, int32_t broker_id, char *errstr, size_t errstr_size) { int ibroker_id = (int)broker_id; return rd_kafka_confval_set_type(&options->broker, RD_KAFKA_CONFVAL_INT, &ibroker_id, errstr, errstr_size); } rd_kafka_error_t *rd_kafka_AdminOptions_set_require_stable_offsets( rd_kafka_AdminOptions_t *options, int true_or_false) { char errstr[512]; rd_kafka_resp_err_t err = rd_kafka_confval_set_type( &options->require_stable_offsets, RD_KAFKA_CONFVAL_INT, &true_or_false, errstr, sizeof(errstr)); return !err ? NULL : rd_kafka_error_new(err, "%s", errstr); } rd_kafka_error_t *rd_kafka_AdminOptions_set_match_consumer_group_states( rd_kafka_AdminOptions_t *options, const rd_kafka_consumer_group_state_t *consumer_group_states, size_t consumer_group_states_cnt) { size_t i; char errstr[512]; rd_kafka_resp_err_t err; rd_list_t *states_list = rd_list_new(0, NULL); rd_list_init_int32(states_list, consumer_group_states_cnt); uint64_t states_bitmask = 0; if (RD_KAFKA_CONSUMER_GROUP_STATE__CNT >= 64) { rd_assert("BUG: cannot handle states with a bitmask anymore"); } for (i = 0; i < consumer_group_states_cnt; i++) { uint64_t state_bit; rd_kafka_consumer_group_state_t state = consumer_group_states[i]; if (state < 0 || state >= RD_KAFKA_CONSUMER_GROUP_STATE__CNT) { rd_list_destroy(states_list); return rd_kafka_error_new( RD_KAFKA_RESP_ERR__INVALID_ARG, "Invalid group state value"); } state_bit = 1 << state; if (states_bitmask & state_bit) { rd_list_destroy(states_list); return rd_kafka_error_new( RD_KAFKA_RESP_ERR__INVALID_ARG, "Duplicate states not allowed"); } else { states_bitmask = states_bitmask | state_bit; rd_list_set_int32(states_list, (int32_t)i, state); } } err = rd_kafka_confval_set_type(&options->match_consumer_group_states, RD_KAFKA_CONFVAL_PTR, states_list, errstr, sizeof(errstr)); if (err) { rd_list_destroy(states_list); } return !err ? NULL : rd_kafka_error_new(err, "%s", errstr); } void rd_kafka_AdminOptions_set_opaque(rd_kafka_AdminOptions_t *options, void *opaque) { rd_kafka_confval_set_type(&options->opaque, RD_KAFKA_CONFVAL_PTR, opaque, NULL, 0); } /** * @brief Initialize and set up defaults for AdminOptions */ static void rd_kafka_AdminOptions_init(rd_kafka_t *rk, rd_kafka_AdminOptions_t *options) { rd_kafka_confval_init_int(&options->request_timeout, "request_timeout", 0, 3600 * 1000, rk->rk_conf.admin.request_timeout_ms); if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS || options->for_api == RD_KAFKA_ADMIN_OP_DELETETOPICS || options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS || options->for_api == RD_KAFKA_ADMIN_OP_DELETERECORDS) rd_kafka_confval_init_int(&options->operation_timeout, "operation_timeout", -1, 3600 * 1000, rk->rk_conf.admin.request_timeout_ms); else rd_kafka_confval_disable(&options->operation_timeout, "operation_timeout"); if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS || options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS || options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS) rd_kafka_confval_init_int(&options->validate_only, "validate_only", 0, 1, 0); else rd_kafka_confval_disable(&options->validate_only, "validate_only"); if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS) rd_kafka_confval_init_int(&options->incremental, "incremental", 0, 1, 0); else rd_kafka_confval_disable(&options->incremental, "incremental"); if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || options->for_api == RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS) rd_kafka_confval_init_int(&options->require_stable_offsets, "require_stable_offsets", 0, 1, 0); else rd_kafka_confval_disable(&options->require_stable_offsets, "require_stable_offsets"); if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || options->for_api == RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS) rd_kafka_confval_init_ptr(&options->match_consumer_group_states, "match_consumer_group_states"); else rd_kafka_confval_disable(&options->match_consumer_group_states, "match_consumer_group_states"); rd_kafka_confval_init_int(&options->broker, "broker", 0, INT32_MAX, -1); rd_kafka_confval_init_ptr(&options->opaque, "opaque"); } /** * @brief Copy contents of \p src to \p dst. * Deep copy every pointer confval. * * @param dst The destination AdminOptions. * @param src The source AdminOptions. */ static void rd_kafka_AdminOptions_copy_to(rd_kafka_AdminOptions_t *dst, const rd_kafka_AdminOptions_t *src) { *dst = *src; if (src->match_consumer_group_states.u.PTR) { char errstr[512]; rd_list_t *states_list_copy = rd_list_copy_preallocated( src->match_consumer_group_states.u.PTR, NULL); rd_kafka_resp_err_t err = rd_kafka_confval_set_type( &dst->match_consumer_group_states, RD_KAFKA_CONFVAL_PTR, states_list_copy, errstr, sizeof(errstr)); rd_assert(!err); } } rd_kafka_AdminOptions_t * rd_kafka_AdminOptions_new(rd_kafka_t *rk, rd_kafka_admin_op_t for_api) { rd_kafka_AdminOptions_t *options; if ((int)for_api < 0 || for_api >= RD_KAFKA_ADMIN_OP__CNT) return NULL; options = rd_calloc(1, sizeof(*options)); options->for_api = for_api; rd_kafka_AdminOptions_init(rk, options); return options; } void rd_kafka_AdminOptions_destroy(rd_kafka_AdminOptions_t *options) { if (options->match_consumer_group_states.u.PTR) { rd_list_destroy(options->match_consumer_group_states.u.PTR); } rd_free(options); } /**@}*/ /** * @name CreateTopics * @{ * * * */ rd_kafka_NewTopic_t *rd_kafka_NewTopic_new(const char *topic, int num_partitions, int replication_factor, char *errstr, size_t errstr_size) { rd_kafka_NewTopic_t *new_topic; if (!topic) { rd_snprintf(errstr, errstr_size, "Invalid topic name"); return NULL; } if (num_partitions < -1 || num_partitions > RD_KAFKAP_PARTITIONS_MAX) { rd_snprintf(errstr, errstr_size, "num_partitions out of " "expected range %d..%d or -1 for broker default", 1, RD_KAFKAP_PARTITIONS_MAX); return NULL; } if (replication_factor < -1 || replication_factor > RD_KAFKAP_BROKERS_MAX) { rd_snprintf(errstr, errstr_size, "replication_factor out of expected range %d..%d", -1, RD_KAFKAP_BROKERS_MAX); return NULL; } new_topic = rd_calloc(1, sizeof(*new_topic)); new_topic->topic = rd_strdup(topic); new_topic->num_partitions = num_partitions; new_topic->replication_factor = replication_factor; /* List of int32 lists */ rd_list_init(&new_topic->replicas, 0, rd_list_destroy_free); rd_list_prealloc_elems(&new_topic->replicas, 0, num_partitions == -1 ? 0 : num_partitions, 0 /*nozero*/); /* List of ConfigEntrys */ rd_list_init(&new_topic->config, 0, rd_kafka_ConfigEntry_free); return new_topic; } /** * @brief Topic name comparator for NewTopic_t */ static int rd_kafka_NewTopic_cmp(const void *_a, const void *_b) { const rd_kafka_NewTopic_t *a = _a, *b = _b; return strcmp(a->topic, b->topic); } /** * @brief Allocate a new NewTopic and make a copy of \p src */ static rd_kafka_NewTopic_t * rd_kafka_NewTopic_copy(const rd_kafka_NewTopic_t *src) { rd_kafka_NewTopic_t *dst; dst = rd_kafka_NewTopic_new(src->topic, src->num_partitions, src->replication_factor, NULL, 0); rd_assert(dst); rd_list_destroy(&dst->replicas); /* created in .._new() */ rd_list_init_copy(&dst->replicas, &src->replicas); rd_list_copy_to(&dst->replicas, &src->replicas, rd_list_copy_preallocated, NULL); rd_list_init_copy(&dst->config, &src->config); rd_list_copy_to(&dst->config, &src->config, rd_kafka_ConfigEntry_list_copy, NULL); return dst; } void rd_kafka_NewTopic_destroy(rd_kafka_NewTopic_t *new_topic) { rd_list_destroy(&new_topic->replicas); rd_list_destroy(&new_topic->config); rd_free(new_topic->topic); rd_free(new_topic); } static void rd_kafka_NewTopic_free(void *ptr) { rd_kafka_NewTopic_destroy(ptr); } void rd_kafka_NewTopic_destroy_array(rd_kafka_NewTopic_t **new_topics, size_t new_topic_cnt) { size_t i; for (i = 0; i < new_topic_cnt; i++) rd_kafka_NewTopic_destroy(new_topics[i]); } rd_kafka_resp_err_t rd_kafka_NewTopic_set_replica_assignment(rd_kafka_NewTopic_t *new_topic, int32_t partition, int32_t *broker_ids, size_t broker_id_cnt, char *errstr, size_t errstr_size) { rd_list_t *rl; int i; if (new_topic->replication_factor != -1) { rd_snprintf(errstr, errstr_size, "Specifying a replication factor and " "a replica assignment are mutually exclusive"); return RD_KAFKA_RESP_ERR__INVALID_ARG; } else if (new_topic->num_partitions == -1) { rd_snprintf(errstr, errstr_size, "Specifying a default partition count and a " "replica assignment are mutually exclusive"); return RD_KAFKA_RESP_ERR__INVALID_ARG; } /* Replica partitions must be added consecutively starting from 0. */ if (partition != rd_list_cnt(&new_topic->replicas)) { rd_snprintf(errstr, errstr_size, "Partitions must be added in order, " "starting at 0: expecting partition %d, " "not %" PRId32, rd_list_cnt(&new_topic->replicas), partition); return RD_KAFKA_RESP_ERR__INVALID_ARG; } if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) { rd_snprintf(errstr, errstr_size, "Too many brokers specified " "(RD_KAFKAP_BROKERS_MAX=%d)", RD_KAFKAP_BROKERS_MAX); return RD_KAFKA_RESP_ERR__INVALID_ARG; } rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt); for (i = 0; i < (int)broker_id_cnt; i++) rd_list_set_int32(rl, i, broker_ids[i]); rd_list_add(&new_topic->replicas, rl); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Generic constructor of ConfigEntry which is also added to \p rl */ static rd_kafka_resp_err_t rd_kafka_admin_add_config0(rd_list_t *rl, const char *name, const char *value, rd_kafka_AlterOperation_t operation) { rd_kafka_ConfigEntry_t *entry; if (!name) return RD_KAFKA_RESP_ERR__INVALID_ARG; entry = rd_calloc(1, sizeof(*entry)); entry->kv = rd_strtup_new(name, value); entry->a.operation = operation; rd_list_add(rl, entry); return RD_KAFKA_RESP_ERR_NO_ERROR; } rd_kafka_resp_err_t rd_kafka_NewTopic_set_config(rd_kafka_NewTopic_t *new_topic, const char *name, const char *value) { return rd_kafka_admin_add_config0(&new_topic->config, name, value, RD_KAFKA_ALTER_OP_ADD); } /** * @brief Parse CreateTopicsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_kafka_t *rk = rkb->rkb_rk; rd_kafka_op_t *rko_result = NULL; int32_t topic_cnt; int i; if (rd_kafka_buf_ApiVersion(reply) >= 2) { int32_t Throttle_Time; rd_kafka_buf_read_i32(reply, &Throttle_Time); rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); } /* #topics */ rd_kafka_buf_read_i32(reply, &topic_cnt); if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) rd_kafka_buf_parse_fail( reply, "Received %" PRId32 " topics in response " "when only %d were requested", topic_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt, rd_kafka_topic_result_free); for (i = 0; i < (int)topic_cnt; i++) { rd_kafkap_str_t ktopic; int16_t error_code; rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER; char *this_errstr = NULL; rd_kafka_topic_result_t *terr; rd_kafka_NewTopic_t skel; int orig_pos; rd_kafka_buf_read_str(reply, &ktopic); rd_kafka_buf_read_i16(reply, &error_code); if (rd_kafka_buf_ApiVersion(reply) >= 1) rd_kafka_buf_read_str(reply, &error_msg); /* For non-blocking CreateTopicsRequests the broker * will returned REQUEST_TIMED_OUT for topics * that were triggered for creation - * we hide this error code from the application * since the topic creation is in fact in progress. */ if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && rd_kafka_confval_get_int(&rko_req->rko_u.admin_request .options.operation_timeout) <= 0) { error_code = RD_KAFKA_RESP_ERR_NO_ERROR; this_errstr = NULL; } if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || RD_KAFKAP_STR_LEN(&error_msg) == 0) this_errstr = (char *)rd_kafka_err2str(error_code); else RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); } terr = rd_kafka_topic_result_new(ktopic.str, RD_KAFKAP_STR_LEN(&ktopic), error_code, this_errstr); /* As a convenience to the application we insert topic result * in the same order as they were requested. The broker * does not maintain ordering unfortunately. */ skel.topic = terr->topic; orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_NewTopic_cmp); if (orig_pos == -1) { rd_kafka_topic_result_destroy(terr); rd_kafka_buf_parse_fail( reply, "Broker returned topic %.*s that was not " "included in the original request", RD_KAFKAP_STR_PR(&ktopic)); } if (rd_list_elem(&rko_result->rko_u.admin_result.results, orig_pos) != NULL) { rd_kafka_topic_result_destroy(terr); rd_kafka_buf_parse_fail( reply, "Broker returned topic %.*s multiple times", RD_KAFKAP_STR_PR(&ktopic)); } rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, terr); } *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf(errstr, errstr_size, "CreateTopics response protocol parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } void rd_kafka_CreateTopics(rd_kafka_t *rk, rd_kafka_NewTopic_t **new_topics, size_t new_topic_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; size_t i; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_CreateTopicsRequest, rd_kafka_CreateTopicsResponse_parse, }; rd_assert(rkqu); rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_CREATETOPICS, RD_KAFKA_EVENT_CREATETOPICS_RESULT, &cbs, options, rkqu->rkqu_q); rd_list_init(&rko->rko_u.admin_request.args, (int)new_topic_cnt, rd_kafka_NewTopic_free); for (i = 0; i < new_topic_cnt; i++) rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_NewTopic_copy(new_topics[i])); rd_kafka_q_enq(rk->rk_ops, rko); } /** * @brief Get an array of topic results from a CreateTopics result. * * The returned \p topics life-time is the same as the \p result object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_topic_result_t **rd_kafka_CreateTopics_result_topics( const rd_kafka_CreateTopics_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result, cntp); } /**@}*/ /** * @name Delete topics * @{ * * * * */ rd_kafka_DeleteTopic_t *rd_kafka_DeleteTopic_new(const char *topic) { size_t tsize = strlen(topic) + 1; rd_kafka_DeleteTopic_t *del_topic; /* Single allocation */ del_topic = rd_malloc(sizeof(*del_topic) + tsize); del_topic->topic = del_topic->data; memcpy(del_topic->topic, topic, tsize); return del_topic; } void rd_kafka_DeleteTopic_destroy(rd_kafka_DeleteTopic_t *del_topic) { rd_free(del_topic); } static void rd_kafka_DeleteTopic_free(void *ptr) { rd_kafka_DeleteTopic_destroy(ptr); } void rd_kafka_DeleteTopic_destroy_array(rd_kafka_DeleteTopic_t **del_topics, size_t del_topic_cnt) { size_t i; for (i = 0; i < del_topic_cnt; i++) rd_kafka_DeleteTopic_destroy(del_topics[i]); } /** * @brief Topic name comparator for DeleteTopic_t */ static int rd_kafka_DeleteTopic_cmp(const void *_a, const void *_b) { const rd_kafka_DeleteTopic_t *a = _a, *b = _b; return strcmp(a->topic, b->topic); } /** * @brief Allocate a new DeleteTopic and make a copy of \p src */ static rd_kafka_DeleteTopic_t * rd_kafka_DeleteTopic_copy(const rd_kafka_DeleteTopic_t *src) { return rd_kafka_DeleteTopic_new(src->topic); } /** * @brief Parse DeleteTopicsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_DeleteTopicsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_kafka_t *rk = rkb->rkb_rk; rd_kafka_op_t *rko_result = NULL; int32_t topic_cnt; int i; if (rd_kafka_buf_ApiVersion(reply) >= 1) { int32_t Throttle_Time; rd_kafka_buf_read_i32(reply, &Throttle_Time); rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); } /* #topics */ rd_kafka_buf_read_i32(reply, &topic_cnt); if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) rd_kafka_buf_parse_fail( reply, "Received %" PRId32 " topics in response " "when only %d were requested", topic_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt, rd_kafka_topic_result_free); for (i = 0; i < (int)topic_cnt; i++) { rd_kafkap_str_t ktopic; int16_t error_code; rd_kafka_topic_result_t *terr; rd_kafka_NewTopic_t skel; int orig_pos; rd_kafka_buf_read_str(reply, &ktopic); rd_kafka_buf_read_i16(reply, &error_code); /* For non-blocking DeleteTopicsRequests the broker * will returned REQUEST_TIMED_OUT for topics * that were triggered for creation - * we hide this error code from the application * since the topic creation is in fact in progress. */ if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && rd_kafka_confval_get_int(&rko_req->rko_u.admin_request .options.operation_timeout) <= 0) { error_code = RD_KAFKA_RESP_ERR_NO_ERROR; } terr = rd_kafka_topic_result_new( ktopic.str, RD_KAFKAP_STR_LEN(&ktopic), error_code, error_code ? rd_kafka_err2str(error_code) : NULL); /* As a convenience to the application we insert topic result * in the same order as they were requested. The broker * does not maintain ordering unfortunately. */ skel.topic = terr->topic; orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_DeleteTopic_cmp); if (orig_pos == -1) { rd_kafka_topic_result_destroy(terr); rd_kafka_buf_parse_fail( reply, "Broker returned topic %.*s that was not " "included in the original request", RD_KAFKAP_STR_PR(&ktopic)); } if (rd_list_elem(&rko_result->rko_u.admin_result.results, orig_pos) != NULL) { rd_kafka_topic_result_destroy(terr); rd_kafka_buf_parse_fail( reply, "Broker returned topic %.*s multiple times", RD_KAFKAP_STR_PR(&ktopic)); } rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, terr); } *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf(errstr, errstr_size, "DeleteTopics response protocol parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } void rd_kafka_DeleteTopics(rd_kafka_t *rk, rd_kafka_DeleteTopic_t **del_topics, size_t del_topic_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; size_t i; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_DeleteTopicsRequest, rd_kafka_DeleteTopicsResponse_parse, }; rd_assert(rkqu); rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_DELETETOPICS, RD_KAFKA_EVENT_DELETETOPICS_RESULT, &cbs, options, rkqu->rkqu_q); rd_list_init(&rko->rko_u.admin_request.args, (int)del_topic_cnt, rd_kafka_DeleteTopic_free); for (i = 0; i < del_topic_cnt; i++) rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_DeleteTopic_copy(del_topics[i])); rd_kafka_q_enq(rk->rk_ops, rko); } /** * @brief Get an array of topic results from a DeleteTopics result. * * The returned \p topics life-time is the same as the \p result object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_topic_result_t **rd_kafka_DeleteTopics_result_topics( const rd_kafka_DeleteTopics_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result, cntp); } /** * @name Create partitions * @{ * * * * */ rd_kafka_NewPartitions_t *rd_kafka_NewPartitions_new(const char *topic, size_t new_total_cnt, char *errstr, size_t errstr_size) { size_t tsize = strlen(topic) + 1; rd_kafka_NewPartitions_t *newps; if (new_total_cnt < 1 || new_total_cnt > RD_KAFKAP_PARTITIONS_MAX) { rd_snprintf(errstr, errstr_size, "new_total_cnt out of " "expected range %d..%d", 1, RD_KAFKAP_PARTITIONS_MAX); return NULL; } /* Single allocation */ newps = rd_malloc(sizeof(*newps) + tsize); newps->total_cnt = new_total_cnt; newps->topic = newps->data; memcpy(newps->topic, topic, tsize); /* List of int32 lists */ rd_list_init(&newps->replicas, 0, rd_list_destroy_free); rd_list_prealloc_elems(&newps->replicas, 0, new_total_cnt, 0 /*nozero*/); return newps; } /** * @brief Topic name comparator for NewPartitions_t */ static int rd_kafka_NewPartitions_cmp(const void *_a, const void *_b) { const rd_kafka_NewPartitions_t *a = _a, *b = _b; return strcmp(a->topic, b->topic); } /** * @brief Allocate a new CreatePartitions and make a copy of \p src */ static rd_kafka_NewPartitions_t * rd_kafka_NewPartitions_copy(const rd_kafka_NewPartitions_t *src) { rd_kafka_NewPartitions_t *dst; dst = rd_kafka_NewPartitions_new(src->topic, src->total_cnt, NULL, 0); rd_list_destroy(&dst->replicas); /* created in .._new() */ rd_list_init_copy(&dst->replicas, &src->replicas); rd_list_copy_to(&dst->replicas, &src->replicas, rd_list_copy_preallocated, NULL); return dst; } void rd_kafka_NewPartitions_destroy(rd_kafka_NewPartitions_t *newps) { rd_list_destroy(&newps->replicas); rd_free(newps); } static void rd_kafka_NewPartitions_free(void *ptr) { rd_kafka_NewPartitions_destroy(ptr); } void rd_kafka_NewPartitions_destroy_array(rd_kafka_NewPartitions_t **newps, size_t newps_cnt) { size_t i; for (i = 0; i < newps_cnt; i++) rd_kafka_NewPartitions_destroy(newps[i]); } rd_kafka_resp_err_t rd_kafka_NewPartitions_set_replica_assignment(rd_kafka_NewPartitions_t *newp, int32_t new_partition_idx, int32_t *broker_ids, size_t broker_id_cnt, char *errstr, size_t errstr_size) { rd_list_t *rl; int i; /* Replica partitions must be added consecutively starting from 0. */ if (new_partition_idx != rd_list_cnt(&newp->replicas)) { rd_snprintf(errstr, errstr_size, "Partitions must be added in order, " "starting at 0: expecting partition " "index %d, not %" PRId32, rd_list_cnt(&newp->replicas), new_partition_idx); return RD_KAFKA_RESP_ERR__INVALID_ARG; } if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) { rd_snprintf(errstr, errstr_size, "Too many brokers specified " "(RD_KAFKAP_BROKERS_MAX=%d)", RD_KAFKAP_BROKERS_MAX); return RD_KAFKA_RESP_ERR__INVALID_ARG; } rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt); for (i = 0; i < (int)broker_id_cnt; i++) rd_list_set_int32(rl, i, broker_ids[i]); rd_list_add(&newp->replicas, rl); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Parse CreatePartitionsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_CreatePartitionsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_kafka_t *rk = rkb->rkb_rk; rd_kafka_op_t *rko_result = NULL; int32_t topic_cnt; int i; int32_t Throttle_Time; rd_kafka_buf_read_i32(reply, &Throttle_Time); rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); /* #topics */ rd_kafka_buf_read_i32(reply, &topic_cnt); if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) rd_kafka_buf_parse_fail( reply, "Received %" PRId32 " topics in response " "when only %d were requested", topic_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt, rd_kafka_topic_result_free); for (i = 0; i < (int)topic_cnt; i++) { rd_kafkap_str_t ktopic; int16_t error_code; char *this_errstr = NULL; rd_kafka_topic_result_t *terr; rd_kafka_NewTopic_t skel; rd_kafkap_str_t error_msg; int orig_pos; rd_kafka_buf_read_str(reply, &ktopic); rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); /* For non-blocking CreatePartitionsRequests the broker * will returned REQUEST_TIMED_OUT for topics * that were triggered for creation - * we hide this error code from the application * since the topic creation is in fact in progress. */ if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && rd_kafka_confval_get_int(&rko_req->rko_u.admin_request .options.operation_timeout) <= 0) { error_code = RD_KAFKA_RESP_ERR_NO_ERROR; } if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || RD_KAFKAP_STR_LEN(&error_msg) == 0) this_errstr = (char *)rd_kafka_err2str(error_code); else RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); } terr = rd_kafka_topic_result_new( ktopic.str, RD_KAFKAP_STR_LEN(&ktopic), error_code, error_code ? this_errstr : NULL); /* As a convenience to the application we insert topic result * in the same order as they were requested. The broker * does not maintain ordering unfortunately. */ skel.topic = terr->topic; orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_NewPartitions_cmp); if (orig_pos == -1) { rd_kafka_topic_result_destroy(terr); rd_kafka_buf_parse_fail( reply, "Broker returned topic %.*s that was not " "included in the original request", RD_KAFKAP_STR_PR(&ktopic)); } if (rd_list_elem(&rko_result->rko_u.admin_result.results, orig_pos) != NULL) { rd_kafka_topic_result_destroy(terr); rd_kafka_buf_parse_fail( reply, "Broker returned topic %.*s multiple times", RD_KAFKAP_STR_PR(&ktopic)); } rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, terr); } *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf(errstr, errstr_size, "CreatePartitions response protocol parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } void rd_kafka_CreatePartitions(rd_kafka_t *rk, rd_kafka_NewPartitions_t **newps, size_t newps_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; size_t i; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_CreatePartitionsRequest, rd_kafka_CreatePartitionsResponse_parse, }; rd_assert(rkqu); rko = rd_kafka_admin_request_op_new( rk, RD_KAFKA_OP_CREATEPARTITIONS, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, &cbs, options, rkqu->rkqu_q); rd_list_init(&rko->rko_u.admin_request.args, (int)newps_cnt, rd_kafka_NewPartitions_free); for (i = 0; i < newps_cnt; i++) rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_NewPartitions_copy(newps[i])); rd_kafka_q_enq(rk->rk_ops, rko); } /** * @brief Get an array of topic results from a CreatePartitions result. * * The returned \p topics life-time is the same as the \p result object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_topic_result_t **rd_kafka_CreatePartitions_result_topics( const rd_kafka_CreatePartitions_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result, cntp); } /**@}*/ /** * @name ConfigEntry * @{ * * * */ static void rd_kafka_ConfigEntry_destroy(rd_kafka_ConfigEntry_t *entry) { rd_strtup_destroy(entry->kv); rd_list_destroy(&entry->synonyms); rd_free(entry); } static void rd_kafka_ConfigEntry_free(void *ptr) { rd_kafka_ConfigEntry_destroy((rd_kafka_ConfigEntry_t *)ptr); } /** * @brief Create new ConfigEntry * * @param name Config entry name * @param name_len Length of name, or -1 to use strlen() * @param value Config entry value, or NULL * @param value_len Length of value, or -1 to use strlen() */ static rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new0(const char *name, size_t name_len, const char *value, size_t value_len) { rd_kafka_ConfigEntry_t *entry; if (!name) return NULL; entry = rd_calloc(1, sizeof(*entry)); entry->kv = rd_strtup_new0(name, name_len, value, value_len); rd_list_init(&entry->synonyms, 0, rd_kafka_ConfigEntry_free); entry->a.source = RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG; return entry; } /** * @sa rd_kafka_ConfigEntry_new0 */ static rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new(const char *name, const char *value) { return rd_kafka_ConfigEntry_new0(name, -1, value, -1); } /** * @brief Allocate a new AlterConfigs and make a copy of \p src */ static rd_kafka_ConfigEntry_t * rd_kafka_ConfigEntry_copy(const rd_kafka_ConfigEntry_t *src) { rd_kafka_ConfigEntry_t *dst; dst = rd_kafka_ConfigEntry_new(src->kv->name, src->kv->value); dst->a = src->a; rd_list_destroy(&dst->synonyms); /* created in .._new() */ rd_list_init_copy(&dst->synonyms, &src->synonyms); rd_list_copy_to(&dst->synonyms, &src->synonyms, rd_kafka_ConfigEntry_list_copy, NULL); return dst; } static void *rd_kafka_ConfigEntry_list_copy(const void *src, void *opaque) { return rd_kafka_ConfigEntry_copy((const rd_kafka_ConfigEntry_t *)src); } const char *rd_kafka_ConfigEntry_name(const rd_kafka_ConfigEntry_t *entry) { return entry->kv->name; } const char *rd_kafka_ConfigEntry_value(const rd_kafka_ConfigEntry_t *entry) { return entry->kv->value; } rd_kafka_ConfigSource_t rd_kafka_ConfigEntry_source(const rd_kafka_ConfigEntry_t *entry) { return entry->a.source; } int rd_kafka_ConfigEntry_is_read_only(const rd_kafka_ConfigEntry_t *entry) { return entry->a.is_readonly; } int rd_kafka_ConfigEntry_is_default(const rd_kafka_ConfigEntry_t *entry) { return entry->a.is_default; } int rd_kafka_ConfigEntry_is_sensitive(const rd_kafka_ConfigEntry_t *entry) { return entry->a.is_sensitive; } int rd_kafka_ConfigEntry_is_synonym(const rd_kafka_ConfigEntry_t *entry) { return entry->a.is_synonym; } const rd_kafka_ConfigEntry_t ** rd_kafka_ConfigEntry_synonyms(const rd_kafka_ConfigEntry_t *entry, size_t *cntp) { *cntp = rd_list_cnt(&entry->synonyms); if (!*cntp) return NULL; return (const rd_kafka_ConfigEntry_t **)entry->synonyms.rl_elems; } /**@}*/ /** * @name ConfigSource * @{ * * * */ const char *rd_kafka_ConfigSource_name(rd_kafka_ConfigSource_t confsource) { static const char *names[] = { "UNKNOWN_CONFIG", "DYNAMIC_TOPIC_CONFIG", "DYNAMIC_BROKER_CONFIG", "DYNAMIC_DEFAULT_BROKER_CONFIG", "STATIC_BROKER_CONFIG", "DEFAULT_CONFIG", }; if ((unsigned int)confsource >= (unsigned int)RD_KAFKA_CONFIG_SOURCE__CNT) return "UNSUPPORTED"; return names[confsource]; } /**@}*/ /** * @name ConfigResource * @{ * * * */ const char *rd_kafka_ResourcePatternType_name( rd_kafka_ResourcePatternType_t resource_pattern_type) { static const char *names[] = {"UNKNOWN", "ANY", "MATCH", "LITERAL", "PREFIXED"}; if ((unsigned int)resource_pattern_type >= (unsigned int)RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) return "UNSUPPORTED"; return names[resource_pattern_type]; } const char *rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype) { static const char *names[] = { "UNKNOWN", "ANY", "TOPIC", "GROUP", "BROKER", }; if ((unsigned int)restype >= (unsigned int)RD_KAFKA_RESOURCE__CNT) return "UNSUPPORTED"; return names[restype]; } rd_kafka_ConfigResource_t * rd_kafka_ConfigResource_new(rd_kafka_ResourceType_t restype, const char *resname) { rd_kafka_ConfigResource_t *config; size_t namesz = resname ? strlen(resname) : 0; if (!namesz || (int)restype < 0) return NULL; config = rd_calloc(1, sizeof(*config) + namesz + 1); config->name = config->data; memcpy(config->name, resname, namesz + 1); config->restype = restype; rd_list_init(&config->config, 8, rd_kafka_ConfigEntry_free); return config; } void rd_kafka_ConfigResource_destroy(rd_kafka_ConfigResource_t *config) { rd_list_destroy(&config->config); if (config->errstr) rd_free(config->errstr); rd_free(config); } static void rd_kafka_ConfigResource_free(void *ptr) { rd_kafka_ConfigResource_destroy((rd_kafka_ConfigResource_t *)ptr); } void rd_kafka_ConfigResource_destroy_array(rd_kafka_ConfigResource_t **config, size_t config_cnt) { size_t i; for (i = 0; i < config_cnt; i++) rd_kafka_ConfigResource_destroy(config[i]); } /** * @brief Type and name comparator for ConfigResource_t */ static int rd_kafka_ConfigResource_cmp(const void *_a, const void *_b) { const rd_kafka_ConfigResource_t *a = _a, *b = _b; int r = RD_CMP(a->restype, b->restype); if (r) return r; return strcmp(a->name, b->name); } /** * @brief Allocate a new AlterConfigs and make a copy of \p src */ static rd_kafka_ConfigResource_t * rd_kafka_ConfigResource_copy(const rd_kafka_ConfigResource_t *src) { rd_kafka_ConfigResource_t *dst; dst = rd_kafka_ConfigResource_new(src->restype, src->name); rd_list_destroy(&dst->config); /* created in .._new() */ rd_list_init_copy(&dst->config, &src->config); rd_list_copy_to(&dst->config, &src->config, rd_kafka_ConfigEntry_list_copy, NULL); return dst; } static void rd_kafka_ConfigResource_add_ConfigEntry(rd_kafka_ConfigResource_t *config, rd_kafka_ConfigEntry_t *entry) { rd_list_add(&config->config, entry); } rd_kafka_resp_err_t rd_kafka_ConfigResource_add_config(rd_kafka_ConfigResource_t *config, const char *name, const char *value) { if (!name || !*name || !value) return RD_KAFKA_RESP_ERR__INVALID_ARG; return rd_kafka_admin_add_config0(&config->config, name, value, RD_KAFKA_ALTER_OP_ADD); } rd_kafka_resp_err_t rd_kafka_ConfigResource_set_config(rd_kafka_ConfigResource_t *config, const char *name, const char *value) { if (!name || !*name || !value) return RD_KAFKA_RESP_ERR__INVALID_ARG; return rd_kafka_admin_add_config0(&config->config, name, value, RD_KAFKA_ALTER_OP_SET); } rd_kafka_resp_err_t rd_kafka_ConfigResource_delete_config(rd_kafka_ConfigResource_t *config, const char *name) { if (!name || !*name) return RD_KAFKA_RESP_ERR__INVALID_ARG; return rd_kafka_admin_add_config0(&config->config, name, NULL, RD_KAFKA_ALTER_OP_DELETE); } const rd_kafka_ConfigEntry_t ** rd_kafka_ConfigResource_configs(const rd_kafka_ConfigResource_t *config, size_t *cntp) { *cntp = rd_list_cnt(&config->config); if (!*cntp) return NULL; return (const rd_kafka_ConfigEntry_t **)config->config.rl_elems; } rd_kafka_ResourceType_t rd_kafka_ConfigResource_type(const rd_kafka_ConfigResource_t *config) { return config->restype; } const char * rd_kafka_ConfigResource_name(const rd_kafka_ConfigResource_t *config) { return config->name; } rd_kafka_resp_err_t rd_kafka_ConfigResource_error(const rd_kafka_ConfigResource_t *config) { return config->err; } const char * rd_kafka_ConfigResource_error_string(const rd_kafka_ConfigResource_t *config) { if (!config->err) return NULL; if (config->errstr) return config->errstr; return rd_kafka_err2str(config->err); } /** * @brief Look in the provided ConfigResource_t* list for a resource of * type BROKER and set its broker id in \p broker_id, returning * RD_KAFKA_RESP_ERR_NO_ERROR. * * If multiple BROKER resources are found RD_KAFKA_RESP_ERR__CONFLICT * is returned and an error string is written to errstr. * * If no BROKER resources are found RD_KAFKA_RESP_ERR_NO_ERROR * is returned and \p broker_idp is set to use the coordinator. */ static rd_kafka_resp_err_t rd_kafka_ConfigResource_get_single_broker_id(const rd_list_t *configs, int32_t *broker_idp, char *errstr, size_t errstr_size) { const rd_kafka_ConfigResource_t *config; int i; int32_t broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; /* Some default * value that we * can compare * to below */ RD_LIST_FOREACH(config, configs, i) { char *endptr; long int r; if (config->restype != RD_KAFKA_RESOURCE_BROKER) continue; if (broker_id != RD_KAFKA_ADMIN_TARGET_CONTROLLER) { rd_snprintf(errstr, errstr_size, "Only one ConfigResource of type BROKER " "is allowed per call"); return RD_KAFKA_RESP_ERR__CONFLICT; } /* Convert string broker-id to int32 */ r = (int32_t)strtol(config->name, &endptr, 10); if (r == LONG_MIN || r == LONG_MAX || config->name == endptr || r < 0) { rd_snprintf(errstr, errstr_size, "Expected an int32 broker_id for " "ConfigResource(type=BROKER, name=%s)", config->name); return RD_KAFKA_RESP_ERR__INVALID_ARG; } broker_id = r; /* Keep scanning to make sure there are no duplicate * BROKER resources. */ } *broker_idp = broker_id; return RD_KAFKA_RESP_ERR_NO_ERROR; } /**@}*/ /** * @name AlterConfigs * @{ * * * */ /** * @brief Parse AlterConfigsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_AlterConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_kafka_t *rk = rkb->rkb_rk; rd_kafka_op_t *rko_result = NULL; int32_t res_cnt; int i; int32_t Throttle_Time; rd_kafka_buf_read_i32(reply, &Throttle_Time); rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); rd_kafka_buf_read_i32(reply, &res_cnt); if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) { rd_snprintf(errstr, errstr_size, "Received %" PRId32 " ConfigResources in response " "when only %d were requested", res_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); return RD_KAFKA_RESP_ERR__BAD_MSG; } rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, rd_kafka_ConfigResource_free); for (i = 0; i < (int)res_cnt; i++) { int16_t error_code; rd_kafkap_str_t error_msg; int8_t res_type; rd_kafkap_str_t kres_name; char *res_name; char *this_errstr = NULL; rd_kafka_ConfigResource_t *config; rd_kafka_ConfigResource_t skel; int orig_pos; rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); rd_kafka_buf_read_i8(reply, &res_type); rd_kafka_buf_read_str(reply, &kres_name); RD_KAFKAP_STR_DUPA(&res_name, &kres_name); if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || RD_KAFKAP_STR_LEN(&error_msg) == 0) this_errstr = (char *)rd_kafka_err2str(error_code); else RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); } config = rd_kafka_ConfigResource_new(res_type, res_name); if (!config) { rd_kafka_log(rko_req->rko_rk, LOG_ERR, "ADMIN", "AlterConfigs returned " "unsupported ConfigResource #%d with " "type %d and name \"%s\": ignoring", i, res_type, res_name); continue; } config->err = error_code; if (this_errstr) config->errstr = rd_strdup(this_errstr); /* As a convenience to the application we insert result * in the same order as they were requested. The broker * does not maintain ordering unfortunately. */ skel.restype = config->restype; skel.name = config->name; orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_ConfigResource_cmp); if (orig_pos == -1) { rd_kafka_ConfigResource_destroy(config); rd_kafka_buf_parse_fail( reply, "Broker returned ConfigResource %d,%s " "that was not " "included in the original request", res_type, res_name); } if (rd_list_elem(&rko_result->rko_u.admin_result.results, orig_pos) != NULL) { rd_kafka_ConfigResource_destroy(config); rd_kafka_buf_parse_fail( reply, "Broker returned ConfigResource %d,%s " "multiple times", res_type, res_name); } rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, config); } *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf(errstr, errstr_size, "AlterConfigs response protocol parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } void rd_kafka_AlterConfigs(rd_kafka_t *rk, rd_kafka_ConfigResource_t **configs, size_t config_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; size_t i; rd_kafka_resp_err_t err; char errstr[256]; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_AlterConfigsRequest, rd_kafka_AlterConfigsResponse_parse, }; rd_assert(rkqu); rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_ALTERCONFIGS, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, &cbs, options, rkqu->rkqu_q); rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt, rd_kafka_ConfigResource_free); for (i = 0; i < config_cnt; i++) rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_ConfigResource_copy(configs[i])); /* If there's a BROKER resource in the list we need to * speak directly to that broker rather than the controller. * * Multiple BROKER resources are not allowed. */ err = rd_kafka_ConfigResource_get_single_broker_id( &rko->rko_u.admin_request.args, &rko->rko_u.admin_request.broker_id, errstr, sizeof(errstr)); if (err) { rd_kafka_admin_result_fail(rko, err, "%s", errstr); rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); return; } rd_kafka_q_enq(rk->rk_ops, rko); } const rd_kafka_ConfigResource_t **rd_kafka_AlterConfigs_result_resources( const rd_kafka_AlterConfigs_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_resources( (const rd_kafka_op_t *)result, cntp); } /**@}*/ /** * @name DescribeConfigs * @{ * * * */ /** * @brief Parse DescribeConfigsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_kafka_t *rk = rkb->rkb_rk; rd_kafka_op_t *rko_result = NULL; int32_t res_cnt; int i; int32_t Throttle_Time; rd_kafka_ConfigResource_t *config = NULL; rd_kafka_ConfigEntry_t *entry = NULL; rd_kafka_buf_read_i32(reply, &Throttle_Time); rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); /* #resources */ rd_kafka_buf_read_i32(reply, &res_cnt); if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) rd_kafka_buf_parse_fail( reply, "Received %" PRId32 " ConfigResources in response " "when only %d were requested", res_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, rd_kafka_ConfigResource_free); for (i = 0; i < (int)res_cnt; i++) { int16_t error_code; rd_kafkap_str_t error_msg; int8_t res_type; rd_kafkap_str_t kres_name; char *res_name; char *this_errstr = NULL; rd_kafka_ConfigResource_t skel; int orig_pos; int32_t entry_cnt; int ci; rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); rd_kafka_buf_read_i8(reply, &res_type); rd_kafka_buf_read_str(reply, &kres_name); RD_KAFKAP_STR_DUPA(&res_name, &kres_name); if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || RD_KAFKAP_STR_LEN(&error_msg) == 0) this_errstr = (char *)rd_kafka_err2str(error_code); else RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); } config = rd_kafka_ConfigResource_new(res_type, res_name); if (!config) { rd_kafka_log(rko_req->rko_rk, LOG_ERR, "ADMIN", "DescribeConfigs returned " "unsupported ConfigResource #%d with " "type %d and name \"%s\": ignoring", i, res_type, res_name); continue; } config->err = error_code; if (this_errstr) config->errstr = rd_strdup(this_errstr); /* #config_entries */ rd_kafka_buf_read_i32(reply, &entry_cnt); for (ci = 0; ci < (int)entry_cnt; ci++) { rd_kafkap_str_t config_name, config_value; int32_t syn_cnt; int si; rd_kafka_buf_read_str(reply, &config_name); rd_kafka_buf_read_str(reply, &config_value); entry = rd_kafka_ConfigEntry_new0( config_name.str, RD_KAFKAP_STR_LEN(&config_name), config_value.str, RD_KAFKAP_STR_LEN(&config_value)); rd_kafka_buf_read_bool(reply, &entry->a.is_readonly); /* ApiVersion 0 has is_default field, while * ApiVersion 1 has source field. * Convert between the two so they look the same * to the caller. */ if (rd_kafka_buf_ApiVersion(reply) == 0) { rd_kafka_buf_read_bool(reply, &entry->a.is_default); if (entry->a.is_default) entry->a.source = RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG; } else { int8_t config_source; rd_kafka_buf_read_i8(reply, &config_source); entry->a.source = config_source; if (entry->a.source == RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG) entry->a.is_default = 1; } rd_kafka_buf_read_bool(reply, &entry->a.is_sensitive); if (rd_kafka_buf_ApiVersion(reply) == 1) { /* #config_synonyms (ApiVersion 1) */ rd_kafka_buf_read_i32(reply, &syn_cnt); if (syn_cnt > 100000) rd_kafka_buf_parse_fail( reply, "Broker returned %" PRId32 " config synonyms for " "ConfigResource %d,%s: " "limit is 100000", syn_cnt, config->restype, config->name); if (syn_cnt > 0) rd_list_grow(&entry->synonyms, syn_cnt); } else { /* No synonyms in ApiVersion 0 */ syn_cnt = 0; } /* Read synonyms (ApiVersion 1) */ for (si = 0; si < (int)syn_cnt; si++) { rd_kafkap_str_t syn_name, syn_value; int8_t syn_source; rd_kafka_ConfigEntry_t *syn_entry; rd_kafka_buf_read_str(reply, &syn_name); rd_kafka_buf_read_str(reply, &syn_value); rd_kafka_buf_read_i8(reply, &syn_source); syn_entry = rd_kafka_ConfigEntry_new0( syn_name.str, RD_KAFKAP_STR_LEN(&syn_name), syn_value.str, RD_KAFKAP_STR_LEN(&syn_value)); if (!syn_entry) rd_kafka_buf_parse_fail( reply, "Broker returned invalid " "synonym #%d " "for ConfigEntry #%d (%s) " "and ConfigResource %d,%s: " "syn_name.len %d, " "syn_value.len %d", si, ci, entry->kv->name, config->restype, config->name, (int)syn_name.len, (int)syn_value.len); syn_entry->a.source = syn_source; syn_entry->a.is_synonym = 1; rd_list_add(&entry->synonyms, syn_entry); } rd_kafka_ConfigResource_add_ConfigEntry(config, entry); entry = NULL; } /* As a convenience to the application we insert result * in the same order as they were requested. The broker * does not maintain ordering unfortunately. */ skel.restype = config->restype; skel.name = config->name; orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, &skel, rd_kafka_ConfigResource_cmp); if (orig_pos == -1) rd_kafka_buf_parse_fail( reply, "Broker returned ConfigResource %d,%s " "that was not " "included in the original request", res_type, res_name); if (rd_list_elem(&rko_result->rko_u.admin_result.results, orig_pos) != NULL) rd_kafka_buf_parse_fail( reply, "Broker returned ConfigResource %d,%s " "multiple times", res_type, res_name); rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, config); config = NULL; } *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (entry) rd_kafka_ConfigEntry_destroy(entry); if (config) rd_kafka_ConfigResource_destroy(config); if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf(errstr, errstr_size, "DescribeConfigs response protocol parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } void rd_kafka_DescribeConfigs(rd_kafka_t *rk, rd_kafka_ConfigResource_t **configs, size_t config_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; size_t i; rd_kafka_resp_err_t err; char errstr[256]; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_DescribeConfigsRequest, rd_kafka_DescribeConfigsResponse_parse, }; rd_assert(rkqu); rko = rd_kafka_admin_request_op_new( rk, RD_KAFKA_OP_DESCRIBECONFIGS, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, &cbs, options, rkqu->rkqu_q); rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt, rd_kafka_ConfigResource_free); for (i = 0; i < config_cnt; i++) rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_ConfigResource_copy(configs[i])); /* If there's a BROKER resource in the list we need to * speak directly to that broker rather than the controller. * * Multiple BROKER resources are not allowed. */ err = rd_kafka_ConfigResource_get_single_broker_id( &rko->rko_u.admin_request.args, &rko->rko_u.admin_request.broker_id, errstr, sizeof(errstr)); if (err) { rd_kafka_admin_result_fail(rko, err, "%s", errstr); rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); return; } rd_kafka_q_enq(rk->rk_ops, rko); } const rd_kafka_ConfigResource_t **rd_kafka_DescribeConfigs_result_resources( const rd_kafka_DescribeConfigs_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_resources( (const rd_kafka_op_t *)result, cntp); } /**@}*/ /** * @name Delete Records * @{ * * * * */ rd_kafka_DeleteRecords_t *rd_kafka_DeleteRecords_new( const rd_kafka_topic_partition_list_t *before_offsets) { rd_kafka_DeleteRecords_t *del_records; del_records = rd_calloc(1, sizeof(*del_records)); del_records->offsets = rd_kafka_topic_partition_list_copy(before_offsets); return del_records; } void rd_kafka_DeleteRecords_destroy(rd_kafka_DeleteRecords_t *del_records) { rd_kafka_topic_partition_list_destroy(del_records->offsets); rd_free(del_records); } void rd_kafka_DeleteRecords_destroy_array( rd_kafka_DeleteRecords_t **del_records, size_t del_record_cnt) { size_t i; for (i = 0; i < del_record_cnt; i++) rd_kafka_DeleteRecords_destroy(del_records[i]); } /** @brief Merge the DeleteRecords response from a single broker * into the user response list. */ static void rd_kafka_DeleteRecords_response_merge(rd_kafka_op_t *rko_fanout, const rd_kafka_op_t *rko_partial) { rd_kafka_t *rk = rko_fanout->rko_rk; const rd_kafka_topic_partition_list_t *partitions; rd_kafka_topic_partition_list_t *respartitions; const rd_kafka_topic_partition_t *partition; rd_assert(rko_partial->rko_evtype == RD_KAFKA_EVENT_DELETERECORDS_RESULT); /* All partitions (offsets) from the DeleteRecords() call */ respartitions = rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results, 0); if (rko_partial->rko_err) { /* If there was a request-level error, set the error on * all requested partitions for this request. */ const rd_kafka_topic_partition_list_t *reqpartitions; rd_kafka_topic_partition_t *reqpartition; /* Partitions (offsets) from this DeleteRecordsRequest */ reqpartitions = rd_list_elem(&rko_partial->rko_u.admin_result.args, 0); RD_KAFKA_TPLIST_FOREACH(reqpartition, reqpartitions) { rd_kafka_topic_partition_t *respart; /* Find result partition */ respart = rd_kafka_topic_partition_list_find( respartitions, reqpartition->topic, reqpartition->partition); rd_assert(respart || !*"respart not found"); respart->err = rko_partial->rko_err; } return; } /* Partitions from the DeleteRecordsResponse */ partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); RD_KAFKA_TPLIST_FOREACH(partition, partitions) { rd_kafka_topic_partition_t *respart; /* Find result partition */ respart = rd_kafka_topic_partition_list_find( respartitions, partition->topic, partition->partition); if (unlikely(!respart)) { rd_dassert(!*"partition not found"); rd_kafka_log(rk, LOG_WARNING, "DELETERECORDS", "DeleteRecords response contains " "unexpected %s [%" PRId32 "] which " "was not in the request list: ignored", partition->topic, partition->partition); continue; } respart->offset = partition->offset; respart->err = partition->err; } } /** * @brief Parse DeleteRecordsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_op_t *rko_result; rd_kafka_topic_partition_list_t *offsets; rd_kafka_buf_read_throttle_time(reply); const rd_kafka_topic_partition_field_t fields[] = { RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; offsets = rd_kafka_buf_read_topic_partitions(reply, 0, fields); if (!offsets) rd_kafka_buf_parse_fail(reply, "Failed to parse topic partitions"); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, 1, rd_kafka_topic_partition_list_destroy_free); rd_list_add(&rko_result->rko_u.admin_result.results, offsets); *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: rd_snprintf(errstr, errstr_size, "DeleteRecords response protocol parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } /** * @brief Call when leaders have been queried to progress the DeleteRecords * admin op to its next phase, sending DeleteRecords to partition * leaders. * * @param rko Reply op (RD_KAFKA_OP_LEADERS). */ static rd_kafka_op_res_t rd_kafka_DeleteRecords_leaders_queried_cb(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *reply) { rd_kafka_resp_err_t err = reply->rko_err; const rd_list_t *leaders = reply->rko_u.leaders.leaders; /* Possibly NULL (on err) */ rd_kafka_topic_partition_list_t *partitions = reply->rko_u.leaders.partitions; /* Possibly NULL (on err) */ rd_kafka_op_t *rko_fanout = reply->rko_u.leaders.opaque; rd_kafka_topic_partition_t *rktpar; rd_kafka_topic_partition_list_t *offsets; const struct rd_kafka_partition_leader *leader; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_DeleteRecordsRequest, rd_kafka_DeleteRecordsResponse_parse, }; int i; rd_assert((rko_fanout->rko_type & ~RD_KAFKA_OP_FLAGMASK) == RD_KAFKA_OP_ADMIN_FANOUT); if (err == RD_KAFKA_RESP_ERR__DESTROY) goto err; /* Requested offsets */ offsets = rd_list_elem(&rko_fanout->rko_u.admin_request.args, 0); /* Update the error field of each partition from the * leader-queried partition list so that ERR_UNKNOWN_TOPIC_OR_PART * and similar are propagated, since those partitions are not * included in the leaders list. */ RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) { rd_kafka_topic_partition_t *rktpar2; if (!rktpar->err) continue; rktpar2 = rd_kafka_topic_partition_list_find( offsets, rktpar->topic, rktpar->partition); rd_assert(rktpar2); rktpar2->err = rktpar->err; } if (err) { err: rd_kafka_admin_result_fail( rko_fanout, err, "Failed to query partition leaders: %s", err == RD_KAFKA_RESP_ERR__NOENT ? "No leaders found" : rd_kafka_err2str(err)); rd_kafka_admin_common_worker_destroy(rk, rko_fanout, rd_true /*destroy*/); return RD_KAFKA_OP_RES_HANDLED; } /* The response lists is one element deep and that element is a * rd_kafka_topic_partition_list_t with the results of the deletes. */ rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, 1, rd_kafka_topic_partition_list_destroy_free); rd_list_add(&rko_fanout->rko_u.admin_request.fanout.results, rd_kafka_topic_partition_list_copy(offsets)); rko_fanout->rko_u.admin_request.fanout.outstanding = rd_list_cnt(leaders); rd_assert(rd_list_cnt(leaders) > 0); /* For each leader send a request for its partitions */ RD_LIST_FOREACH(leader, leaders, i) { rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( rk, RD_KAFKA_OP_DELETERECORDS, RD_KAFKA_EVENT_DELETERECORDS_RESULT, &cbs, &rko_fanout->rko_u.admin_request.options, rk->rk_ops); rko->rko_u.admin_request.fanout_parent = rko_fanout; rko->rko_u.admin_request.broker_id = leader->rkb->rkb_nodeid; rd_kafka_topic_partition_list_sort_by_topic(leader->partitions); rd_list_init(&rko->rko_u.admin_request.args, 1, rd_kafka_topic_partition_list_destroy_free); rd_list_add( &rko->rko_u.admin_request.args, rd_kafka_topic_partition_list_copy(leader->partitions)); /* Enqueue op for admin_worker() to transition to next state */ rd_kafka_q_enq(rk->rk_ops, rko); } return RD_KAFKA_OP_RES_HANDLED; } void rd_kafka_DeleteRecords(rd_kafka_t *rk, rd_kafka_DeleteRecords_t **del_records, size_t del_record_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko_fanout; static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { rd_kafka_DeleteRecords_response_merge, rd_kafka_topic_partition_list_copy_opaque, }; const rd_kafka_topic_partition_list_t *offsets; rd_kafka_topic_partition_list_t *copied_offsets; rd_assert(rkqu); rko_fanout = rd_kafka_admin_fanout_op_new( rk, RD_KAFKA_OP_DELETERECORDS, RD_KAFKA_EVENT_DELETERECORDS_RESULT, &fanout_cbs, options, rkqu->rkqu_q); if (del_record_cnt != 1) { /* We only support one DeleteRecords per call since there * is no point in passing multiples, but the API still * needs to be extensible/future-proof. */ rd_kafka_admin_result_fail(rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, "Exactly one DeleteRecords must be " "passed"); rd_kafka_admin_common_worker_destroy(rk, rko_fanout, rd_true /*destroy*/); return; } offsets = del_records[0]->offsets; if (offsets == NULL || offsets->cnt == 0) { rd_kafka_admin_result_fail(rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, "No records to delete"); rd_kafka_admin_common_worker_destroy(rk, rko_fanout, rd_true /*destroy*/); return; } /* Copy offsets list and store it on the request op */ copied_offsets = rd_kafka_topic_partition_list_copy(offsets); if (rd_kafka_topic_partition_list_has_duplicates( copied_offsets, rd_false /*check partition*/)) { rd_kafka_topic_partition_list_destroy(copied_offsets); rd_kafka_admin_result_fail(rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, "Duplicate partitions not allowed"); rd_kafka_admin_common_worker_destroy(rk, rko_fanout, rd_true /*destroy*/); return; } /* Set default error on each partition so that if any of the partitions * never get a request sent we have an error to indicate it. */ rd_kafka_topic_partition_list_set_err(copied_offsets, RD_KAFKA_RESP_ERR__NOOP); rd_list_init(&rko_fanout->rko_u.admin_request.args, 1, rd_kafka_topic_partition_list_destroy_free); rd_list_add(&rko_fanout->rko_u.admin_request.args, copied_offsets); /* Async query for partition leaders */ rd_kafka_topic_partition_list_query_leaders_async( rk, copied_offsets, rd_kafka_admin_timeout_remains(rko_fanout), RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_DeleteRecords_leaders_queried_cb, rko_fanout); } /** * @brief Get the list of offsets from a DeleteRecords result. * * The returned \p offsets life-time is the same as the \p result object. */ const rd_kafka_topic_partition_list_t *rd_kafka_DeleteRecords_result_offsets( const rd_kafka_DeleteRecords_result_t *result) { const rd_kafka_topic_partition_list_t *offsets; const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; size_t cnt; rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_DELETERECORDS); cnt = rd_list_cnt(&rko->rko_u.admin_result.results); rd_assert(cnt == 1); offsets = (const rd_kafka_topic_partition_list_t *)rd_list_elem( &rko->rko_u.admin_result.results, 0); rd_assert(offsets); return offsets; } /**@}*/ /** * @name Delete groups * @{ * * * * */ rd_kafka_DeleteGroup_t *rd_kafka_DeleteGroup_new(const char *group) { size_t tsize = strlen(group) + 1; rd_kafka_DeleteGroup_t *del_group; /* Single allocation */ del_group = rd_malloc(sizeof(*del_group) + tsize); del_group->group = del_group->data; memcpy(del_group->group, group, tsize); return del_group; } void rd_kafka_DeleteGroup_destroy(rd_kafka_DeleteGroup_t *del_group) { rd_free(del_group); } static void rd_kafka_DeleteGroup_free(void *ptr) { rd_kafka_DeleteGroup_destroy(ptr); } void rd_kafka_DeleteGroup_destroy_array(rd_kafka_DeleteGroup_t **del_groups, size_t del_group_cnt) { size_t i; for (i = 0; i < del_group_cnt; i++) rd_kafka_DeleteGroup_destroy(del_groups[i]); } /** * @brief Group name comparator for DeleteGroup_t */ static int rd_kafka_DeleteGroup_cmp(const void *_a, const void *_b) { const rd_kafka_DeleteGroup_t *a = _a, *b = _b; return strcmp(a->group, b->group); } /** * @brief Allocate a new DeleteGroup and make a copy of \p src */ static rd_kafka_DeleteGroup_t * rd_kafka_DeleteGroup_copy(const rd_kafka_DeleteGroup_t *src) { return rd_kafka_DeleteGroup_new(src->group); } /** * @brief Parse DeleteGroupsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_DeleteGroupsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; int32_t group_cnt; int i; rd_kafka_op_t *rko_result = NULL; rd_kafka_buf_read_throttle_time(reply); /* #group_error_codes */ rd_kafka_buf_read_i32(reply, &group_cnt); if (group_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) rd_kafka_buf_parse_fail( reply, "Received %" PRId32 " groups in response " "when only %d were requested", group_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, group_cnt, rd_kafka_group_result_free); for (i = 0; i < (int)group_cnt; i++) { rd_kafkap_str_t kgroup; int16_t error_code; rd_kafka_group_result_t *groupres; rd_kafka_buf_read_str(reply, &kgroup); rd_kafka_buf_read_i16(reply, &error_code); groupres = rd_kafka_group_result_new( kgroup.str, RD_KAFKAP_STR_LEN(&kgroup), NULL, error_code ? rd_kafka_error_new(error_code, NULL) : NULL); rd_list_add(&rko_result->rko_u.admin_result.results, groupres); } *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf(errstr, errstr_size, "DeleteGroups response protocol parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } /** @brief Merge the DeleteGroups response from a single broker * into the user response list. */ void rd_kafka_DeleteGroups_response_merge(rd_kafka_op_t *rko_fanout, const rd_kafka_op_t *rko_partial) { const rd_kafka_group_result_t *groupres = NULL; rd_kafka_group_result_t *newgroupres; const rd_kafka_DeleteGroup_t *grp = rko_partial->rko_u.admin_result.opaque; int orig_pos; rd_assert(rko_partial->rko_evtype == RD_KAFKA_EVENT_DELETEGROUPS_RESULT); if (!rko_partial->rko_err) { /* Proper results. * We only send one group per request, make sure it matches */ groupres = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); rd_assert(groupres); rd_assert(!strcmp(groupres->group, grp->group)); newgroupres = rd_kafka_group_result_copy(groupres); } else { /* Op errored, e.g. timeout */ newgroupres = rd_kafka_group_result_new( grp->group, -1, NULL, rd_kafka_error_new(rko_partial->rko_err, NULL)); } /* As a convenience to the application we insert group result * in the same order as they were requested. */ orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, rd_kafka_DeleteGroup_cmp); rd_assert(orig_pos != -1); /* Make sure result is not already set */ rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos) == NULL); rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos, newgroupres); } void rd_kafka_DeleteGroups(rd_kafka_t *rk, rd_kafka_DeleteGroup_t **del_groups, size_t del_group_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko_fanout; rd_list_t dup_list; size_t i; static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { rd_kafka_DeleteGroups_response_merge, rd_kafka_group_result_copy_opaque, }; rd_assert(rkqu); rko_fanout = rd_kafka_admin_fanout_op_new( rk, RD_KAFKA_OP_DELETEGROUPS, RD_KAFKA_EVENT_DELETEGROUPS_RESULT, &fanout_cbs, options, rkqu->rkqu_q); if (del_group_cnt == 0) { rd_kafka_admin_result_fail(rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, "No groups to delete"); rd_kafka_admin_common_worker_destroy(rk, rko_fanout, rd_true /*destroy*/); return; } /* Copy group list and store it on the request op. * Maintain original ordering. */ rd_list_init(&rko_fanout->rko_u.admin_request.args, (int)del_group_cnt, rd_kafka_DeleteGroup_free); for (i = 0; i < del_group_cnt; i++) rd_list_add(&rko_fanout->rko_u.admin_request.args, rd_kafka_DeleteGroup_copy(del_groups[i])); /* Check for duplicates. * Make a temporary copy of the group list and sort it to check for * duplicates, we don't want the original list sorted since we want * to maintain ordering. */ rd_list_init(&dup_list, rd_list_cnt(&rko_fanout->rko_u.admin_request.args), NULL); rd_list_copy_to(&dup_list, &rko_fanout->rko_u.admin_request.args, NULL, NULL); rd_list_sort(&dup_list, rd_kafka_DeleteGroup_cmp); if (rd_list_find_duplicate(&dup_list, rd_kafka_DeleteGroup_cmp)) { rd_list_destroy(&dup_list); rd_kafka_admin_result_fail(rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, "Duplicate groups not allowed"); rd_kafka_admin_common_worker_destroy(rk, rko_fanout, rd_true /*destroy*/); return; } rd_list_destroy(&dup_list); /* Prepare results list where fanned out op's results will be * accumulated. */ rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, (int)del_group_cnt, rd_kafka_group_result_free); rko_fanout->rko_u.admin_request.fanout.outstanding = (int)del_group_cnt; /* Create individual request ops for each group. * FIXME: A future optimization is to coalesce all groups for a single * coordinator into one op. */ for (i = 0; i < del_group_cnt; i++) { static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_DeleteGroupsRequest, rd_kafka_DeleteGroupsResponse_parse, }; rd_kafka_DeleteGroup_t *grp = rd_list_elem(&rko_fanout->rko_u.admin_request.args, (int)i); rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( rk, RD_KAFKA_OP_DELETEGROUPS, RD_KAFKA_EVENT_DELETEGROUPS_RESULT, &cbs, options, rk->rk_ops); rko->rko_u.admin_request.fanout_parent = rko_fanout; rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_COORDINATOR; rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; rko->rko_u.admin_request.coordkey = rd_strdup(grp->group); /* Set the group name as the opaque so the fanout worker use it * to fill in errors. * References rko_fanout's memory, which will always outlive * the fanned out op. */ rd_kafka_AdminOptions_set_opaque( &rko->rko_u.admin_request.options, grp); rd_list_init(&rko->rko_u.admin_request.args, 1, rd_kafka_DeleteGroup_free); rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_DeleteGroup_copy(del_groups[i])); rd_kafka_q_enq(rk->rk_ops, rko); } } /** * @brief Get an array of group results from a DeleteGroups result. * * The returned \p groups life-time is the same as the \p result object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_group_result_t **rd_kafka_DeleteGroups_result_groups( const rd_kafka_DeleteGroups_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, cntp); } /**@}*/ /** * @name Delete consumer group offsets (committed offsets) * @{ * * * * */ rd_kafka_DeleteConsumerGroupOffsets_t *rd_kafka_DeleteConsumerGroupOffsets_new( const char *group, const rd_kafka_topic_partition_list_t *partitions) { size_t tsize = strlen(group) + 1; rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets; rd_assert(partitions); /* Single allocation */ del_grpoffsets = rd_malloc(sizeof(*del_grpoffsets) + tsize); del_grpoffsets->group = del_grpoffsets->data; memcpy(del_grpoffsets->group, group, tsize); del_grpoffsets->partitions = rd_kafka_topic_partition_list_copy(partitions); return del_grpoffsets; } void rd_kafka_DeleteConsumerGroupOffsets_destroy( rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets) { rd_kafka_topic_partition_list_destroy(del_grpoffsets->partitions); rd_free(del_grpoffsets); } static void rd_kafka_DeleteConsumerGroupOffsets_free(void *ptr) { rd_kafka_DeleteConsumerGroupOffsets_destroy(ptr); } void rd_kafka_DeleteConsumerGroupOffsets_destroy_array( rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets, size_t del_grpoffsets_cnt) { size_t i; for (i = 0; i < del_grpoffsets_cnt; i++) rd_kafka_DeleteConsumerGroupOffsets_destroy(del_grpoffsets[i]); } /** * @brief Allocate a new DeleteGroup and make a copy of \p src */ static rd_kafka_DeleteConsumerGroupOffsets_t * rd_kafka_DeleteConsumerGroupOffsets_copy( const rd_kafka_DeleteConsumerGroupOffsets_t *src) { return rd_kafka_DeleteConsumerGroupOffsets_new(src->group, src->partitions); } /** * @brief Parse OffsetDeleteResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_op_t *rko_result; int16_t ErrorCode; rd_kafka_topic_partition_list_t *partitions = NULL; const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets; rd_kafka_buf_read_i16(reply, &ErrorCode); if (ErrorCode) { rd_snprintf(errstr, errstr_size, "OffsetDelete response error: %s", rd_kafka_err2str(ErrorCode)); return ErrorCode; } rd_kafka_buf_read_throttle_time(reply); const rd_kafka_topic_partition_field_t fields[] = { RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; partitions = rd_kafka_buf_read_topic_partitions(reply, 16, fields); if (!partitions) { rd_snprintf(errstr, errstr_size, "Failed to parse OffsetDeleteResponse partitions"); return RD_KAFKA_RESP_ERR__BAD_MSG; } /* Create result op and group_result_t */ rko_result = rd_kafka_admin_result_new(rko_req); del_grpoffsets = rd_list_elem(&rko_result->rko_u.admin_result.args, 0); rd_list_init(&rko_result->rko_u.admin_result.results, 1, rd_kafka_group_result_free); rd_list_add(&rko_result->rko_u.admin_result.results, rd_kafka_group_result_new(del_grpoffsets->group, -1, partitions, NULL)); rd_kafka_topic_partition_list_destroy(partitions); *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: rd_snprintf(errstr, errstr_size, "OffsetDelete response protocol parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } void rd_kafka_DeleteConsumerGroupOffsets( rd_kafka_t *rk, rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets, size_t del_grpoffsets_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_OffsetDeleteRequest, rd_kafka_OffsetDeleteResponse_parse, }; rd_kafka_op_t *rko; rd_assert(rkqu); rko = rd_kafka_admin_request_op_new( rk, RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS, RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT, &cbs, options, rkqu->rkqu_q); if (del_grpoffsets_cnt != 1) { /* For simplicity we only support one single group for now */ rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, "Exactly one " "DeleteConsumerGroupOffsets must " "be passed"); rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); return; } rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_COORDINATOR; rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; rko->rko_u.admin_request.coordkey = rd_strdup(del_grpoffsets[0]->group); /* Store copy of group on request so the group name can be reached * from the response parser. */ rd_list_init(&rko->rko_u.admin_request.args, 1, rd_kafka_DeleteConsumerGroupOffsets_free); rd_list_add( &rko->rko_u.admin_request.args, rd_kafka_DeleteConsumerGroupOffsets_copy(del_grpoffsets[0])); rd_kafka_q_enq(rk->rk_ops, rko); } /** * @brief Get an array of group results from a DeleteGroups result. * * The returned \p groups life-time is the same as the \p result object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_group_result_t ** rd_kafka_DeleteConsumerGroupOffsets_result_groups( const rd_kafka_DeleteConsumerGroupOffsets_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, cntp); } void rd_kafka_DeleteConsumerGroupOffsets( rd_kafka_t *rk, rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets, size_t del_grpoffsets_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu); /**@}*/ /** * @name CreateAcls * @{ * * * */ const char *rd_kafka_AclOperation_name(rd_kafka_AclOperation_t operation) { static const char *names[] = {"UNKNOWN", "ANY", "ALL", "READ", "WRITE", "CREATE", "DELETE", "ALTER", "DESCRIBE", "CLUSTER_ACTION", "DESCRIBE_CONFIGS", "ALTER_CONFIGS", "IDEMPOTENT_WRITE"}; if ((unsigned int)operation >= (unsigned int)RD_KAFKA_ACL_OPERATION__CNT) return "UNSUPPORTED"; return names[operation]; } const char * rd_kafka_AclPermissionType_name(rd_kafka_AclPermissionType_t permission_type) { static const char *names[] = {"UNKNOWN", "ANY", "DENY", "ALLOW"}; if ((unsigned int)permission_type >= (unsigned int)RD_KAFKA_ACL_PERMISSION_TYPE__CNT) return "UNSUPPORTED"; return names[permission_type]; } static rd_kafka_AclBinding_t * rd_kafka_AclBinding_new0(rd_kafka_ResourceType_t restype, const char *name, rd_kafka_ResourcePatternType_t resource_pattern_type, const char *principal, const char *host, rd_kafka_AclOperation_t operation, rd_kafka_AclPermissionType_t permission_type, rd_kafka_resp_err_t err, const char *errstr) { rd_kafka_AclBinding_t *acl_binding; acl_binding = rd_calloc(1, sizeof(*acl_binding)); acl_binding->name = name != NULL ? rd_strdup(name) : NULL; acl_binding->principal = principal != NULL ? rd_strdup(principal) : NULL; acl_binding->host = host != NULL ? rd_strdup(host) : NULL; acl_binding->restype = restype; acl_binding->resource_pattern_type = resource_pattern_type; acl_binding->operation = operation; acl_binding->permission_type = permission_type; if (err) acl_binding->error = rd_kafka_error_new(err, "%s", errstr); return acl_binding; } rd_kafka_AclBinding_t * rd_kafka_AclBinding_new(rd_kafka_ResourceType_t restype, const char *name, rd_kafka_ResourcePatternType_t resource_pattern_type, const char *principal, const char *host, rd_kafka_AclOperation_t operation, rd_kafka_AclPermissionType_t permission_type, char *errstr, size_t errstr_size) { if (!name) { rd_snprintf(errstr, errstr_size, "Invalid resource name"); return NULL; } if (!principal) { rd_snprintf(errstr, errstr_size, "Invalid principal"); return NULL; } if (!host) { rd_snprintf(errstr, errstr_size, "Invalid host"); return NULL; } if (restype == RD_KAFKA_RESOURCE_ANY || restype <= RD_KAFKA_RESOURCE_UNKNOWN || restype >= RD_KAFKA_RESOURCE__CNT) { rd_snprintf(errstr, errstr_size, "Invalid resource type"); return NULL; } if (resource_pattern_type == RD_KAFKA_RESOURCE_PATTERN_ANY || resource_pattern_type == RD_KAFKA_RESOURCE_PATTERN_MATCH || resource_pattern_type <= RD_KAFKA_RESOURCE_PATTERN_UNKNOWN || resource_pattern_type >= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) { rd_snprintf(errstr, errstr_size, "Invalid resource pattern type"); return NULL; } if (operation == RD_KAFKA_ACL_OPERATION_ANY || operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN || operation >= RD_KAFKA_ACL_OPERATION__CNT) { rd_snprintf(errstr, errstr_size, "Invalid operation"); return NULL; } if (permission_type == RD_KAFKA_ACL_PERMISSION_TYPE_ANY || permission_type <= RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN || permission_type >= RD_KAFKA_ACL_PERMISSION_TYPE__CNT) { rd_snprintf(errstr, errstr_size, "Invalid permission type"); return NULL; } return rd_kafka_AclBinding_new0( restype, name, resource_pattern_type, principal, host, operation, permission_type, RD_KAFKA_RESP_ERR_NO_ERROR, NULL); } rd_kafka_AclBindingFilter_t *rd_kafka_AclBindingFilter_new( rd_kafka_ResourceType_t restype, const char *name, rd_kafka_ResourcePatternType_t resource_pattern_type, const char *principal, const char *host, rd_kafka_AclOperation_t operation, rd_kafka_AclPermissionType_t permission_type, char *errstr, size_t errstr_size) { if (restype <= RD_KAFKA_RESOURCE_UNKNOWN || restype >= RD_KAFKA_RESOURCE__CNT) { rd_snprintf(errstr, errstr_size, "Invalid resource type"); return NULL; } if (resource_pattern_type <= RD_KAFKA_RESOURCE_PATTERN_UNKNOWN || resource_pattern_type >= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) { rd_snprintf(errstr, errstr_size, "Invalid resource pattern type"); return NULL; } if (operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN || operation >= RD_KAFKA_ACL_OPERATION__CNT) { rd_snprintf(errstr, errstr_size, "Invalid operation"); return NULL; } if (permission_type <= RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN || permission_type >= RD_KAFKA_ACL_PERMISSION_TYPE__CNT) { rd_snprintf(errstr, errstr_size, "Invalid permission type"); return NULL; } return rd_kafka_AclBinding_new0( restype, name, resource_pattern_type, principal, host, operation, permission_type, RD_KAFKA_RESP_ERR_NO_ERROR, NULL); } rd_kafka_ResourceType_t rd_kafka_AclBinding_restype(const rd_kafka_AclBinding_t *acl) { return acl->restype; } const char *rd_kafka_AclBinding_name(const rd_kafka_AclBinding_t *acl) { return acl->name; } const char *rd_kafka_AclBinding_principal(const rd_kafka_AclBinding_t *acl) { return acl->principal; } const char *rd_kafka_AclBinding_host(const rd_kafka_AclBinding_t *acl) { return acl->host; } rd_kafka_AclOperation_t rd_kafka_AclBinding_operation(const rd_kafka_AclBinding_t *acl) { return acl->operation; } rd_kafka_AclPermissionType_t rd_kafka_AclBinding_permission_type(const rd_kafka_AclBinding_t *acl) { return acl->permission_type; } rd_kafka_ResourcePatternType_t rd_kafka_AclBinding_resource_pattern_type(const rd_kafka_AclBinding_t *acl) { return acl->resource_pattern_type; } const rd_kafka_error_t * rd_kafka_AclBinding_error(const rd_kafka_AclBinding_t *acl) { return acl->error; } /** * @brief Allocate a new AclBinding and make a copy of \p src */ static rd_kafka_AclBinding_t * rd_kafka_AclBinding_copy(const rd_kafka_AclBinding_t *src) { rd_kafka_AclBinding_t *dst; dst = rd_kafka_AclBinding_new( src->restype, src->name, src->resource_pattern_type, src->principal, src->host, src->operation, src->permission_type, NULL, 0); rd_assert(dst); return dst; } /** * @brief Allocate a new AclBindingFilter and make a copy of \p src */ static rd_kafka_AclBindingFilter_t * rd_kafka_AclBindingFilter_copy(const rd_kafka_AclBindingFilter_t *src) { rd_kafka_AclBindingFilter_t *dst; dst = rd_kafka_AclBindingFilter_new( src->restype, src->name, src->resource_pattern_type, src->principal, src->host, src->operation, src->permission_type, NULL, 0); rd_assert(dst); return dst; } void rd_kafka_AclBinding_destroy(rd_kafka_AclBinding_t *acl_binding) { if (acl_binding->name) rd_free(acl_binding->name); if (acl_binding->principal) rd_free(acl_binding->principal); if (acl_binding->host) rd_free(acl_binding->host); if (acl_binding->error) rd_kafka_error_destroy(acl_binding->error); rd_free(acl_binding); } static void rd_kafka_AclBinding_free(void *ptr) { rd_kafka_AclBinding_destroy(ptr); } void rd_kafka_AclBinding_destroy_array(rd_kafka_AclBinding_t **acl_bindings, size_t acl_bindings_cnt) { size_t i; for (i = 0; i < acl_bindings_cnt; i++) rd_kafka_AclBinding_destroy(acl_bindings[i]); } /** * @brief Parse CreateAclsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_CreateAclsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; rd_kafka_op_t *rko_result = NULL; int32_t acl_cnt; int i; rd_kafka_buf_read_throttle_time(reply); rd_kafka_buf_read_arraycnt(reply, &acl_cnt, 100000); if (acl_cnt != rd_list_cnt(&rko_req->rko_u.admin_request.args)) rd_kafka_buf_parse_fail( reply, "Received %" PRId32 " acls in response, but %d were requested", acl_cnt, rd_list_cnt(&rko_req->rko_u.admin_request.args)); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, acl_cnt, rd_kafka_acl_result_free); for (i = 0; i < (int)acl_cnt; i++) { int16_t error_code; rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER; rd_kafka_acl_result_t *acl_res; char *errstr = NULL; rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); if (error_code) { if (RD_KAFKAP_STR_LEN(&error_msg) == 0) errstr = (char *)rd_kafka_err2str(error_code); else RD_KAFKAP_STR_DUPA(&errstr, &error_msg); } acl_res = rd_kafka_acl_result_new( error_code ? rd_kafka_error_new(error_code, "%s", errstr) : NULL); rd_list_set(&rko_result->rko_u.admin_result.results, i, acl_res); } *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf(errstr, errstr_size, "CreateAcls response protocol parse failure: %s", rd_kafka_err2str(err)); return err; } void rd_kafka_CreateAcls(rd_kafka_t *rk, rd_kafka_AclBinding_t **new_acls, size_t new_acls_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; size_t i; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_CreateAclsRequest, rd_kafka_CreateAclsResponse_parse}; rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_CREATEACLS, RD_KAFKA_EVENT_CREATEACLS_RESULT, &cbs, options, rkqu->rkqu_q); rd_list_init(&rko->rko_u.admin_request.args, (int)new_acls_cnt, rd_kafka_AclBinding_free); for (i = 0; i < new_acls_cnt; i++) rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_AclBinding_copy(new_acls[i])); rd_kafka_q_enq(rk->rk_ops, rko); } /** * @brief Get an array of rd_kafka_acl_result_t from a CreateAcls result. * * The returned \p rd_kafka_acl_result_t life-time is the same as the \p result * object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_acl_result_t ** rd_kafka_CreateAcls_result_acls(const rd_kafka_CreateAcls_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_acl_results( (const rd_kafka_op_t *)result, cntp); } /**@}*/ /** * @name DescribeAcls * @{ * * * */ /** * @brief Parse DescribeAclsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_DescribeAclsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; rd_kafka_op_t *rko_result = NULL; int32_t res_cnt; int i; int j; rd_kafka_AclBinding_t *acl = NULL; int16_t error_code; rd_kafkap_str_t error_msg; rd_kafka_buf_read_throttle_time(reply); rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); if (error_code) { if (RD_KAFKAP_STR_LEN(&error_msg) == 0) errstr = (char *)rd_kafka_err2str(error_code); else RD_KAFKAP_STR_DUPA(&errstr, &error_msg); } /* #resources */ rd_kafka_buf_read_arraycnt(reply, &res_cnt, 100000); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, rd_kafka_AclBinding_free); for (i = 0; i < (int)res_cnt; i++) { int8_t res_type = RD_KAFKA_RESOURCE_UNKNOWN; rd_kafkap_str_t kres_name; char *res_name; int8_t resource_pattern_type = RD_KAFKA_RESOURCE_PATTERN_LITERAL; int32_t acl_cnt; rd_kafka_buf_read_i8(reply, &res_type); rd_kafka_buf_read_str(reply, &kres_name); RD_KAFKAP_STR_DUPA(&res_name, &kres_name); if (rd_kafka_buf_ApiVersion(reply) >= 1) { rd_kafka_buf_read_i8(reply, &resource_pattern_type); } if (res_type <= RD_KAFKA_RESOURCE_UNKNOWN || res_type >= RD_KAFKA_RESOURCE__CNT) { rd_rkb_log(rkb, LOG_WARNING, "DESCRIBEACLSRESPONSE", "DescribeAclsResponse returned unknown " "resource type %d", res_type); res_type = RD_KAFKA_RESOURCE_UNKNOWN; } if (resource_pattern_type <= RD_KAFKA_RESOURCE_PATTERN_UNKNOWN || resource_pattern_type >= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) { rd_rkb_log(rkb, LOG_WARNING, "DESCRIBEACLSRESPONSE", "DescribeAclsResponse returned unknown " "resource pattern type %d", resource_pattern_type); resource_pattern_type = RD_KAFKA_RESOURCE_PATTERN_UNKNOWN; } /* #resources */ rd_kafka_buf_read_arraycnt(reply, &acl_cnt, 100000); for (j = 0; j < (int)acl_cnt; j++) { rd_kafkap_str_t kprincipal; rd_kafkap_str_t khost; int8_t operation = RD_KAFKA_ACL_OPERATION_UNKNOWN; int8_t permission_type = RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN; char *principal; char *host; rd_kafka_buf_read_str(reply, &kprincipal); rd_kafka_buf_read_str(reply, &khost); rd_kafka_buf_read_i8(reply, &operation); rd_kafka_buf_read_i8(reply, &permission_type); RD_KAFKAP_STR_DUPA(&principal, &kprincipal); RD_KAFKAP_STR_DUPA(&host, &khost); if (operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN || operation >= RD_KAFKA_ACL_OPERATION__CNT) { rd_rkb_log(rkb, LOG_WARNING, "DESCRIBEACLSRESPONSE", "DescribeAclsResponse returned " "unknown acl operation %d", operation); operation = RD_KAFKA_ACL_OPERATION_UNKNOWN; } if (permission_type <= RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN || permission_type >= RD_KAFKA_ACL_PERMISSION_TYPE__CNT) { rd_rkb_log(rkb, LOG_WARNING, "DESCRIBEACLSRESPONSE", "DescribeAclsResponse returned " "unknown acl permission type %d", permission_type); permission_type = RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN; } acl = rd_kafka_AclBinding_new0( res_type, res_name, resource_pattern_type, principal, host, operation, permission_type, RD_KAFKA_RESP_ERR_NO_ERROR, NULL); rd_list_add(&rko_result->rko_u.admin_result.results, acl); } } *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf(errstr, errstr_size, "DescribeAcls response protocol parse failure: %s", rd_kafka_err2str(err)); return err; } void rd_kafka_DescribeAcls(rd_kafka_t *rk, rd_kafka_AclBindingFilter_t *acl_filter, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_DescribeAclsRequest, rd_kafka_DescribeAclsResponse_parse, }; rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_DESCRIBEACLS, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, &cbs, options, rkqu->rkqu_q); rd_list_init(&rko->rko_u.admin_request.args, 1, rd_kafka_AclBinding_free); rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_AclBindingFilter_copy(acl_filter)); rd_kafka_q_enq(rk->rk_ops, rko); } /** * @brief Get an array of rd_kafka_AclBinding_t from a DescribeAcls result. * * The returned \p rd_kafka_AclBinding_t life-time is the same as the \p result * object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_AclBinding_t ** rd_kafka_DescribeAcls_result_acls(const rd_kafka_DescribeAcls_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_acl_bindings( (const rd_kafka_op_t *)result, cntp); } /**@}*/ /** * @name DeleteAcls * @{ * * * */ /** * @brief Allocate a new DeleteAcls result response with the given * \p err error code and \p errstr error message. */ const rd_kafka_DeleteAcls_result_response_t * rd_kafka_DeleteAcls_result_response_new(rd_kafka_resp_err_t err, char *errstr) { rd_kafka_DeleteAcls_result_response_t *result_response; result_response = rd_calloc(1, sizeof(*result_response)); if (err) result_response->error = rd_kafka_error_new( err, "%s", errstr ? errstr : rd_kafka_err2str(err)); /* List of int32 lists */ rd_list_init(&result_response->matching_acls, 0, rd_kafka_AclBinding_free); return result_response; } static void rd_kafka_DeleteAcls_result_response_destroy( rd_kafka_DeleteAcls_result_response_t *resp) { if (resp->error) rd_kafka_error_destroy(resp->error); rd_list_destroy(&resp->matching_acls); rd_free(resp); } static void rd_kafka_DeleteAcls_result_response_free(void *ptr) { rd_kafka_DeleteAcls_result_response_destroy( (rd_kafka_DeleteAcls_result_response_t *)ptr); } /** * @brief Get an array of rd_kafka_AclBinding_t from a DescribeAcls result. * * The returned \p rd_kafka_AclBinding_t life-time is the same as the \p result * object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_DeleteAcls_result_response_t ** rd_kafka_DeleteAcls_result_responses(const rd_kafka_DeleteAcls_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_delete_acl_result_responses( (const rd_kafka_op_t *)result, cntp); } const rd_kafka_error_t *rd_kafka_DeleteAcls_result_response_error( const rd_kafka_DeleteAcls_result_response_t *result_response) { return result_response->error; } const rd_kafka_AclBinding_t **rd_kafka_DeleteAcls_result_response_matching_acls( const rd_kafka_DeleteAcls_result_response_t *result_response, size_t *matching_acls_cntp) { *matching_acls_cntp = result_response->matching_acls.rl_cnt; return (const rd_kafka_AclBinding_t **) result_response->matching_acls.rl_elems; } /** * @brief Parse DeleteAclsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_DeleteAclsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_kafka_op_t *rko_result = NULL; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int32_t res_cnt; int i; int j; rd_kafka_buf_read_throttle_time(reply); /* #responses */ rd_kafka_buf_read_arraycnt(reply, &res_cnt, 100000); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, rd_kafka_DeleteAcls_result_response_free); for (i = 0; i < (int)res_cnt; i++) { int16_t error_code; rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER; char *errstr = NULL; const rd_kafka_DeleteAcls_result_response_t *result_response; int32_t matching_acls_cnt; rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || RD_KAFKAP_STR_LEN(&error_msg) == 0) errstr = (char *)rd_kafka_err2str(error_code); else RD_KAFKAP_STR_DUPA(&errstr, &error_msg); } result_response = rd_kafka_DeleteAcls_result_response_new(error_code, errstr); /* #maching_acls */ rd_kafka_buf_read_arraycnt(reply, &matching_acls_cnt, 100000); for (j = 0; j < (int)matching_acls_cnt; j++) { int16_t acl_error_code; int8_t res_type = RD_KAFKA_RESOURCE_UNKNOWN; rd_kafkap_str_t acl_error_msg = RD_KAFKAP_STR_INITIALIZER; rd_kafkap_str_t kres_name; rd_kafkap_str_t khost; rd_kafkap_str_t kprincipal; int8_t resource_pattern_type = RD_KAFKA_RESOURCE_PATTERN_LITERAL; int8_t operation = RD_KAFKA_ACL_OPERATION_UNKNOWN; int8_t permission_type = RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN; rd_kafka_AclBinding_t *matching_acl; char *acl_errstr = NULL; char *res_name; char *principal; char *host; rd_kafka_buf_read_i16(reply, &acl_error_code); rd_kafka_buf_read_str(reply, &acl_error_msg); if (acl_error_code) { if (RD_KAFKAP_STR_IS_NULL(&acl_error_msg) || RD_KAFKAP_STR_LEN(&acl_error_msg) == 0) acl_errstr = (char *)rd_kafka_err2str( acl_error_code); else RD_KAFKAP_STR_DUPA(&acl_errstr, &acl_error_msg); } rd_kafka_buf_read_i8(reply, &res_type); rd_kafka_buf_read_str(reply, &kres_name); if (rd_kafka_buf_ApiVersion(reply) >= 1) { rd_kafka_buf_read_i8(reply, &resource_pattern_type); } rd_kafka_buf_read_str(reply, &kprincipal); rd_kafka_buf_read_str(reply, &khost); rd_kafka_buf_read_i8(reply, &operation); rd_kafka_buf_read_i8(reply, &permission_type); RD_KAFKAP_STR_DUPA(&res_name, &kres_name); RD_KAFKAP_STR_DUPA(&principal, &kprincipal); RD_KAFKAP_STR_DUPA(&host, &khost); if (res_type <= RD_KAFKA_RESOURCE_UNKNOWN || res_type >= RD_KAFKA_RESOURCE__CNT) { rd_rkb_log(rkb, LOG_WARNING, "DELETEACLSRESPONSE", "DeleteAclsResponse returned " "unknown resource type %d", res_type); res_type = RD_KAFKA_RESOURCE_UNKNOWN; } if (resource_pattern_type <= RD_KAFKA_RESOURCE_PATTERN_UNKNOWN || resource_pattern_type >= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) { rd_rkb_log(rkb, LOG_WARNING, "DELETEACLSRESPONSE", "DeleteAclsResponse returned " "unknown resource pattern type %d", resource_pattern_type); resource_pattern_type = RD_KAFKA_RESOURCE_PATTERN_UNKNOWN; } if (operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN || operation >= RD_KAFKA_ACL_OPERATION__CNT) { rd_rkb_log(rkb, LOG_WARNING, "DELETEACLSRESPONSE", "DeleteAclsResponse returned " "unknown acl operation %d", operation); operation = RD_KAFKA_ACL_OPERATION_UNKNOWN; } if (permission_type <= RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN || permission_type >= RD_KAFKA_ACL_PERMISSION_TYPE__CNT) { rd_rkb_log(rkb, LOG_WARNING, "DELETEACLSRESPONSE", "DeleteAclsResponse returned " "unknown acl permission type %d", permission_type); permission_type = RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN; } matching_acl = rd_kafka_AclBinding_new0( res_type, res_name, resource_pattern_type, principal, host, operation, permission_type, acl_error_code, acl_errstr); rd_list_add( (rd_list_t *)&result_response->matching_acls, (void *)matching_acl); } rd_list_add(&rko_result->rko_u.admin_result.results, (void *)result_response); } *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf(errstr, errstr_size, "DeleteAcls response protocol parse failure: %s", rd_kafka_err2str(err)); return err; } void rd_kafka_DeleteAcls(rd_kafka_t *rk, rd_kafka_AclBindingFilter_t **del_acls, size_t del_acls_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; size_t i; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_DeleteAclsRequest, rd_kafka_DeleteAclsResponse_parse}; rko = rd_kafka_admin_request_op_new(rk, RD_KAFKA_OP_DELETEACLS, RD_KAFKA_EVENT_DELETEACLS_RESULT, &cbs, options, rkqu->rkqu_q); rd_list_init(&rko->rko_u.admin_request.args, (int)del_acls_cnt, rd_kafka_AclBinding_free); for (i = 0; i < del_acls_cnt; i++) rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_AclBindingFilter_copy(del_acls[i])); rd_kafka_q_enq(rk->rk_ops, rko); } /**@}*/ /** * @name Alter consumer group offsets (committed offsets) * @{ * * * * */ rd_kafka_AlterConsumerGroupOffsets_t *rd_kafka_AlterConsumerGroupOffsets_new( const char *group_id, const rd_kafka_topic_partition_list_t *partitions) { rd_assert(group_id && partitions); size_t tsize = strlen(group_id) + 1; rd_kafka_AlterConsumerGroupOffsets_t *alter_grpoffsets; /* Single allocation */ alter_grpoffsets = rd_malloc(sizeof(*alter_grpoffsets) + tsize); alter_grpoffsets->group_id = alter_grpoffsets->data; memcpy(alter_grpoffsets->group_id, group_id, tsize); alter_grpoffsets->partitions = rd_kafka_topic_partition_list_copy(partitions); return alter_grpoffsets; } void rd_kafka_AlterConsumerGroupOffsets_destroy( rd_kafka_AlterConsumerGroupOffsets_t *alter_grpoffsets) { rd_kafka_topic_partition_list_destroy(alter_grpoffsets->partitions); rd_free(alter_grpoffsets); } static void rd_kafka_AlterConsumerGroupOffsets_free(void *ptr) { rd_kafka_AlterConsumerGroupOffsets_destroy(ptr); } void rd_kafka_AlterConsumerGroupOffsets_destroy_array( rd_kafka_AlterConsumerGroupOffsets_t **alter_grpoffsets, size_t alter_grpoffsets_cnt) { size_t i; for (i = 0; i < alter_grpoffsets_cnt; i++) rd_kafka_AlterConsumerGroupOffsets_destroy(alter_grpoffsets[i]); } /** * @brief Allocate a new AlterGroup and make a copy of \p src */ static rd_kafka_AlterConsumerGroupOffsets_t * rd_kafka_AlterConsumerGroupOffsets_copy( const rd_kafka_AlterConsumerGroupOffsets_t *src) { return rd_kafka_AlterConsumerGroupOffsets_new(src->group_id, src->partitions); } /** * @brief Send a OffsetCommitRequest to \p rkb with the partitions * in alter_grpoffsets (AlterConsumerGroupOffsets_t*) using * \p options. * */ static rd_kafka_resp_err_t rd_kafka_AlterConsumerGroupOffsetsRequest( rd_kafka_broker_t *rkb, /* (rd_kafka_AlterConsumerGroupOffsets_t*) */ const rd_list_t *alter_grpoffsets, rd_kafka_AdminOptions_t *options, char *errstr, size_t errstr_size, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque) { const rd_kafka_AlterConsumerGroupOffsets_t *grpoffsets = rd_list_elem(alter_grpoffsets, 0); rd_assert(rd_list_cnt(alter_grpoffsets) == 1); rd_kafka_topic_partition_list_t *offsets = grpoffsets->partitions; rd_kafka_consumer_group_metadata_t *cgmetadata = rd_kafka_consumer_group_metadata_new(grpoffsets->group_id); int ret = rd_kafka_OffsetCommitRequest( rkb, cgmetadata, offsets, replyq, resp_cb, opaque, "rd_kafka_AlterConsumerGroupOffsetsRequest"); rd_kafka_consumer_group_metadata_destroy(cgmetadata); if (ret == 0) { rd_snprintf(errstr, errstr_size, "At least one topic-partition offset must " "be >= 0"); return RD_KAFKA_RESP_ERR__NO_OFFSET; } return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Parse OffsetCommitResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_AlterConsumerGroupOffsetsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { rd_kafka_t *rk; rd_kafka_broker_t *rkb; rd_kafka_op_t *rko_result; rd_kafka_topic_partition_list_t *partitions = NULL; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; const rd_kafka_AlterConsumerGroupOffsets_t *alter_grpoffsets = rd_list_elem(&rko_req->rko_u.admin_request.args, 0); partitions = rd_kafka_topic_partition_list_copy(alter_grpoffsets->partitions); rk = rko_req->rko_rk; rkb = reply->rkbuf_rkb; err = rd_kafka_handle_OffsetCommit(rk, rkb, err, reply, NULL, partitions, rd_true); /* Create result op and group_result_t */ rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, 1, rd_kafka_group_result_free); rd_list_add(&rko_result->rko_u.admin_result.results, rd_kafka_group_result_new(alter_grpoffsets->group_id, -1, partitions, NULL)); rd_kafka_topic_partition_list_destroy(partitions); *rko_resultp = rko_result; if (reply->rkbuf_err) rd_snprintf( errstr, errstr_size, "AlterConsumerGroupOffset response parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } void rd_kafka_AlterConsumerGroupOffsets( rd_kafka_t *rk, rd_kafka_AlterConsumerGroupOffsets_t **alter_grpoffsets, size_t alter_grpoffsets_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { int i; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_AlterConsumerGroupOffsetsRequest, rd_kafka_AlterConsumerGroupOffsetsResponse_parse, }; rd_kafka_op_t *rko; rd_kafka_topic_partition_list_t *copied_offsets; rd_assert(rkqu); rko = rd_kafka_admin_request_op_new( rk, RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS, RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT, &cbs, options, rkqu->rkqu_q); if (alter_grpoffsets_cnt != 1) { /* For simplicity we only support one single group for now */ rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, "Exactly one " "AlterConsumerGroupOffsets must " "be passed"); goto fail; } if (alter_grpoffsets[0]->partitions->cnt == 0) { rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, "Non-empty topic partition list " "must be present"); goto fail; } for (i = 0; i < alter_grpoffsets[0]->partitions->cnt; i++) { if (alter_grpoffsets[0]->partitions->elems[i].offset < 0) { rd_kafka_admin_result_fail( rko, RD_KAFKA_RESP_ERR__INVALID_ARG, "All topic-partition offsets " "must be >= 0"); goto fail; } } /* TODO: add group id duplication check if in future more than one * AlterConsumerGroupOffsets can be passed */ /* Copy offsets list for checking duplicated */ copied_offsets = rd_kafka_topic_partition_list_copy(alter_grpoffsets[0]->partitions); if (rd_kafka_topic_partition_list_has_duplicates( copied_offsets, rd_false /*check partition*/)) { rd_kafka_topic_partition_list_destroy(copied_offsets); rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, "Duplicate partitions not allowed"); goto fail; } rd_kafka_topic_partition_list_destroy(copied_offsets); rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_COORDINATOR; rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; rko->rko_u.admin_request.coordkey = rd_strdup(alter_grpoffsets[0]->group_id); /* Store copy of group on request so the group name can be reached * from the response parser. */ rd_list_init(&rko->rko_u.admin_request.args, 1, rd_kafka_AlterConsumerGroupOffsets_free); rd_list_add(&rko->rko_u.admin_request.args, (void *)rd_kafka_AlterConsumerGroupOffsets_copy( alter_grpoffsets[0])); rd_kafka_q_enq(rk->rk_ops, rko); return; fail: rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); } /** * @brief Get an array of group results from a AlterGroups result. * * The returned \p groups life-time is the same as the \p result object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_group_result_t ** rd_kafka_AlterConsumerGroupOffsets_result_groups( const rd_kafka_AlterConsumerGroupOffsets_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, cntp); } /**@}*/ /**@}*/ /** * @name List consumer group offsets (committed offsets) * @{ * * * * */ rd_kafka_ListConsumerGroupOffsets_t *rd_kafka_ListConsumerGroupOffsets_new( const char *group_id, const rd_kafka_topic_partition_list_t *partitions) { size_t tsize = strlen(group_id) + 1; rd_kafka_ListConsumerGroupOffsets_t *list_grpoffsets; rd_assert(group_id); /* Single allocation */ list_grpoffsets = rd_calloc(1, sizeof(*list_grpoffsets) + tsize); list_grpoffsets->group_id = list_grpoffsets->data; memcpy(list_grpoffsets->group_id, group_id, tsize); if (partitions) { list_grpoffsets->partitions = rd_kafka_topic_partition_list_copy(partitions); } return list_grpoffsets; } void rd_kafka_ListConsumerGroupOffsets_destroy( rd_kafka_ListConsumerGroupOffsets_t *list_grpoffsets) { if (list_grpoffsets->partitions != NULL) { rd_kafka_topic_partition_list_destroy( list_grpoffsets->partitions); } rd_free(list_grpoffsets); } static void rd_kafka_ListConsumerGroupOffsets_free(void *ptr) { rd_kafka_ListConsumerGroupOffsets_destroy(ptr); } void rd_kafka_ListConsumerGroupOffsets_destroy_array( rd_kafka_ListConsumerGroupOffsets_t **list_grpoffsets, size_t list_grpoffsets_cnt) { size_t i; for (i = 0; i < list_grpoffsets_cnt; i++) rd_kafka_ListConsumerGroupOffsets_destroy(list_grpoffsets[i]); } /** * @brief Allocate a new ListGroup and make a copy of \p src */ static rd_kafka_ListConsumerGroupOffsets_t * rd_kafka_ListConsumerGroupOffsets_copy( const rd_kafka_ListConsumerGroupOffsets_t *src) { return rd_kafka_ListConsumerGroupOffsets_new(src->group_id, src->partitions); } /** * @brief Send a OffsetFetchRequest to \p rkb with the partitions * in list_grpoffsets (ListConsumerGroupOffsets_t*) using * \p options. * */ static rd_kafka_resp_err_t rd_kafka_ListConsumerGroupOffsetsRequest( rd_kafka_broker_t *rkb, /* (rd_kafka_ListConsumerGroupOffsets_t*) */ const rd_list_t *list_grpoffsets, rd_kafka_AdminOptions_t *options, char *errstr, size_t errstr_size, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque) { int op_timeout; rd_bool_t require_stable_offsets; const rd_kafka_ListConsumerGroupOffsets_t *grpoffsets = rd_list_elem(list_grpoffsets, 0); rd_assert(rd_list_cnt(list_grpoffsets) == 1); op_timeout = rd_kafka_confval_get_int(&options->request_timeout); require_stable_offsets = rd_kafka_confval_get_int(&options->require_stable_offsets); rd_kafka_OffsetFetchRequest( rkb, grpoffsets->group_id, grpoffsets->partitions, require_stable_offsets, op_timeout, replyq, resp_cb, opaque); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Parse OffsetFetchResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_ListConsumerGroupOffsetsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const rd_kafka_ListConsumerGroupOffsets_t *list_grpoffsets = rd_list_elem(&rko_req->rko_u.admin_request.args, 0); rd_kafka_t *rk; rd_kafka_broker_t *rkb; rd_kafka_topic_partition_list_t *offsets = NULL; rd_kafka_op_t *rko_result; rd_kafka_resp_err_t err; rk = rko_req->rko_rk; rkb = reply->rkbuf_rkb; err = rd_kafka_handle_OffsetFetch(rk, rkb, RD_KAFKA_RESP_ERR_NO_ERROR, reply, NULL, &offsets, rd_false, rd_true, rd_false); if (unlikely(err != RD_KAFKA_RESP_ERR_NO_ERROR)) { reply->rkbuf_err = err; goto err; } /* Create result op and group_result_t */ rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, 1, rd_kafka_group_result_free); rd_list_add(&rko_result->rko_u.admin_result.results, rd_kafka_group_result_new(list_grpoffsets->group_id, -1, offsets, NULL)); if (likely(offsets != NULL)) rd_kafka_topic_partition_list_destroy(offsets); *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err: if (likely(offsets != NULL)) rd_kafka_topic_partition_list_destroy(offsets); rd_snprintf(errstr, errstr_size, "ListConsumerGroupOffsetsResponse response failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } void rd_kafka_ListConsumerGroupOffsets( rd_kafka_t *rk, rd_kafka_ListConsumerGroupOffsets_t **list_grpoffsets, size_t list_grpoffsets_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_ListConsumerGroupOffsetsRequest, rd_kafka_ListConsumerGroupOffsetsResponse_parse, }; rd_kafka_op_t *rko; rd_kafka_topic_partition_list_t *copied_offsets; rd_assert(rkqu); rko = rd_kafka_admin_request_op_new( rk, RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS, RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT, &cbs, options, rkqu->rkqu_q); if (list_grpoffsets_cnt != 1) { /* For simplicity we only support one single group for now */ rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, "Exactly one " "ListConsumerGroupOffsets must " "be passed"); goto fail; } if (list_grpoffsets[0]->partitions != NULL && list_grpoffsets[0]->partitions->cnt == 0) { /* Either pass NULL for all the partitions or a non-empty list */ rd_kafka_admin_result_fail( rko, RD_KAFKA_RESP_ERR__INVALID_ARG, "NULL or " "non-empty topic partition list must " "be passed"); goto fail; } /* TODO: add group id duplication check when implementing KIP-709 */ if (list_grpoffsets[0]->partitions != NULL) { /* Copy offsets list for checking duplicated */ copied_offsets = rd_kafka_topic_partition_list_copy( list_grpoffsets[0]->partitions); if (rd_kafka_topic_partition_list_has_duplicates( copied_offsets, rd_false /*check partition*/)) { rd_kafka_topic_partition_list_destroy(copied_offsets); rd_kafka_admin_result_fail( rko, RD_KAFKA_RESP_ERR__INVALID_ARG, "Duplicate partitions not allowed"); goto fail; } rd_kafka_topic_partition_list_destroy(copied_offsets); } rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_COORDINATOR; rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; rko->rko_u.admin_request.coordkey = rd_strdup(list_grpoffsets[0]->group_id); /* Store copy of group on request so the group name can be reached * from the response parser. */ rd_list_init(&rko->rko_u.admin_request.args, 1, rd_kafka_ListConsumerGroupOffsets_free); rd_list_add(&rko->rko_u.admin_request.args, rd_kafka_ListConsumerGroupOffsets_copy(list_grpoffsets[0])); rd_kafka_q_enq(rk->rk_ops, rko); return; fail: rd_kafka_admin_common_worker_destroy(rk, rko, rd_true /*destroy*/); } /** * @brief Get an array of group results from a ListConsumerGroups result. * * The returned \p groups life-time is the same as the \p result object. * @param cntp is updated to the number of elements in the array. */ const rd_kafka_group_result_t **rd_kafka_ListConsumerGroupOffsets_result_groups( const rd_kafka_ListConsumerGroupOffsets_result_t *result, size_t *cntp) { return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, cntp); } /**@}*/ /** * @name List consumer groups * @{ * * * * */ #define CONSUMER_PROTOCOL_TYPE "consumer" /** * @brief Create a new ConsumerGroupListing object. * * @param group_id The group id. * @param is_simple_consumer_group Is the group simple? * @param state Group state. */ static rd_kafka_ConsumerGroupListing_t * rd_kafka_ConsumerGroupListing_new(const char *group_id, rd_bool_t is_simple_consumer_group, rd_kafka_consumer_group_state_t state) { rd_kafka_ConsumerGroupListing_t *grplist; grplist = rd_calloc(1, sizeof(*grplist)); grplist->group_id = rd_strdup(group_id); grplist->is_simple_consumer_group = is_simple_consumer_group; grplist->state = state; return grplist; } /** * @brief Copy \p grplist ConsumerGroupListing. * * @param grplist The group listing to copy. * @return A new allocated copy of the passed ConsumerGroupListing. */ static rd_kafka_ConsumerGroupListing_t *rd_kafka_ConsumerGroupListing_copy( const rd_kafka_ConsumerGroupListing_t *grplist) { return rd_kafka_ConsumerGroupListing_new( grplist->group_id, grplist->is_simple_consumer_group, grplist->state); } /** * @brief Same as rd_kafka_ConsumerGroupListing_copy() but suitable for * rd_list_copy(). The \p opaque is ignored. */ static void *rd_kafka_ConsumerGroupListing_copy_opaque(const void *grplist, void *opaque) { return rd_kafka_ConsumerGroupListing_copy(grplist); } static void rd_kafka_ConsumerGroupListing_destroy( rd_kafka_ConsumerGroupListing_t *grplist) { RD_IF_FREE(grplist->group_id, rd_free); rd_free(grplist); } static void rd_kafka_ConsumerGroupListing_free(void *ptr) { rd_kafka_ConsumerGroupListing_destroy(ptr); } const char *rd_kafka_ConsumerGroupListing_group_id( const rd_kafka_ConsumerGroupListing_t *grplist) { return grplist->group_id; } int rd_kafka_ConsumerGroupListing_is_simple_consumer_group( const rd_kafka_ConsumerGroupListing_t *grplist) { return grplist->is_simple_consumer_group; } rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupListing_state( const rd_kafka_ConsumerGroupListing_t *grplist) { return grplist->state; } /** * @brief Create a new ListConsumerGroupsResult object. * * @param valid * @param errors */ static rd_kafka_ListConsumerGroupsResult_t * rd_kafka_ListConsumerGroupsResult_new(const rd_list_t *valid, const rd_list_t *errors) { rd_kafka_ListConsumerGroupsResult_t *res; res = rd_calloc(1, sizeof(*res)); rd_list_init_copy(&res->valid, valid); rd_list_copy_to(&res->valid, valid, rd_kafka_ConsumerGroupListing_copy_opaque, NULL); rd_list_init_copy(&res->errors, errors); rd_list_copy_to(&res->errors, errors, rd_kafka_error_copy_opaque, NULL); return res; } static void rd_kafka_ListConsumerGroupsResult_destroy( rd_kafka_ListConsumerGroupsResult_t *res) { rd_list_destroy(&res->valid); rd_list_destroy(&res->errors); rd_free(res); } static void rd_kafka_ListConsumerGroupsResult_free(void *ptr) { rd_kafka_ListConsumerGroupsResult_destroy(ptr); } /** * @brief Copy the passed ListConsumerGroupsResult. * * @param res the ListConsumerGroupsResult to copy * @return a newly allocated ListConsumerGroupsResult object. * * @sa Release the object with rd_kafka_ListConsumerGroupsResult_destroy(). */ static rd_kafka_ListConsumerGroupsResult_t * rd_kafka_ListConsumerGroupsResult_copy( const rd_kafka_ListConsumerGroupsResult_t *res) { return rd_kafka_ListConsumerGroupsResult_new(&res->valid, &res->errors); } /** * @brief Same as rd_kafka_ListConsumerGroupsResult_copy() but suitable for * rd_list_copy(). The \p opaque is ignored. */ static void *rd_kafka_ListConsumerGroupsResult_copy_opaque(const void *list, void *opaque) { return rd_kafka_ListConsumerGroupsResult_copy(list); } /** * @brief Send ListConsumerGroupsRequest. Admin worker compatible callback. */ static rd_kafka_resp_err_t rd_kafka_admin_ListConsumerGroupsRequest(rd_kafka_broker_t *rkb, const rd_list_t *groups /*(char*)*/, rd_kafka_AdminOptions_t *options, char *errstr, size_t errstr_size, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque) { int i; rd_kafka_resp_err_t err; rd_kafka_error_t *error; const char **states_str = NULL; int states_str_cnt = 0; rd_list_t *states = rd_kafka_confval_get_ptr(&options->match_consumer_group_states); /* Prepare list_options */ if (states && rd_list_cnt(states) > 0) { states_str_cnt = rd_list_cnt(states); states_str = rd_calloc(states_str_cnt, sizeof(*states_str)); for (i = 0; i < states_str_cnt; i++) { states_str[i] = rd_kafka_consumer_group_state_name( rd_list_get_int32(states, i)); } } error = rd_kafka_ListGroupsRequest(rkb, -1, states_str, states_str_cnt, replyq, resp_cb, opaque); if (states_str) { rd_free(states_str); } if (error) { rd_snprintf(errstr, errstr_size, "%s", rd_kafka_error_string(error)); err = rd_kafka_error_code(error); rd_kafka_error_destroy(error); return err; } return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Parse ListConsumerGroupsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_ListConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; int i, cnt; int16_t error_code, api_version; rd_kafka_op_t *rko_result = NULL; rd_kafka_error_t *error = NULL; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_list_t valid, errors; rd_kafka_ListConsumerGroupsResult_t *list_result; char *group_id = NULL, *group_state = NULL, *proto_type = NULL; api_version = rd_kafka_buf_ApiVersion(reply); if (api_version >= 1) { rd_kafka_buf_read_throttle_time(reply); } rd_kafka_buf_read_i16(reply, &error_code); if (error_code) { error = rd_kafka_error_new(error_code, "Broker [%d" "] " "ListConsumerGroups: %s", rd_kafka_broker_id(rkb), rd_kafka_err2str(error_code)); } rd_kafka_buf_read_arraycnt(reply, &cnt, RD_KAFKAP_GROUPS_MAX); rd_list_init(&valid, cnt, rd_kafka_ConsumerGroupListing_free); rd_list_init(&errors, 8, rd_free); if (error) rd_list_add(&errors, error); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, 1, rd_kafka_ListConsumerGroupsResult_free); for (i = 0; i < cnt; i++) { rd_kafkap_str_t GroupId, ProtocolType, GroupState = RD_ZERO_INIT; rd_kafka_ConsumerGroupListing_t *group_listing; rd_bool_t is_simple_consumer_group, is_consumer_protocol_type; rd_kafka_consumer_group_state_t state = RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN; rd_kafka_buf_read_str(reply, &GroupId); rd_kafka_buf_read_str(reply, &ProtocolType); if (api_version >= 4) { rd_kafka_buf_read_str(reply, &GroupState); } rd_kafka_buf_skip_tags(reply); group_id = RD_KAFKAP_STR_DUP(&GroupId); proto_type = RD_KAFKAP_STR_DUP(&ProtocolType); if (api_version >= 4) { group_state = RD_KAFKAP_STR_DUP(&GroupState); state = rd_kafka_consumer_group_state_code(group_state); } is_simple_consumer_group = *proto_type == '\0'; is_consumer_protocol_type = !strcmp(proto_type, CONSUMER_PROTOCOL_TYPE); if (is_simple_consumer_group || is_consumer_protocol_type) { group_listing = rd_kafka_ConsumerGroupListing_new( group_id, is_simple_consumer_group, state); rd_list_add(&valid, group_listing); } rd_free(group_id); rd_free(group_state); rd_free(proto_type); group_id = NULL; group_state = NULL; proto_type = NULL; } rd_kafka_buf_skip_tags(reply); err_parse: if (group_id) rd_free(group_id); if (group_state) rd_free(group_state); if (proto_type) rd_free(proto_type); if (reply->rkbuf_err) { error_code = reply->rkbuf_err; error = rd_kafka_error_new( error_code, "Broker [%d" "] " "ListConsumerGroups response protocol parse failure: %s", rd_kafka_broker_id(rkb), rd_kafka_err2str(error_code)); rd_list_add(&errors, error); } list_result = rd_kafka_ListConsumerGroupsResult_new(&valid, &errors); rd_list_add(&rko_result->rko_u.admin_result.results, list_result); *rko_resultp = rko_result; rd_list_destroy(&valid); rd_list_destroy(&errors); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** @brief Merge the ListConsumerGroups response from a single broker * into the user response list. */ static void rd_kafka_ListConsumerGroups_response_merge(rd_kafka_op_t *rko_fanout, const rd_kafka_op_t *rko_partial) { int cnt; rd_kafka_ListConsumerGroupsResult_t *res = NULL; rd_kafka_ListConsumerGroupsResult_t *newres; rd_list_t new_valid, new_errors; rd_assert(rko_partial->rko_evtype == RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT); cnt = rd_list_cnt(&rko_fanout->rko_u.admin_request.fanout.results); if (cnt) { res = rd_list_elem( &rko_fanout->rko_u.admin_request.fanout.results, 0); } else { rd_list_init(&new_valid, 0, rd_kafka_ConsumerGroupListing_free); rd_list_init(&new_errors, 0, rd_free); res = rd_kafka_ListConsumerGroupsResult_new(&new_valid, &new_errors); rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, 0, res); rd_list_destroy(&new_valid); rd_list_destroy(&new_errors); } if (!rko_partial->rko_err) { int new_valid_count, new_errors_count; const rd_list_t *new_valid_list, *new_errors_list; /* Read the partial result and merge the valid groups * and the errors into the fanout parent result. */ newres = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); rd_assert(newres); new_valid_count = rd_list_cnt(&newres->valid); new_errors_count = rd_list_cnt(&newres->errors); if (new_valid_count) { new_valid_list = &newres->valid; rd_list_grow(&res->valid, new_valid_count); rd_list_copy_to( &res->valid, new_valid_list, rd_kafka_ConsumerGroupListing_copy_opaque, NULL); } if (new_errors_count) { new_errors_list = &newres->errors; rd_list_grow(&res->errors, new_errors_count); rd_list_copy_to(&res->errors, new_errors_list, rd_kafka_error_copy_opaque, NULL); } } else { /* Op errored, e.g. timeout */ rd_list_add(&res->errors, rd_kafka_error_new(rko_partial->rko_err, NULL)); } } void rd_kafka_ListConsumerGroups(rd_kafka_t *rk, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_admin_ListConsumerGroupsRequest, rd_kafka_ListConsumerGroupsResponse_parse}; static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { rd_kafka_ListConsumerGroups_response_merge, rd_kafka_ListConsumerGroupsResult_copy_opaque, }; rko = rd_kafka_admin_request_op_target_all_new( rk, RD_KAFKA_OP_LISTCONSUMERGROUPS, RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT, &cbs, &fanout_cbs, rd_kafka_ListConsumerGroupsResult_free, options, rkqu->rkqu_q); rd_kafka_q_enq(rk->rk_ops, rko); } const rd_kafka_ConsumerGroupListing_t ** rd_kafka_ListConsumerGroups_result_valid( const rd_kafka_ListConsumerGroups_result_t *result, size_t *cntp) { int list_result_cnt; const rd_kafka_ListConsumerGroupsResult_t *list_result; const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_LISTCONSUMERGROUPS); list_result_cnt = rd_list_cnt(&rko->rko_u.admin_result.results); rd_assert(list_result_cnt == 1); list_result = rd_list_elem(&rko->rko_u.admin_result.results, 0); *cntp = rd_list_cnt(&list_result->valid); return (const rd_kafka_ConsumerGroupListing_t **) list_result->valid.rl_elems; } const rd_kafka_error_t **rd_kafka_ListConsumerGroups_result_errors( const rd_kafka_ListConsumerGroups_result_t *result, size_t *cntp) { int list_result_cnt, error_cnt; const rd_kafka_ListConsumerGroupsResult_t *list_result; const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_LISTCONSUMERGROUPS); list_result_cnt = rd_list_cnt(&rko->rko_u.admin_result.results); rd_assert(list_result_cnt == 1); list_result = rko->rko_u.admin_result.results.rl_elems[0]; error_cnt = rd_list_cnt(&list_result->errors); if (error_cnt == 0) { *cntp = 0; return NULL; } *cntp = error_cnt; return (const rd_kafka_error_t **)list_result->errors.rl_elems; } /**@}*/ /** * @name Describe consumer groups * @{ * * * * */ /** * @brief Create a new MemberDescription object. This object is used for * creating a ConsumerGroupDescription. * * @param client_id The client id. * @param consumer_id The consumer id (or member id). * @param group_instance_id (optional) The group instance id * for static membership. * @param host The consumer host. * @param assignment The member's assigned partitions, or NULL if none. * * @return A new allocated MemberDescription object. * Use rd_kafka_MemberDescription_destroy() to free when done. */ static rd_kafka_MemberDescription_t *rd_kafka_MemberDescription_new( const char *client_id, const char *consumer_id, const char *group_instance_id, const char *host, const rd_kafka_topic_partition_list_t *assignment) { rd_kafka_MemberDescription_t *member; member = rd_calloc(1, sizeof(*member)); member->client_id = rd_strdup(client_id); member->consumer_id = rd_strdup(consumer_id); if (group_instance_id) member->group_instance_id = rd_strdup(group_instance_id); member->host = rd_strdup(host); if (assignment) member->assignment.partitions = rd_kafka_topic_partition_list_copy(assignment); else member->assignment.partitions = rd_kafka_topic_partition_list_new(0); return member; } /** * @brief Allocate a new MemberDescription, copy of \p src * and return it. * * @param src The MemberDescription to copy. * @return A new allocated MemberDescription object, * Use rd_kafka_MemberDescription_destroy() to free when done. */ static rd_kafka_MemberDescription_t * rd_kafka_MemberDescription_copy(const rd_kafka_MemberDescription_t *src) { return rd_kafka_MemberDescription_new(src->client_id, src->consumer_id, src->group_instance_id, src->host, src->assignment.partitions); } /** * @brief MemberDescription copy, compatible with rd_list_copy_to. * * @param elem The MemberDescription to copy- * @param opaque Not used. */ static void *rd_kafka_MemberDescription_list_copy(const void *elem, void *opaque) { return rd_kafka_MemberDescription_copy(elem); } static void rd_kafka_MemberDescription_destroy(rd_kafka_MemberDescription_t *member) { rd_free(member->client_id); rd_free(member->consumer_id); rd_free(member->host); if (member->group_instance_id != NULL) rd_free(member->group_instance_id); if (member->assignment.partitions) rd_kafka_topic_partition_list_destroy( member->assignment.partitions); rd_free(member); } static void rd_kafka_MemberDescription_free(void *member) { rd_kafka_MemberDescription_destroy(member); } const char *rd_kafka_MemberDescription_client_id( const rd_kafka_MemberDescription_t *member) { return member->client_id; } const char *rd_kafka_MemberDescription_group_instance_id( const rd_kafka_MemberDescription_t *member) { return member->group_instance_id; } const char *rd_kafka_MemberDescription_consumer_id( const rd_kafka_MemberDescription_t *member) { return member->consumer_id; } const char * rd_kafka_MemberDescription_host(const rd_kafka_MemberDescription_t *member) { return member->host; } const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_assignment( const rd_kafka_MemberDescription_t *member) { return &member->assignment; } const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions( const rd_kafka_MemberAssignment_t *assignment) { return assignment->partitions; } /** * @brief Create a new ConsumerGroupDescription object. * * @param group_id The group id. * @param is_simple_consumer_group Is the group simple? * @param members List of members (rd_kafka_MemberDescription_t) of this * group. * @param partition_assignor (optional) Chosen assignor. * @param state Group state. * @param coordinator (optional) Group coordinator. * @param error (optional) Error received for this group. * @return A new allocated ConsumerGroupDescription object. * Use rd_kafka_ConsumerGroupDescription_destroy() to free when done. */ static rd_kafka_ConsumerGroupDescription_t * rd_kafka_ConsumerGroupDescription_new(const char *group_id, rd_bool_t is_simple_consumer_group, const rd_list_t *members, const char *partition_assignor, rd_kafka_consumer_group_state_t state, const rd_kafka_Node_t *coordinator, rd_kafka_error_t *error) { rd_kafka_ConsumerGroupDescription_t *grpdesc; grpdesc = rd_calloc(1, sizeof(*grpdesc)); grpdesc->group_id = rd_strdup(group_id); grpdesc->is_simple_consumer_group = is_simple_consumer_group; if (members == NULL) { rd_list_init(&grpdesc->members, 0, rd_kafka_MemberDescription_free); } else { rd_list_init_copy(&grpdesc->members, members); rd_list_copy_to(&grpdesc->members, members, rd_kafka_MemberDescription_list_copy, NULL); } grpdesc->partition_assignor = !partition_assignor ? (char *)partition_assignor : rd_strdup(partition_assignor); grpdesc->state = state; if (coordinator != NULL) grpdesc->coordinator = rd_kafka_Node_copy(coordinator); grpdesc->error = error != NULL ? rd_kafka_error_new(rd_kafka_error_code(error), "%s", rd_kafka_error_string(error)) : NULL; return grpdesc; } /** * @brief New instance of ConsumerGroupDescription from an error. * * @param group_id The group id. * @param error The error. * @return A new allocated ConsumerGroupDescription with the passed error. */ static rd_kafka_ConsumerGroupDescription_t * rd_kafka_ConsumerGroupDescription_new_error(const char *group_id, rd_kafka_error_t *error) { return rd_kafka_ConsumerGroupDescription_new( group_id, rd_false, NULL, NULL, RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN, NULL, error); } /** * @brief Copy \p desc ConsumerGroupDescription. * * @param desc The group description to copy. * @return A new allocated copy of the passed ConsumerGroupDescription. */ static rd_kafka_ConsumerGroupDescription_t * rd_kafka_ConsumerGroupDescription_copy( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { return rd_kafka_ConsumerGroupDescription_new( grpdesc->group_id, grpdesc->is_simple_consumer_group, &grpdesc->members, grpdesc->partition_assignor, grpdesc->state, grpdesc->coordinator, grpdesc->error); } /** * @brief Same as rd_kafka_ConsumerGroupDescription_copy() but suitable for * rd_list_copy(). The \p opaque is ignored. */ static void *rd_kafka_ConsumerGroupDescription_copy_opaque(const void *grpdesc, void *opaque) { return rd_kafka_ConsumerGroupDescription_copy(grpdesc); } static void rd_kafka_ConsumerGroupDescription_destroy( rd_kafka_ConsumerGroupDescription_t *grpdesc) { if (likely(grpdesc->group_id != NULL)) rd_free(grpdesc->group_id); rd_list_destroy(&grpdesc->members); if (likely(grpdesc->partition_assignor != NULL)) rd_free(grpdesc->partition_assignor); if (likely(grpdesc->error != NULL)) rd_kafka_error_destroy(grpdesc->error); if (grpdesc->coordinator) rd_kafka_Node_destroy(grpdesc->coordinator); rd_free(grpdesc); } static void rd_kafka_ConsumerGroupDescription_free(void *ptr) { rd_kafka_ConsumerGroupDescription_destroy(ptr); } const char *rd_kafka_ConsumerGroupDescription_group_id( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { return grpdesc->group_id; } const rd_kafka_error_t *rd_kafka_ConsumerGroupDescription_error( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { return grpdesc->error; } int rd_kafka_ConsumerGroupDescription_is_simple_consumer_group( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { return grpdesc->is_simple_consumer_group; } const char *rd_kafka_ConsumerGroupDescription_partition_assignor( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { return grpdesc->partition_assignor; } rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupDescription_state( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { return grpdesc->state; } const rd_kafka_Node_t *rd_kafka_ConsumerGroupDescription_coordinator( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { return grpdesc->coordinator; } size_t rd_kafka_ConsumerGroupDescription_member_count( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { return rd_list_cnt(&grpdesc->members); } const rd_kafka_MemberDescription_t *rd_kafka_ConsumerGroupDescription_member( const rd_kafka_ConsumerGroupDescription_t *grpdesc, size_t idx) { return (rd_kafka_MemberDescription_t *)rd_list_elem(&grpdesc->members, idx); } /** * @brief Group arguments comparator for DescribeConsumerGroups args */ static int rd_kafka_DescribeConsumerGroups_cmp(const void *a, const void *b) { return strcmp(a, b); } /** @brief Merge the DescribeConsumerGroups response from a single broker * into the user response list. */ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_kafka_op_t *rko_fanout, const rd_kafka_op_t *rko_partial) { rd_kafka_ConsumerGroupDescription_t *groupres = NULL; rd_kafka_ConsumerGroupDescription_t *newgroupres; const char *grp = rko_partial->rko_u.admin_result.opaque; int orig_pos; rd_assert(rko_partial->rko_evtype == RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT); if (!rko_partial->rko_err) { /* Proper results. * We only send one group per request, make sure it matches */ groupres = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); rd_assert(groupres); rd_assert(!strcmp(groupres->group_id, grp)); newgroupres = rd_kafka_ConsumerGroupDescription_copy(groupres); } else { /* Op errored, e.g. timeout */ rd_kafka_error_t *error = rd_kafka_error_new(rko_partial->rko_err, NULL); newgroupres = rd_kafka_ConsumerGroupDescription_new_error(grp, error); rd_kafka_error_destroy(error); } /* As a convenience to the application we insert group result * in the same order as they were requested. */ orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, rd_kafka_DescribeConsumerGroups_cmp); rd_assert(orig_pos != -1); /* Make sure result is not already set */ rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos) == NULL); rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos, newgroupres); } /** * @brief Construct and send DescribeConsumerGroupsRequest to \p rkb * with the groups (char *) in \p groups, using * \p options. * * The response (unparsed) will be enqueued on \p replyq * for handling by \p resp_cb (with \p opaque passed). * * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for * transmission, otherwise an error code and errstr will be * updated with a human readable error string. */ static rd_kafka_resp_err_t rd_kafka_admin_DescribeConsumerGroupsRequest( rd_kafka_broker_t *rkb, const rd_list_t *groups /*(char*)*/, rd_kafka_AdminOptions_t *options, char *errstr, size_t errstr_size, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque) { int i; char *group; rd_kafka_resp_err_t err; int groups_cnt = rd_list_cnt(groups); rd_kafka_error_t *error = NULL; char **groups_arr = rd_calloc(groups_cnt, sizeof(*groups_arr)); RD_LIST_FOREACH(group, groups, i) { groups_arr[i] = rd_list_elem(groups, i); } error = rd_kafka_DescribeGroupsRequest(rkb, -1, groups_arr, groups_cnt, replyq, resp_cb, opaque); rd_free(groups_arr); if (error) { rd_snprintf(errstr, errstr_size, "%s", rd_kafka_error_string(error)); err = rd_kafka_error_code(error); rd_kafka_error_destroy(error); return err; } return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Parse DescribeConsumerGroupsResponse and create ADMIN_RESULT op. */ static rd_kafka_resp_err_t rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_op_t **rko_resultp, rd_kafka_buf_t *reply, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; int nodeid; uint16_t port; int16_t api_version; int32_t cnt; rd_kafka_op_t *rko_result = NULL; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_kafka_Node_t *node = NULL; rd_kafka_error_t *error = NULL; char *group_id = NULL, *group_state = NULL, *proto_type = NULL, *proto = NULL, *host = NULL; api_version = rd_kafka_buf_ApiVersion(reply); if (api_version >= 1) { rd_kafka_buf_read_throttle_time(reply); } rd_kafka_buf_read_arraycnt(reply, &cnt, 100000); rko_result = rd_kafka_admin_result_new(rko_req); rd_list_init(&rko_result->rko_u.admin_result.results, cnt, rd_kafka_ConsumerGroupDescription_free); rd_kafka_broker_lock(rkb); nodeid = rkb->rkb_nodeid; host = rd_strdup(rkb->rkb_origname); port = rkb->rkb_port; rd_kafka_broker_unlock(rkb); node = rd_kafka_Node_new(nodeid, host, port, NULL); while (cnt-- > 0) { int16_t error_code; rd_kafkap_str_t GroupId, GroupState, ProtocolType, ProtocolData; rd_bool_t is_simple_consumer_group, is_consumer_protocol_type; int32_t member_cnt; rd_list_t members; rd_kafka_ConsumerGroupDescription_t *grpdesc = NULL; rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &GroupId); rd_kafka_buf_read_str(reply, &GroupState); rd_kafka_buf_read_str(reply, &ProtocolType); rd_kafka_buf_read_str(reply, &ProtocolData); rd_kafka_buf_read_arraycnt(reply, &member_cnt, 100000); group_id = RD_KAFKAP_STR_DUP(&GroupId); group_state = RD_KAFKAP_STR_DUP(&GroupState); proto_type = RD_KAFKAP_STR_DUP(&ProtocolType); proto = RD_KAFKAP_STR_DUP(&ProtocolData); if (error_code) { error = rd_kafka_error_new( error_code, "DescribeConsumerGroups: %s", rd_kafka_err2str(error_code)); } is_simple_consumer_group = *proto_type == '\0'; is_consumer_protocol_type = !strcmp(proto_type, CONSUMER_PROTOCOL_TYPE); if (error == NULL && !is_simple_consumer_group && !is_consumer_protocol_type) { error = rd_kafka_error_new( RD_KAFKA_RESP_ERR__INVALID_ARG, "GroupId %s is not a consumer group (%s).", group_id, proto_type); } rd_list_init(&members, 0, rd_kafka_MemberDescription_free); while (member_cnt-- > 0) { rd_kafkap_str_t MemberId, ClientId, ClientHost, GroupInstanceId = RD_KAFKAP_STR_INITIALIZER; char *member_id, *client_id, *client_host, *group_instance_id = NULL; rd_kafkap_bytes_t MemberMetadata, MemberAssignment; rd_kafka_MemberDescription_t *member; rd_kafka_topic_partition_list_t *partitions = NULL; rd_kafka_buf_t *rkbuf; rd_kafka_buf_read_str(reply, &MemberId); if (api_version >= 4) { rd_kafka_buf_read_str(reply, &GroupInstanceId); } rd_kafka_buf_read_str(reply, &ClientId); rd_kafka_buf_read_str(reply, &ClientHost); rd_kafka_buf_read_bytes(reply, &MemberMetadata); rd_kafka_buf_read_bytes(reply, &MemberAssignment); if (error != NULL) continue; if (RD_KAFKAP_BYTES_LEN(&MemberAssignment) != 0) { int16_t version; /* Parse assignment */ rkbuf = rd_kafka_buf_new_shadow( MemberAssignment.data, RD_KAFKAP_BYTES_LEN(&MemberAssignment), NULL); /* Protocol parser needs a broker handle * to log errors on. */ rkbuf->rkbuf_rkb = rkb; /* Decreased in rd_kafka_buf_destroy */ rd_kafka_broker_keep(rkb); rd_kafka_buf_read_i16(rkbuf, &version); const rd_kafka_topic_partition_field_t fields[] = {RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; partitions = rd_kafka_buf_read_topic_partitions( rkbuf, 0, fields); rd_kafka_buf_destroy(rkbuf); if (!partitions) rd_kafka_buf_parse_fail( reply, "Error reading topic partitions"); } member_id = RD_KAFKAP_STR_DUP(&MemberId); if (!RD_KAFKAP_STR_IS_NULL(&GroupInstanceId)) { group_instance_id = RD_KAFKAP_STR_DUP(&GroupInstanceId); } client_id = RD_KAFKAP_STR_DUP(&ClientId); client_host = RD_KAFKAP_STR_DUP(&ClientHost); member = rd_kafka_MemberDescription_new( client_id, member_id, group_instance_id, client_host, partitions); if (partitions) rd_kafka_topic_partition_list_destroy( partitions); rd_list_add(&members, member); rd_free(member_id); rd_free(group_instance_id); rd_free(client_id); rd_free(client_host); member_id = NULL; group_instance_id = NULL; client_id = NULL; client_host = NULL; } if (api_version >= 3) { /* TODO: implement KIP-430 */ int32_t authorized_operations; rd_kafka_buf_read_i32(reply, &authorized_operations); } if (error == NULL) { grpdesc = rd_kafka_ConsumerGroupDescription_new( group_id, is_simple_consumer_group, &members, proto, rd_kafka_consumer_group_state_code(group_state), node, error); } else { grpdesc = rd_kafka_ConsumerGroupDescription_new_error( group_id, error); } rd_list_add(&rko_result->rko_u.admin_result.results, grpdesc); if (error) rd_kafka_error_destroy(error); rd_list_destroy(&members); rd_free(group_id); rd_free(group_state); rd_free(proto_type); rd_free(proto); error = NULL; group_id = NULL; group_state = NULL; proto_type = NULL; proto = NULL; } if (host) rd_free(host); if (node) rd_kafka_Node_destroy(node); *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if (group_id) rd_free(group_id); if (group_state) rd_free(group_state); if (proto_type) rd_free(proto_type); if (proto) rd_free(proto); if (error) rd_kafka_error_destroy(error); if (host) rd_free(host); if (node) rd_kafka_Node_destroy(node); if (rko_result) rd_kafka_op_destroy(rko_result); rd_snprintf( errstr, errstr_size, "DescribeConsumerGroups response protocol parse failure: %s", rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; } void rd_kafka_DescribeConsumerGroups(rd_kafka_t *rk, const char **groups, size_t groups_cnt, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko_fanout; rd_list_t dup_list; size_t i; static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { rd_kafka_DescribeConsumerGroups_response_merge, rd_kafka_ConsumerGroupDescription_copy_opaque}; rd_assert(rkqu); rko_fanout = rd_kafka_admin_fanout_op_new( rk, RD_KAFKA_OP_DESCRIBECONSUMERGROUPS, RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT, &fanout_cbs, options, rkqu->rkqu_q); if (groups_cnt == 0) { rd_kafka_admin_result_fail(rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, "No groups to describe"); rd_kafka_admin_common_worker_destroy(rk, rko_fanout, rd_true /*destroy*/); return; } /* Copy group list and store it on the request op. * Maintain original ordering. */ rd_list_init(&rko_fanout->rko_u.admin_request.args, (int)groups_cnt, rd_free); for (i = 0; i < groups_cnt; i++) rd_list_add(&rko_fanout->rko_u.admin_request.args, rd_strdup(groups[i])); /* Check for duplicates. * Make a temporary copy of the group list and sort it to check for * duplicates, we don't want the original list sorted since we want * to maintain ordering. */ rd_list_init(&dup_list, rd_list_cnt(&rko_fanout->rko_u.admin_request.args), NULL); rd_list_copy_to(&dup_list, &rko_fanout->rko_u.admin_request.args, NULL, NULL); rd_list_sort(&dup_list, rd_kafka_DescribeConsumerGroups_cmp); if (rd_list_find_duplicate(&dup_list, rd_kafka_DescribeConsumerGroups_cmp)) { rd_list_destroy(&dup_list); rd_kafka_admin_result_fail(rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG, "Duplicate groups not allowed"); rd_kafka_admin_common_worker_destroy(rk, rko_fanout, rd_true /*destroy*/); return; } rd_list_destroy(&dup_list); /* Prepare results list where fanned out op's results will be * accumulated. */ rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, (int)groups_cnt, rd_kafka_ConsumerGroupDescription_free); rko_fanout->rko_u.admin_request.fanout.outstanding = (int)groups_cnt; /* Create individual request ops for each group. * FIXME: A future optimization is to coalesce all groups for a single * coordinator into one op. */ for (i = 0; i < groups_cnt; i++) { static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_admin_DescribeConsumerGroupsRequest, rd_kafka_DescribeConsumerGroupsResponse_parse, }; char *grp = rd_list_elem(&rko_fanout->rko_u.admin_request.args, (int)i); rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( rk, RD_KAFKA_OP_DESCRIBECONSUMERGROUPS, RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT, &cbs, options, rk->rk_ops); rko->rko_u.admin_request.fanout_parent = rko_fanout; rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_COORDINATOR; rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; rko->rko_u.admin_request.coordkey = rd_strdup(grp); /* Set the group name as the opaque so the fanout worker use it * to fill in errors. * References rko_fanout's memory, which will always outlive * the fanned out op. */ rd_kafka_AdminOptions_set_opaque( &rko->rko_u.admin_request.options, grp); rd_list_init(&rko->rko_u.admin_request.args, 1, rd_free); rd_list_add(&rko->rko_u.admin_request.args, rd_strdup(groups[i])); rd_kafka_q_enq(rk->rk_ops, rko); } } const rd_kafka_ConsumerGroupDescription_t ** rd_kafka_DescribeConsumerGroups_result_groups( const rd_kafka_DescribeConsumerGroups_result_t *result, size_t *cntp) { const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_DESCRIBECONSUMERGROUPS); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_ConsumerGroupDescription_t **) rko->rko_u.admin_result.results.rl_elems; } /**@}*/