diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock_int.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock_int.h | 538 |
1 files changed, 538 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock_int.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock_int.h new file mode 100644 index 00000000..ea3b6cab --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock_int.h @@ -0,0 +1,538 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _RDKAFKA_MOCK_INT_H_ +#define _RDKAFKA_MOCK_INT_H_ + +/** + * @name Mock cluster - internal data types + * + */ + + +/** + * @struct Response error and/or RTT-delay to return to client. + */ +typedef struct rd_kafka_mock_error_rtt_s { + rd_kafka_resp_err_t err; /**< Error response (or 0) */ + rd_ts_t rtt; /**< RTT/delay in microseconds (or 0) */ +} rd_kafka_mock_error_rtt_t; + +/** + * @struct A stack of errors or rtt latencies to return to the client, + * one by one until the stack is depleted. + */ +typedef struct rd_kafka_mock_error_stack_s { + TAILQ_ENTRY(rd_kafka_mock_error_stack_s) link; + int16_t ApiKey; /**< Optional ApiKey for which this stack + * applies to, else -1. */ + size_t cnt; /**< Current number of errors in .errs */ + size_t size; /**< Current allocated size for .errs (in elements) */ + rd_kafka_mock_error_rtt_t *errs; /**< Array of errors/rtts */ +} rd_kafka_mock_error_stack_t; + +typedef TAILQ_HEAD(rd_kafka_mock_error_stack_head_s, + rd_kafka_mock_error_stack_s) + rd_kafka_mock_error_stack_head_t; + + +/** + * @struct Consumer group protocol name and metadata. + */ +typedef struct rd_kafka_mock_cgrp_proto_s { + rd_kafkap_str_t *name; + rd_kafkap_bytes_t *metadata; +} rd_kafka_mock_cgrp_proto_t; + +/** + * @struct Consumer group member + */ +typedef struct rd_kafka_mock_cgrp_member_s { + TAILQ_ENTRY(rd_kafka_mock_cgrp_member_s) link; + char *id; /**< MemberId */ + char *group_instance_id; /**< Group instance id */ + rd_ts_t ts_last_activity; /**< Last activity, e.g., Heartbeat */ + rd_kafka_mock_cgrp_proto_t *protos; /**< Protocol names */ + int proto_cnt; /**< Number of protocols */ + rd_kafkap_bytes_t *assignment; /**< Current assignment */ + rd_kafka_buf_t *resp; /**< Current response buffer */ + struct rd_kafka_mock_connection_s *conn; /**< Connection, may be NULL + * if there is no ongoing + * request. */ +} rd_kafka_mock_cgrp_member_t; + +/** + * @struct Consumer group. + */ +typedef struct rd_kafka_mock_cgrp_s { + TAILQ_ENTRY(rd_kafka_mock_cgrp_s) link; + struct rd_kafka_mock_cluster_s *cluster; /**< Cluster */ + struct rd_kafka_mock_connection_s *conn; /**< Connection */ + char *id; /**< Group Id */ + char *protocol_type; /**< Protocol type */ + char *protocol_name; /**< Elected protocol name */ + int32_t generation_id; /**< Generation Id */ + int session_timeout_ms; /**< Session timeout */ + enum { RD_KAFKA_MOCK_CGRP_STATE_EMPTY, /* No members */ + RD_KAFKA_MOCK_CGRP_STATE_JOINING, /* Members are joining */ + RD_KAFKA_MOCK_CGRP_STATE_SYNCING, /* Syncing assignments */ + RD_KAFKA_MOCK_CGRP_STATE_REBALANCING, /* Rebalance triggered */ + RD_KAFKA_MOCK_CGRP_STATE_UP, /* Group is operational */ + } state; /**< Consumer group state */ + rd_kafka_timer_t session_tmr; /**< Session timeout timer */ + rd_kafka_timer_t rebalance_tmr; /**< Rebalance state timer */ + TAILQ_HEAD(, rd_kafka_mock_cgrp_member_s) members; /**< Group members */ + int member_cnt; /**< Number of group members */ + int last_member_cnt; /**< Mumber of group members at last election */ + int assignment_cnt; /**< Number of member assignments in last Sync */ + rd_kafka_mock_cgrp_member_t *leader; /**< Elected leader */ +} rd_kafka_mock_cgrp_t; + + +/** + * @struct TransactionalId + PID (+ optional sequence state) + */ +typedef struct rd_kafka_mock_pid_s { + rd_kafka_pid_t pid; + + /* BaseSequence tracking (partition) */ + int8_t window; /**< increases up to 5 */ + int8_t lo; /**< Window low bucket: oldest */ + int8_t hi; /**< Window high bucket: most recent */ + int32_t seq[5]; /**< Next expected BaseSequence for each bucket */ + + char TransactionalId[1]; /**< Allocated after this structure */ +} rd_kafka_mock_pid_t; + +/** + * @brief rd_kafka_mock_pid_t.pid Pid (not epoch) comparator + */ +static RD_UNUSED int rd_kafka_mock_pid_cmp_pid(const void *_a, const void *_b) { + const rd_kafka_mock_pid_t *a = _a, *b = _b; + + if (a->pid.id < b->pid.id) + return -1; + else if (a->pid.id > b->pid.id) + return 1; + + return 0; +} + +/** + * @brief rd_kafka_mock_pid_t.pid TransactionalId,Pid,epoch comparator + */ +static RD_UNUSED int rd_kafka_mock_pid_cmp(const void *_a, const void *_b) { + const rd_kafka_mock_pid_t *a = _a, *b = _b; + int r; + + r = strcmp(a->TransactionalId, b->TransactionalId); + if (r) + return r; + + if (a->pid.id < b->pid.id) + return -1; + else if (a->pid.id > b->pid.id) + return 1; + + if (a->pid.epoch < b->pid.epoch) + return -1; + if (a->pid.epoch > b->pid.epoch) + return 1; + + return 0; +} + + + +/** + * @struct A real TCP connection from the client to a mock broker. + */ +typedef struct rd_kafka_mock_connection_s { + TAILQ_ENTRY(rd_kafka_mock_connection_s) link; + rd_kafka_transport_t *transport; /**< Socket transport */ + rd_kafka_buf_t *rxbuf; /**< Receive buffer */ + rd_kafka_bufq_t outbufs; /**< Send buffers */ + short *poll_events; /**< Events to poll, points to + * the broker's pfd array */ + struct sockaddr_in peer; /**< Peer address */ + struct rd_kafka_mock_broker_s *broker; + rd_kafka_timer_t write_tmr; /**< Socket write delay timer */ +} rd_kafka_mock_connection_t; + + +/** + * @struct Mock broker + */ +typedef struct rd_kafka_mock_broker_s { + TAILQ_ENTRY(rd_kafka_mock_broker_s) link; + int32_t id; + char advertised_listener[128]; + struct sockaddr_in sin; /**< Bound address:port */ + uint16_t port; + char *rack; + rd_bool_t up; + rd_ts_t rtt; /**< RTT in microseconds */ + + rd_socket_t listen_s; /**< listen() socket */ + + TAILQ_HEAD(, rd_kafka_mock_connection_s) connections; + + /**< Per-protocol request error stack. + * @locks mcluster->lock */ + rd_kafka_mock_error_stack_head_t errstacks; + + struct rd_kafka_mock_cluster_s *cluster; +} rd_kafka_mock_broker_t; + + +/** + * @struct A Kafka-serialized MessageSet + */ +typedef struct rd_kafka_mock_msgset_s { + TAILQ_ENTRY(rd_kafka_mock_msgset_s) link; + int64_t first_offset; /**< First offset in batch */ + int64_t last_offset; /**< Last offset in batch */ + int32_t leader_epoch; /**< Msgset leader epoch */ + rd_kafkap_bytes_t bytes; + /* Space for bytes.data is allocated after the msgset_t */ +} rd_kafka_mock_msgset_t; + + +/** + * @struct Committed offset for a group and partition. + */ +typedef struct rd_kafka_mock_committed_offset_s { + /**< mpart.committed_offsets */ + TAILQ_ENTRY(rd_kafka_mock_committed_offset_s) link; + char *group; /**< Allocated along with the struct */ + int64_t offset; /**< Committed offset */ + rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */ +} rd_kafka_mock_committed_offset_t; + + +TAILQ_HEAD(rd_kafka_mock_msgset_tailq_s, rd_kafka_mock_msgset_s); + +/** + * @struct Mock partition + */ +typedef struct rd_kafka_mock_partition_s { + TAILQ_ENTRY(rd_kafka_mock_partition_s) leader_link; + int32_t id; + + int32_t leader_epoch; /**< Leader epoch, bumped on each + * partition leader change. */ + int64_t start_offset; /**< Actual/leader start offset */ + int64_t end_offset; /**< Actual/leader end offset */ + int64_t follower_start_offset; /**< Follower's start offset */ + int64_t follower_end_offset; /**< Follower's end offset */ + rd_bool_t update_follower_start_offset; /**< Keep follower_start_offset + * in synch with start_offset + */ + rd_bool_t update_follower_end_offset; /**< Keep follower_end_offset + * in synch with end_offset + */ + + struct rd_kafka_mock_msgset_tailq_s msgsets; + size_t size; /**< Total size of all .msgsets */ + size_t cnt; /**< Total count of .msgsets */ + size_t max_size; /**< Maximum size of all .msgsets, may be overshot. */ + size_t max_cnt; /**< Maximum number of .msgsets */ + + /**< Committed offsets */ + TAILQ_HEAD(, rd_kafka_mock_committed_offset_s) committed_offsets; + + rd_kafka_mock_broker_t *leader; + rd_kafka_mock_broker_t **replicas; + int replica_cnt; + + rd_list_t pidstates; /**< PID states */ + + int32_t follower_id; /**< Preferred replica/follower */ + + struct rd_kafka_mock_topic_s *topic; +} rd_kafka_mock_partition_t; + + +/** + * @struct Mock topic + */ +typedef struct rd_kafka_mock_topic_s { + TAILQ_ENTRY(rd_kafka_mock_topic_s) link; + char *name; + + rd_kafka_mock_partition_t *partitions; + int partition_cnt; + + rd_kafka_resp_err_t err; /**< Error to return in protocol requests + * for this topic. */ + + struct rd_kafka_mock_cluster_s *cluster; +} rd_kafka_mock_topic_t; + +/** + * @struct Explicitly set coordinator. + */ +typedef struct rd_kafka_mock_coord_s { + TAILQ_ENTRY(rd_kafka_mock_coord_s) link; + rd_kafka_coordtype_t type; + char *key; + int32_t broker_id; +} rd_kafka_mock_coord_t; + + +typedef void(rd_kafka_mock_io_handler_t)( + struct rd_kafka_mock_cluster_s *mcluster, + rd_socket_t fd, + int events, + void *opaque); + +struct rd_kafka_mock_api_handler { + int16_t MinVersion; + int16_t MaxVersion; + int16_t FlexVersion; /**< First Flexible version */ + int (*cb)(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_t *rkbuf); +}; + +extern const struct rd_kafka_mock_api_handler + rd_kafka_mock_api_handlers[RD_KAFKAP__NUM]; + + + +/** + * @struct Mock cluster. + * + * The cluster IO loop runs in a separate thread where all + * broker IO is handled. + * + * No locking is needed. + */ +struct rd_kafka_mock_cluster_s { + char id[32]; /**< Generated cluster id */ + + rd_kafka_t *rk; + + int32_t controller_id; /**< Current controller */ + + TAILQ_HEAD(, rd_kafka_mock_broker_s) brokers; + int broker_cnt; + + TAILQ_HEAD(, rd_kafka_mock_topic_s) topics; + int topic_cnt; + + TAILQ_HEAD(, rd_kafka_mock_cgrp_s) cgrps; + + /** Explicit coordinators (set with mock_set_coordinator()) */ + TAILQ_HEAD(, rd_kafka_mock_coord_s) coords; + + /** Current transactional producer PIDs. + * Element type is a malloced rd_kafka_mock_pid_t*. */ + rd_list_t pids; + + char *bootstraps; /**< bootstrap.servers */ + + thrd_t thread; /**< Mock thread */ + + rd_kafka_q_t *ops; /**< Control ops queue for interacting with the + * cluster. */ + + rd_socket_t wakeup_fds[2]; /**< Wake-up fds for use with .ops */ + + rd_bool_t run; /**< Cluster will run while this value is true */ + + int fd_cnt; /**< Number of file descriptors */ + int fd_size; /**< Allocated size of .fds + * and .handlers */ + struct pollfd *fds; /**< Dynamic array */ + + rd_kafka_broker_t *dummy_rkb; /**< Some internal librdkafka APIs + * that we are reusing requires a + * broker object, we use the + * internal broker and store it + * here for convenient access. */ + + struct { + int partition_cnt; /**< Auto topic create part cnt */ + int replication_factor; /**< Auto topic create repl factor */ + } defaults; + + /**< Dynamic array of IO handlers for corresponding fd in .fds */ + struct { + rd_kafka_mock_io_handler_t *cb; /**< Callback */ + void *opaque; /**< Callbacks' opaque */ + } * handlers; + + /**< Per-protocol request error stack. */ + rd_kafka_mock_error_stack_head_t errstacks; + + /**< Request handlers */ + struct rd_kafka_mock_api_handler api_handlers[RD_KAFKAP__NUM]; + + /**< Mutex for: + * .errstacks + * .apiversions + */ + mtx_t lock; + + rd_kafka_timers_t timers; /**< Timers */ +}; + + + +rd_kafka_buf_t *rd_kafka_mock_buf_new_response(const rd_kafka_buf_t *request); +void rd_kafka_mock_connection_send_response(rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t *resp); +void rd_kafka_mock_connection_set_blocking(rd_kafka_mock_connection_t *mconn, + rd_bool_t blocking); + +rd_kafka_mock_partition_t * +rd_kafka_mock_partition_find(const rd_kafka_mock_topic_t *mtopic, + int32_t partition); +rd_kafka_mock_topic_t * +rd_kafka_mock_topic_auto_create(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int partition_cnt, + rd_kafka_resp_err_t *errp); +rd_kafka_mock_topic_t * +rd_kafka_mock_topic_find(const rd_kafka_mock_cluster_t *mcluster, + const char *name); +rd_kafka_mock_topic_t * +rd_kafka_mock_topic_find_by_kstr(const rd_kafka_mock_cluster_t *mcluster, + const rd_kafkap_str_t *kname); +rd_kafka_mock_broker_t * +rd_kafka_mock_cluster_get_coord(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_coordtype_t KeyType, + const rd_kafkap_str_t *Key); + +rd_kafka_mock_committed_offset_t * +rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart, + const rd_kafkap_str_t *group); +rd_kafka_mock_committed_offset_t * +rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, + const rd_kafkap_str_t *group, + int64_t offset, + const rd_kafkap_str_t *metadata); + +const rd_kafka_mock_msgset_t * +rd_kafka_mock_msgset_find(const rd_kafka_mock_partition_t *mpart, + int64_t offset, + rd_bool_t on_follower); + +rd_kafka_resp_err_t +rd_kafka_mock_next_request_error(rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t *resp); + +rd_kafka_resp_err_t +rd_kafka_mock_partition_log_append(rd_kafka_mock_partition_t *mpart, + const rd_kafkap_bytes_t *records, + const rd_kafkap_str_t *TransactionalId, + int64_t *BaseOffset); + +rd_kafka_resp_err_t rd_kafka_mock_partition_leader_epoch_check( + const rd_kafka_mock_partition_t *mpart, + int32_t leader_epoch); + +int64_t rd_kafka_mock_partition_offset_for_leader_epoch( + const rd_kafka_mock_partition_t *mpart, + int32_t leader_epoch); + + +/** + * @returns true if the ApiVersion is supported, else false. + */ +static RD_UNUSED rd_bool_t +rd_kafka_mock_cluster_ApiVersion_check(const rd_kafka_mock_cluster_t *mcluster, + int16_t ApiKey, + int16_t ApiVersion) { + return (ApiVersion >= mcluster->api_handlers[ApiKey].MinVersion && + ApiVersion <= mcluster->api_handlers[ApiKey].MaxVersion); +} + + +rd_kafka_resp_err_t +rd_kafka_mock_pid_find(rd_kafka_mock_cluster_t *mcluster, + const rd_kafkap_str_t *TransactionalId, + const rd_kafka_pid_t pid, + rd_kafka_mock_pid_t **mpidp); + + +/** + * @name Mock consumer group (rdkafka_mock_cgrp.c) + * @{ + */ +void rd_kafka_mock_cgrp_member_active(rd_kafka_mock_cgrp_t *mcgrp, + rd_kafka_mock_cgrp_member_t *member); +void rd_kafka_mock_cgrp_member_assignment_set( + rd_kafka_mock_cgrp_t *mcgrp, + rd_kafka_mock_cgrp_member_t *member, + const rd_kafkap_bytes_t *Metadata); +rd_kafka_resp_err_t +rd_kafka_mock_cgrp_member_sync_set(rd_kafka_mock_cgrp_t *mcgrp, + rd_kafka_mock_cgrp_member_t *member, + rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t *resp); +rd_kafka_resp_err_t +rd_kafka_mock_cgrp_member_leave(rd_kafka_mock_cgrp_t *mcgrp, + rd_kafka_mock_cgrp_member_t *member); +void rd_kafka_mock_cgrp_protos_destroy(rd_kafka_mock_cgrp_proto_t *protos, + int proto_cnt); +rd_kafka_resp_err_t +rd_kafka_mock_cgrp_member_add(rd_kafka_mock_cgrp_t *mcgrp, + rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t *resp, + const rd_kafkap_str_t *MemberId, + const rd_kafkap_str_t *ProtocolType, + rd_kafka_mock_cgrp_proto_t *protos, + int proto_cnt, + int session_timeout_ms); +rd_kafka_resp_err_t +rd_kafka_mock_cgrp_check_state(rd_kafka_mock_cgrp_t *mcgrp, + rd_kafka_mock_cgrp_member_t *member, + const rd_kafka_buf_t *request, + int32_t generation_id); +rd_kafka_mock_cgrp_member_t * +rd_kafka_mock_cgrp_member_find(const rd_kafka_mock_cgrp_t *mcgrp, + const rd_kafkap_str_t *MemberId); +void rd_kafka_mock_cgrp_destroy(rd_kafka_mock_cgrp_t *mcgrp); +rd_kafka_mock_cgrp_t *rd_kafka_mock_cgrp_find(rd_kafka_mock_cluster_t *mcluster, + const rd_kafkap_str_t *GroupId); +rd_kafka_mock_cgrp_t * +rd_kafka_mock_cgrp_get(rd_kafka_mock_cluster_t *mcluster, + const rd_kafkap_str_t *GroupId, + const rd_kafkap_str_t *ProtocolType); +void rd_kafka_mock_cgrps_connection_closed(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_mock_connection_t *mconn); + + +/** + *@} + */ + + +#include "rdkafka_mock.h" + +#endif /* _RDKAFKA_MOCK_INT_H_ */ |