diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.h | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.h | 778 |
1 files changed, 778 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.h new file mode 100644 index 00000000..57c07491 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_op.h @@ -0,0 +1,778 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _RDKAFKA_OP_H_ +#define _RDKAFKA_OP_H_ + + +#include "rdkafka_msg.h" +#include "rdkafka_timer.h" +#include "rdkafka_admin.h" + + +/* Forward declarations */ +typedef struct rd_kafka_q_s rd_kafka_q_t; +typedef struct rd_kafka_toppar_s rd_kafka_toppar_t; +typedef struct rd_kafka_op_s rd_kafka_op_t; + +/* One-off reply queue + reply version. + * All APIs that take a rd_kafka_replyq_t makes a copy of the + * struct as-is and grabs hold of the existing .q refcount. + * Think of replyq as a (Q,VERSION) tuple. */ +typedef struct rd_kafka_replyq_s { + rd_kafka_q_t *q; + int32_t version; +#if ENABLE_DEVEL + char *_id; /* Devel id used for debugging reference leaks. + * Is a strdup() of the caller's function name, + * which makes for easy debugging with valgrind. */ +#endif +} rd_kafka_replyq_t; + + + +/** + * Flags used by: + * - rd_kafka_op_t.rko_flags + * - rd_kafka_buf_t.rkbuf_flags + */ +#define RD_KAFKA_OP_F_FREE 0x1 /* rd_free payload when done with it */ +#define RD_KAFKA_OP_F_NO_RESPONSE 0x2 /* rkbuf: Not expecting a response */ +#define RD_KAFKA_OP_F_CRC 0x4 /* rkbuf: Perform CRC calculation */ +#define RD_KAFKA_OP_F_BLOCKING 0x8 /* rkbuf: blocking protocol request */ +#define RD_KAFKA_OP_F_REPROCESS 0x10 /* cgrp: Reprocess at a later time. */ +#define RD_KAFKA_OP_F_SENT 0x20 /* rkbuf: request sent on wire */ +#define RD_KAFKA_OP_F_FLEXVER \ + 0x40 /* rkbuf: flexible protocol version \ + * (KIP-482) */ +#define RD_KAFKA_OP_F_NEED_MAKE \ + 0x80 /* rkbuf: request content has not \ + * been made yet, the make \ + * callback will be triggered \ + * to construct the request \ + * right before it is sent. */ +#define RD_KAFKA_OP_F_FORCE_CB \ + 0x100 /* rko: force callback even if \ + * op type is eventable. */ + +typedef enum { + RD_KAFKA_OP_NONE, /* No specific type, use OP_CB */ + RD_KAFKA_OP_FETCH, /* Kafka thread -> Application */ + RD_KAFKA_OP_ERR, /* Kafka thread -> Application */ + RD_KAFKA_OP_CONSUMER_ERR, /* Kafka thread -> Application */ + RD_KAFKA_OP_DR, /* Kafka thread -> Application + * Produce message delivery report */ + RD_KAFKA_OP_STATS, /* Kafka thread -> Application */ + + RD_KAFKA_OP_OFFSET_COMMIT, /* any -> toppar's Broker thread */ + RD_KAFKA_OP_NODE_UPDATE, /* any -> Broker thread: node update */ + + RD_KAFKA_OP_XMIT_BUF, /* transmit buffer: any -> broker thread */ + RD_KAFKA_OP_RECV_BUF, /* received response buffer: broker thr -> any */ + RD_KAFKA_OP_XMIT_RETRY, /* retry buffer xmit: any -> broker thread */ + RD_KAFKA_OP_FETCH_START, /* Application -> toppar's handler thread */ + RD_KAFKA_OP_FETCH_STOP, /* Application -> toppar's handler thread */ + RD_KAFKA_OP_SEEK, /* Application -> toppar's handler thread */ + RD_KAFKA_OP_PAUSE, /* Application -> toppar's handler thread */ + RD_KAFKA_OP_OFFSET_FETCH, /* Broker -> broker thread: fetch offsets + * for topic. */ + + RD_KAFKA_OP_PARTITION_JOIN, /* * -> cgrp op: add toppar to cgrp + * * -> broker op: add toppar to broker */ + RD_KAFKA_OP_PARTITION_LEAVE, /* * -> cgrp op: remove toppar from cgrp + * * -> broker op: remove toppar from rkb*/ + RD_KAFKA_OP_REBALANCE, /* broker thread -> app: + * group rebalance */ + RD_KAFKA_OP_TERMINATE, /* For generic use */ + RD_KAFKA_OP_COORD_QUERY, /* Query for coordinator */ + RD_KAFKA_OP_SUBSCRIBE, /* New subscription */ + RD_KAFKA_OP_ASSIGN, /* New assignment */ + RD_KAFKA_OP_GET_SUBSCRIPTION, /* Get current subscription. + * Reuses u.subscribe */ + RD_KAFKA_OP_GET_ASSIGNMENT, /* Get current assignment. + * Reuses u.assign */ + RD_KAFKA_OP_THROTTLE, /* Throttle info */ + RD_KAFKA_OP_NAME, /* Request name */ + RD_KAFKA_OP_CG_METADATA, /**< Request consumer metadata */ + RD_KAFKA_OP_OFFSET_RESET, /* Offset reset */ + RD_KAFKA_OP_METADATA, /* Metadata response */ + RD_KAFKA_OP_LOG, /* Log */ + RD_KAFKA_OP_WAKEUP, /* Wake-up signaling */ + RD_KAFKA_OP_CREATETOPICS, /**< Admin: CreateTopics: u.admin_request*/ + RD_KAFKA_OP_DELETETOPICS, /**< Admin: DeleteTopics: u.admin_request*/ + RD_KAFKA_OP_CREATEPARTITIONS, /**< Admin: CreatePartitions: + * u.admin_request*/ + RD_KAFKA_OP_ALTERCONFIGS, /**< Admin: AlterConfigs: u.admin_request*/ + RD_KAFKA_OP_DESCRIBECONFIGS, /**< Admin: DescribeConfigs: + * u.admin_request*/ + RD_KAFKA_OP_DELETERECORDS, /**< Admin: DeleteRecords: + * u.admin_request*/ + RD_KAFKA_OP_LISTCONSUMERGROUPS, /**< Admin: + * ListConsumerGroups + * u.admin_request */ + RD_KAFKA_OP_DESCRIBECONSUMERGROUPS, /**< Admin: + * DescribeConsumerGroups + * u.admin_request */ + RD_KAFKA_OP_DELETEGROUPS, /**< Admin: DeleteGroups: u.admin_request*/ + RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS, /**< Admin: + * DeleteConsumerGroupOffsets + * u.admin_request */ + RD_KAFKA_OP_CREATEACLS, /**< Admin: CreateAcls: u.admin_request*/ + RD_KAFKA_OP_DESCRIBEACLS, /**< Admin: DescribeAcls: u.admin_request*/ + RD_KAFKA_OP_DELETEACLS, /**< Admin: DeleteAcls: u.admin_request*/ + RD_KAFKA_OP_ALTERCONSUMERGROUPOFFSETS, /**< Admin: + * AlterConsumerGroupOffsets + * u.admin_request */ + RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS, /**< Admin: + * ListConsumerGroupOffsets + * u.admin_request */ + RD_KAFKA_OP_ADMIN_FANOUT, /**< Admin: fanout request */ + RD_KAFKA_OP_ADMIN_RESULT, /**< Admin API .._result_t */ + RD_KAFKA_OP_PURGE, /**< Purge queues */ + RD_KAFKA_OP_CONNECT, /**< Connect (to broker) */ + RD_KAFKA_OP_OAUTHBEARER_REFRESH, /**< Refresh OAUTHBEARER token */ + RD_KAFKA_OP_MOCK, /**< Mock cluster command */ + RD_KAFKA_OP_BROKER_MONITOR, /**< Broker state change */ + RD_KAFKA_OP_TXN, /**< Transaction command */ + RD_KAFKA_OP_GET_REBALANCE_PROTOCOL, /**< Get rebalance protocol */ + RD_KAFKA_OP_LEADERS, /**< Partition leader query */ + RD_KAFKA_OP_BARRIER, /**< Version barrier bump */ + RD_KAFKA_OP__END +} rd_kafka_op_type_t; + +/* Flags used with op_type_t */ +#define RD_KAFKA_OP_CB (int)(1 << 29) /* Callback op. */ +#define RD_KAFKA_OP_REPLY (int)(1 << 30) /* Reply op. */ +#define RD_KAFKA_OP_FLAGMASK (RD_KAFKA_OP_CB | RD_KAFKA_OP_REPLY) + + +/** + * @brief Op/queue priority levels. + * @remark Since priority levels alter the FIFO order, pay extra attention + * to preserve ordering as deemed necessary. + * @remark Priority should only be set on ops destined for application + * facing queues (rk_rep, rkcg_q, etc). + */ +typedef enum { + RD_KAFKA_PRIO_NORMAL = 0, /* Normal bulk, messages, DRs, etc. */ + RD_KAFKA_PRIO_MEDIUM, /* Prioritize in front of bulk, + * still at some scale. e.g. logs, .. */ + RD_KAFKA_PRIO_HIGH, /* Small scale high priority */ + RD_KAFKA_PRIO_FLASH /* Micro scale, immediate delivery. */ +} rd_kafka_prio_t; + + +/** + * @brief Op handler result + * + * @remark When returning YIELD from a handler the handler will + * need to have made sure to either re-enqueue the op or destroy it + * since the caller will not touch the op anymore. + */ +typedef enum { + RD_KAFKA_OP_RES_PASS, /* Not handled, pass to caller */ + RD_KAFKA_OP_RES_HANDLED, /* Op was handled (through callbacks) */ + RD_KAFKA_OP_RES_KEEP, /* Op was handled (through callbacks) + * but must not be destroyed by op_handle(). + * It is NOT PERMITTED to return RES_KEEP + * from a callback handling a ERR__DESTROY + * event. */ + RD_KAFKA_OP_RES_YIELD /* Callback called yield */ +} rd_kafka_op_res_t; + + +/** + * @brief Queue serve callback call type + */ +typedef enum { + RD_KAFKA_Q_CB_INVALID, /* dont use */ + RD_KAFKA_Q_CB_CALLBACK, /* trigger callback based on op */ + RD_KAFKA_Q_CB_RETURN, /* return op rather than trigger callback + * (if possible)*/ + RD_KAFKA_Q_CB_FORCE_RETURN, /* return op, regardless of callback. */ + RD_KAFKA_Q_CB_EVENT /* like _Q_CB_RETURN but return event_t:ed op */ +} rd_kafka_q_cb_type_t; + +/** + * @brief Queue serve callback + * @remark See rd_kafka_op_res_t docs for return semantics. + */ +typedef rd_kafka_op_res_t(rd_kafka_q_serve_cb_t)(rd_kafka_t *rk, + struct rd_kafka_q_s *rkq, + struct rd_kafka_op_s *rko, + rd_kafka_q_cb_type_t cb_type, + void *opaque) + RD_WARN_UNUSED_RESULT; + +/** + * @brief Enumerates the assign op sub-types. + */ +typedef enum { + RD_KAFKA_ASSIGN_METHOD_ASSIGN, /**< Absolute assign/unassign */ + RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN, /**< Incremental assign */ + RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN /**< Incremental unassign */ +} rd_kafka_assign_method_t; + +/** + * @brief Op callback type + */ +typedef rd_kafka_op_res_t(rd_kafka_op_cb_t)(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + struct rd_kafka_op_s *rko) + RD_WARN_UNUSED_RESULT; + +/* Forward declaration */ +struct rd_kafka_admin_worker_cbs; +struct rd_kafka_admin_fanout_worker_cbs; + + +#define RD_KAFKA_OP_TYPE_ASSERT(rko, type) \ + rd_assert(((rko)->rko_type & ~RD_KAFKA_OP_FLAGMASK) == (type)) + +struct rd_kafka_op_s { + TAILQ_ENTRY(rd_kafka_op_s) rko_link; + + rd_kafka_op_type_t rko_type; /* Internal op type */ + rd_kafka_event_type_t rko_evtype; + int rko_flags; /* See RD_KAFKA_OP_F_... above */ + int32_t rko_version; + rd_kafka_resp_err_t rko_err; + rd_kafka_error_t *rko_error; + int32_t rko_len; /* Depends on type, typically the + * message length. */ + rd_kafka_prio_t rko_prio; /**< In-queue priority. + * Higher value means higher prio*/ + + rd_kafka_toppar_t *rko_rktp; + + /* + * Generic fields + */ + + /* Indicates request: enqueue reply on rko_replyq.q with .version. + * .q is refcounted. */ + rd_kafka_replyq_t rko_replyq; + + /* Original queue's op serve callback and opaque, if any. + * Mainly used for forwarded queues to use the original queue's + * serve function from the forwarded position. */ + rd_kafka_q_serve_cb_t *rko_serve; + void *rko_serve_opaque; + + rd_kafka_t *rko_rk; + +#if ENABLE_DEVEL + const char *rko_source; /**< Where op was created */ +#endif + + /* RD_KAFKA_OP_CB */ + rd_kafka_op_cb_t *rko_op_cb; + + union { + struct { + rd_kafka_buf_t *rkbuf; + rd_kafka_msg_t rkm; + int evidx; + } fetch; + + struct { + rd_kafka_topic_partition_list_t *partitions; + /** Require stable (txn-commited) offsets */ + rd_bool_t require_stable_offsets; + int do_free; /* free .partitions on destroy() */ + } offset_fetch; + + struct { + rd_kafka_topic_partition_list_t *partitions; + void (*cb)(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *offsets, + void *opaque); + void *opaque; + int silent_empty; /**< Fail silently if there are no + * offsets to commit. */ + rd_ts_t ts_timeout; + char *reason; + } offset_commit; + + struct { + rd_kafka_topic_partition_list_t *topics; + } subscribe; /* also used for GET_SUBSCRIPTION */ + + struct { + rd_kafka_topic_partition_list_t *partitions; + rd_kafka_assign_method_t method; + } assign; /* also used for GET_ASSIGNMENT */ + + struct { + rd_kafka_topic_partition_list_t *partitions; + } rebalance; + + struct { + const char *str; + } rebalance_protocol; + + struct { + char *str; + } name; + + rd_kafka_consumer_group_metadata_t *cg_metadata; + + struct { + int64_t offset; + char *errstr; + rd_kafka_msg_t rkm; + rd_kafka_topic_t *rkt; + int fatal; /**< This was a ERR__FATAL error that has + * been translated to the fatal error + * code. */ + } err; /* used for ERR and CONSUMER_ERR */ + + struct { + int throttle_time; + int32_t nodeid; + char *nodename; + } throttle; + + struct { + char *json; + size_t json_len; + } stats; + + struct { + rd_kafka_buf_t *rkbuf; + } xbuf; /* XMIT_BUF and RECV_BUF */ + + /* RD_KAFKA_OP_METADATA */ + struct { + rd_kafka_metadata_t *md; + int force; /* force request regardless of outstanding + * metadata requests. */ + } metadata; + + struct { + rd_kafka_topic_t *rkt; + rd_kafka_msgq_t msgq; + rd_kafka_msgq_t msgq2; + int do_purge2; + } dr; + + struct { + int32_t nodeid; + char nodename[RD_KAFKA_NODENAME_SIZE]; + } node; + + struct { + rd_kafka_fetch_pos_t pos; + int32_t broker_id; /**< Originating broker, or -1 */ + char *reason; + } offset_reset; + + struct { + rd_kafka_fetch_pos_t pos; + struct rd_kafka_cgrp_s *rkcg; + } fetch_start; /* reused for SEEK */ + + struct { + int pause; + int flag; + } pause; + + struct { + char fac[64]; + int level; + char *str; + int ctx; + } log; + + struct { + rd_kafka_AdminOptions_t options; /**< Copy of user's + * options */ + rd_ts_t abs_timeout; /**< Absolute timeout + * for this request. */ + rd_kafka_timer_t tmr; /**< Timeout timer */ + struct rd_kafka_enq_once_s *eonce; /**< Enqueue op + * only once, + * used to + * (re)trigger + * the request op + * upon broker state + * changes while + * waiting for the + * controller, or + * due to .tmr + * timeout. */ + rd_list_t + args; /**< Type depends on request, e.g. + * rd_kafka_NewTopic_t for CreateTopics + */ + + rd_kafka_buf_t *reply_buf; /**< Protocol reply, + * temporary reference not + * owned by this rko */ + + /**< Worker callbacks, see rdkafka_admin.c */ + struct rd_kafka_admin_worker_cbs *cbs; + + /** Worker state */ + enum { RD_KAFKA_ADMIN_STATE_INIT, + RD_KAFKA_ADMIN_STATE_WAIT_BROKER, + RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER, + RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS, + RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST, + RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE, + RD_KAFKA_ADMIN_STATE_WAIT_BROKER_LIST, + } state; + + int32_t broker_id; /**< Requested broker id to + * communicate with. + * Used for AlterConfigs, et.al, + * that needs to speak to a + * specific broker rather than + * the controller. + * See RD_KAFKA_ADMIN_TARGET_.. + * for special values (coordinator, + * fanout, etc). + */ + /** The type of coordinator to look up */ + rd_kafka_coordtype_t coordtype; + /** Which coordinator to look up */ + char *coordkey; + + /** Application's reply queue */ + rd_kafka_replyq_t replyq; + rd_kafka_event_type_t reply_event_type; + + /** A collection of fanout child ops. */ + struct { + /** The type of request being fanned out. + * This is used for the ADMIN_RESULT. */ + rd_kafka_op_type_t reqtype; + + /** Worker callbacks, see rdkafka_admin.c */ + struct rd_kafka_admin_fanout_worker_cbs *cbs; + + /** Number of outstanding requests remaining to + * wait for. */ + int outstanding; + + /** Incremental results from fanouts. + * This list is pre-allocated to the number + * of input objects and can thus be set + * by index to retain original ordering. */ + rd_list_t results; + + /** Reply event type */ + rd_kafka_event_type_t reply_event_type; + + } fanout; + + /** A reference to the parent ADMIN_FANOUT op that + * spawned this op, if applicable. NULL otherwise. */ + struct rd_kafka_op_s *fanout_parent; + + } admin_request; + + struct { + rd_kafka_op_type_t reqtype; /**< Request op type, + * used for logging. */ + + rd_list_t args; /**< Args moved from the request op + * when the result op is created. + * + * Type depends on request. + */ + + char *errstr; /**< Error string, if rko_err + * is set, else NULL. */ + + rd_list_t results; /**< Type depends on request type: + * + * (rd_kafka_topic_result_t *): + * CreateTopics, DeleteTopics, + * CreatePartitions. + * + * (rd_kafka_ConfigResource_t *): + * AlterConfigs, DescribeConfigs + */ + + void *opaque; /**< Application's opaque as set by + * rd_kafka_AdminOptions_set_opaque + */ + + /** A reference to the parent ADMIN_FANOUT op that + * spawned this op, if applicable. NULL otherwise. */ + struct rd_kafka_op_s *fanout_parent; + } admin_result; + + struct { + int flags; /**< purge_flags from rd_kafka_purge() */ + } purge; + + /**< Mock cluster command */ + struct { + enum { RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR, + RD_KAFKA_MOCK_CMD_TOPIC_CREATE, + RD_KAFKA_MOCK_CMD_PART_SET_LEADER, + RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER, + RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS, + RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN, + RD_KAFKA_MOCK_CMD_BROKER_SET_RTT, + RD_KAFKA_MOCK_CMD_BROKER_SET_RACK, + RD_KAFKA_MOCK_CMD_COORD_SET, + RD_KAFKA_MOCK_CMD_APIVERSION_SET, + } cmd; + + rd_kafka_resp_err_t err; /**< Error for: + * TOPIC_SET_ERROR */ + char *name; /**< For: + * TOPIC_SET_ERROR + * TOPIC_CREATE + * PART_SET_FOLLOWER + * PART_SET_FOLLOWER_WMARKS + * BROKER_SET_RACK + * COORD_SET (key_type) */ + char *str; /**< For: + * COORD_SET (key) */ + int32_t partition; /**< For: + * PART_SET_FOLLOWER + * PART_SET_FOLLOWER_WMARKS + * PART_SET_LEADER + * APIVERSION_SET (ApiKey) + */ + int32_t broker_id; /**< For: + * PART_SET_FOLLOWER + * PART_SET_LEADER + * BROKER_SET_UPDOWN + * BROKER_SET_RACK + * COORD_SET */ + int64_t lo; /**< Low offset, for: + * TOPIC_CREATE (part cnt) + * PART_SET_FOLLOWER_WMARKS + * BROKER_SET_UPDOWN + * APIVERSION_SET (minver) + * BROKER_SET_RTT + */ + int64_t hi; /**< High offset, for: + * TOPIC_CREATE (repl fact) + * PART_SET_FOLLOWER_WMARKS + * APIVERSION_SET (maxver) + */ + } mock; + + struct { + struct rd_kafka_broker_s *rkb; /**< Broker who's state + * changed. */ + /**< Callback to trigger on the op handler's thread. */ + void (*cb)(struct rd_kafka_broker_s *rkb); + } broker_monitor; + + struct { + /** Consumer group metadata for send_offsets_to.. */ + rd_kafka_consumer_group_metadata_t *cgmetadata; + /** Consumer group id for AddOffsetsTo.. */ + char *group_id; + int timeout_ms; /**< Operation timeout */ + rd_ts_t abs_timeout; /**< Absolute time */ + /**< Offsets to commit */ + rd_kafka_topic_partition_list_t *offsets; + } txn; + + struct { + /* This struct serves two purposes, the fields + * with "Request:" are used for the async workers state + * while the "Reply:" fields is a separate reply + * rko that is enqueued for the caller upon + * completion or failure. */ + + /** Request: Partitions to query. + * Reply: Queried partitions with .err field set. */ + rd_kafka_topic_partition_list_t *partitions; + + /** Request: Absolute timeout */ + rd_ts_t ts_timeout; + + /** Request: Metadata query timer */ + rd_kafka_timer_t query_tmr; + + /** Request: Timeout timer */ + rd_kafka_timer_t timeout_tmr; + + /** Request: Enqueue op only once, used to (re)trigger + * metadata cache lookups, topic refresh, timeout. */ + struct rd_kafka_enq_once_s *eonce; + + /** Request: Caller's replyq */ + rd_kafka_replyq_t replyq; + + /** Request: Number of metadata queries made. */ + int query_cnt; + + /** Reply: Leaders (result) + * (rd_kafka_partition_leader*) */ + rd_list_t *leaders; + + /** Reply: Callback on completion (or failure) */ + rd_kafka_op_cb_t *cb; + + /** Reply: Callback opaque */ + void *opaque; + + } leaders; + + } rko_u; +}; + +TAILQ_HEAD(rd_kafka_op_head_s, rd_kafka_op_s); + + + +const char *rd_kafka_op2str(rd_kafka_op_type_t type); +void rd_kafka_op_destroy(rd_kafka_op_t *rko); +rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type); +#if ENABLE_DEVEL +#define _STRINGIFYX(A) #A +#define _STRINGIFY(A) _STRINGIFYX(A) +#define rd_kafka_op_new(type) \ + rd_kafka_op_new0(__FILE__ ":" _STRINGIFY(__LINE__), type) +#else +#define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type) +#endif +rd_kafka_op_t *rd_kafka_op_new_reply(rd_kafka_op_t *rko_orig, + rd_kafka_resp_err_t err); +rd_kafka_op_t *rd_kafka_op_new_cb(rd_kafka_t *rk, + rd_kafka_op_type_t type, + rd_kafka_op_cb_t *cb); +int rd_kafka_op_reply(rd_kafka_op_t *rko, rd_kafka_resp_err_t err); +int rd_kafka_op_error_reply(rd_kafka_op_t *rko, rd_kafka_error_t *error); + +#define rd_kafka_op_set_prio(rko, prio) ((rko)->rko_prio = prio) + +#define rd_kafka_op_err(rk, err, ...) \ + do { \ + if (!((rk)->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)) { \ + rd_kafka_log(rk, LOG_ERR, "ERROR", __VA_ARGS__); \ + break; \ + } \ + rd_kafka_q_op_err((rk)->rk_rep, err, __VA_ARGS__); \ + } while (0) + +void rd_kafka_q_op_err(rd_kafka_q_t *rkq, + rd_kafka_resp_err_t err, + const char *fmt, + ...) RD_FORMAT(printf, 3, 4); +void rd_kafka_consumer_err(rd_kafka_q_t *rkq, + int32_t broker_id, + rd_kafka_resp_err_t err, + int32_t version, + const char *topic, + rd_kafka_toppar_t *rktp, + int64_t offset, + const char *fmt, + ...) RD_FORMAT(printf, 8, 9); +rd_kafka_op_t *rd_kafka_op_req0(rd_kafka_q_t *destq, + rd_kafka_q_t *recvq, + rd_kafka_op_t *rko, + int timeout_ms); +rd_kafka_op_t * +rd_kafka_op_req(rd_kafka_q_t *destq, rd_kafka_op_t *rko, int timeout_ms); +rd_kafka_op_t *rd_kafka_op_req2(rd_kafka_q_t *destq, rd_kafka_op_type_t type); +rd_kafka_resp_err_t rd_kafka_op_err_destroy(rd_kafka_op_t *rko); +rd_kafka_error_t *rd_kafka_op_error_destroy(rd_kafka_op_t *rko); + +rd_kafka_op_res_t rd_kafka_op_call(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) RD_WARN_UNUSED_RESULT; + +rd_kafka_op_t *rd_kafka_op_new_fetch_msg(rd_kafka_msg_t **rkmp, + rd_kafka_toppar_t *rktp, + int32_t version, + rd_kafka_buf_t *rkbuf, + int64_t offset, + size_t key_len, + const void *key, + size_t val_len, + const void *val); + +rd_kafka_op_t *rd_kafka_op_new_ctrl_msg(rd_kafka_toppar_t *rktp, + int32_t version, + rd_kafka_buf_t *rkbuf, + int64_t offset); + +void rd_kafka_op_throttle_time(struct rd_kafka_broker_s *rkb, + rd_kafka_q_t *rkq, + int throttle_time); + + +rd_kafka_op_res_t +rd_kafka_op_handle(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko, + rd_kafka_q_cb_type_t cb_type, + void *opaque, + rd_kafka_q_serve_cb_t *callback) RD_WARN_UNUSED_RESULT; + + +extern rd_atomic32_t rd_kafka_op_cnt; + +void rd_kafka_op_print(FILE *fp, const char *prefix, rd_kafka_op_t *rko); + +void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko); + + +#define rd_kafka_op_is_ctrl_msg(rko) \ + ((rko)->rko_type == RD_KAFKA_OP_FETCH && !(rko)->rko_err && \ + ((rko)->rko_u.fetch.rkm.rkm_flags & RD_KAFKA_MSG_F_CONTROL)) + + + +/** + * @returns true if the rko's replyq is valid and the + * rko's rktp version (if any) is not outdated. + */ +#define rd_kafka_op_replyq_is_valid(RKO) \ + (rd_kafka_replyq_is_valid(&(RKO)->rko_replyq) && \ + !rd_kafka_op_version_outdated((RKO), 0)) + + + +/** + * @returns the rko for a consumer message (RD_KAFKA_OP_FETCH). + */ +static RD_UNUSED rd_kafka_op_t * +rd_kafka_message2rko(rd_kafka_message_t *rkmessage) { + rd_kafka_op_t *rko = rkmessage->_private; + + if (!rko || rko->rko_type != RD_KAFKA_OP_FETCH) + return NULL; + + return rko; +} + + + +#endif /* _RDKAFKA_OP_H_ */ |