summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.c928
1 files changed, 928 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.c
new file mode 100644
index 000000000..128b8bb40
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.c
@@ -0,0 +1,928 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdarg.h>
+
+#include "rdkafka_int.h"
+#include "rdkafka_op.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_proto.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_error.h"
+
+/* Current number of rd_kafka_op_t */
+rd_atomic32_t rd_kafka_op_cnt;
+
+
+const char *rd_kafka_op2str(rd_kafka_op_type_t type) {
+ int skiplen = 6;
+ static const char *names[RD_KAFKA_OP__END] = {
+ [RD_KAFKA_OP_NONE] = "REPLY:NONE",
+ [RD_KAFKA_OP_FETCH] = "REPLY:FETCH",
+ [RD_KAFKA_OP_ERR] = "REPLY:ERR",
+ [RD_KAFKA_OP_CONSUMER_ERR] = "REPLY:CONSUMER_ERR",
+ [RD_KAFKA_OP_DR] = "REPLY:DR",
+ [RD_KAFKA_OP_STATS] = "REPLY:STATS",
+ [RD_KAFKA_OP_OFFSET_COMMIT] = "REPLY:OFFSET_COMMIT",
+ [RD_KAFKA_OP_NODE_UPDATE] = "REPLY:NODE_UPDATE",
+ [RD_KAFKA_OP_XMIT_BUF] = "REPLY:XMIT_BUF",
+ [RD_KAFKA_OP_RECV_BUF] = "REPLY:RECV_BUF",
+ [RD_KAFKA_OP_XMIT_RETRY] = "REPLY:XMIT_RETRY",
+ [RD_KAFKA_OP_FETCH_START] = "REPLY:FETCH_START",
+ [RD_KAFKA_OP_FETCH_STOP] = "REPLY:FETCH_STOP",
+ [RD_KAFKA_OP_SEEK] = "REPLY:SEEK",
+ [RD_KAFKA_OP_PAUSE] = "REPLY:PAUSE",
+ [RD_KAFKA_OP_OFFSET_FETCH] = "REPLY:OFFSET_FETCH",
+ [RD_KAFKA_OP_PARTITION_JOIN] = "REPLY:PARTITION_JOIN",
+ [RD_KAFKA_OP_PARTITION_LEAVE] = "REPLY:PARTITION_LEAVE",
+ [RD_KAFKA_OP_REBALANCE] = "REPLY:REBALANCE",
+ [RD_KAFKA_OP_TERMINATE] = "REPLY:TERMINATE",
+ [RD_KAFKA_OP_COORD_QUERY] = "REPLY:COORD_QUERY",
+ [RD_KAFKA_OP_SUBSCRIBE] = "REPLY:SUBSCRIBE",
+ [RD_KAFKA_OP_ASSIGN] = "REPLY:ASSIGN",
+ [RD_KAFKA_OP_GET_SUBSCRIPTION] = "REPLY:GET_SUBSCRIPTION",
+ [RD_KAFKA_OP_GET_ASSIGNMENT] = "REPLY:GET_ASSIGNMENT",
+ [RD_KAFKA_OP_THROTTLE] = "REPLY:THROTTLE",
+ [RD_KAFKA_OP_NAME] = "REPLY:NAME",
+ [RD_KAFKA_OP_CG_METADATA] = "REPLY:CG_METADATA",
+ [RD_KAFKA_OP_OFFSET_RESET] = "REPLY:OFFSET_RESET",
+ [RD_KAFKA_OP_METADATA] = "REPLY:METADATA",
+ [RD_KAFKA_OP_LOG] = "REPLY:LOG",
+ [RD_KAFKA_OP_WAKEUP] = "REPLY:WAKEUP",
+ [RD_KAFKA_OP_CREATETOPICS] = "REPLY:CREATETOPICS",
+ [RD_KAFKA_OP_DELETETOPICS] = "REPLY:DELETETOPICS",
+ [RD_KAFKA_OP_CREATEPARTITIONS] = "REPLY:CREATEPARTITIONS",
+ [RD_KAFKA_OP_ALTERCONFIGS] = "REPLY:ALTERCONFIGS",
+ [RD_KAFKA_OP_DESCRIBECONFIGS] = "REPLY:DESCRIBECONFIGS",
+ [RD_KAFKA_OP_DELETERECORDS] = "REPLY:DELETERECORDS",
+ [RD_KAFKA_OP_LISTCONSUMERGROUPS] = "REPLY:LISTCONSUMERGROUPS",
+ [RD_KAFKA_OP_DESCRIBECONSUMERGROUPS] =
+ "REPLY:DESCRIBECONSUMERGROUPS",
+ [RD_KAFKA_OP_DELETEGROUPS] = "REPLY:DELETEGROUPS",
+ [RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS] =
+ "REPLY:DELETECONSUMERGROUPOFFSETS",
+ [RD_KAFKA_OP_CREATEACLS] = "REPLY:CREATEACLS",
+ [RD_KAFKA_OP_DESCRIBEACLS] = "REPLY:DESCRIBEACLS",
+ [RD_KAFKA_OP_DELETEACLS] = "REPLY:DELETEACLS",
+ [RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS] =
+ "REPLY:ALTERCONSUMERGROUPOFFSETS",
+ [RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS] =
+ "REPLY:LISTCONSUMERGROUPOFFSETS",
+ [RD_KAFKA_OP_ADMIN_FANOUT] = "REPLY:ADMIN_FANOUT",
+ [RD_KAFKA_OP_ADMIN_RESULT] = "REPLY:ADMIN_RESULT",
+ [RD_KAFKA_OP_PURGE] = "REPLY:PURGE",
+ [RD_KAFKA_OP_CONNECT] = "REPLY:CONNECT",
+ [RD_KAFKA_OP_OAUTHBEARER_REFRESH] = "REPLY:OAUTHBEARER_REFRESH",
+ [RD_KAFKA_OP_MOCK] = "REPLY:MOCK",
+ [RD_KAFKA_OP_BROKER_MONITOR] = "REPLY:BROKER_MONITOR",
+ [RD_KAFKA_OP_TXN] = "REPLY:TXN",
+ [RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] =
+ "REPLY:GET_REBALANCE_PROTOCOL",
+ [RD_KAFKA_OP_LEADERS] = "REPLY:LEADERS",
+ [RD_KAFKA_OP_BARRIER] = "REPLY:BARRIER",
+ };
+
+ if (type & RD_KAFKA_OP_REPLY)
+ skiplen = 0;
+
+ rd_assert((names[type & ~RD_KAFKA_OP_FLAGMASK] != NULL) ||
+ !*"add OP type to rd_kafka_op2str()");
+ return names[type & ~RD_KAFKA_OP_FLAGMASK] + skiplen;
+}
+
+
+void rd_kafka_op_print(FILE *fp, const char *prefix, rd_kafka_op_t *rko) {
+ fprintf(fp,
+ "%s((rd_kafka_op_t*)%p)\n"
+ "%s Type: %s (0x%x), Version: %" PRId32 "\n",
+ prefix, rko, prefix, rd_kafka_op2str(rko->rko_type),
+ rko->rko_type, rko->rko_version);
+ if (rko->rko_err)
+ fprintf(fp, "%s Error: %s\n", prefix,
+ rd_kafka_err2str(rko->rko_err));
+ if (rko->rko_replyq.q)
+ fprintf(fp, "%s Replyq %p v%d (%s)\n", prefix,
+ rko->rko_replyq.q, rko->rko_replyq.version,
+#if ENABLE_DEVEL
+ rko->rko_replyq._id
+#else
+ ""
+#endif
+ );
+ if (rko->rko_rktp) {
+ fprintf(fp,
+ "%s ((rd_kafka_toppar_t*)%p) "
+ "%s [%" PRId32 "] v%d\n",
+ prefix, rko->rko_rktp,
+ rko->rko_rktp->rktp_rkt->rkt_topic->str,
+ rko->rko_rktp->rktp_partition,
+ rd_atomic32_get(&rko->rko_rktp->rktp_version));
+ }
+
+ switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) {
+ case RD_KAFKA_OP_FETCH:
+ fprintf(fp, "%s Offset: %" PRId64 "\n", prefix,
+ rko->rko_u.fetch.rkm.rkm_offset);
+ break;
+ case RD_KAFKA_OP_CONSUMER_ERR:
+ fprintf(fp, "%s Offset: %" PRId64 "\n", prefix,
+ rko->rko_u.err.offset);
+ /* FALLTHRU */
+ case RD_KAFKA_OP_ERR:
+ fprintf(fp, "%s Reason: %s\n", prefix, rko->rko_u.err.errstr);
+ break;
+ case RD_KAFKA_OP_DR:
+ fprintf(fp, "%s %" PRId32 " messages on %s\n", prefix,
+ rko->rko_u.dr.msgq.rkmq_msg_cnt,
+ rko->rko_u.dr.rkt ? rko->rko_u.dr.rkt->rkt_topic->str
+ : "(n/a)");
+ break;
+ case RD_KAFKA_OP_OFFSET_COMMIT:
+ fprintf(fp, "%s Callback: %p (opaque %p)\n", prefix,
+ rko->rko_u.offset_commit.cb,
+ rko->rko_u.offset_commit.opaque);
+ fprintf(fp, "%s %d partitions\n", prefix,
+ rko->rko_u.offset_commit.partitions
+ ? rko->rko_u.offset_commit.partitions->cnt
+ : 0);
+ break;
+
+ case RD_KAFKA_OP_LOG:
+ fprintf(fp, "%s Log: %%%d %s: %s\n", prefix,
+ rko->rko_u.log.level, rko->rko_u.log.fac,
+ rko->rko_u.log.str);
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) {
+ rd_kafka_op_t *rko;
+#define _RD_KAFKA_OP_EMPTY \
+ 1234567 /* Special value to be able to assert \
+ * on default-initialized (0) sizes \
+ * if we forgot to add an op type to \
+ * this list. */
+ static const size_t op2size[RD_KAFKA_OP__END] = {
+ [RD_KAFKA_OP_FETCH] = sizeof(rko->rko_u.fetch),
+ [RD_KAFKA_OP_ERR] = sizeof(rko->rko_u.err),
+ [RD_KAFKA_OP_CONSUMER_ERR] = sizeof(rko->rko_u.err),
+ [RD_KAFKA_OP_DR] = sizeof(rko->rko_u.dr),
+ [RD_KAFKA_OP_STATS] = sizeof(rko->rko_u.stats),
+ [RD_KAFKA_OP_OFFSET_COMMIT] = sizeof(rko->rko_u.offset_commit),
+ [RD_KAFKA_OP_NODE_UPDATE] = sizeof(rko->rko_u.node),
+ [RD_KAFKA_OP_XMIT_BUF] = sizeof(rko->rko_u.xbuf),
+ [RD_KAFKA_OP_RECV_BUF] = sizeof(rko->rko_u.xbuf),
+ [RD_KAFKA_OP_XMIT_RETRY] = sizeof(rko->rko_u.xbuf),
+ [RD_KAFKA_OP_FETCH_START] = sizeof(rko->rko_u.fetch_start),
+ [RD_KAFKA_OP_FETCH_STOP] = _RD_KAFKA_OP_EMPTY,
+ [RD_KAFKA_OP_SEEK] = sizeof(rko->rko_u.fetch_start),
+ [RD_KAFKA_OP_PAUSE] = sizeof(rko->rko_u.pause),
+ [RD_KAFKA_OP_OFFSET_FETCH] = sizeof(rko->rko_u.offset_fetch),
+ [RD_KAFKA_OP_PARTITION_JOIN] = _RD_KAFKA_OP_EMPTY,
+ [RD_KAFKA_OP_PARTITION_LEAVE] = _RD_KAFKA_OP_EMPTY,
+ [RD_KAFKA_OP_REBALANCE] = sizeof(rko->rko_u.rebalance),
+ [RD_KAFKA_OP_TERMINATE] = _RD_KAFKA_OP_EMPTY,
+ [RD_KAFKA_OP_COORD_QUERY] = _RD_KAFKA_OP_EMPTY,
+ [RD_KAFKA_OP_SUBSCRIBE] = sizeof(rko->rko_u.subscribe),
+ [RD_KAFKA_OP_ASSIGN] = sizeof(rko->rko_u.assign),
+ [RD_KAFKA_OP_GET_SUBSCRIPTION] = sizeof(rko->rko_u.subscribe),
+ [RD_KAFKA_OP_GET_ASSIGNMENT] = sizeof(rko->rko_u.assign),
+ [RD_KAFKA_OP_THROTTLE] = sizeof(rko->rko_u.throttle),
+ [RD_KAFKA_OP_NAME] = sizeof(rko->rko_u.name),
+ [RD_KAFKA_OP_CG_METADATA] = sizeof(rko->rko_u.cg_metadata),
+ [RD_KAFKA_OP_OFFSET_RESET] = sizeof(rko->rko_u.offset_reset),
+ [RD_KAFKA_OP_METADATA] = sizeof(rko->rko_u.metadata),
+ [RD_KAFKA_OP_LOG] = sizeof(rko->rko_u.log),
+ [RD_KAFKA_OP_WAKEUP] = _RD_KAFKA_OP_EMPTY,
+ [RD_KAFKA_OP_CREATETOPICS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_DELETETOPICS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_CREATEPARTITIONS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_ALTERCONFIGS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_DESCRIBECONFIGS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_DELETERECORDS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_LISTCONSUMERGROUPS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_DESCRIBECONSUMERGROUPS] =
+ sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_DELETEGROUPS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS] =
+ sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_CREATEACLS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_DESCRIBEACLS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_DELETEACLS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS] =
+ sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS] =
+ sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_ADMIN_FANOUT] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_ADMIN_RESULT] = sizeof(rko->rko_u.admin_result),
+ [RD_KAFKA_OP_PURGE] = sizeof(rko->rko_u.purge),
+ [RD_KAFKA_OP_CONNECT] = _RD_KAFKA_OP_EMPTY,
+ [RD_KAFKA_OP_OAUTHBEARER_REFRESH] = _RD_KAFKA_OP_EMPTY,
+ [RD_KAFKA_OP_MOCK] = sizeof(rko->rko_u.mock),
+ [RD_KAFKA_OP_BROKER_MONITOR] = sizeof(rko->rko_u.broker_monitor),
+ [RD_KAFKA_OP_TXN] = sizeof(rko->rko_u.txn),
+ [RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] =
+ sizeof(rko->rko_u.rebalance_protocol),
+ [RD_KAFKA_OP_LEADERS] = sizeof(rko->rko_u.leaders),
+ [RD_KAFKA_OP_BARRIER] = _RD_KAFKA_OP_EMPTY,
+ };
+ size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];
+
+ rd_assert(tsize > 0 || !*"add OP type to rd_kafka_op_new0()");
+ if (tsize == _RD_KAFKA_OP_EMPTY)
+ tsize = 0;
+
+ rko = rd_calloc(1, sizeof(*rko) - sizeof(rko->rko_u) + tsize);
+ rko->rko_type = type;
+
+#if ENABLE_DEVEL
+ rko->rko_source = source;
+ rd_atomic32_add(&rd_kafka_op_cnt, 1);
+#endif
+ return rko;
+}
+
+
+void rd_kafka_op_destroy(rd_kafka_op_t *rko) {
+
+ /* Call ops callback with ERR__DESTROY to let it
+ * clean up its resources. */
+ if ((rko->rko_type & RD_KAFKA_OP_CB) && rko->rko_op_cb) {
+ rd_kafka_op_res_t res;
+ rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY;
+ res = rko->rko_op_cb(rko->rko_rk, NULL, rko);
+ rd_assert(res != RD_KAFKA_OP_RES_YIELD);
+ rd_assert(res != RD_KAFKA_OP_RES_KEEP);
+ }
+
+
+ switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) {
+ case RD_KAFKA_OP_FETCH:
+ rd_kafka_msg_destroy(NULL, &rko->rko_u.fetch.rkm);
+ /* Decrease refcount on rkbuf to eventually rd_free shared buf*/
+ if (rko->rko_u.fetch.rkbuf)
+ rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
+
+ break;
+
+ case RD_KAFKA_OP_OFFSET_FETCH:
+ if (rko->rko_u.offset_fetch.partitions &&
+ rko->rko_u.offset_fetch.do_free)
+ rd_kafka_topic_partition_list_destroy(
+ rko->rko_u.offset_fetch.partitions);
+ break;
+
+ case RD_KAFKA_OP_OFFSET_COMMIT:
+ RD_IF_FREE(rko->rko_u.offset_commit.partitions,
+ rd_kafka_topic_partition_list_destroy);
+ RD_IF_FREE(rko->rko_u.offset_commit.reason, rd_free);
+ break;
+
+ case RD_KAFKA_OP_SUBSCRIBE:
+ case RD_KAFKA_OP_GET_SUBSCRIPTION:
+ RD_IF_FREE(rko->rko_u.subscribe.topics,
+ rd_kafka_topic_partition_list_destroy);
+ break;
+
+ case RD_KAFKA_OP_ASSIGN:
+ case RD_KAFKA_OP_GET_ASSIGNMENT:
+ RD_IF_FREE(rko->rko_u.assign.partitions,
+ rd_kafka_topic_partition_list_destroy);
+ break;
+
+ case RD_KAFKA_OP_REBALANCE:
+ RD_IF_FREE(rko->rko_u.rebalance.partitions,
+ rd_kafka_topic_partition_list_destroy);
+ break;
+
+ case RD_KAFKA_OP_NAME:
+ RD_IF_FREE(rko->rko_u.name.str, rd_free);
+ break;
+
+ case RD_KAFKA_OP_CG_METADATA:
+ RD_IF_FREE(rko->rko_u.cg_metadata,
+ rd_kafka_consumer_group_metadata_destroy);
+ break;
+
+ case RD_KAFKA_OP_ERR:
+ case RD_KAFKA_OP_CONSUMER_ERR:
+ RD_IF_FREE(rko->rko_u.err.errstr, rd_free);
+ rd_kafka_msg_destroy(NULL, &rko->rko_u.err.rkm);
+ break;
+
+ break;
+
+ case RD_KAFKA_OP_THROTTLE:
+ RD_IF_FREE(rko->rko_u.throttle.nodename, rd_free);
+ break;
+
+ case RD_KAFKA_OP_STATS:
+ RD_IF_FREE(rko->rko_u.stats.json, rd_free);
+ break;
+
+ case RD_KAFKA_OP_XMIT_RETRY:
+ case RD_KAFKA_OP_XMIT_BUF:
+ case RD_KAFKA_OP_RECV_BUF:
+ if (rko->rko_u.xbuf.rkbuf)
+ rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
+
+ RD_IF_FREE(rko->rko_u.xbuf.rkbuf, rd_kafka_buf_destroy);
+ break;
+
+ case RD_KAFKA_OP_DR:
+ rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq);
+ if (rko->rko_u.dr.do_purge2)
+ rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq2);
+
+ if (rko->rko_u.dr.rkt)
+ rd_kafka_topic_destroy0(rko->rko_u.dr.rkt);
+ break;
+
+ case RD_KAFKA_OP_OFFSET_RESET:
+ RD_IF_FREE(rko->rko_u.offset_reset.reason, rd_free);
+ break;
+
+ case RD_KAFKA_OP_METADATA:
+ RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
+ break;
+
+ case RD_KAFKA_OP_LOG:
+ rd_free(rko->rko_u.log.str);
+ break;
+
+ case RD_KAFKA_OP_ADMIN_FANOUT:
+ rd_assert(rko->rko_u.admin_request.fanout.outstanding == 0);
+ rd_list_destroy(&rko->rko_u.admin_request.fanout.results);
+ case RD_KAFKA_OP_CREATETOPICS:
+ case RD_KAFKA_OP_DELETETOPICS:
+ case RD_KAFKA_OP_CREATEPARTITIONS:
+ case RD_KAFKA_OP_ALTERCONFIGS:
+ case RD_KAFKA_OP_DESCRIBECONFIGS:
+ case RD_KAFKA_OP_DELETERECORDS:
+ case RD_KAFKA_OP_LISTCONSUMERGROUPS:
+ case RD_KAFKA_OP_DESCRIBECONSUMERGROUPS:
+ case RD_KAFKA_OP_DELETEGROUPS:
+ case RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS:
+ case RD_KAFKA_OP_CREATEACLS:
+ case RD_KAFKA_OP_DESCRIBEACLS:
+ case RD_KAFKA_OP_DELETEACLS:
+ case RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS:
+ case RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS:
+ rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq);
+ rd_list_destroy(&rko->rko_u.admin_request.args);
+ if (rko->rko_u.admin_request.options.match_consumer_group_states
+ .u.PTR) {
+ rd_list_destroy(rko->rko_u.admin_request.options
+ .match_consumer_group_states.u.PTR);
+ }
+ rd_assert(!rko->rko_u.admin_request.fanout_parent);
+ RD_IF_FREE(rko->rko_u.admin_request.coordkey, rd_free);
+ break;
+
+ case RD_KAFKA_OP_ADMIN_RESULT:
+ rd_list_destroy(&rko->rko_u.admin_result.args);
+ rd_list_destroy(&rko->rko_u.admin_result.results);
+ RD_IF_FREE(rko->rko_u.admin_result.errstr, rd_free);
+ rd_assert(!rko->rko_u.admin_result.fanout_parent);
+ ;
+ break;
+
+ case RD_KAFKA_OP_MOCK:
+ RD_IF_FREE(rko->rko_u.mock.name, rd_free);
+ RD_IF_FREE(rko->rko_u.mock.str, rd_free);
+ break;
+
+ case RD_KAFKA_OP_BROKER_MONITOR:
+ rd_kafka_broker_destroy(rko->rko_u.broker_monitor.rkb);
+ break;
+
+ case RD_KAFKA_OP_TXN:
+ RD_IF_FREE(rko->rko_u.txn.group_id, rd_free);
+ RD_IF_FREE(rko->rko_u.txn.offsets,
+ rd_kafka_topic_partition_list_destroy);
+ RD_IF_FREE(rko->rko_u.txn.cgmetadata,
+ rd_kafka_consumer_group_metadata_destroy);
+ break;
+
+ case RD_KAFKA_OP_LEADERS:
+ rd_assert(!rko->rko_u.leaders.eonce);
+ rd_assert(!rko->rko_u.leaders.replyq.q);
+ RD_IF_FREE(rko->rko_u.leaders.leaders, rd_list_destroy);
+ RD_IF_FREE(rko->rko_u.leaders.partitions,
+ rd_kafka_topic_partition_list_destroy);
+ break;
+
+ default:
+ break;
+ }
+
+ RD_IF_FREE(rko->rko_rktp, rd_kafka_toppar_destroy);
+
+ RD_IF_FREE(rko->rko_error, rd_kafka_error_destroy);
+
+ rd_kafka_replyq_destroy(&rko->rko_replyq);
+
+#if ENABLE_DEVEL
+ if (rd_atomic32_sub(&rd_kafka_op_cnt, 1) < 0)
+ rd_kafka_assert(NULL, !*"rd_kafka_op_cnt < 0");
+#endif
+
+ rd_free(rko);
+}
+
+
+
+/**
+ * Propagate an error event to the application on a specific queue.
+ */
+void rd_kafka_q_op_err(rd_kafka_q_t *rkq,
+ rd_kafka_resp_err_t err,
+ const char *fmt,
+ ...) {
+ va_list ap;
+ char buf[2048];
+ rd_kafka_op_t *rko;
+
+ va_start(ap, fmt);
+ rd_vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_ERR);
+ rko->rko_err = err;
+ rko->rko_u.err.errstr = rd_strdup(buf);
+
+ rd_kafka_q_enq(rkq, rko);
+}
+
+
+
+/**
+ * @brief Enqueue RD_KAFKA_OP_CONSUMER_ERR on \p rkq.
+ *
+ * @param broker_id Is the relevant broker id, or RD_KAFKA_NODEID_UA (-1)
+ * if not applicable.
+ * @param err Error code.
+ * @param version Queue version barrier, or 0 if not applicable.
+ * @param topic May be NULL.
+ * @param rktp May be NULL. Takes precedence over \p topic.
+ * @param offset RD_KAFKA_OFFSET_INVALID if not applicable.
+ *
+ * @sa rd_kafka_q_op_err()
+ */
+void rd_kafka_consumer_err(rd_kafka_q_t *rkq,
+ int32_t broker_id,
+ rd_kafka_resp_err_t err,
+ int32_t version,
+ const char *topic,
+ rd_kafka_toppar_t *rktp,
+ int64_t offset,
+ const char *fmt,
+ ...) {
+ va_list ap;
+ char buf[2048];
+ rd_kafka_op_t *rko;
+
+ va_start(ap, fmt);
+ rd_vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR);
+ rko->rko_version = version;
+ rko->rko_err = err;
+ rko->rko_u.err.offset = offset;
+ rko->rko_u.err.errstr = rd_strdup(buf);
+ rko->rko_u.err.rkm.rkm_broker_id = broker_id;
+
+ if (rktp)
+ rko->rko_rktp = rd_kafka_toppar_keep(rktp);
+ else if (topic)
+ rko->rko_u.err.rkm.rkm_rkmessage.rkt =
+ (rd_kafka_topic_t *)rd_kafka_lwtopic_new(rkq->rkq_rk,
+ topic);
+
+
+ rd_kafka_q_enq(rkq, rko);
+}
+
+
+/**
+ * Creates a reply op based on 'rko_orig'.
+ * If 'rko_orig' has rko_op_cb set the reply op will be OR:ed with
+ * RD_KAFKA_OP_CB, else the reply type will be the original rko_type OR:ed
+ * with RD_KAFKA_OP_REPLY.
+ */
+rd_kafka_op_t *rd_kafka_op_new_reply(rd_kafka_op_t *rko_orig,
+ rd_kafka_resp_err_t err) {
+ rd_kafka_op_t *rko;
+
+ rko = rd_kafka_op_new(rko_orig->rko_type | RD_KAFKA_OP_REPLY);
+ rd_kafka_op_get_reply_version(rko, rko_orig);
+ rko->rko_err = err;
+ if (rko_orig->rko_rktp)
+ rko->rko_rktp = rd_kafka_toppar_keep(rko_orig->rko_rktp);
+
+ return rko;
+}
+
+
+/**
+ * @brief Create new callback op for type \p type
+ */
+rd_kafka_op_t *rd_kafka_op_new_cb(rd_kafka_t *rk,
+ rd_kafka_op_type_t type,
+ rd_kafka_op_cb_t *cb) {
+ rd_kafka_op_t *rko;
+ rko = rd_kafka_op_new(type | RD_KAFKA_OP_CB);
+ rko->rko_op_cb = cb;
+ rko->rko_rk = rk;
+ return rko;
+}
+
+
+/**
+ * @brief Reply to 'rko' re-using the same rko with rko_err
+ * specified by \p err. rko_error is set to NULL.
+ *
+ * If there is no replyq the rko is destroyed.
+ *
+ * @returns 1 if op was enqueued, else 0 and rko is destroyed.
+ */
+int rd_kafka_op_reply(rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
+
+ if (!rko->rko_replyq.q) {
+ rd_kafka_op_destroy(rko);
+ return 0;
+ }
+
+ rko->rko_type |= (rko->rko_op_cb ? RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY);
+ rko->rko_err = err;
+ rko->rko_error = NULL;
+
+ return rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
+}
+
+
+/**
+ * @brief Reply to 'rko' re-using the same rko with rko_error specified
+ * by \p error (may be NULL) and rko_err set to the corresponding
+ * error code. Assumes ownership of \p error.
+ *
+ * If there is no replyq the rko is destroyed.
+ *
+ * @returns 1 if op was enqueued, else 0 and rko is destroyed.
+ */
+int rd_kafka_op_error_reply(rd_kafka_op_t *rko, rd_kafka_error_t *error) {
+
+ if (!rko->rko_replyq.q) {
+ RD_IF_FREE(error, rd_kafka_error_destroy);
+ rd_kafka_op_destroy(rko);
+ return 0;
+ }
+
+ rko->rko_type |= (rko->rko_op_cb ? RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY);
+ rko->rko_err =
+ error ? rd_kafka_error_code(error) : RD_KAFKA_RESP_ERR_NO_ERROR;
+ rko->rko_error = error;
+
+ return rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
+}
+
+
+/**
+ * @brief Send request to queue, wait for response.
+ *
+ * @returns response on success or NULL if destq is disabled.
+ */
+rd_kafka_op_t *rd_kafka_op_req0(rd_kafka_q_t *destq,
+ rd_kafka_q_t *recvq,
+ rd_kafka_op_t *rko,
+ int timeout_ms) {
+ rd_kafka_op_t *reply;
+
+ /* Indicate to destination where to send reply. */
+ rd_kafka_op_set_replyq(rko, recvq, NULL);
+
+ /* Enqueue op */
+ if (!rd_kafka_q_enq(destq, rko))
+ return NULL;
+
+ /* Wait for reply */
+ reply = rd_kafka_q_pop(recvq, rd_timeout_us(timeout_ms), 0);
+
+ /* May be NULL for timeout */
+ return reply;
+}
+
+/**
+ * Send request to queue, wait for response.
+ * Creates a temporary reply queue.
+ */
+rd_kafka_op_t *
+rd_kafka_op_req(rd_kafka_q_t *destq, rd_kafka_op_t *rko, int timeout_ms) {
+ rd_kafka_q_t *recvq;
+ rd_kafka_op_t *reply;
+
+ recvq = rd_kafka_q_new(destq->rkq_rk);
+
+ reply = rd_kafka_op_req0(destq, recvq, rko, timeout_ms);
+
+ rd_kafka_q_destroy_owner(recvq);
+
+ return reply;
+}
+
+
+/**
+ * Send simple type-only request to queue, wait for response.
+ */
+rd_kafka_op_t *rd_kafka_op_req2(rd_kafka_q_t *destq, rd_kafka_op_type_t type) {
+ rd_kafka_op_t *rko;
+
+ rko = rd_kafka_op_new(type);
+ return rd_kafka_op_req(destq, rko, RD_POLL_INFINITE);
+}
+
+
+/**
+ * Destroys the rko and returns its err.
+ */
+rd_kafka_resp_err_t rd_kafka_op_err_destroy(rd_kafka_op_t *rko) {
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+ if (rko) {
+ err = rko->rko_err;
+ rd_kafka_op_destroy(rko);
+ }
+ return err;
+}
+
+
+/**
+ * Destroys the rko and returns its error object or NULL if no error.
+ */
+rd_kafka_error_t *rd_kafka_op_error_destroy(rd_kafka_op_t *rko) {
+ if (rko) {
+ rd_kafka_error_t *error = rko->rko_error;
+ rko->rko_error = NULL;
+ rd_kafka_op_destroy(rko);
+ return error;
+ }
+
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "Operation timed out");
+}
+
+
+/**
+ * Call op callback
+ */
+rd_kafka_op_res_t
+rd_kafka_op_call(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
+ rd_kafka_op_res_t res;
+ rd_assert(rko->rko_op_cb);
+ res = rko->rko_op_cb(rk, rkq, rko);
+ if (unlikely(res == RD_KAFKA_OP_RES_YIELD || rd_kafka_yield_thread))
+ return RD_KAFKA_OP_RES_YIELD;
+ if (res != RD_KAFKA_OP_RES_KEEP)
+ rko->rko_op_cb = NULL;
+ return res;
+}
+
+
+/**
+ * @brief Creates a new RD_KAFKA_OP_FETCH op representing a
+ * control message. The rkm_flags property is set to
+ * RD_KAFKA_MSG_F_CONTROL.
+ */
+rd_kafka_op_t *rd_kafka_op_new_ctrl_msg(rd_kafka_toppar_t *rktp,
+ int32_t version,
+ rd_kafka_buf_t *rkbuf,
+ int64_t offset) {
+ rd_kafka_msg_t *rkm;
+ rd_kafka_op_t *rko;
+
+ rko = rd_kafka_op_new_fetch_msg(&rkm, rktp, version, rkbuf, offset, 0,
+ NULL, 0, NULL);
+
+ rkm->rkm_flags |= RD_KAFKA_MSG_F_CONTROL;
+
+ return rko;
+}
+
+/**
+ * @brief Creates a new RD_KAFKA_OP_FETCH op and sets up the
+ * embedded message according to the parameters.
+ *
+ * @param rkmp will be set to the embedded rkm in the rko (for convenience)
+ * @param offset may be updated later if relative offset.
+ */
+rd_kafka_op_t *rd_kafka_op_new_fetch_msg(rd_kafka_msg_t **rkmp,
+ rd_kafka_toppar_t *rktp,
+ int32_t version,
+ rd_kafka_buf_t *rkbuf,
+ int64_t offset,
+ size_t key_len,
+ const void *key,
+ size_t val_len,
+ const void *val) {
+ rd_kafka_msg_t *rkm;
+ rd_kafka_op_t *rko;
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH);
+ rko->rko_rktp = rd_kafka_toppar_keep(rktp);
+ rko->rko_version = version;
+ rkm = &rko->rko_u.fetch.rkm;
+ *rkmp = rkm;
+
+ /* Since all the ops share the same payload buffer
+ * a refcnt is used on the rkbuf that makes sure all
+ * consume_cb() will have been
+ * called for each of these ops before the rkbuf
+ * and its memory backing buffers are freed. */
+ rko->rko_u.fetch.rkbuf = rkbuf;
+ rd_kafka_buf_keep(rkbuf);
+
+ rkm->rkm_offset = offset;
+
+ rkm->rkm_key = (void *)key;
+ rkm->rkm_key_len = key_len;
+
+ rkm->rkm_payload = (void *)val;
+ rkm->rkm_len = val_len;
+ rko->rko_len = (int32_t)rkm->rkm_len;
+
+ rkm->rkm_partition = rktp->rktp_partition;
+
+ /* Persistence status is always PERSISTED for consumed messages
+ * since we managed to read the message. */
+ rkm->rkm_status = RD_KAFKA_MSG_STATUS_PERSISTED;
+
+ return rko;
+}
+
+
+/**
+ * Enqueue ERR__THROTTLE op, if desired.
+ */
+void rd_kafka_op_throttle_time(rd_kafka_broker_t *rkb,
+ rd_kafka_q_t *rkq,
+ int throttle_time) {
+ rd_kafka_op_t *rko;
+
+ if (unlikely(throttle_time > 0))
+ rd_avg_add(&rkb->rkb_avg_throttle, throttle_time);
+
+ /* We send throttle events when:
+ * - throttle_time > 0
+ * - throttle_time == 0 and last throttle_time > 0
+ */
+ if (!rkb->rkb_rk->rk_conf.throttle_cb ||
+ (!throttle_time &&
+ !rd_atomic32_get(&rkb->rkb_rk->rk_last_throttle)))
+ return;
+
+ rd_atomic32_set(&rkb->rkb_rk->rk_last_throttle, throttle_time);
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_THROTTLE);
+ rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
+ rko->rko_u.throttle.nodename = rd_strdup(rkb->rkb_nodename);
+ rko->rko_u.throttle.nodeid = rkb->rkb_nodeid;
+ rko->rko_u.throttle.throttle_time = throttle_time;
+ rd_kafka_q_enq(rkq, rko);
+}
+
+
+/**
+ * @brief Handle standard op types.
+ */
+rd_kafka_op_res_t rd_kafka_op_handle_std(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ int cb_type) {
+ if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
+ return RD_KAFKA_OP_RES_PASS;
+ else if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
+ /* Control messages must not be exposed to the application
+ * but we need to store their offsets. */
+ rd_kafka_fetch_op_app_prepare(rk, rko);
+ return RD_KAFKA_OP_RES_HANDLED;
+ } else if (cb_type != RD_KAFKA_Q_CB_EVENT &&
+ rko->rko_type & RD_KAFKA_OP_CB)
+ return rd_kafka_op_call(rk, rkq, rko);
+ else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF) /* Handle Response */
+ rd_kafka_buf_handle_op(rko, rko->rko_err);
+ else if (cb_type != RD_KAFKA_Q_CB_RETURN &&
+ rko->rko_type & RD_KAFKA_OP_REPLY &&
+ rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
+ return RD_KAFKA_OP_RES_HANDLED; /* dest queue was
+ * probably disabled. */
+ else
+ return RD_KAFKA_OP_RES_PASS;
+
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+/**
+ * @brief Attempt to handle op using its queue's serve callback,
+ * or the passed callback, or op_handle_std(), else do nothing.
+ *
+ * @param rkq is \p rko's queue (which it was unlinked from) with rkq_lock
+ * being held. Callback may re-enqueue the op on this queue
+ * and return YIELD.
+ *
+ * @returns HANDLED if op was handled (and destroyed), PASS if not,
+ * or YIELD if op was handled (maybe destroyed or re-enqueued)
+ * and caller must propagate yield upwards (cancel and return).
+ */
+rd_kafka_op_res_t rd_kafka_op_handle(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ rd_kafka_q_cb_type_t cb_type,
+ void *opaque,
+ rd_kafka_q_serve_cb_t *callback) {
+ rd_kafka_op_res_t res;
+
+ if (rko->rko_serve) {
+ callback = rko->rko_serve;
+ opaque = rko->rko_serve_opaque;
+ rko->rko_serve = NULL;
+ rko->rko_serve_opaque = NULL;
+ }
+
+ res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type);
+ if (res == RD_KAFKA_OP_RES_KEEP) {
+ /* Op was handled but must not be destroyed. */
+ return res;
+ }
+ if (res == RD_KAFKA_OP_RES_HANDLED) {
+ rd_kafka_op_destroy(rko);
+ return res;
+ } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD))
+ return res;
+
+ if (callback)
+ res = callback(rk, rkq, rko, cb_type, opaque);
+
+ return res;
+}
+
+
+/**
+ * @brief Prepare passing message to application.
+ * This must be called just prior to passing/returning a consumed
+ * message to the application.
+ *
+ * Performs:
+ * - Store offset for fetched message + 1.
+ * - Updates the application offset (rktp_app_offset).
+ *
+ * @locks rktp_lock and rk_lock MUST NOT be held
+ */
+void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko) {
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_fetch_pos_t pos;
+
+ if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH || rko->rko_err))
+ return;
+
+ rktp = rko->rko_rktp;
+
+ if (unlikely(!rk))
+ rk = rktp->rktp_rkt->rkt_rk;
+
+ pos.offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1;
+ pos.leader_epoch = rko->rko_u.fetch.rkm.rkm_u.consumer.leader_epoch;
+
+ rd_kafka_update_app_pos(rk, rktp, pos, RD_DO_LOCK);
+}