diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h | 383 |
1 files changed, 383 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h new file mode 100644 index 00000000..4fa51e54 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h @@ -0,0 +1,383 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _RDKAFKA_CGRP_H_ +#define _RDKAFKA_CGRP_H_ + +#include "rdinterval.h" + +#include "rdkafka_assignor.h" + + +/** + * Client groups implementation + * + * Client groups handling for a single cgrp is assigned to a single + * rd_kafka_broker_t object at any given time. + * The main thread will call cgrp_serve() to serve its cgrps. + * + * This means that the cgrp itself does not need to be locked since it + * is only ever used from the main thread. + * + */ + + +extern const char *rd_kafka_cgrp_join_state_names[]; + +/** + * Client group + */ +typedef struct rd_kafka_cgrp_s { + const rd_kafkap_str_t *rkcg_group_id; + rd_kafkap_str_t *rkcg_member_id; /* Last assigned MemberId */ + rd_kafkap_str_t *rkcg_group_instance_id; + const rd_kafkap_str_t *rkcg_client_id; + + enum { + /* Init state */ + RD_KAFKA_CGRP_STATE_INIT, + + /* Cgrp has been stopped. This is a final state */ + RD_KAFKA_CGRP_STATE_TERM, + + /* Query for group coordinator */ + RD_KAFKA_CGRP_STATE_QUERY_COORD, + + /* Outstanding query, awaiting response */ + RD_KAFKA_CGRP_STATE_WAIT_COORD, + + /* Wait ack from assigned cgrp manager broker thread */ + RD_KAFKA_CGRP_STATE_WAIT_BROKER, + + /* Wait for manager broker thread to connect to broker */ + RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT, + + /* Coordinator is up and manager is assigned. */ + RD_KAFKA_CGRP_STATE_UP, + } rkcg_state; + rd_ts_t rkcg_ts_statechange; /* Timestamp of last + * state change. */ + + + enum { + /* all: join or rejoin, possibly with an existing assignment. */ + RD_KAFKA_CGRP_JOIN_STATE_INIT, + + /* all: JoinGroupRequest sent, awaiting response. */ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN, + + /* all: MetadataRequest sent, awaiting response. + * While metadata requests may be issued at any time, + * this state is only set upon a proper (re)join. */ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA, + + /* Follower: SyncGroupRequest sent, awaiting response. */ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC, + + /* all: waiting for application to call *_assign() */ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL, + + /* all: waiting for application to call *_unassign() */ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL, + + /* all: waiting for full assignment to decommission */ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE, + + /* all: waiting for partial assignment to decommission */ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE, + + /* all: synchronized and assigned + * may be an empty assignment. */ + RD_KAFKA_CGRP_JOIN_STATE_STEADY, + } rkcg_join_state; + + /* State when group leader */ + struct { + rd_kafka_group_member_t *members; + int member_cnt; + } rkcg_group_leader; + + rd_kafka_q_t *rkcg_q; /* Application poll queue */ + rd_kafka_q_t *rkcg_ops; /* Manager ops queue */ + rd_kafka_q_t *rkcg_wait_coord_q; /* Ops awaiting coord */ + int rkcg_flags; +#define RD_KAFKA_CGRP_F_TERMINATE 0x1 /* Terminate cgrp (async) */ +#define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE \ + 0x8 /* Send LeaveGroup when \ + * unassign is done */ +#define RD_KAFKA_CGRP_F_SUBSCRIPTION \ + 0x10 /* If set: \ + * subscription \ + * else: \ + * static assignment */ +#define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT \ + 0x20 /* A Heartbeat request \ + * is in transit, dont \ + * send a new one. */ +#define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION \ + 0x40 /* Subscription contains \ + * wildcards. */ +#define RD_KAFKA_CGRP_F_WAIT_LEAVE \ + 0x80 /* Wait for LeaveGroup \ + * to be sent. \ + * This is used to stall \ + * termination until \ + * the LeaveGroupRequest \ + * is responded to, \ + * otherwise it risks \ + * being dropped in the \ + * output queue when \ + * the broker is destroyed. \ + */ +#define RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED \ + 0x100 /**< max.poll.interval.ms \ + * was exceeded and we \ + * left the group. \ + * Do not rejoin until \ + * the application has \ + * polled again. */ + + rd_interval_t rkcg_coord_query_intvl; /* Coordinator query intvl*/ + rd_interval_t rkcg_heartbeat_intvl; /* Heartbeat intvl */ + rd_interval_t rkcg_join_intvl; /* JoinGroup interval */ + rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */ + + rd_ts_t rkcg_ts_session_timeout; /**< Absolute session + * timeout enforced by + * the consumer, this + * value is updated on + * Heartbeat success, + * etc. */ + rd_kafka_resp_err_t rkcg_last_heartbeat_err; /**< Last Heartbeat error, + * used for logging. */ + + TAILQ_HEAD(, rd_kafka_topic_s) rkcg_topics; /* Topics subscribed to */ + + rd_list_t rkcg_toppars; /* Toppars subscribed to*/ + + int32_t rkcg_generation_id; /* Current generation id */ + + rd_kafka_assignor_t *rkcg_assignor; /**< The current partition + * assignor. used by both + * leader and members. */ + void *rkcg_assignor_state; /**< current partition + * assignor state */ + + int32_t rkcg_coord_id; /**< Current coordinator id, + * or -1 if not known. */ + + rd_kafka_broker_t *rkcg_curr_coord; /**< Current coordinator + * broker handle, or NULL. + * rkcg_coord's nodename is + * updated to this broker's + * nodename when there is a + * coordinator change. */ + rd_kafka_broker_t *rkcg_coord; /**< The dedicated coordinator + * broker handle. + * Will be updated when the + * coordinator changes. */ + + int16_t rkcg_wait_resp; /**< Awaiting response for this + * ApiKey. + * Makes sure only one + * JoinGroup or SyncGroup + * request is outstanding. + * Unset value is -1. */ + + /** Current subscription */ + rd_kafka_topic_partition_list_t *rkcg_subscription; + /** The actual topics subscribed (after metadata+wildcard matching). + * Sorted. */ + rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */ + /** Subscribed topics that are errored/not available. */ + rd_kafka_topic_partition_list_t *rkcg_errored_topics; + /** If a SUBSCRIBE op is received during a COOPERATIVE rebalance, + * actioning this will be postponed until after the rebalance + * completes. The waiting subscription is stored here. + * Mutually exclusive with rkcg_next_subscription. */ + rd_kafka_topic_partition_list_t *rkcg_next_subscription; + /** If a (un)SUBSCRIBE op is received during a COOPERATIVE rebalance, + * actioning this will be posponed until after the rebalance + * completes. This flag is used to signal a waiting unsubscribe + * operation. Mutually exclusive with rkcg_next_subscription. */ + rd_bool_t rkcg_next_unsubscribe; + + /** Assignment considered lost */ + rd_atomic32_t rkcg_assignment_lost; + + /** Current assignment of partitions from last SyncGroup response. + * NULL means no assignment, else empty or non-empty assignment. + * + * This group assignment is the actual set of partitions that were + * assigned to our consumer by the consumer group leader and should + * not be confused with the rk_consumer.assignment which is the + * partitions assigned by the application using assign(), et.al. + * + * The group assignment and the consumer assignment are typically + * identical, but not necessarily since an application is free to + * assign() any partition, not just the partitions it is handed + * through the rebalance callback. + * + * Yes, this nomenclature is ambigious but has historical reasons, + * so for now just try to remember that: + * - group assignment == consumer group assignment. + * - assignment == actual used assignment, i.e., fetched partitions. + * + * @remark This list is always sorted. + */ + rd_kafka_topic_partition_list_t *rkcg_group_assignment; + + /** The partitions to incrementally assign following a + * currently in-progress incremental unassign. */ + rd_kafka_topic_partition_list_t *rkcg_rebalance_incr_assignment; + + /** Rejoin the group following a currently in-progress + * incremental unassign. */ + rd_bool_t rkcg_rebalance_rejoin; + + rd_kafka_resp_err_t rkcg_last_err; /* Last error propagated to + * application. + * This is for silencing + * same errors. */ + + rd_kafka_timer_t rkcg_offset_commit_tmr; /* Offset commit timer */ + rd_kafka_timer_t rkcg_max_poll_interval_tmr; /**< Enforce the max + * poll interval. */ + + rd_kafka_t *rkcg_rk; + + rd_kafka_op_t *rkcg_reply_rko; /* Send reply for op + * (OP_TERMINATE) + * to this rko's queue. */ + + rd_ts_t rkcg_ts_terminate; /* Timestamp of when + * cgrp termination was + * initiated. */ + + rd_atomic32_t rkcg_terminated; /**< Consumer has been closed */ + + /* Protected by rd_kafka_*lock() */ + struct { + rd_ts_t ts_rebalance; /* Timestamp of + * last rebalance */ + int rebalance_cnt; /* Number of + rebalances */ + char rebalance_reason[256]; /**< Last rebalance + * reason */ + int assignment_size; /* Partition count + * of last rebalance + * assignment */ + } rkcg_c; + +} rd_kafka_cgrp_t; + + + +/* Check if broker is the coordinator */ +#define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb) \ + ((rkcg)->rkcg_coord_id != -1 && \ + (rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid) + +/** + * @returns true if cgrp is using static group membership + */ +#define RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) \ + !RD_KAFKAP_STR_IS_NULL((rkcg)->rkcg_group_instance_id) + +extern const char *rd_kafka_cgrp_state_names[]; +extern const char *rd_kafka_cgrp_join_state_names[]; + +void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg); +rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, + const rd_kafkap_str_t *group_id, + const rd_kafkap_str_t *client_id); +void rd_kafka_cgrp_serve(rd_kafka_cgrp_t *rkcg); + +void rd_kafka_cgrp_op(rd_kafka_cgrp_t *rkcg, + rd_kafka_toppar_t *rktp, + rd_kafka_replyq_t replyq, + rd_kafka_op_type_t type, + rd_kafka_resp_err_t err); +void rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko); +void rd_kafka_cgrp_terminate(rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq); + + +rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del(rd_kafka_cgrp_t *rkcg, + const char *pattern); +rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add(rd_kafka_cgrp_t *rkcg, + const char *pattern); + +int rd_kafka_cgrp_topic_check(rd_kafka_cgrp_t *rkcg, const char *topic); + +void rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t *rkcg, const char *member_id); + +void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state); + +rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg); +void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason); +void rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t *rkcg, + rd_kafka_resp_err_t err, + const char *reason); +void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg, + rd_bool_t do_join); +#define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp) + + +void rd_kafka_cgrp_assigned_offsets_commit( + rd_kafka_cgrp_t *rkcg, + const rd_kafka_topic_partition_list_t *offsets, + rd_bool_t set_offsets, + const char *reason); + +void rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t *rkcg); + +rd_bool_t rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t *rkcg); + + +struct rd_kafka_consumer_group_metadata_s { + char *group_id; + int32_t generation_id; + char *member_id; + char *group_instance_id; /**< Optional (NULL) */ +}; + +rd_kafka_consumer_group_metadata_t *rd_kafka_consumer_group_metadata_dup( + const rd_kafka_consumer_group_metadata_t *cgmetadata); + +static RD_UNUSED const char * +rd_kafka_rebalance_protocol2str(rd_kafka_rebalance_protocol_t protocol) { + switch (protocol) { + case RD_KAFKA_REBALANCE_PROTOCOL_EAGER: + return "EAGER"; + case RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE: + return "COOPERATIVE"; + default: + return "NONE"; + } +} + +#endif /* _RDKAFKA_CGRP_H_ */ |