summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_topic.h
blob: cbed9308a7a517b4da987781e36afc21847864fb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012,2013 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _RDKAFKA_TOPIC_H_
#define _RDKAFKA_TOPIC_H_

#include "rdlist.h"

extern const char *rd_kafka_topic_state_names[];


/**
 * @struct Light-weight topic object which only contains the topic name.
 *
 * For use in outgoing APIs (like rd_kafka_message_t) when there is
 * no proper topic object available.
 *
 * @remark lrkt_magic[4] MUST be the first field and be set to "LRKT".
 */
struct rd_kafka_lwtopic_s {
        char lrkt_magic[4];      /**< "LRKT" */
        rd_kafka_t *lrkt_rk;     /**< Pointer to the client instance. */
        rd_refcnt_t lrkt_refcnt; /**< Refcount */
        char *lrkt_topic;        /**< Points past this struct, allocated
                                  *   along with the struct. */
};

/** Casts a topic_t to a light-weight lwtopic_t */
#define rd_kafka_rkt_lw(rkt) ((rd_kafka_lwtopic_t *)rkt)

#define rd_kafka_rkt_lw_const(rkt) ((const rd_kafka_lwtopic_t *)rkt)

/**
 * @returns true if the topic object is a light-weight topic, else false.
 */
static RD_UNUSED RD_INLINE rd_bool_t
rd_kafka_rkt_is_lw(const rd_kafka_topic_t *app_rkt) {
        const rd_kafka_lwtopic_t *lrkt = rd_kafka_rkt_lw_const(app_rkt);
        return !memcmp(lrkt->lrkt_magic, "LRKT", 4);
}

/** @returns the lwtopic_t if \p rkt is a light-weight topic, else NULL. */
static RD_UNUSED RD_INLINE rd_kafka_lwtopic_t *
rd_kafka_rkt_get_lw(rd_kafka_topic_t *rkt) {
        if (rd_kafka_rkt_is_lw(rkt))
                return rd_kafka_rkt_lw(rkt);
        return NULL;
}

void rd_kafka_lwtopic_destroy(rd_kafka_lwtopic_t *lrkt);
rd_kafka_lwtopic_t *rd_kafka_lwtopic_new(rd_kafka_t *rk, const char *topic);

static RD_UNUSED RD_INLINE void
rd_kafka_lwtopic_keep(rd_kafka_lwtopic_t *lrkt) {
        rd_refcnt_add(&lrkt->lrkt_refcnt);
}



/**
 * @struct Holds partition + transactional PID + base sequence msgid.
 *
 * Used in rkt_saved_partmsgids to restore transactional/idempotency state
 * for a partition that is lost from metadata for some time and then returns.
 */
typedef struct rd_kafka_partition_msgid_s {
        TAILQ_ENTRY(rd_kafka_partition_msgid_s) link;
        int32_t partition;
        rd_kafka_pid_t pid;
        uint64_t msgid;
        uint64_t epoch_base_msgid;
        rd_ts_t ts;
} rd_kafka_partition_msgid_t;


/**
 * @struct Aux struct that holds a partition id and a leader epoch.
 *         Used as temporary holding space for per-partition leader epochs
 *         while parsing MetadataResponse.
 */
typedef struct rd_kafka_partition_leader_epoch_s {
        int32_t partition_id;
        int32_t leader_epoch;
} rd_kafka_partition_leader_epoch_t;


/*
 * @struct Internal representation of a topic.
 *
 * @remark rkt_magic[4] MUST be the first field and be set to "IRKT".
 */
struct rd_kafka_topic_s {
        char rkt_magic[4]; /**< "IRKT" */

        TAILQ_ENTRY(rd_kafka_topic_s) rkt_link;

        rd_refcnt_t rkt_refcnt;

        rwlock_t rkt_lock;
        rd_kafkap_str_t *rkt_topic;

        rd_kafka_toppar_t *rkt_ua; /**< Unassigned partition (-1) */
        rd_kafka_toppar_t **rkt_p; /**< Partition array */
        int32_t rkt_partition_cnt;

        int32_t rkt_sticky_partition;   /**< Current sticky partition.
                                         *     @locks rkt_lock */
        rd_interval_t rkt_sticky_intvl; /**< Interval to assign new
                                         *   sticky partition. */

        rd_list_t rkt_desp;                   /* Desired partitions
                                               * that are not yet seen
                                               * in the cluster. */
        rd_interval_t rkt_desp_refresh_intvl; /**< Rate-limiter for
                                               *   desired partition
                                               *   metadata refresh. */

        rd_ts_t rkt_ts_create;   /**< Topic object creation time. */
        rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata
                                  * update for this topic. */

        rd_refcnt_t rkt_app_refcnt; /**< Number of active rkt's new()ed
                                     *   by application. */

        enum { RD_KAFKA_TOPIC_S_UNKNOWN,   /* No cluster information yet */
               RD_KAFKA_TOPIC_S_EXISTS,    /* Topic exists in cluster */
               RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
               RD_KAFKA_TOPIC_S_ERROR,     /* Topic exists but is in an errored
                                            * state, such as auth failure. */
        } rkt_state;

        int rkt_flags;
#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL                                        \
        0x1 /* Leader lost/unavailable                                         \
             * for at least one partition. */

        rd_kafka_resp_err_t rkt_err; /**< Permanent error. */

        rd_kafka_t *rkt_rk;

        rd_avg_t rkt_avg_batchsize; /**< Average batch size */
        rd_avg_t rkt_avg_batchcnt;  /**< Average batch message count */

        rd_kafka_topic_conf_t rkt_conf;

