summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.h
blob: 4fa51e548970ff628c6305755563e0d8b2b4aa63 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */
#ifndef _RDKAFKA_CGRP_H_
#define _RDKAFKA_CGRP_H_

#include "rdinterval.h"

#include "rdkafka_assignor.h"


/**
 * Client groups implementation
 *
 * Client groups handling for a single cgrp is assigned to a single
 * rd_kafka_broker_t object at any given time.
 * The main thread will call cgrp_serve() to serve its cgrps.
 *
 * This means that the cgrp itself does not need to be locked since it
 * is only ever used from the main thread.
 *
 */


extern const char *rd_kafka_cgrp_join_state_names[];

/**
 * Client group
 */
typedef struct rd_kafka_cgrp_s {
        const rd_kafkap_str_t *rkcg_group_id;
        rd_kafkap_str_t *rkcg_member_id; /* Last assigned MemberId */
        rd_kafkap_str_t *rkcg_group_instance_id;
        const rd_kafkap_str_t *rkcg_client_id;

        enum {
                /* Init state */
                RD_KAFKA_CGRP_STATE_INIT,

                /* Cgrp has been stopped. This is a final state */
                RD_KAFKA_CGRP_STATE_TERM,

                /* Query for group coordinator */
                RD_KAFKA_CGRP_STATE_QUERY_COORD,

                /* Outstanding query, awaiting response */
                RD_KAFKA_CGRP_STATE_WAIT_COORD,

                /* Wait ack from assigned cgrp manager broker thread */
                RD_KAFKA_CGRP_STATE_WAIT_BROKER,

                /* Wait for manager broker thread to connect to broker */
                RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT,

                /* Coordinator is up and manager is assigned. */
                RD_KAFKA_CGRP_STATE_UP,
        } rkcg_state;
        rd_ts_t rkcg_ts_statechange; /* Timestamp of last
                                      * state change. */


        enum {
                /* all: join or rejoin, possibly with an existing assignment. */
                RD_KAFKA_CGRP_JOIN_STATE_INIT,

                /* all: JoinGroupRequest sent, awaiting response. */
                RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN,

                /* all: MetadataRequest sent, awaiting response.
                 *      While metadata requests may be issued at any time,
                 *      this state is only set upon a proper (re)join. */
                RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA,

                /* Follower: SyncGroupRequest sent, awaiting response. */
                RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC,

                /* all: waiting for application to call *_assign() */
                RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL,

                /* all: waiting for application to call *_unassign() */
                RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL,

                /* all: waiting for full assignment to decommission */
                RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE,

                /* all: waiting for partial assignment to decommission */
                RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE,

                /* all: synchronized and assigned
                 *      may be an empty assignment. */
                RD_KAFKA_CGRP_JOIN_STATE_STEADY,
        } rkcg_join_state;

        /* State when group leader */
        struct {
                rd_kafka_group_member_t *members;
                int member_cnt;
        } rkcg_group_leader;

        rd_kafka_q_t *rkcg_q;            /* Application poll queue */
        rd_kafka_q_t *rkcg_ops;          /* Manager ops queue */
        rd_kafka_q_t *rkcg_wait_coord_q; /* Ops awaiting coord */
        int rkcg_flags;
#define RD_KAFKA_CGRP_F_TERMINATE 0x1 /* Terminate cgrp (async) */
#define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE                                 \
        0x8 /* Send LeaveGroup when                                            \
             * unassign is done */
#define RD_KAFKA_CGRP_F_SUBSCRIPTION                                           \
        0x10 /* If set:                                                        \
              *   subscription                                                 \
              * else:                                                          \
              *   static assignment */
#define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT                                   \
        0x20 /* A Heartbeat request                                            \
              * is in transit, dont                                            \
              * send a new one. */
#define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION                                  \
        0x40 /* Subscription contains                                          \
              * wildcards. */
#define RD_KAFKA_CGRP_F_WAIT_LEAVE                                             \
        0x80 /* Wait for LeaveGroup                                            \
              * to be sent.                                                    \
              * This is used to stall                                          \
              * termination until                                              \
              * the LeaveGroupRequest                                          \
              * is responded to,                                               \
              * otherwise it risks                                             \
              * being dropped in the                                           \
              * output queue when                                              \
              * the broker is destroyed.                                       \
              */
#define RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED                                      \
        0x100 /**< max.poll.interval.ms                                        \
               *   was exceeded and we                                         \
               *   left the group.                                             \
               *   Do not rejoin until                                         \
               *   the application has                                         \
               *   polled again. */

        rd_interval_t rkcg_coord_query_intvl;  /* Coordinator query intvl*/
        rd_interval_t rkcg_heartbeat_intvl;    /* Heartbeat intvl */
        rd_interval_t rkcg_join_intvl;         /* JoinGroup interval */
        rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */

        rd_ts_t rkcg_ts_session_timeout;             /**< Absolute session
                                                      *   timeout enforced by
                                                      *   the consumer, this
                                                      *   value is updated on
                                                      *   Heartbeat success,
                                                      *   etc. */
        rd_kafka_resp_err_t rkcg_last_heartbeat_err; /**< Last Heartbeat error,
                                                      *   used for logging. */

        TAILQ_HEAD(, rd_kafka_topic_s) rkcg_topics; /* Topics subscribed to */

        rd_list_t rkcg_toppars; /* Toppars subscribed to*/

        int32_t rkcg_generation_id; /* Current generation id */

        rd_kafka_assignor_t *rkcg_assignor; /**< The current partition
                                             *   assignor. used by both
                                             *   leader and members. */
        void *rkcg_assignor_state;          /**< current partition
                                             *   assignor state */

        int32_t rkcg_coord_id; /**< Current coordinator id,
                                *   or -1 if not known. */

        rd_kafka_broker_t *rkcg_curr_coord; /**< Current coordinator
                                             *   broker handle, or NULL.
                                             *   rkcg_coord's nodename is
                                             *   updated to this broker's
                                             *   nodename when there is a
                                             *   coordinator change. */
        rd_kafka_broker_t *rkcg_coord;      /**< The dedicated coordinator
                                             *   broker handle.
                                             *   Will be updated when the
                                             *   coordinator changes. */

        int16_t rkcg_wait_resp; /**< Awaiting response for this
                                 *   ApiKey.
                                 *   Makes sure only one
                                 *   JoinGroup or SyncGroup
                                 *   request is outstanding.
                                 *   Unset value is -1. */

        /** Current subscription */
        rd_kafka_topic_partition_list_t *rkcg_subscription;
        /** The actual topics subscribed (after metadata+wildcard matching).
         *  Sorted. */
        rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */
        /** Subscribed topics that are errored/not available. */
        rd_kafka_topic_partition_list_t *rkcg_errored_topics;
        /** If a SUBSCRIBE op is received during a COOPERATIVE rebalance,
         *  actioning this will be postponed until after the rebalance
         *  completes. The waiting subscription is stored here.
         *  Mutually exclusive with rkcg_next_subscription. */
        rd_kafka_topic_partition_list_t *rkcg_next_subscription;
        /** If a (un)SUBSCRIBE op is received during a COOPERATIVE rebalance,
         *  actioning this will be posponed until after the rebalance
         *  completes. This flag is used to signal a waiting unsubscribe
         *  operation. Mutually exclusive with rkcg_next_subscription. */
        rd_bool_t rkcg_next_unsubscribe;

        /** Assignment considered lost */
        rd_atomic32_t rkcg_assignment_lost;

        /** Current assignment of partitions from last SyncGroup response.
         *  NULL means no assignment, else empty or non-empty assignment.
         *
         * This group assignment is the actual set of partitions that were
         * assigned to our consumer by the consumer group leader and should
         * not be confused with the rk_consumer.assignment which is the
         * partitions assigned by the application using assign(), et.al.
         *
         * The group assignment and the consumer assignment are typically
         * identical, but not necessarily since an application is free to
         * assign() any partition, not just the partitions it is handed
         * through the rebalance callback.
         *
         * Yes, this nomenclature is ambigious but has historical reasons,
         * so for now just try to remember that:
         *  - group assignment == consumer group assignment.
         *  - assignment == actual used assignment, i.e., fetched partitions.
         *
         * @remark This list is always sorted.
         */
        rd_kafka_topic_partition_list_t *rkcg_group_assignment;

        /** The partitions to incrementally assign following a
         *  currently in-progress incremental unassign. */
        rd_kafka_topic_partition_list_t *rkcg_rebalance_incr_assignment;

        /** Rejoin the group following a currently in-progress
         *  incremental unassign. */
        rd_bool_t rkcg_rebalance_rejoin;

        rd_kafka_resp_err_t rkcg_last_err; /* Last error propagated to
                                            * application.
                                            * This is for silencing
                                            * same errors. */

        rd_kafka_timer_t rkcg_offset_commit_tmr;     /* Offset commit timer */
        rd_kafka_timer_t rkcg_max_poll_interval_tmr; /**< Enforce the max
                                                      *   poll interval. */

        rd_kafka_t *rkcg_rk;

        rd_kafka_op_t *rkcg_reply_rko; /* Send reply for op
                                        * (OP_TERMINATE)
                                        * to this rko's queue. */

        rd_ts_t rkcg_ts_terminate; /* Timestamp of when
                                    * cgrp termination was
                                    * initiated. */

        rd_atomic32_t rkcg_terminated; /**< Consumer has been closed */

        /* Protected by rd_kafka_*lock() */
        struct {
                rd_ts_t ts_rebalance;       /* Timestamp of
                                             * last rebalance */
                int rebalance_cnt;          /* Number of
                                               rebalances */
                char rebalance_reason[256]; /**< Last rebalance
                                             *   reason */
                int assignment_size;        /* Partition count
                                             * of last rebalance
                                             * assignment */
        } rkcg_c;

} rd_kafka_cgrp_t;



