/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2015, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _RDKAFKA_CGRP_H_ #define _RDKAFKA_CGRP_H_ #include "rdinterval.h" #include "rdkafka_assignor.h" /** * Client groups implementation * * Client groups handling for a single cgrp is assigned to a single * rd_kafka_broker_t object at any given time. * The main thread will call cgrp_serve() to serve its cgrps. * * This means that the cgrp itself does not need to be locked since it * is only ever used from the main thread. * */ extern const char *rd_kafka_cgrp_join_state_names[]; /** * Client group */ typedef struct rd_kafka_cgrp_s { const rd_kafkap_str_t *rkcg_group_id; rd_kafkap_str_t *rkcg_member_id; /* Last assigned MemberId */ rd_kafkap_str_t *rkcg_group_instance_id; const rd_kafkap_str_t *rkcg_client_id; enum { /* Init state */ RD_KAFKA_CGRP_STATE_INIT, /* Cgrp has been stopped. This is a final state */ RD_KAFKA_CGRP_STATE_TERM, /* Query for group coordinator */ RD_KAFKA_CGRP_STATE_QUERY_COORD, /* Outstanding query, awaiting response */ RD_KAFKA_CGRP_STATE_WAIT_COORD, /* Wait ack from assigned cgrp manager broker thread */ RD_KAFKA_CGRP_STATE_WAIT_BROKER, /* Wait for manager broker thread to connect to broker */ RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT, /* Coordinator is up and manager is assigned. */ RD_KAFKA_CGRP_STATE_UP, } rkcg_state; rd_ts_t rkcg_ts_statechange; /* Timestamp of last * state change. */ enum { /* all: join or rejoin, possibly with an existing assignment. */ RD_KAFKA_CGRP_JOIN_STATE_INIT, /* all: JoinGroupRequest sent, awaiting response. */ RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN, /* all: MetadataRequest sent, awaiting response. * While metadata requests may be issued at any time, * this state is only set upon a proper (re)join. */ RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA, /* Follower: SyncGroupRequest sent, awaiting response. */ RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC, /* all: waiting for application to call *_assign() */ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL, /* all: waiting for application to call *_unassign() */ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL, /* all: waiting for full assignment to decommission */ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE, /* all: waiting for partial assignment to decommission */ RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE, /* all: synchronized and assigned * may be an empty assignment. */ RD_KAFKA_CGRP_JOIN_STATE_STEADY, } rkcg_join_state; /* State when group leader */ struct { rd_kafka_group_member_t *members; int member_cnt; } rkcg_group_leader; rd_kafka_q_t *rkcg_q; /* Application poll queue */ rd_kafka_q_t *rkcg_ops; /* Manager ops queue */ rd_kafka_q_t *rkcg_wait_coord_q; /* Ops awaiting coord */ int rkcg_flags; #define RD_KAFKA_CGRP_F_TERMINATE 0x1 /* Terminate cgrp (async) */ #define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE \ 0x8 /* Send LeaveGroup when \ * unassign is done */ #define RD_KAFKA_CGRP_F_SUBSCRIPTION \ 0x10 /* If set: \ * subscription \ * else: \ * static assignment */ #define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT \ 0x20 /* A Heartbeat request \ * is in transit, dont \ * send a new one. */ #define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION \ 0x40 /* Subscription contains \ * wildcards. */ #define RD_KAFKA_CGRP_F_WAIT_LEAVE \ 0x80 /* Wait for LeaveGroup \ * to be sent. \ * This is used to stall \ * termination until \ * the LeaveGroupRequest \ * is responded to, \ * otherwise it risks \ * being dropped in the \ * output queue when \ * the broker is destroyed. \ */ #define RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED \ 0x100 /**< max.poll.interval.ms \ * was exceeded and we \ * left the group. \ * Do not rejoin until \ * the application has \ * polled again. */ rd_interval_t rkcg_coord_query_intvl; /* Coordinator query intvl*/ rd_interval_t rkcg_heartbeat_intvl; /* Heartbeat intvl */ rd_interval_t rkcg_join_intvl; /* JoinGroup interval */ rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */ rd_ts_t rkcg_ts_session_timeout; /**< Absolute session * timeout enforced by * the consumer, this * value is updated on * Heartbeat success, * etc. */ rd_kafka_resp_err_t rkcg_last_heartbeat_err; /**< Last Heartbeat error, * used for logging. */ TAILQ_HEAD(, rd_kafka_topic_s) rkcg_topics; /* Topics subscribed to */ rd_list_t rkcg_toppars; /* Toppars subscribed to*/ int32_t rkcg_generation_id; /* Current generation id */ rd_kafka_assignor_t *rkcg_assignor; /**< The current partition * assignor. used by both * leader and members. */ void *rkcg_assignor_state; /**< current partition * assignor state */ int32_t rkcg_coord_id; /**< Current coordinator id, * or -1 if not known. */ rd_kafka_broker_t *rkcg_curr_coord; /**< Current coordinator * broker handle, or NULL. * rkcg_coord's nodename is * updated to this broker's * nodename when there is a * coordinator change. */ rd_kafka_broker_t *rkcg_coord; /**< The dedicated coordinator * broker handle. * Will be updated when the * coordinator changes. */ int16_t rkcg_wait_resp; /**< Awaiting response for this * ApiKey. * Makes sure only one * JoinGroup or SyncGroup * request is outstanding. * Unset value is -1. */ /** Current subscription */ rd_kafka_topic_partition_list_t *rkcg_subscription; /** The actual topics subscribed (after metadata+wildcard matching). * Sorted. */ rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */ /** Subscribed topics that are errored/not available. */ rd_kafka_topic_partition_list_t *rkcg_errored_topics; /** If a SUBSCRIBE op is received during a COOPERATIVE rebalance, * actioning this will be postponed until after the rebalance * completes. The waiting subscription is stored here. * Mutually exclusive with rkcg_next_subscription. */ rd_kafka_topic_partition_list_t *rkcg_next_subscription; /** If a (un)SUBSCRIBE op is received during a COOPERATIVE rebalance, * actioning this will be posponed until after the rebalance * completes. This flag is used to signal a waiting unsubscribe * operation. Mutually exclusive with rkcg_next_subscription. */ rd_bool_t rkcg_next_unsubscribe; /** Assignment considered lost */ rd_atomic32_t rkcg_assignment_lost; /** Current assignment of partitions from last SyncGroup response. * NULL means no assignment, else empty or non-empty assignment. * * This group assignment is the actual set of partitions that were * assigned to our consumer by the consumer group leader and should * not be confused with the rk_consumer.assignment which is the * partitions assigned by the application using assign(), et.al. * * The group assignment and the consumer assignment are typically * identical, but not necessarily since an application is free to * assign() any partition, not just the partitions it is handed * through the rebalance callback. * * Yes, this nomenclature is ambigious but has historical reasons, * so for now just try to remember that: * - group assignment == consumer group assignment. * - assignment == actual used assignment, i.e., fetched partitions. * * @remark This list is always sorted. */ rd_kafka_topic_partition_list_t *rkcg_group_assignment; /** The partitions to incrementally assign following a * currently in-progress incremental unassign. */ rd_kafka_topic_partition_list_t *rkcg_rebalance_incr_assignment; /** Rejoin the group following a currently in-progress * incremental unassign. */ rd_bool_t rkcg_rebalance_rejoin; rd_kafka_resp_err_t rkcg_last_err; /* Last error propagated to * application. * This is for silencing * same errors. */ rd_kafka_timer_t rkcg_offset_commit_tmr; /* Offset commit timer */ rd_kafka_timer_t rkcg_max_poll_interval_tmr; /**< Enforce the max * poll interval. */ rd_kafka_t *rkcg_rk; rd_kafka_op_t *rkcg_reply_rko; /* Send reply for op * (OP_TERMINATE) * to this rko's queue. */ rd_ts_t rkcg_ts_terminate; /* Timestamp of when * cgrp termination was * initiated. */ rd_atomic32_t rkcg_terminated; /**< Consumer has been closed */ /* Protected by rd_kafka_*lock() */ struct { rd_ts_t ts_rebalance; /* Timestamp of * last rebalance */ int rebalance_cnt; /* Number of rebalances */ char rebalance_reason[256]; /**< Last rebalance * reason */ int assignment_size; /* Partition count * of last rebalance * assignment */ } rkcg_c; } rd_kafka_cgrp_t; /* Check if broker is the coordinator */ #define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb) \ ((rkcg)->rkcg_coord_id != -1 && \ (rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid) /** * @returns true if cgrp is using static group membership */ #define RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) \ !RD_KAFKAP_STR_IS_NULL((rkcg)->rkcg_group_instance_id) extern const char *rd_kafka_cgrp_state_names[]; extern const char *rd_kafka_cgrp_join_state_names[]; void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg); rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, const rd_kafkap_str_t *group_id, const rd_kafkap_str_t *client_id); void rd_kafka_cgrp_serve(rd_kafka_cgrp_t *rkcg); void rd_kafka_cgrp_op(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp, rd_kafka_replyq_t replyq, rd_kafka_op_type_t type, rd_kafka_resp_err_t err); void rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko); void rd_kafka_cgrp_terminate(rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq); rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del(rd_kafka_cgrp_t *rkcg, const char *pattern); rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add(rd_kafka_cgrp_t *rkcg, const char *pattern); int rd_kafka_cgrp_topic_check(rd_kafka_cgrp_t *rkcg, const char *topic); void rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t *rkcg, const char *member_id); void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state); rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg); void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason); void rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err, const char *reason); void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg, rd_bool_t do_join); #define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp) void rd_kafka_cgrp_assigned_offsets_commit( rd_kafka_cgrp_t *rkcg, const rd_kafka_topic_partition_list_t *offsets, rd_bool_t set_offsets, const char *reason); void rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t *rkcg); rd_bool_t rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t *rkcg); struct rd_kafka_consumer_group_metadata_s { char *group_id; int32_t generation_id; char *member_id; char *group_instance_id; /**< Optional (NULL) */ }; rd_kafka_consumer_group_metadata_t *rd_kafka_consumer_group_metadata_dup( const rd_kafka_consumer_group_metadata_t *cgmetadata); static RD_UNUSED const char * rd_kafka_rebalance_protocol2str(rd_kafka_rebalance_protocol_t protocol) { switch (protocol) { case RD_KAFKA_REBALANCE_PROTOCOL_EAGER: return "EAGER"; case RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE: return "COOPERATIVE"; default: return "NONE"; } } #endif /* _RDKAFKA_CGRP_H_ */