summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h383
1 files changed, 383 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h
new file mode 100644
index 000000000..4fa51e548
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h
@@ -0,0 +1,383 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef _RDKAFKA_CGRP_H_
+#define _RDKAFKA_CGRP_H_
+
+#include "rdinterval.h"
+
+#include "rdkafka_assignor.h"
+
+
+/**
+ * Client groups implementation
+ *
+ * Client groups handling for a single cgrp is assigned to a single
+ * rd_kafka_broker_t object at any given time.
+ * The main thread will call cgrp_serve() to serve its cgrps.
+ *
+ * This means that the cgrp itself does not need to be locked since it
+ * is only ever used from the main thread.
+ *
+ */
+
+
+extern const char *rd_kafka_cgrp_join_state_names[];
+
+/**
+ * Client group
+ */
+typedef struct rd_kafka_cgrp_s {
+ const rd_kafkap_str_t *rkcg_group_id;
+ rd_kafkap_str_t *rkcg_member_id; /* Last assigned MemberId */
+ rd_kafkap_str_t *rkcg_group_instance_id;
+ const rd_kafkap_str_t *rkcg_client_id;
+
+ enum {
+ /* Init state */
+ RD_KAFKA_CGRP_STATE_INIT,
+
+ /* Cgrp has been stopped. This is a final state */
+ RD_KAFKA_CGRP_STATE_TERM,
+
+ /* Query for group coordinator */
+ RD_KAFKA_CGRP_STATE_QUERY_COORD,
+
+ /* Outstanding query, awaiting response */
+ RD_KAFKA_CGRP_STATE_WAIT_COORD,
+
+ /* Wait ack from assigned cgrp manager broker thread */
+ RD_KAFKA_CGRP_STATE_WAIT_BROKER,
+
+ /* Wait for manager broker thread to connect to broker */
+ RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT,
+
+ /* Coordinator is up and manager is assigned. */
+ RD_KAFKA_CGRP_STATE_UP,
+ } rkcg_state;
+ rd_ts_t rkcg_ts_statechange; /* Timestamp of last
+ * state change. */
+
+
+ enum {
+ /* all: join or rejoin, possibly with an existing assignment. */
+ RD_KAFKA_CGRP_JOIN_STATE_INIT,
+
+ /* all: JoinGroupRequest sent, awaiting response. */
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN,
+
+ /* all: MetadataRequest sent, awaiting response.
+ * While metadata requests may be issued at any time,
+ * this state is only set upon a proper (re)join. */
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA,
+
+ /* Follower: SyncGroupRequest sent, awaiting response. */
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC,
+
+ /* all: waiting for application to call *_assign() */
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL,
+
+ /* all: waiting for application to call *_unassign() */
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL,
+
+ /* all: waiting for full assignment to decommission */
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE,
+
+ /* all: waiting for partial assignment to decommission */
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE,
+
+ /* all: synchronized and assigned
+ * may be an empty assignment. */
+ RD_KAFKA_CGRP_JOIN_STATE_STEADY,
+ } rkcg_join_state;
+
+ /* State when group leader */
+ struct {
+ rd_kafka_group_member_t *members;
+ int member_cnt;
+ } rkcg_group_leader;
+
+ rd_kafka_q_t *rkcg_q; /* Application poll queue */
+ rd_kafka_q_t *rkcg_ops; /* Manager ops queue */
+ rd_kafka_q_t *rkcg_wait_coord_q; /* Ops awaiting coord */
+ int rkcg_flags;
+#define RD_KAFKA_CGRP_F_TERMINATE 0x1 /* Terminate cgrp (async) */
+#define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE \
+ 0x8 /* Send LeaveGroup when \
+ * unassign is done */
+#define RD_KAFKA_CGRP_F_SUBSCRIPTION \
+ 0x10 /* If set: \
+ * subscription \
+ * else: \
+ * static assignment */
+#define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT \
+ 0x20 /* A Heartbeat request \
+ * is in transit, dont \
+ * send a new one. */
+#define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION \
+ 0x40 /* Subscription contains \
+ * wildcards. */
+#define RD_KAFKA_CGRP_F_WAIT_LEAVE \
+ 0x80 /* Wait for LeaveGroup \
+ * to be sent. \
+ * This is used to stall \
+ * termination until \
+ * the LeaveGroupRequest \
+ * is responded to, \
+ * otherwise it risks \
+ * being dropped in the \
+ * output queue when \
+ * the broker is destroyed. \
+ */
+#define RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED \
+ 0x100 /**< max.poll.interval.ms \
+ * was exceeded and we \
+ * left the group. \
+ * Do not rejoin until \
+ * the application has \
+ * polled again. */
+
+ rd_interval_t rkcg_coord_query_intvl; /* Coordinator query intvl*/
+ rd_interval_t rkcg_heartbeat_intvl; /* Heartbeat intvl */
+ rd_interval_t rkcg_join_intvl; /* JoinGroup interval */
+ rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */
+
+ rd_ts_t rkcg_ts_session_timeout; /**< Absolute session
+ * timeout enforced by
+ * the consumer, this
+ * value is updated on
+ * Heartbeat success,
+ * etc. */
+ rd_kafka_resp_err_t rkcg_last_heartbeat_err; /**< Last Heartbeat error,
+ * used for logging. */
+
+ TAILQ_HEAD(, rd_kafka_topic_s) rkcg_topics; /* Topics subscribed to */
+
+ rd_list_t rkcg_toppars; /* Toppars subscribed to*/
+
+ int32_t rkcg_generation_id; /* Current generation id */
+
+ rd_kafka_assignor_t *rkcg_assignor; /**< The current partition
+ * assignor. used by both
+ * leader and members. */
+ void *rkcg_assignor_state; /**< current partition
+ * assignor state */
+
+ int32_t rkcg_coord_id; /**< Current coordinator id,
+ * or -1 if not known. */
+
+ rd_kafka_broker_t *rkcg_curr_coord; /**< Current coordinator
+ * broker handle, or NULL.
+ * rkcg_coord's nodename is
+ * updated to this broker's
+ * nodename when there is a
+ * coordinator change. */
+ rd_kafka_broker_t *rkcg_coord; /**< The dedicated coordinator
+ * broker handle.
+ * Will be updated when the
+ * coordinator changes. */
+
+ int16_t rkcg_wait_resp; /**< Awaiting response for this
+ * ApiKey.
+ * Makes sure only one
+ * JoinGroup or SyncGroup
+ * request is outstanding.
+ * Unset value is -1. */
+
+ /** Current subscription */
+ rd_kafka_topic_partition_list_t *rkcg_subscription;
+ /** The actual topics subscribed (after metadata+wildcard matching).
+ * Sorted. */
+ rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */
+ /** Subscribed topics that are errored/not available. */
+ rd_kafka_topic_partition_list_t *rkcg_errored_topics;
+ /** If a SUBSCRIBE op is received during a COOPERATIVE rebalance,
+ * actioning this will be postponed until after the rebalance
+ * completes. The waiting subscription is stored here.
+ * Mutually exclusive with rkcg_next_subscription. */
+ rd_kafka_topic_partition_list_t *rkcg_next_subscription;
+ /** If a (un)SUBSCRIBE op is received during a COOPERATIVE rebalance,
+ * actioning this will be posponed until after the rebalance
+ * completes. This flag is used to signal a waiting unsubscribe
+ * operation. Mutually exclusive with rkcg_next_subscription. */
+ rd_bool_t rkcg_next_unsubscribe;
+
+ /** Assignment considered lost */
+ rd_atomic32_t rkcg_assignment_lost;
+
+ /** Current assignment of partitions from last SyncGroup response.
+ * NULL means no assignment, else empty or non-empty assignment.
+ *
+ * This group assignment is the actual set of partitions that were
+ * assigned to our consumer by the consumer group leader and should
+ * not be confused with the rk_consumer.assignment which is the
+ * partitions assigned by the application using assign(), et.al.
+ *
+ * The group assignment and the consumer assignment are typically
+ * identical, but not necessarily since an application is free to
+ * assign() any partition, not just the partitions it is handed
+ * through the rebalance callback.
+ *
+ * Yes, this nomenclature is ambigious but has historical reasons,
+ * so for now just try to remember that:
+ * - group assignment == consumer group assignment.
+ * - assignment == actual used assignment, i.e., fetched partitions.
+ *
+ * @remark This list is always sorted.
+ */
+ rd_kafka_topic_partition_list_t *rkcg_group_assignment;
+
+ /** The partitions to incrementally assign following a
+ * currently in-progress incremental unassign. */
+ rd_kafka_topic_partition_list_t *rkcg_rebalance_incr_assignment;
+
+ /** Rejoin the group following a currently in-progress
+ * incremental unassign. */
+ rd_bool_t rkcg_rebalance_rejoin;
+
+ rd_kafka_resp_err_t rkcg_last_err; /* Last error propagated to
+ * application.
+ * This is for silencing
+ * same errors. */
+
+ rd_kafka_timer_t rkcg_offset_commit_tmr; /* Offset commit timer */
+ rd_kafka_timer_t rkcg_max_poll_interval_tmr; /**< Enforce the max
+ * poll interval. */
+
+ rd_kafka_t *rkcg_rk;
+
+ rd_kafka_op_t *rkcg_reply_rko; /* Send reply for op
+ * (OP_TERMINATE)
+ * to this rko's queue. */
+
+ rd_ts_t rkcg_ts_terminate; /* Timestamp of when
+ * cgrp termination was
+ * initiated. */
+
+ rd_atomic32_t rkcg_terminated; /**< Consumer has been closed */
+
+ /* Protected by rd_kafka_*lock() */
+ struct {
+ rd_ts_t ts_rebalance; /* Timestamp of
+ * last rebalance */
+ int rebalance_cnt; /* Number of
+ rebalances */
+ char rebalance_reason[256]; /**< Last rebalance
+ * reason */
+ int assignment_size; /* Partition count
+ * of last rebalance
+ * assignment */
+ } rkcg_c;
+
+} rd_kafka_cgrp_t;
+
+
+
+/* Check if broker is the coordinator */
+#define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb) \
+ ((rkcg)->rkcg_coord_id != -1 && \
+ (rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid)
+
+/**
+ * @returns true if cgrp is using static group membership
+ */
+#define RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) \
+ !RD_KAFKAP_STR_IS_NULL((rkcg)->rkcg_group_instance_id)
+
+extern const char *rd_kafka_cgrp_state_names[];
+extern const char *rd_kafka_cgrp_join_state_names[];
+
+void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg);
+rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *client_id);
+void rd_kafka_cgrp_serve(rd_kafka_cgrp_t *rkcg);
+
+void rd_kafka_cgrp_op(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_toppar_t *rktp,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_op_type_t type,
+ rd_kafka_resp_err_t err);
+void rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko);
+void rd_kafka_cgrp_terminate(rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq);
+
+
+rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del(rd_kafka_cgrp_t *rkcg,
+ const char *pattern);
+rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add(rd_kafka_cgrp_t *rkcg,
+ const char *pattern);
+
+int rd_kafka_cgrp_topic_check(rd_kafka_cgrp_t *rkcg, const char *topic);
+
+void rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t *rkcg, const char *member_id);
+
+void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state);
+
+rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg);
+void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason);
+void rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_resp_err_t err,
+ const char *reason);
+void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
+ rd_bool_t do_join);
+#define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp)
+
+
+void rd_kafka_cgrp_assigned_offsets_commit(
+ rd_kafka_cgrp_t *rkcg,
+ const rd_kafka_topic_partition_list_t *offsets,
+ rd_bool_t set_offsets,
+ const char *reason);
+
+void rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t *rkcg);
+
+rd_bool_t rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t *rkcg);
+
+
+struct rd_kafka_consumer_group_metadata_s {
+ char *group_id;
+ int32_t generation_id;
+ char *member_id;
+ char *group_instance_id; /**< Optional (NULL) */
+};
+
+rd_kafka_consumer_group_metadata_t *rd_kafka_consumer_group_metadata_dup(
+ const rd_kafka_consumer_group_metadata_t *cgmetadata);
+
+static RD_UNUSED const char *
+rd_kafka_rebalance_protocol2str(rd_kafka_rebalance_protocol_t protocol) {
+ switch (protocol) {
+ case RD_KAFKA_REBALANCE_PROTOCOL_EAGER:
+ return "EAGER";
+ case RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE:
+ return "COOPERATIVE";
+ default:
+ return "NONE";
+ }
+}
+
+#endif /* _RDKAFKA_CGRP_H_ */