        /** Idempotent/Txn producer:
         *  The PID,Epoch,base Msgid state for removed partitions. */
        TAILQ_HEAD(, rd_kafka_partition_msgid_s) rkt_saved_partmsgids;
};

#define rd_kafka_topic_rdlock(rkt)   rwlock_rdlock(&(rkt)->rkt_lock)
#define rd_kafka_topic_wrlock(rkt)   rwlock_wrlock(&(rkt)->rkt_lock)
#define rd_kafka_topic_rdunlock(rkt) rwlock_rdunlock(&(rkt)->rkt_lock)
#define rd_kafka_topic_wrunlock(rkt) rwlock_wrunlock(&(rkt)->rkt_lock)



/**
 * @brief Increase refcount and return topic object.
 */
static RD_INLINE RD_UNUSED rd_kafka_topic_t *
rd_kafka_topic_keep(rd_kafka_topic_t *rkt) {
        rd_kafka_lwtopic_t *lrkt;
        if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL))
                rd_kafka_lwtopic_keep(lrkt);
        else
                rd_refcnt_add(&rkt->rkt_refcnt);
        return rkt;
}

void rd_kafka_topic_destroy_final(rd_kafka_topic_t *rkt);

rd_kafka_topic_t *rd_kafka_topic_proper(rd_kafka_topic_t *app_rkt);



/**
 * @brief Loose reference to topic object as increased by ..topic_keep().
 */
static RD_INLINE RD_UNUSED void rd_kafka_topic_destroy0(rd_kafka_topic_t *rkt) {
        rd_kafka_lwtopic_t *lrkt;
        if (unlikely((lrkt = rd_kafka_rkt_get_lw(rkt)) != NULL))
                rd_kafka_lwtopic_destroy(lrkt);
        else if (unlikely(rd_refcnt_sub(&rkt->rkt_refcnt) == 0))
                rd_kafka_topic_destroy_final(rkt);
}


rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk,
                                      const char *topic,
                                      rd_kafka_topic_conf_t *conf,
                                      int *existing,
                                      int do_lock);

rd_kafka_topic_t *rd_kafka_topic_find_fl(const char *func,
                                         int line,
                                         rd_kafka_t *rk,
                                         const char *topic,
                                         int do_lock);
rd_kafka_topic_t *rd_kafka_topic_find0_fl(const char *func,
                                          int line,
                                          rd_kafka_t *rk,
                                          const rd_kafkap_str_t *topic);
#define rd_kafka_topic_find(rk, topic, do_lock)                                \
        rd_kafka_topic_find_fl(__FUNCTION__, __LINE__, rk, topic, do_lock)
#define rd_kafka_topic_find0(rk, topic)                                        \
        rd_kafka_topic_find0_fl(__FUNCTION__, __LINE__, rk, topic)
int rd_kafka_topic_cmp_rkt(const void *_a, const void *_b);

void rd_kafka_topic_partitions_remove(rd_kafka_topic_t *rkt);

rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt,
                                       rd_kafka_resp_err_t err);
rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt,
                                   rd_kafka_resp_err_t err);

/**
 * @returns the topic's permanent error, if any.
 *
 * @locality any
 * @locks_acquired rd_kafka_topic_rdlock(rkt)
 */
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_topic_get_error(rd_kafka_topic_t *rkt) {
        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
        rd_kafka_topic_rdlock(rkt);
        if (rkt->rkt_state == RD_KAFKA_TOPIC_S_ERROR)
                err = rkt->rkt_err;
        rd_kafka_topic_rdunlock(rkt);
        return err;
}

int rd_kafka_topic_metadata_update2(
    rd_kafka_broker_t *rkb,
    const struct rd_kafka_metadata_topic *mdt,
    const rd_kafka_partition_leader_epoch_t *leader_epochs);

void rd_kafka_topic_scan_all(rd_kafka_t *rk, rd_ts_t now);


typedef struct rd_kafka_topic_info_s {
        const char *topic; /**< Allocated along with struct */
        int partition_cnt;
} rd_kafka_topic_info_t;

int rd_kafka_topic_info_topic_cmp(const void *_a, const void *_b);
int rd_kafka_topic_info_cmp(const void *_a, const void *_b);
rd_kafka_topic_info_t *rd_kafka_topic_info_new(const char *topic,
                                               int partition_cnt);
void rd_kafka_topic_info_destroy(rd_kafka_topic_info_t *ti);

int rd_kafka_topic_match(rd_kafka_t *rk,
                         const char *pattern,
                         const char *topic);

int rd_kafka_toppar_broker_update(rd_kafka_toppar_t *rktp,
                                  int32_t broker_id,
                                  rd_kafka_broker_t *rkb,
                                  const char *reason);

int rd_kafka_toppar_delegate_to_leader(rd_kafka_toppar_t *rktp);

rd_kafka_resp_err_t rd_kafka_topics_leader_query_sync(rd_kafka_t *rk,
                                                      int all_topics,
                                                      const rd_list_t *topics,
                                                      int timeout_ms);
void rd_kafka_topic_leader_query0(rd_kafka_t *rk,
                                  rd_kafka_topic_t *rkt,
                                  int do_rk_lock,
                                  rd_bool_t force);
#define rd_kafka_topic_leader_query(rk, rkt)                                   \
        rd_kafka_topic_leader_query0(rk, rkt, 1 /*lock*/,                      \
                                     rd_false /*dont force*/)

#define rd_kafka_topic_fast_leader_query(rk)                                   \
        rd_kafka_metadata_fast_leader_query(rk)

void rd_kafka_local_topics_to_list(rd_kafka_t *rk,
                                   rd_list_t *topics,
                                   int *cache_cntp);

void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt,
                                        int partition_cnt,
                                        int32_t leader_id);

#endif /* _RDKAFKA_TOPIC_H_ */