summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.h135
1 files changed, 135 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.h b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.h
new file mode 100644
index 000000000..7b01c8487
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.h
@@ -0,0 +1,135 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012,2013 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RDKAFKA_OFFSET_H_
+#define _RDKAFKA_OFFSET_H_
+
+#include "rdkafka_partition.h"
+
+
+const char *rd_kafka_offset2str(int64_t offset);
+
+
+/**
+ * @brief Stores the offset for the toppar 'rktp'.
+ * The actual commit of the offset to backing store is usually
+ * performed at a later time (time or threshold based).
+ *
+ * For the high-level consumer (assign()), this function will reject absolute
+ * offsets if the partition is not currently assigned, unless \p force is set.
+ * This check was added to avoid a race condition where an application
+ * would call offsets_store() after the partitions had been revoked, forcing
+ * a future auto-committer on the next assignment to commit this old offset and
+ * overwriting whatever newer offset was committed by another consumer.
+ *
+ * The \p force flag is useful for internal calls to offset_store0() which
+ * do not need the protection described above.
+ *
+ *
+ * There is one situation where the \p force flag is troublesome:
+ * If the application is using any of the consumer batching APIs,
+ * e.g., consume_batch() or the event-based consumption, then it's possible
+ * that while the batch is being accumulated or the application is picking off
+ * messages from the event a rebalance occurs (in the background) which revokes
+ * the current assignment. This revokal will remove all queued messages, but
+ * not the ones the application already has accumulated in the event object.
+ * Enforcing assignment for store in this state is tricky with a bunch of
+ * corner cases, so instead we let those places forcibly store the offset, but
+ * then in assign() we reset the stored offset to .._INVALID, just like we do
+ * on revoke.
+ * Illustrated (with fix):
+ * 1. ev = rd_kafka_queue_poll();
+ * 2. background rebalance revoke unassigns the partition and sets the
+ * stored offset to _INVALID.
+ * 3. application calls message_next(ev) which forcibly sets the
+ * stored offset.
+ * 4. background rebalance assigns the partition again, but forcibly sets
+ * the stored offset to .._INVALID to provide a clean state.
+ *
+ * @param pos Offset and leader epoch to set, may be an absolute offset
+ * or .._INVALID.
+ * @param force Forcibly set \p offset regardless of assignment state.
+ * @param do_lock Whether to lock the \p rktp or not (already locked by caller).
+ *
+ * See head of rdkafka_offset.c for more information.
+ *
+ * @returns RD_KAFKA_RESP_ERR__STATE if the partition is not currently assigned,
+ * unless \p force is set.
+ */
+static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
+rd_kafka_offset_store0(rd_kafka_toppar_t *rktp,
+ const rd_kafka_fetch_pos_t pos,
+ rd_bool_t force,
+ rd_dolock_t do_lock) {
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ if (do_lock)
+ rd_kafka_toppar_lock(rktp);
+
+ if (unlikely(!force && !RD_KAFKA_OFFSET_IS_LOGICAL(pos.offset) &&
+ !(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED) &&
+ !rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk))) {
+ err = RD_KAFKA_RESP_ERR__STATE;
+ } else {
+ rktp->rktp_stored_pos = pos;
+ }
+
+ if (do_lock)
+ rd_kafka_toppar_unlock(rktp);
+
+ return err;
+}
+
+rd_kafka_resp_err_t
+rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset);
+
+rd_kafka_resp_err_t rd_kafka_offset_sync(rd_kafka_toppar_t *rktp);
+
+void rd_kafka_offset_store_term(rd_kafka_toppar_t *rktp,
+ rd_kafka_resp_err_t err);
+rd_kafka_resp_err_t rd_kafka_offset_store_stop(rd_kafka_toppar_t *rktp);
+void rd_kafka_offset_store_init(rd_kafka_toppar_t *rktp);
+
+void rd_kafka_offset_reset(rd_kafka_toppar_t *rktp,
+ int32_t broker_id,
+ rd_kafka_fetch_pos_t err_pos,
+ rd_kafka_resp_err_t err,
+ const char *fmt,
+ ...) RD_FORMAT(printf, 5, 6);
+
+void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...)
+ RD_FORMAT(printf, 2, 3);
+
+void rd_kafka_offset_query_tmr_cb(rd_kafka_timers_t *rkts, void *arg);
+
+void rd_kafka_update_app_pos(rd_kafka_t *rk,
+ rd_kafka_toppar_t *rktp,
+ rd_kafka_fetch_pos_t pos,
+ rd_dolock_t do_lock);
+
+#endif /* _RDKAFKA_OFFSET_H_ */