/* * librdkafka - Apache Kafka C library * * Copyright (c) 2019 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _RDKAFKA_MOCK_INT_H_ #define _RDKAFKA_MOCK_INT_H_ /** * @name Mock cluster - internal data types * */ /** * @struct Response error and/or RTT-delay to return to client. */ typedef struct rd_kafka_mock_error_rtt_s { rd_kafka_resp_err_t err; /**< Error response (or 0) */ rd_ts_t rtt; /**< RTT/delay in microseconds (or 0) */ } rd_kafka_mock_error_rtt_t; /** * @struct A stack of errors or rtt latencies to return to the client, * one by one until the stack is depleted. */ typedef struct rd_kafka_mock_error_stack_s { TAILQ_ENTRY(rd_kafka_mock_error_stack_s) link; int16_t ApiKey; /**< Optional ApiKey for which this stack * applies to, else -1. */ size_t cnt; /**< Current number of errors in .errs */ size_t size; /**< Current allocated size for .errs (in elements) */ rd_kafka_mock_error_rtt_t *errs; /**< Array of errors/rtts */ } rd_kafka_mock_error_stack_t; typedef TAILQ_HEAD(rd_kafka_mock_error_stack_head_s, rd_kafka_mock_error_stack_s) rd_kafka_mock_error_stack_head_t; /** * @struct Consumer group protocol name and metadata. */ typedef struct rd_kafka_mock_cgrp_proto_s { rd_kafkap_str_t *name; rd_kafkap_bytes_t *metadata; } rd_kafka_mock_cgrp_proto_t; /** * @struct Consumer group member */ typedef struct rd_kafka_mock_cgrp_member_s { TAILQ_ENTRY(rd_kafka_mock_cgrp_member_s) link; char *id; /**< MemberId */ char *group_instance_id; /**< Group instance id */ rd_ts_t ts_last_activity; /**< Last activity, e.g., Heartbeat */ rd_kafka_mock_cgrp_proto_t *protos; /**< Protocol names */ int proto_cnt; /**< Number of protocols */ rd_kafkap_bytes_t *assignment; /**< Current assignment */ rd_kafka_buf_t *resp; /**< Current response buffer */ struct rd_kafka_mock_connection_s *conn; /**< Connection, may be NULL * if there is no ongoing * request. */ } rd_kafka_mock_cgrp_member_t; /** * @struct Consumer group. */ typedef struct rd_kafka_mock_cgrp_s { TAILQ_ENTRY(rd_kafka_mock_cgrp_s) link; struct rd_kafka_mock_cluster_s *cluster; /**< Cluster */ struct rd_kafka_mock_connection_s *conn; /**< Connection */ char *id; /**< Group Id */ char *protocol_type; /**< Protocol type */ char *protocol_name; /**< Elected protocol name */ int32_t generation_id; /**< Generation Id */ int session_timeout_ms; /**< Session timeout */ enum { RD_KAFKA_MOCK_CGRP_STATE_EMPTY, /* No members */ RD_KAFKA_MOCK_CGRP_STATE_JOINING, /* Members are joining */ RD_KAFKA_MOCK_CGRP_STATE_SYNCING, /* Syncing assignments */ RD_KAFKA_MOCK_CGRP_STATE_REBALANCING, /* Rebalance triggered */ RD_KAFKA_MOCK_CGRP_STATE_UP, /* Group is operational */ } state; /**< Consumer group state */ rd_kafka_timer_t session_tmr; /**< Session timeout timer */ rd_kafka_timer_t rebalance_tmr; /**< Rebalance state timer */ TAILQ_HEAD(, rd_kafka_mock_cgrp_member_s) members; /**< Group members */ int member_cnt; /**< Number of group members */ int last_member_cnt; /**< Mumber of group members at last election */ int assignment_cnt; /**< Number of member assignments in last Sync */ rd_kafka_mock_cgrp_member_t *leader; /**< Elected leader */ } rd_kafka_mock_cgrp_t; /** * @struct TransactionalId + PID (+ optional sequence state) */ typedef struct rd_kafka_mock_pid_s { rd_kafka_pid_t pid; /* BaseSequence tracking (partition) */ int8_t window; /**< increases up to 5 */ int8_t lo; /**< Window low bucket: oldest */ int8_t hi; /**< Window high bucket: most recent */ int32_t seq[5]; /**< Next expected BaseSequence for each bucket */ char TransactionalId[1]; /**< Allocated after this structure */ } rd_kafka_mock_pid_t; /** * @brief rd_kafka_mock_pid_t.pid Pid (not epoch) comparator */ static RD_UNUSED int rd_kafka_mock_pid_cmp_pid(const void *_a, const void *_b) { const rd_kafka_mock_pid_t *a = _a, *b = _b; if (a->pid.id < b->pid.id) return -1; else if (a->pid.id > b->pid.id) return 1; return 0; } /** * @brief rd_kafka_mock_pid_t.pid TransactionalId,Pid,epoch comparator */ static RD_UNUSED int rd_kafka_mock_pid_cmp(const void *_a, const void *_b) { const rd_kafka_mock_pid_t *a = _a, *b = _b; int r; r = strcmp(a->TransactionalId, b->TransactionalId); if (r) return r; if (a->pid.id < b->pid.id) return -1; else if (a->pid.id > b->pid.id) return 1; if (a->pid.epoch < b->pid.epoch) return -1; if (a->pid.epoch > b->pid.epoch) return 1; return 0; } /** * @struct A real TCP connection from the client to a mock broker. */ typedef struct rd_kafka_mock_connection_s { TAILQ_ENTRY(rd_kafka_mock_connection_s) link; rd_kafka_transport_t *transport; /**< Socket transport */ rd_kafka_buf_t *rxbuf; /**< Receive buffer */ rd_kafka_bufq_t outbufs; /**< Send buffers */ short *poll_events; /**< Events to poll, points to * the broker's pfd array */ struct sockaddr_in peer; /**< Peer address */ struct rd_kafka_mock_broker_s *broker; rd_kafka_timer_t write_tmr; /**< Socket write delay timer */ } rd_kafka_mock_connection_t; /** * @struct Mock broker */ typedef struct rd_kafka_mock_broker_s { TAILQ_ENTRY(rd_kafka_mock_broker_s) link; int32_t id; char advertised_listener[128]; struct sockaddr_in sin; /**< Bound address:port */ uint16_t port; char *rack; rd_bool_t up; rd_ts_t rtt; /**< RTT in microseconds */ rd_socket_t listen_s; /**< listen() socket */ TAILQ_HEAD(, rd_kafka_mock_connection_s) connections; /**< Per-protocol request error stack. * @locks mcluster->lock */ rd_kafka_mock_error_stack_head_t errstacks; struct rd_kafka_mock_cluster_s *cluster; } rd_kafka_mock_broker_t; /** * @struct A Kafka-serialized MessageSet */ typedef struct rd_kafka_mock_msgset_s { TAILQ_ENTRY(rd_kafka_mock_msgset_s) link; int64_t first_offset; /**< First offset in batch */ int64_t last_offset; /**< Last offset in batch */ int32_t leader_epoch; /**< Msgset leader epoch */ rd_kafkap_bytes_t bytes; /* Space for bytes.data is allocated after the msgset_t */ } rd_kafka_mock_msgset_t; /** * @struct Committed offset for a group and partition. */ typedef struct rd_kafka_mock_committed_offset_s { /**< mpart.committed_offsets */ TAILQ_ENTRY(rd_kafka_mock_committed_offset_s) link; char *group; /**< Allocated along with the struct */ int64_t offset; /**< Committed offset */ rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */ } rd_kafka_mock_committed_offset_t; TAILQ_HEAD(rd_kafka_mock_msgset_tailq_s, rd_kafka_mock_msgset_s); /** * @struct Mock partition */ typedef struct rd_kafka_mock_partition_s { TAILQ_ENTRY(rd_kafka_mock_partition_s) leader_link; int32_t id; int32_t leader_epoch; /**< Leader epoch, bumped on each * partition leader change. */ int64_t start_offset; /**< Actual/leader start offset */ int64_t end_offset; /**< Actual/leader end offset */ int64_t follower_start_offset; /**< Follower's start offset */ int64_t follower_end_offset; /**< Follower's end offset */ rd_bool_t update_follower_start_offset; /**< Keep follower_start_offset * in synch with start_offset */ rd_bool_t update_follower_end_offset; /**< Keep follower_end_offset * in synch with end_offset */ struct rd_kafka_mock_msgset_tailq_s msgsets; size_t size; /**< Total size of all .msgsets */ size_t cnt; /**< Total count of .msgsets */ size_t max_size; /**< Maximum size of all .msgsets, may be overshot. */ size_t max_cnt; /**< Maximum number of .msgsets */ /**< Committed offsets */ TAILQ_HEAD(, rd_kafka_mock_committed_offset_s) committed_offsets; rd_kafka_mock_broker_t *leader; rd_kafka_mock_broker_t **replicas; int replica_cnt; rd_list_t pidstates; /**< PID states */ int32_t follower_id; /**< Preferred replica/follower */ struct rd_kafka_mock_topic_s *topic; } rd_kafka_mock_partition_t; /** * @struct Mock topic */ typedef struct rd_kafka_mock_topic_s { TAILQ_ENTRY(rd_kafka_mock_topic_s) link; char *name; rd_kafka_mock_partition_t *partitions; int partition_cnt; rd_kafka_resp_err_t err; /**< Error to return in protocol requests * for this topic. */ struct rd_kafka_mock_cluster_s *cluster; } rd_kafka_mock_topic_t; /** * @struct Explicitly set coordinator. */ typedef struct rd_kafka_mock_coord_s { TAILQ_ENTRY(rd_kafka_mock_coord_s) link; rd_kafka_coordtype_t type; char *key; int32_t broker_id; } rd_kafka_mock_coord_t; typedef void(rd_kafka_mock_io_handler_t)( struct rd_kafka_mock_cluster_s *mcluster, rd_socket_t fd, int events, void *opaque); struct rd_kafka_mock_api_handler { int16_t MinVersion; int16_t MaxVersion; int16_t FlexVersion; /**< First Flexible version */ int (*cb)(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_t *rkbuf); }; extern const struct rd_kafka_mock_api_handler rd_kafka_mock_api_handlers[RD_KAFKAP__NUM]; /** * @struct Mock cluster. * * The cluster IO loop runs in a separate thread where all * broker IO is handled. * * No locking is needed. */ struct rd_kafka_mock_cluster_s { char id[32]; /**< Generated cluster id */ rd_kafka_t *rk; int32_t controller_id; /**< Current controller */ TAILQ_HEAD(, rd_kafka_mock_broker_s) brokers; int broker_cnt; TAILQ_HEAD(, rd_kafka_mock_topic_s) topics; int topic_cnt; TAILQ_HEAD(, rd_kafka_mock_cgrp_s) cgrps; /** Explicit coordinators (set with mock_set_coordinator()) */ TAILQ_HEAD(, rd_kafka_mock_coord_s) coords; /** Current transactional producer PIDs. * Element type is a malloced rd_kafka_mock_pid_t*. */ rd_list_t pids; char *bootstraps; /**< bootstrap.servers */ thrd_t thread; /**< Mock thread */ rd_kafka_q_t *ops; /**< Control ops queue for interacting with the * cluster. */ rd_socket_t wakeup_fds[2]; /**< Wake-up fds for use with .ops */ rd_bool_t run; /**< Cluster will run while this value is true */ int fd_cnt; /**< Number of file descriptors */ int fd_size; /**< Allocated size of .fds * and .handlers */ struct pollfd *fds; /**< Dynamic array */ rd_kafka_broker_t *dummy_rkb; /**< Some internal librdkafka APIs * that we are reusing requires a * broker object, we use the * internal broker and store it * here for convenient access. */ struct { int partition_cnt; /**< Auto topic create part cnt */ int replication_factor; /**< Auto topic create repl factor */ } defaults; /**< Dynamic array of IO handlers for corresponding fd in .fds */ struct { rd_kafka_mock_io_handler_t *cb; /**< Callback */ void *opaque; /**< Callbacks' opaque */ } * handlers; /**< Per-protocol request error stack. */ rd_kafka_mock_error_stack_head_t errstacks; /**< Request handlers */ struct rd_kafka_mock_api_handler api_handlers[RD_KAFKAP__NUM]; /**< Mutex for: * .errstacks * .apiversions */ mtx_t lock; rd_kafka_timers_t timers; /**< Timers */ }; rd_kafka_buf_t *rd_kafka_mock_buf_new_response(const rd_kafka_buf_t *request); void rd_kafka_mock_connection_send_response(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_t *resp); void rd_kafka_mock_connection_set_blocking(rd_kafka_mock_connection_t *mconn, rd_bool_t blocking); rd_kafka_mock_partition_t * rd_kafka_mock_partition_find(const rd_kafka_mock_topic_t *mtopic, int32_t partition); rd_kafka_mock_topic_t * rd_kafka_mock_topic_auto_create(rd_kafka_mock_cluster_t *mcluster, const char *topic, int partition_cnt, rd_kafka_resp_err_t *errp); rd_kafka_mock_topic_t * rd_kafka_mock_topic_find(const rd_kafka_mock_cluster_t *mcluster, const char *name); rd_kafka_mock_topic_t * rd_kafka_mock_topic_find_by_kstr(const rd_kafka_mock_cluster_t *mcluster, const rd_kafkap_str_t *kname); rd_kafka_mock_broker_t * rd_kafka_mock_cluster_get_coord(rd_kafka_mock_cluster_t *mcluster, rd_kafka_coordtype_t KeyType, const rd_kafkap_str_t *Key); rd_kafka_mock_committed_offset_t * rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart, const rd_kafkap_str_t *group); rd_kafka_mock_committed_offset_t * rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, const rd_kafkap_str_t *group, int64_t offset, const rd_kafkap_str_t *metadata); const rd_kafka_mock_msgset_t * rd_kafka_mock_msgset_find(const rd_kafka_mock_partition_t *mpart, int64_t offset, rd_bool_t on_follower); rd_kafka_resp_err_t rd_kafka_mock_next_request_error(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_t *resp); rd_kafka_resp_err_t rd_kafka_mock_partition_log_append(rd_kafka_mock_partition_t *mpart, const rd_kafkap_bytes_t *records, const rd_kafkap_str_t *TransactionalId, int64_t *BaseOffset); rd_kafka_resp_err_t rd_kafka_mock_partition_leader_epoch_check( const rd_kafka_mock_partition_t *mpart, int32_t leader_epoch); int64_t rd_kafka_mock_partition_offset_for_leader_epoch( const rd_kafka_mock_partition_t *mpart, int32_t leader_epoch); /** * @returns true if the ApiVersion is supported, else false. */ static RD_UNUSED rd_bool_t rd_kafka_mock_cluster_ApiVersion_check(const rd_kafka_mock_cluster_t *mcluster, int16_t ApiKey, int16_t ApiVersion) { return (ApiVersion >= mcluster->api_handlers[ApiKey].MinVersion && ApiVersion <= mcluster->api_handlers[ApiKey].MaxVersion); } rd_kafka_resp_err_t rd_kafka_mock_pid_find(rd_kafka_mock_cluster_t *mcluster, const rd_kafkap_str_t *TransactionalId, const rd_kafka_pid_t pid, rd_kafka_mock_pid_t **mpidp); /** * @name Mock consumer group (rdkafka_mock_cgrp.c) * @{ */ void rd_kafka_mock_cgrp_member_active(rd_kafka_mock_cgrp_t *mcgrp, rd_kafka_mock_cgrp_member_t *member); void rd_kafka_mock_cgrp_member_assignment_set( rd_kafka_mock_cgrp_t *mcgrp, rd_kafka_mock_cgrp_member_t *member, const rd_kafkap_bytes_t *Metadata); rd_kafka_resp_err_t rd_kafka_mock_cgrp_member_sync_set(rd_kafka_mock_cgrp_t *mcgrp, rd_kafka_mock_cgrp_member_t *member, rd_kafka_mock_connection_t *mconn, rd_kafka_buf_t *resp); rd_kafka_resp_err_t rd_kafka_mock_cgrp_member_leave(rd_kafka_mock_cgrp_t *mcgrp, rd_kafka_mock_cgrp_member_t *member); void rd_kafka_mock_cgrp_protos_destroy(rd_kafka_mock_cgrp_proto_t *protos, int proto_cnt); rd_kafka_resp_err_t rd_kafka_mock_cgrp_member_add(rd_kafka_mock_cgrp_t *mcgrp, rd_kafka_mock_connection_t *mconn, rd_kafka_buf_t *resp, const rd_kafkap_str_t *MemberId, const rd_kafkap_str_t *ProtocolType, rd_kafka_mock_cgrp_proto_t *protos, int proto_cnt, int session_timeout_ms); rd_kafka_resp_err_t rd_kafka_mock_cgrp_check_state(rd_kafka_mock_cgrp_t *mcgrp, rd_kafka_mock_cgrp_member_t *member, const rd_kafka_buf_t *request, int32_t generation_id); rd_kafka_mock_cgrp_member_t * rd_kafka_mock_cgrp_member_find(const rd_kafka_mock_cgrp_t *mcgrp, const rd_kafkap_str_t *MemberId); void rd_kafka_mock_cgrp_destroy(rd_kafka_mock_cgrp_t *mcgrp); rd_kafka_mock_cgrp_t *rd_kafka_mock_cgrp_find(rd_kafka_mock_cluster_t *mcluster, const rd_kafkap_str_t *GroupId); rd_kafka_mock_cgrp_t * rd_kafka_mock_cgrp_get(rd_kafka_mock_cluster_t *mcluster, const rd_kafkap_str_t *GroupId, const rd_kafkap_str_t *ProtocolType); void rd_kafka_mock_cgrps_connection_closed(rd_kafka_mock_cluster_t *mcluster, rd_kafka_mock_connection_t *mconn); /** *@} */ #include "rdkafka_mock.h" #endif /* _RDKAFKA_MOCK_INT_H_ */