/* Check if broker is the coordinator */
#define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb)                               \
        ((rkcg)->rkcg_coord_id != -1 &&                                        \
         (rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid)

/**
 * @returns true if cgrp is using static group membership
 */
#define RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg)                                   \
        !RD_KAFKAP_STR_IS_NULL((rkcg)->rkcg_group_instance_id)

extern const char *rd_kafka_cgrp_state_names[];
extern const char *rd_kafka_cgrp_join_state_names[];

void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg);
rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
                                   const rd_kafkap_str_t *group_id,
                                   const rd_kafkap_str_t *client_id);
void rd_kafka_cgrp_serve(rd_kafka_cgrp_t *rkcg);

void rd_kafka_cgrp_op(rd_kafka_cgrp_t *rkcg,
                      rd_kafka_toppar_t *rktp,
                      rd_kafka_replyq_t replyq,
                      rd_kafka_op_type_t type,
                      rd_kafka_resp_err_t err);
void rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko);
void rd_kafka_cgrp_terminate(rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq);


rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del(rd_kafka_cgrp_t *rkcg,
                                                    const char *pattern);
rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add(rd_kafka_cgrp_t *rkcg,
                                                    const char *pattern);

int rd_kafka_cgrp_topic_check(rd_kafka_cgrp_t *rkcg, const char *topic);

void rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t *rkcg, const char *member_id);

void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state);

rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg);
void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason);
void rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t *rkcg,
                              rd_kafka_resp_err_t err,
                              const char *reason);
void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
                                         rd_bool_t do_join);
#define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp)


void rd_kafka_cgrp_assigned_offsets_commit(
    rd_kafka_cgrp_t *rkcg,
    const rd_kafka_topic_partition_list_t *offsets,
    rd_bool_t set_offsets,
    const char *reason);

void rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t *rkcg);

rd_bool_t rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t *rkcg);


struct rd_kafka_consumer_group_metadata_s {
        char *group_id;
        int32_t generation_id;
        char *member_id;
        char *group_instance_id; /**< Optional (NULL) */
};

rd_kafka_consumer_group_metadata_t *rd_kafka_consumer_group_metadata_dup(
    const rd_kafka_consumer_group_metadata_t *cgmetadata);

static RD_UNUSED const char *
rd_kafka_rebalance_protocol2str(rd_kafka_rebalance_protocol_t protocol) {
        switch (protocol) {
        case RD_KAFKA_REBALANCE_PROTOCOL_EAGER:
                return "EAGER";
        case RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE:
                return "COOPERATIVE";
        default:
                return "NONE";
        }
}

#endif /* _RDKAFKA_CGRP_H_ */