summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.h
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.h')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.h144
1 files changed, 144 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.h b/src/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.h
new file mode 100644
index 000000000..5be8d606d
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.h
@@ -0,0 +1,144 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2018 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#ifndef _RD_KAFKA_IDEMPOTENCE_H_
+#define _RD_KAFKA_IDEMPOTENCE_H_
+
+
+/**
+ * @define The broker maintains a window of the 5 last Produce requests
+ * for a partition to be able to de-deduplicate resends.
+ */
+#define RD_KAFKA_IDEMP_MAX_INFLIGHT 5
+#define RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "5" /* For printouts */
+
+/**
+ * @brief Get the current PID if state permits.
+ *
+ * @param bumpable If true, return PID even if it may only be used for
+ * bumping the Epoch.
+ *
+ * @returns If there is no valid PID or the state
+ * does not permit further PID usage (such as when draining)
+ * then an invalid PID is returned.
+ *
+ * @locality any
+ * @locks none
+ */
+static RD_UNUSED RD_INLINE rd_kafka_pid_t
+rd_kafka_idemp_get_pid0(rd_kafka_t *rk,
+ rd_dolock_t do_lock,
+ rd_bool_t bumpable) {
+ rd_kafka_pid_t pid;
+
+ if (do_lock)
+ rd_kafka_rdlock(rk);
+ if (likely(rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED))
+ pid = rk->rk_eos.pid;
+ else if (unlikely(bumpable && rk->rk_eos.idemp_state ==
+ RD_KAFKA_IDEMP_STATE_WAIT_TXN_ABORT))
+ pid = rk->rk_eos.pid;
+ else
+ rd_kafka_pid_reset(&pid);
+ if (do_lock)
+ rd_kafka_rdunlock(rk);
+
+ return pid;
+}
+
+#define rd_kafka_idemp_get_pid(rk) \
+ rd_kafka_idemp_get_pid0(rk, RD_DO_LOCK, rd_false)
+
+void rd_kafka_idemp_set_state(rd_kafka_t *rk, rd_kafka_idemp_state_t new_state);
+void rd_kafka_idemp_request_pid_failed(rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err);
+void rd_kafka_idemp_pid_update(rd_kafka_broker_t *rkb,
+ const rd_kafka_pid_t pid);
+void rd_kafka_idemp_pid_fsm(rd_kafka_t *rk);
+void rd_kafka_idemp_drain_reset(rd_kafka_t *rk, const char *reason);
+void rd_kafka_idemp_drain_epoch_bump0(rd_kafka_t *rk,
+ rd_bool_t allow_txn_abort,
+ rd_kafka_resp_err_t err,
+ const char *fmt,
+ ...) RD_FORMAT(printf, 4, 5);
+#define rd_kafka_idemp_drain_epoch_bump(rk, err, ...) \
+ rd_kafka_idemp_drain_epoch_bump0(rk, rd_true, err, __VA_ARGS__)
+
+void rd_kafka_idemp_drain_toppar(rd_kafka_toppar_t *rktp, const char *reason);
+void rd_kafka_idemp_inflight_toppar_sub(rd_kafka_t *rk,
+ rd_kafka_toppar_t *rktp);
+void rd_kafka_idemp_inflight_toppar_add(rd_kafka_t *rk,
+ rd_kafka_toppar_t *rktp);
+
+rd_kafka_broker_t *rd_kafka_idemp_broker_any(rd_kafka_t *rk,
+ rd_kafka_resp_err_t *errp,
+ char *errstr,
+ size_t errstr_size);
+
+rd_bool_t rd_kafka_idemp_check_error(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ const char *errstr,
+ rd_bool_t is_fatal);
+
+
+/**
+ * @brief Call when a fatal idempotence error has occurred, when the producer
+ * can't continue without risking the idempotency guarantees.
+ *
+ * If the producer is transactional this error is non-fatal and will just
+ * cause the current transaction to transition into the ABORTABLE_ERROR state.
+ * If the producer is not transactional the client instance fatal error
+ * is set and the producer instance is no longer usable.
+ *
+ * @Warning Until KIP-360 has been fully implemented any fatal idempotent
+ * producer error will also raise a fatal transactional producer error.
+ * This is to guarantee that there is no silent data loss.
+ *
+ * @param RK rd_kafka_t instance
+ * @param ERR error to raise
+ * @param ... format string with error message
+ *
+ * @locality any thread
+ * @locks none
+ */
+#define rd_kafka_idemp_set_fatal_error(RK, ERR, ...) \
+ do { \
+ if (rd_kafka_is_transactional(RK)) \
+ rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, ERR, \
+ __VA_ARGS__); \
+ else \
+ rd_kafka_set_fatal_error(RK, ERR, __VA_ARGS__); \
+ } while (0)
+
+void rd_kafka_idemp_start(rd_kafka_t *rk, rd_bool_t immediate);
+void rd_kafka_idemp_init(rd_kafka_t *rk);
+void rd_kafka_idemp_term(rd_kafka_t *rk);
+
+
+#endif /* _RD_KAFKA_IDEMPOTENCE_H_ */