summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.h
blob: 7b01c84877d6c9636acc01381ddf169e6cd3c864 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012,2013 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _RDKAFKA_OFFSET_H_
#define _RDKAFKA_OFFSET_H_

#include "rdkafka_partition.h"


const char *rd_kafka_offset2str(int64_t offset);


/**
 * @brief Stores the offset for the toppar 'rktp'.
 *        The actual commit of the offset to backing store is usually
 *        performed at a later time (time or threshold based).
 *
 * For the high-level consumer (assign()), this function will reject absolute
 * offsets if the partition is not currently assigned, unless \p force is set.
 * This check was added to avoid a race condition where an application
 * would call offsets_store() after the partitions had been revoked, forcing
 * a future auto-committer on the next assignment to commit this old offset and
 * overwriting whatever newer offset was committed by another consumer.
 *
 * The \p force flag is useful for internal calls to offset_store0() which
 * do not need the protection described above.
 *
 *
 * There is one situation where the \p force flag is troublesome:
 * If the application is using any of the consumer batching APIs,
 * e.g., consume_batch() or the event-based consumption, then it's possible
 * that while the batch is being accumulated or the application is picking off
 * messages from the event a rebalance occurs (in the background) which revokes
 * the current assignment. This revokal will remove all queued messages, but
 * not the ones the application already has accumulated in the event object.
 * Enforcing assignment for store in this state is tricky with a bunch of
 * corner cases, so instead we let those places forcibly store the offset, but
 * then in assign() we reset the stored offset to .._INVALID, just like we do
 * on revoke.
 * Illustrated (with fix):
 *   1. ev = rd_kafka_queue_poll();
 *   2. background rebalance revoke unassigns the partition and sets the
 *      stored offset to _INVALID.
 *   3. application calls message_next(ev) which forcibly sets the
 *      stored offset.
 *   4. background rebalance assigns the partition again, but forcibly sets
 *      the stored offset to .._INVALID to provide a clean state.
 *
 * @param pos Offset and leader epoch to set, may be an absolute offset
 *            or .._INVALID.
 * @param force Forcibly set \p offset regardless of assignment state.
 * @param do_lock Whether to lock the \p rktp or not (already locked by caller).
 *
 * See head of rdkafka_offset.c for more information.
 *
 * @returns RD_KAFKA_RESP_ERR__STATE if the partition is not currently assigned,
 *          unless \p force is set.
 */
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_offset_store0(rd_kafka_toppar_t *rktp,
                       const rd_kafka_fetch_pos_t pos,
                       rd_bool_t force,
                       rd_dolock_t do_lock) {
        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;

        if (do_lock)
                rd_kafka_toppar_lock(rktp);

        if (unlikely(!force && !RD_KAFKA_OFFSET_IS_LOGICAL(pos.offset) &&
                     !(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED) &&
                     !rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk))) {
                err = RD_KAFKA_RESP_ERR__STATE;
        } else {
                rktp->rktp_stored_pos = pos;
        }

        if (do_lock)
                rd_kafka_toppar_unlock(rktp);

        return err;
}

rd_kafka_resp_err_t
rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset);

rd_kafka_resp_err_t rd_kafka_offset_sync(rd_kafka_toppar_t *rktp);

void rd_kafka_offset_store_term(rd_kafka_toppar_t *rktp,
                                rd_kafka_resp_err_t err);
rd_kafka_resp_err_t rd_kafka_offset_store_stop(rd_kafka_toppar_t *rktp);
void rd_kafka_offset_store_init(rd_kafka_toppar_t *rktp);

void rd_kafka_offset_reset(rd_kafka_toppar_t *rktp,
                           int32_t broker_id,
                           rd_kafka_fetch_pos_t err_pos,
                           rd_kafka_resp_err_t err,
                           const char *fmt,
                           ...) RD_FORMAT(printf, 5, 6);

void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...)
    RD_FORMAT(printf, 2, 3);

void rd_kafka_offset_query_tmr_cb(rd_kafka_timers_t *rkts, void *arg);

void rd_kafka_update_app_pos(rd_kafka_t *rk,
                             rd_kafka_toppar_t *rktp,
                             rd_kafka_fetch_pos_t pos,
                             rd_dolock_t do_lock);

#endif /* _RDKAFKA_OFFSET_H_ */