diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.c | 1548 |
1 files changed, 1548 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.c new file mode 100644 index 00000000..ffa6a9d5 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_offset.c @@ -0,0 +1,1548 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012,2013 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +// FIXME: Revise this documentation: +/** + * This file implements the consumer offset storage. + * It currently supports local file storage and broker OffsetCommit storage. + * + * Regardless of commit method (file, broker, ..) this is how it works: + * - When rdkafka, or the application, depending on if auto.offset.commit + * is enabled or not, calls rd_kafka_offset_store() with an offset to store, + * all it does is set rktp->rktp_stored_offset to this value. + * This can happen from any thread and is locked by the rktp lock. + * - The actual commit/write of the offset to its backing store (filesystem) + * is performed by the main rdkafka thread and scheduled at the configured + * auto.commit.interval.ms interval. + * - The write is performed in the main rdkafka thread (in a blocking manner + * for file based offsets) and once the write has + * succeeded rktp->rktp_committed_offset is updated to the new value. + * - If offset.store.sync.interval.ms is configured the main rdkafka thread + * will also make sure to fsync() each offset file accordingly. (file) + */ + + +#include "rdkafka_int.h" +#include "rdkafka_topic.h" +#include "rdkafka_partition.h" +#include "rdkafka_offset.h" +#include "rdkafka_broker.h" +#include "rdkafka_request.h" + +#include <stdio.h> +#include <sys/types.h> +#include <fcntl.h> + +#ifdef _WIN32 +#include <io.h> +#include <share.h> +#include <sys/stat.h> +#include <shlwapi.h> +#endif + + +/** + * Convert an absolute or logical offset to string. + */ +const char *rd_kafka_offset2str(int64_t offset) { + static RD_TLS char ret[16][32]; + static RD_TLS int i = 0; + + i = (i + 1) % 16; + + if (offset >= 0) + rd_snprintf(ret[i], sizeof(ret[i]), "%" PRId64, offset); + else if (offset == RD_KAFKA_OFFSET_BEGINNING) + return "BEGINNING"; + else if (offset == RD_KAFKA_OFFSET_END) + return "END"; + else if (offset == RD_KAFKA_OFFSET_STORED) + return "STORED"; + else if (offset == RD_KAFKA_OFFSET_INVALID) + return "INVALID"; + else if (offset <= RD_KAFKA_OFFSET_TAIL_BASE) + rd_snprintf(ret[i], sizeof(ret[i]), "TAIL(%lld)", + llabs(offset - RD_KAFKA_OFFSET_TAIL_BASE)); + else + rd_snprintf(ret[i], sizeof(ret[i]), "%" PRId64 "?", offset); + + return ret[i]; +} + +static void rd_kafka_offset_file_close(rd_kafka_toppar_t *rktp) { + if (!rktp->rktp_offset_fp) + return; + + fclose(rktp->rktp_offset_fp); + rktp->rktp_offset_fp = NULL; +} + + +#ifndef _WIN32 +/** + * Linux version of open callback providing racefree CLOEXEC. + */ +int rd_kafka_open_cb_linux(const char *pathname, + int flags, + mode_t mode, + void *opaque) { +#ifdef O_CLOEXEC + return open(pathname, flags | O_CLOEXEC, mode); +#else + return rd_kafka_open_cb_generic(pathname, flags, mode, opaque); +#endif +} +#endif + +/** + * Fallback version of open_cb NOT providing racefree CLOEXEC, + * but setting CLOEXEC after file open (if FD_CLOEXEC is defined). + */ +int rd_kafka_open_cb_generic(const char *pathname, + int flags, + mode_t mode, + void *opaque) { +#ifndef _WIN32 + int fd; + int on = 1; + fd = open(pathname, flags, mode); + if (fd == -1) + return -1; +#ifdef FD_CLOEXEC + fcntl(fd, F_SETFD, FD_CLOEXEC, &on); +#endif + return fd; +#else + int fd; + if (_sopen_s(&fd, pathname, flags, _SH_DENYNO, mode) != 0) + return -1; + return fd; +#endif +} + + +static int rd_kafka_offset_file_open(rd_kafka_toppar_t *rktp) { + rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; + int fd; + +#ifndef _WIN32 + mode_t mode = 0644; +#else + mode_t mode = _S_IREAD | _S_IWRITE; +#endif + if ((fd = rk->rk_conf.open_cb(rktp->rktp_offset_path, O_CREAT | O_RDWR, + mode, rk->rk_conf.opaque)) == -1) { + rd_kafka_op_err(rktp->rktp_rkt->rkt_rk, RD_KAFKA_RESP_ERR__FS, + "%s [%" PRId32 + "]: " + "Failed to open offset file %s: %s", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, rktp->rktp_offset_path, + rd_strerror(errno)); + return -1; + } + + rktp->rktp_offset_fp = +#ifndef _WIN32 + fdopen(fd, "r+"); +#else + _fdopen(fd, "r+"); +#endif + + return 0; +} + + +static int64_t rd_kafka_offset_file_read(rd_kafka_toppar_t *rktp) { + char buf[22]; + char *end; + int64_t offset; + size_t r; + + if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) { + rd_kafka_op_err(rktp->rktp_rkt->rkt_rk, RD_KAFKA_RESP_ERR__FS, + "%s [%" PRId32 + "]: " + "Seek (for read) failed on offset file %s: %s", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, rktp->rktp_offset_path, + rd_strerror(errno)); + rd_kafka_offset_file_close(rktp); + return RD_KAFKA_OFFSET_INVALID; + } + + r = fread(buf, 1, sizeof(buf) - 1, rktp->rktp_offset_fp); + if (r == 0) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "%s [%" PRId32 "]: offset file (%s) is empty", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, rktp->rktp_offset_path); + return RD_KAFKA_OFFSET_INVALID; + } + + buf[r] = '\0'; + + offset = strtoull(buf, &end, 10); + if (buf == end) { + rd_kafka_op_err(rktp->rktp_rkt->rkt_rk, RD_KAFKA_RESP_ERR__FS, + "%s [%" PRId32 + "]: " + "Unable to parse offset in %s", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, rktp->rktp_offset_path); + return RD_KAFKA_OFFSET_INVALID; + } + + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "%s [%" PRId32 "]: Read offset %" PRId64 + " from offset " + "file (%s)", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + offset, rktp->rktp_offset_path); + + return offset; +} + + +/** + * Sync/flush offset file. + */ +static int rd_kafka_offset_file_sync(rd_kafka_toppar_t *rktp) { + if (!rktp->rktp_offset_fp) + return 0; + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "SYNC", + "%s [%" PRId32 "]: offset file sync", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); + +#ifndef _WIN32 + (void)fflush(rktp->rktp_offset_fp); + (void)fsync(fileno(rktp->rktp_offset_fp)); // FIXME +#else + // FIXME + // FlushFileBuffers(_get_osfhandle(fileno(rktp->rktp_offset_fp))); +#endif + return 0; +} + + +/** + * Write offset to offset file. + * + * Locality: toppar's broker thread + */ +static rd_kafka_resp_err_t +rd_kafka_offset_file_commit(rd_kafka_toppar_t *rktp) { + rd_kafka_topic_t *rkt = rktp->rktp_rkt; + int attempt; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + int64_t offset = rktp->rktp_stored_pos.offset; + + for (attempt = 0; attempt < 2; attempt++) { + char buf[22]; + int len; + + if (!rktp->rktp_offset_fp) + if (rd_kafka_offset_file_open(rktp) == -1) + continue; + + if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) { + rd_kafka_op_err( + rktp->rktp_rkt->rkt_rk, RD_KAFKA_RESP_ERR__FS, + "%s [%" PRId32 + "]: " + "Seek failed on offset file %s: %s", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, rktp->rktp_offset_path, + rd_strerror(errno)); + err = RD_KAFKA_RESP_ERR__FS; + rd_kafka_offset_file_close(rktp); + continue; + } + + len = rd_snprintf(buf, sizeof(buf), "%" PRId64 "\n", offset); + + if (fwrite(buf, 1, len, rktp->rktp_offset_fp) < 1) { + rd_kafka_op_err( + rktp->rktp_rkt->rkt_rk, RD_KAFKA_RESP_ERR__FS, + "%s [%" PRId32 + "]: " + "Failed to write offset %" PRId64 + " to " + "offset file %s: %s", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, offset, + rktp->rktp_offset_path, rd_strerror(errno)); + err = RD_KAFKA_RESP_ERR__FS; + rd_kafka_offset_file_close(rktp); + continue; + } + + /* Need to flush before truncate to preserve write ordering */ + (void)fflush(rktp->rktp_offset_fp); + + /* Truncate file */ +#ifdef _WIN32 + if (_chsize_s(_fileno(rktp->rktp_offset_fp), len) == -1) + ; /* Ignore truncate failures */ +#else + if (ftruncate(fileno(rktp->rktp_offset_fp), len) == -1) + ; /* Ignore truncate failures */ +#endif + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "%s [%" PRId32 "]: wrote offset %" PRId64 + " to " + "file %s", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, offset, + rktp->rktp_offset_path); + + rktp->rktp_committed_pos.offset = offset; + + /* If sync interval is set to immediate we sync right away. */ + if (rkt->rkt_conf.offset_store_sync_interval_ms == 0) + rd_kafka_offset_file_sync(rktp); + + + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + + return err; +} + + + +/** + * Commit a list of offsets asynchronously. Response will be queued on 'replyq'. + * Optional \p cb will be set on requesting op. + * + * Makes a copy of \p offsets (may be NULL for current assignment) + */ +static rd_kafka_resp_err_t +rd_kafka_commit0(rd_kafka_t *rk, + const rd_kafka_topic_partition_list_t *offsets, + rd_kafka_toppar_t *rktp, + rd_kafka_replyq_t replyq, + void (*cb)(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *offsets, + void *opaque), + void *opaque, + const char *reason) { + rd_kafka_cgrp_t *rkcg; + rd_kafka_op_t *rko; + + if (!(rkcg = rd_kafka_cgrp_get(rk))) + return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; + + rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT); + rko->rko_u.offset_commit.reason = rd_strdup(reason); + rko->rko_replyq = replyq; + rko->rko_u.offset_commit.cb = cb; + rko->rko_u.offset_commit.opaque = opaque; + if (rktp) + rko->rko_rktp = rd_kafka_toppar_keep(rktp); + + if (offsets) + rko->rko_u.offset_commit.partitions = + rd_kafka_topic_partition_list_copy(offsets); + + rd_kafka_q_enq(rkcg->rkcg_ops, rko); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +/** + * NOTE: 'offsets' may be NULL, see official documentation. + */ +rd_kafka_resp_err_t +rd_kafka_commit(rd_kafka_t *rk, + const rd_kafka_topic_partition_list_t *offsets, + int async) { + rd_kafka_cgrp_t *rkcg; + rd_kafka_resp_err_t err; + rd_kafka_q_t *repq = NULL; + rd_kafka_replyq_t rq = RD_KAFKA_NO_REPLYQ; + + if (!(rkcg = rd_kafka_cgrp_get(rk))) + return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; + + if (!async) { + repq = rd_kafka_q_new(rk); + rq = RD_KAFKA_REPLYQ(repq, 0); + } + + err = rd_kafka_commit0(rk, offsets, NULL, rq, NULL, NULL, "manual"); + + if (!err && !async) + err = rd_kafka_q_wait_result(repq, RD_POLL_INFINITE); + + if (!async) + rd_kafka_q_destroy_owner(repq); + + return err; +} + + +rd_kafka_resp_err_t rd_kafka_commit_message(rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, + int async) { + rd_kafka_topic_partition_list_t *offsets; + rd_kafka_topic_partition_t *rktpar; + rd_kafka_resp_err_t err; + + if (rkmessage->err) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + offsets = rd_kafka_topic_partition_list_new(1); + rktpar = rd_kafka_topic_partition_list_add( + offsets, rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition); + rktpar->offset = rkmessage->offset + 1; + + err = rd_kafka_commit(rk, offsets, async); + + rd_kafka_topic_partition_list_destroy(offsets); + + return err; +} + + + +rd_kafka_resp_err_t +rd_kafka_commit_queue(rd_kafka_t *rk, + const rd_kafka_topic_partition_list_t *offsets, + rd_kafka_queue_t *rkqu, + void (*cb)(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *offsets, + void *opaque), + void *opaque) { + rd_kafka_q_t *rkq; + rd_kafka_resp_err_t err; + + if (!rd_kafka_cgrp_get(rk)) + return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; + + if (rkqu) + rkq = rkqu->rkqu_q; + else + rkq = rd_kafka_q_new(rk); + + err = rd_kafka_commit0(rk, offsets, NULL, RD_KAFKA_REPLYQ(rkq, 0), cb, + opaque, "manual"); + + if (!rkqu) { + rd_kafka_op_t *rko = rd_kafka_q_pop_serve( + rkq, RD_POLL_INFINITE, 0, RD_KAFKA_Q_CB_FORCE_RETURN, NULL, + NULL); + if (!rko) + err = RD_KAFKA_RESP_ERR__TIMED_OUT; + else { + if (cb) + cb(rk, rko->rko_err, + rko->rko_u.offset_commit.partitions, opaque); + err = rko->rko_err; + rd_kafka_op_destroy(rko); + } + + if (rkqu) + rd_kafka_q_destroy(rkq); + else + rd_kafka_q_destroy_owner(rkq); + } + + return err; +} + + + +/** + * Called when a broker commit is done. + * + * Locality: toppar handler thread + * Locks: none + */ +static void +rd_kafka_offset_broker_commit_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *offsets, + void *opaque) { + rd_kafka_toppar_t *rktp; + rd_kafka_topic_partition_t *rktpar; + + if (offsets->cnt == 0) { + rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT", + "No offsets to commit (commit_cb)"); + return; + } + + rktpar = &offsets->elems[0]; + + if (!(rktp = + rd_kafka_topic_partition_get_toppar(rk, rktpar, rd_false))) { + rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT", + "No local partition found for %s [%" PRId32 + "] " + "while parsing OffsetCommit response " + "(offset %" PRId64 ", error \"%s\")", + rktpar->topic, rktpar->partition, rktpar->offset, + rd_kafka_err2str(rktpar->err)); + return; + } + + if (!err) + err = rktpar->err; + + rd_kafka_toppar_offset_commit_result(rktp, err, offsets); + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "%s [%" PRId32 "]: offset %" PRId64 " %scommitted: %s", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rktpar->offset, err ? "not " : "", rd_kafka_err2str(err)); + + rktp->rktp_committing_pos.offset = 0; + + rd_kafka_toppar_lock(rktp); + if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING) + rd_kafka_offset_store_term(rktp, err); + rd_kafka_toppar_unlock(rktp); + + rd_kafka_toppar_destroy(rktp); +} + + +/** + * @locks_required rd_kafka_toppar_lock(rktp) MUST be held. + */ +static rd_kafka_resp_err_t +rd_kafka_offset_broker_commit(rd_kafka_toppar_t *rktp, const char *reason) { + rd_kafka_topic_partition_list_t *offsets; + rd_kafka_topic_partition_t *rktpar; + + rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_cgrp != NULL); + rd_kafka_assert(rktp->rktp_rkt->rkt_rk, + rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE); + + rktp->rktp_committing_pos = rktp->rktp_stored_pos; + + offsets = rd_kafka_topic_partition_list_new(1); + rktpar = rd_kafka_topic_partition_list_add( + offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); + rd_kafka_topic_partition_set_from_fetch_pos(rktpar, + rktp->rktp_committing_pos); + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSETCMT", + "%.*s [%" PRId32 "]: committing %s: %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_fetch_pos2str(rktp->rktp_committing_pos), reason); + + rd_kafka_commit0(rktp->rktp_rkt->rkt_rk, offsets, rktp, + RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), + rd_kafka_offset_broker_commit_cb, NULL, reason); + + rd_kafka_topic_partition_list_destroy(offsets); + + return RD_KAFKA_RESP_ERR__IN_PROGRESS; +} + + + +/** + * Commit offset to backing store. + * This might be an async operation. + * + * Locality: toppar handler thread + */ +static rd_kafka_resp_err_t rd_kafka_offset_commit(rd_kafka_toppar_t *rktp, + const char *reason) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "%s [%" PRId32 "]: commit: stored %s > committed %s?", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_fetch_pos2str(rktp->rktp_stored_pos), + rd_kafka_fetch_pos2str(rktp->rktp_committed_pos)); + + /* Already committed */ + if (rd_kafka_fetch_pos_cmp(&rktp->rktp_stored_pos, + &rktp->rktp_committed_pos) <= 0) + return RD_KAFKA_RESP_ERR_NO_ERROR; + + /* Already committing (for async ops) */ + if (rd_kafka_fetch_pos_cmp(&rktp->rktp_stored_pos, + &rktp->rktp_committing_pos) <= 0) + return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS; + + switch (rktp->rktp_rkt->rkt_conf.offset_store_method) { + case RD_KAFKA_OFFSET_METHOD_FILE: + return rd_kafka_offset_file_commit(rktp); + case RD_KAFKA_OFFSET_METHOD_BROKER: + return rd_kafka_offset_broker_commit(rktp, reason); + default: + /* UNREACHABLE */ + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } +} + + + +/** + * Sync offset backing store. This is only used for METHOD_FILE. + * + * Locality: rktp's broker thread. + */ +rd_kafka_resp_err_t rd_kafka_offset_sync(rd_kafka_toppar_t *rktp) { + switch (rktp->rktp_rkt->rkt_conf.offset_store_method) { + case RD_KAFKA_OFFSET_METHOD_FILE: + return rd_kafka_offset_file_sync(rktp); + default: + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } +} + + +/** + * Store offset. + * Typically called from application code. + * + * NOTE: No locks must be held. + * + * @deprecated Use rd_kafka_offsets_store(). + */ +rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *app_rkt, + int32_t partition, + int64_t offset) { + rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt); + rd_kafka_toppar_t *rktp; + rd_kafka_resp_err_t err; + rd_kafka_fetch_pos_t pos = {offset + 1, -1 /*no leader epoch known*/}; + + /* Find toppar */ + rd_kafka_topic_rdlock(rkt); + if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0 /*!ua_on_miss*/))) { + rd_kafka_topic_rdunlock(rkt); + return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; + } + rd_kafka_topic_rdunlock(rkt); + + err = rd_kafka_offset_store0(rktp, pos, rd_false /* Don't force */, + RD_DO_LOCK); + + rd_kafka_toppar_destroy(rktp); + + return err; +} + + +rd_kafka_resp_err_t +rd_kafka_offsets_store(rd_kafka_t *rk, + rd_kafka_topic_partition_list_t *offsets) { + int i; + int ok_cnt = 0; + rd_kafka_resp_err_t last_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + if (rk->rk_conf.enable_auto_offset_store) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + for (i = 0; i < offsets->cnt; i++) { + rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; + rd_kafka_toppar_t *rktp; + rd_kafka_fetch_pos_t pos = {rktpar->offset, -1}; + + rktp = + rd_kafka_topic_partition_get_toppar(rk, rktpar, rd_false); + if (!rktp) { + rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; + last_err = rktpar->err; + continue; + } + + pos.leader_epoch = + rd_kafka_topic_partition_get_leader_epoch(rktpar); + + rktpar->err = rd_kafka_offset_store0( + rktp, pos, rd_false /* don't force */, RD_DO_LOCK); + rd_kafka_toppar_destroy(rktp); + + if (rktpar->err) + last_err = rktpar->err; + else + ok_cnt++; + } + + return offsets->cnt > 0 && ok_cnt == 0 ? last_err + : RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +rd_kafka_error_t *rd_kafka_offset_store_message(rd_kafka_message_t *rkmessage) { + rd_kafka_toppar_t *rktp; + rd_kafka_op_t *rko; + rd_kafka_resp_err_t err; + rd_kafka_msg_t *rkm = (rd_kafka_msg_t *)rkmessage; + rd_kafka_fetch_pos_t pos; + + if (rkmessage->err) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, + "Message object must not have an " + "error set"); + + if (unlikely(!(rko = rd_kafka_message2rko(rkmessage)) || + !(rktp = rko->rko_rktp))) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, + "Invalid message object, " + "not a consumed message"); + + pos.offset = rkmessage->offset + 1; + pos.leader_epoch = rkm->rkm_u.consumer.leader_epoch; + err = rd_kafka_offset_store0(rktp, pos, rd_false /* Don't force */, + RD_DO_LOCK); + + if (err == RD_KAFKA_RESP_ERR__STATE) + return rd_kafka_error_new(err, "Partition is not assigned"); + else if (err) + return rd_kafka_error_new(err, "Failed to store offset: %s", + rd_kafka_err2str(err)); + + return NULL; +} + + + +/** + * Decommissions the use of an offset file for a toppar. + * The file content will not be touched and the file will not be removed. + */ +static rd_kafka_resp_err_t rd_kafka_offset_file_term(rd_kafka_toppar_t *rktp) { + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + + /* Sync offset file if the sync is intervalled (> 0) */ + if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0) { + rd_kafka_offset_file_sync(rktp); + rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers, + &rktp->rktp_offset_sync_tmr, 1 /*lock*/); + } + + + rd_kafka_offset_file_close(rktp); + + rd_free(rktp->rktp_offset_path); + rktp->rktp_offset_path = NULL; + + return err; +} + +static rd_kafka_op_res_t rd_kafka_offset_reset_op_cb(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_toppar_t *rktp = rko->rko_rktp; + rd_kafka_toppar_lock(rktp); + rd_kafka_offset_reset(rktp, rko->rko_u.offset_reset.broker_id, + rko->rko_u.offset_reset.pos, rko->rko_err, "%s", + rko->rko_u.offset_reset.reason); + rd_kafka_toppar_unlock(rktp); + return RD_KAFKA_OP_RES_HANDLED; +} + +/** + * @brief Take action when the offset for a toppar is unusable (due to an + * error, or offset is logical). + * + * @param rktp the toppar + * @param broker_id Originating broker, if any, else RD_KAFKA_NODEID_UA. + * @param err_pos a logical offset, or offset corresponding to the error. + * @param err the error, or RD_KAFKA_RESP_ERR_NO_ERROR if offset is logical. + * @param fmt a reason string for logging. + * + * @locality any. if not main thread, work will be enqued on main thread. + * @locks_required toppar_lock() MUST be held + */ +void rd_kafka_offset_reset(rd_kafka_toppar_t *rktp, + int32_t broker_id, + rd_kafka_fetch_pos_t err_pos, + rd_kafka_resp_err_t err, + const char *fmt, + ...) { + rd_kafka_fetch_pos_t pos = {RD_KAFKA_OFFSET_INVALID, -1}; + const char *extra = ""; + char reason[512]; + va_list ap; + + va_start(ap, fmt); + rd_vsnprintf(reason, sizeof(reason), fmt, ap); + va_end(ap); + + /* Enqueue op for toppar handler thread if we're on the wrong thread. */ + if (!thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)) { + rd_kafka_op_t *rko = + rd_kafka_op_new(RD_KAFKA_OP_OFFSET_RESET | RD_KAFKA_OP_CB); + rko->rko_op_cb = rd_kafka_offset_reset_op_cb; + rko->rko_err = err; + rko->rko_rktp = rd_kafka_toppar_keep(rktp); + rko->rko_u.offset_reset.broker_id = broker_id; + rko->rko_u.offset_reset.pos = err_pos; + rko->rko_u.offset_reset.reason = rd_strdup(reason); + rd_kafka_q_enq(rktp->rktp_ops, rko); + return; + } + + if (err_pos.offset == RD_KAFKA_OFFSET_INVALID || err) + pos.offset = rktp->rktp_rkt->rkt_conf.auto_offset_reset; + else + pos.offset = err_pos.offset; + + if (pos.offset == RD_KAFKA_OFFSET_INVALID) { + /* Error, auto.offset.reset tells us to error out. */ + if (broker_id != RD_KAFKA_NODEID_UA) + rd_kafka_consumer_err( + rktp->rktp_fetchq, broker_id, + RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET, 0, NULL, rktp, + err_pos.offset, "%s: %s (broker %" PRId32 ")", + reason, rd_kafka_err2str(err), broker_id); + else + rd_kafka_consumer_err( + rktp->rktp_fetchq, broker_id, + RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET, 0, NULL, rktp, + err_pos.offset, "%s: %s", reason, + rd_kafka_err2str(err)); + + rd_kafka_toppar_set_fetch_state(rktp, + RD_KAFKA_TOPPAR_FETCH_NONE); + + } else if (pos.offset == RD_KAFKA_OFFSET_BEGINNING && + rktp->rktp_lo_offset >= 0) { + /* Use cached log start from last Fetch if available. + * Note: The cached end offset (rktp_ls_offset) can't be + * used here since the End offset is a constantly moving + * target as new messages are produced. */ + extra = "cached BEGINNING offset "; + pos.offset = rktp->rktp_lo_offset; + pos.leader_epoch = -1; + rd_kafka_toppar_next_offset_handle(rktp, pos); + + } else { + /* Else query cluster for offset */ + rktp->rktp_query_pos = pos; + rd_kafka_toppar_set_fetch_state( + rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY); + } + + /* Offset resets due to error are logged since they might have quite + * critical impact. For non-errors, or for auto.offset.reset=error, + * the reason is simply debug-logged. */ + if (!err || err == RD_KAFKA_RESP_ERR__NO_OFFSET || + pos.offset == RD_KAFKA_OFFSET_INVALID) + rd_kafka_dbg( + rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "%s [%" PRId32 "]: offset reset (at %s, broker %" PRId32 + ") " + "to %s%s: %s: %s", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_fetch_pos2str(err_pos), broker_id, extra, + rd_kafka_fetch_pos2str(pos), reason, rd_kafka_err2str(err)); + else + rd_kafka_log( + rktp->rktp_rkt->rkt_rk, LOG_WARNING, "OFFSET", + "%s [%" PRId32 "]: offset reset (at %s, broker %" PRId32 + ") to %s%s: %s: %s", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_fetch_pos2str(err_pos), broker_id, extra, + rd_kafka_fetch_pos2str(pos), reason, rd_kafka_err2str(err)); + + /* Note: If rktp is not delegated to the leader, then low and high + offsets will necessarily be cached from the last FETCH request, + and so this offset query will never occur in that case for + BEGINNING / END logical offsets. */ + if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY) + rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_pos, + err ? 100 : 0); +} + + + +/** + * @brief Offset validation retry timer + */ +static void rd_kafka_offset_validate_tmr_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_toppar_t *rktp = arg; + + rd_kafka_toppar_lock(rktp); + rd_kafka_offset_validate(rktp, "retrying offset validation"); + rd_kafka_toppar_unlock(rktp); +} + + + +/** + * @brief OffsetForLeaderEpochResponse handler that + * pushes the matched toppar's to the next state. + * + * @locality rdkafka main thread + */ +static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_topic_partition_list_t *parts = NULL; + rd_kafka_toppar_t *rktp = opaque; + rd_kafka_topic_partition_t *rktpar; + int64_t end_offset; + int32_t end_offset_leader_epoch; + + if (err == RD_KAFKA_RESP_ERR__DESTROY) { + rd_kafka_toppar_destroy(rktp); /* Drop refcnt */ + return; + } + + err = rd_kafka_handle_OffsetForLeaderEpoch(rk, rkb, err, rkbuf, request, + &parts); + + rd_kafka_toppar_lock(rktp); + + if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + err = RD_KAFKA_RESP_ERR__OUTDATED; + + if (unlikely(!err && parts->cnt == 0)) + err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; + + if (!err) { + err = (&parts->elems[0])->err; + } + + if (err) { + int actions; + + rd_rkb_dbg(rkb, FETCH, "OFFSETVALID", + "%.*s [%" PRId32 + "]: OffsetForLeaderEpoch requested failed: %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, rd_kafka_err2str(err)); + + if (err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE) { + rd_rkb_dbg(rkb, FETCH, "VALIDATE", + "%.*s [%" PRId32 + "]: offset and epoch validation not " + "supported by broker: validation skipped", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition); + /* Reset the epoch to -1 since it can't be used with + * older brokers. */ + rktp->rktp_next_fetch_start.leader_epoch = -1; + rd_kafka_toppar_set_fetch_state( + rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE); + goto done; + + } else if (err == RD_KAFKA_RESP_ERR__OUTDATED) { + /* Partition state has changed, this response + * is outdated. */ + goto done; + } + + actions = rd_kafka_err_action( + rkb, err, request, RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH, + RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH, + RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE, + RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, + RD_KAFKA_ERR_ACTION_END); + + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) + /* Metadata refresh is ongoing, so force it */ + rd_kafka_topic_leader_query0(rk, rktp->rktp_rkt, 1, + rd_true /* force */); + + if (actions & RD_KAFKA_ERR_ACTION_RETRY) { + /* No need for refcnt on rktp for timer opaque + * since the timer resides on the rktp and will be + * stopped on toppar remove. */ + rd_kafka_timer_start_oneshot( + &rk->rk_timers, &rktp->rktp_validate_tmr, rd_false, + 500 * 1000 /* 500ms */, + rd_kafka_offset_validate_tmr_cb, rktp); + goto done; + } + + if (!(actions & RD_KAFKA_ERR_ACTION_REFRESH)) { + /* Permanent error */ + rd_kafka_offset_reset( + rktp, rd_kafka_broker_id(rkb), + RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, + rktp->rktp_leader_epoch), + RD_KAFKA_RESP_ERR__LOG_TRUNCATION, + "Unable to validate offset and epoch: %s", + rd_kafka_err2str(err)); + } + goto done; + } + + + rktpar = &parts->elems[0]; + end_offset = rktpar->offset; + end_offset_leader_epoch = + rd_kafka_topic_partition_get_leader_epoch(rktpar); + + if (end_offset < 0 || end_offset_leader_epoch < 0) { + rd_kafka_offset_reset( + rktp, rd_kafka_broker_id(rkb), rktp->rktp_next_fetch_start, + RD_KAFKA_RESP_ERR__LOG_TRUNCATION, + "No epoch found less or equal to " + "%s: broker end offset is %" PRId64 + " (offset leader epoch %" PRId32 + ")." + " Reset using configured policy.", + rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start), + end_offset, end_offset_leader_epoch); + + } else if (end_offset < rktp->rktp_next_fetch_start.offset) { + + if (rktp->rktp_rkt->rkt_conf.auto_offset_reset == + RD_KAFKA_OFFSET_INVALID /* auto.offset.reset=error */) { + rd_kafka_offset_reset( + rktp, rd_kafka_broker_id(rkb), + RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, + rktp->rktp_leader_epoch), + RD_KAFKA_RESP_ERR__LOG_TRUNCATION, + "Partition log truncation detected at %s: " + "broker end offset is %" PRId64 + " (offset leader epoch %" PRId32 + "). " + "Reset to INVALID.", + rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start), + end_offset, end_offset_leader_epoch); + + } else { + rd_kafka_toppar_unlock(rktp); + + /* Seek to the updated end offset */ + rd_kafka_fetch_pos_t fetch_pos = + rd_kafka_topic_partition_get_fetch_pos(rktpar); + fetch_pos.validated = rd_true; + + rd_kafka_toppar_op_seek(rktp, fetch_pos, + RD_KAFKA_NO_REPLYQ); + + rd_kafka_topic_partition_list_destroy(parts); + rd_kafka_toppar_destroy(rktp); + + return; + } + + } else { + rd_rkb_dbg(rkb, FETCH, "OFFSETVALID", + "%.*s [%" PRId32 + "]: offset and epoch validation " + "succeeded: broker end offset %" PRId64 + " (offset leader epoch %" PRId32 ")", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, end_offset, + end_offset_leader_epoch); + + rktp->rktp_next_fetch_start.leader_epoch = + end_offset_leader_epoch; + rd_kafka_toppar_set_fetch_state(rktp, + RD_KAFKA_TOPPAR_FETCH_ACTIVE); + } + +done: + rd_kafka_toppar_unlock(rktp); + + if (parts) + rd_kafka_topic_partition_list_destroy(parts); + rd_kafka_toppar_destroy(rktp); +} + + +static rd_kafka_op_res_t rd_kafka_offset_validate_op_cb(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_toppar_t *rktp = rko->rko_rktp; + rd_kafka_toppar_lock(rktp); + rd_kafka_offset_validate(rktp, "%s", rko->rko_u.offset_reset.reason); + rd_kafka_toppar_unlock(rktp); + return RD_KAFKA_OP_RES_HANDLED; +} + +/** + * @brief Validate partition epoch and offset (KIP-320). + * + * @param rktp the toppar + * @param err Optional error code that triggered the validation. + * @param fmt a reason string for logging. + * + * @locality any. if not main thread, work will be enqued on main thread. + * @locks_required toppar_lock() MUST be held + */ +void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) { + rd_kafka_topic_partition_list_t *parts; + rd_kafka_topic_partition_t *rktpar; + char reason[512]; + va_list ap; + + if (rktp->rktp_rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER) + return; + + va_start(ap, fmt); + rd_vsnprintf(reason, sizeof(reason), fmt, ap); + va_end(ap); + + /* Enqueue op for toppar handler thread if we're on the wrong thread. */ + if (!thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)) { + /* Reuse OP_OFFSET_RESET type */ + rd_kafka_op_t *rko = + rd_kafka_op_new(RD_KAFKA_OP_OFFSET_RESET | RD_KAFKA_OP_CB); + rko->rko_op_cb = rd_kafka_offset_validate_op_cb; + rko->rko_rktp = rd_kafka_toppar_keep(rktp); + rko->rko_u.offset_reset.reason = rd_strdup(reason); + rd_kafka_q_enq(rktp->rktp_ops, rko); + return; + } + + if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE && + rktp->rktp_fetch_state != + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE", + "%.*s [%" PRId32 + "]: skipping offset " + "validation in fetch state %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_fetch_states[rktp->rktp_fetch_state]); + return; + } + + + if (rktp->rktp_leader_id == -1 || !rktp->rktp_leader || + rktp->rktp_leader->rkb_source == RD_KAFKA_INTERNAL) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE", + "%.*s [%" PRId32 + "]: unable to perform offset " + "validation: partition leader not available", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition); + + rd_kafka_toppar_set_fetch_state(rktp, + RD_KAFKA_TOPPAR_FETCH_ACTIVE); + return; + } + + /* If the fetch start position does not have an epoch set then + * there is no point in doing validation. + * This is the case for epoch-less seek()s or epoch-less + * committed offsets. */ + if (rktp->rktp_next_fetch_start.leader_epoch == -1) { + rd_kafka_dbg( + rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE", + "%.*s [%" PRId32 + "]: skipping offset " + "validation for %s: no leader epoch set", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start)); + rd_kafka_toppar_set_fetch_state(rktp, + RD_KAFKA_TOPPAR_FETCH_ACTIVE); + return; + } + + rd_kafka_toppar_set_fetch_state( + rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT); + + /* Construct and send OffsetForLeaderEpochRequest */ + parts = rd_kafka_topic_partition_list_new(1); + rktpar = rd_kafka_topic_partition_list_add( + parts, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); + rd_kafka_topic_partition_set_leader_epoch( + rktpar, rktp->rktp_next_fetch_start.leader_epoch); + rd_kafka_topic_partition_set_current_leader_epoch( + rktpar, rktp->rktp_leader_epoch); + rd_kafka_toppar_keep(rktp); /* for request opaque */ + + rd_rkb_dbg(rktp->rktp_leader, FETCH, "VALIDATE", + "%.*s [%" PRId32 + "]: querying broker for epoch " + "validation of %s: %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start), reason); + + rd_kafka_OffsetForLeaderEpochRequest( + rktp->rktp_leader, parts, RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), + rd_kafka_toppar_handle_OffsetForLeaderEpoch, rktp); + rd_kafka_topic_partition_list_destroy(parts); +} + + +/** + * Escape any special characters in filename 'in' and write escaped + * string to 'out' (of max size out_size). + */ +static char *mk_esc_filename(const char *in, char *out, size_t out_size) { + const char *s = in; + char *o = out; + + while (*s) { + const char *esc; + size_t esclen; + + switch (*s) { + case '/': /* linux */ + esc = "%2F"; + esclen = strlen(esc); + break; + case ':': /* osx, windows */ + esc = "%3A"; + esclen = strlen(esc); + break; + case '\\': /* windows */ + esc = "%5C"; + esclen = strlen(esc); + break; + default: + esc = s; + esclen = 1; + break; + } + + if ((size_t)((o + esclen + 1) - out) >= out_size) { + /* No more space in output string, truncate. */ + break; + } + + while (esclen-- > 0) + *(o++) = *(esc++); + + s++; + } + + *o = '\0'; + return out; +} + + +static void rd_kafka_offset_sync_tmr_cb(rd_kafka_timers_t *rkts, void *arg) { + rd_kafka_toppar_t *rktp = arg; + rd_kafka_offset_sync(rktp); +} + + +/** + * Prepare a toppar for using an offset file. + * + * Locality: rdkafka main thread + * Locks: toppar_lock(rktp) must be held + */ +static void rd_kafka_offset_file_init(rd_kafka_toppar_t *rktp) { + char spath[4096 + 1]; /* larger than escfile to avoid warning */ + const char *path = rktp->rktp_rkt->rkt_conf.offset_store_path; + int64_t offset = RD_KAFKA_OFFSET_INVALID; + + if (rd_kafka_path_is_dir(path)) { + char tmpfile[1024]; + char escfile[4096]; + + /* Include group.id in filename if configured. */ + if (!RD_KAFKAP_STR_IS_NULL(rktp->rktp_rkt->rkt_rk->rk_group_id)) + rd_snprintf(tmpfile, sizeof(tmpfile), + "%s-%" PRId32 "-%.*s.offset", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + RD_KAFKAP_STR_PR( + rktp->rktp_rkt->rkt_rk->rk_group_id)); + else + rd_snprintf(tmpfile, sizeof(tmpfile), + "%s-%" PRId32 ".offset", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition); + + /* Escape filename to make it safe. */ + mk_esc_filename(tmpfile, escfile, sizeof(escfile)); + + rd_snprintf(spath, sizeof(spath), "%s%s%s", path, + path[strlen(path) - 1] == '/' ? "" : "/", escfile); + + path = spath; + } + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "%s [%" PRId32 "]: using offset file %s", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + path); + rktp->rktp_offset_path = rd_strdup(path); + + + /* Set up the offset file sync interval. */ + if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0) + rd_kafka_timer_start( + &rktp->rktp_rkt->rkt_rk->rk_timers, + &rktp->rktp_offset_sync_tmr, + rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms * + 1000ll, + rd_kafka_offset_sync_tmr_cb, rktp); + + if (rd_kafka_offset_file_open(rktp) != -1) { + /* Read offset from offset file. */ + offset = rd_kafka_offset_file_read(rktp); + } + + if (offset != RD_KAFKA_OFFSET_INVALID) { + /* Start fetching from offset */ + rktp->rktp_stored_pos.offset = offset; + rktp->rktp_committed_pos.offset = offset; + rd_kafka_toppar_next_offset_handle(rktp, rktp->rktp_stored_pos); + + } else { + /* Offset was not usable: perform offset reset logic */ + rktp->rktp_committed_pos.offset = RD_KAFKA_OFFSET_INVALID; + rd_kafka_offset_reset( + rktp, RD_KAFKA_NODEID_UA, + RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1), + RD_KAFKA_RESP_ERR__FS, "non-readable offset file"); + } +} + + + +/** + * Terminate broker offset store + */ +static rd_kafka_resp_err_t +rd_kafka_offset_broker_term(rd_kafka_toppar_t *rktp) { + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * Prepare a toppar for using broker offset commit (broker 0.8.2 or + * later). When using KafkaConsumer (high-level consumer) this + * functionality is disabled in favour of the cgrp commits for the + * entire set of subscriptions. + */ +static void rd_kafka_offset_broker_init(rd_kafka_toppar_t *rktp) { + if (!rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk)) + return; + rd_kafka_offset_reset(rktp, RD_KAFKA_NODEID_UA, + RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_STORED, -1), + RD_KAFKA_RESP_ERR_NO_ERROR, + "query broker for offsets"); +} + + +/** + * Terminates toppar's offset store, this is the finalizing step after + * offset_store_stop(). + * + * Locks: rd_kafka_toppar_lock() MUST be held. + */ +void rd_kafka_offset_store_term(rd_kafka_toppar_t *rktp, + rd_kafka_resp_err_t err) { + rd_kafka_resp_err_t err2; + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "STORETERM", + "%s [%" PRId32 "]: offset store terminating", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); + + rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING; + + rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers, + &rktp->rktp_offset_commit_tmr, 1 /*lock*/); + + switch (rktp->rktp_rkt->rkt_conf.offset_store_method) { + case RD_KAFKA_OFFSET_METHOD_FILE: + err2 = rd_kafka_offset_file_term(rktp); + break; + case RD_KAFKA_OFFSET_METHOD_BROKER: + err2 = rd_kafka_offset_broker_term(rktp); + break; + case RD_KAFKA_OFFSET_METHOD_NONE: + err2 = RD_KAFKA_RESP_ERR_NO_ERROR; + break; + } + + /* Prioritize the input error (probably from commit), fall + * back on termination error. */ + if (!err) + err = err2; + + rd_kafka_toppar_fetch_stopped(rktp, err); +} + + +/** + * Stop toppar's offset store, committing the final offsets, etc. + * + * Returns RD_KAFKA_RESP_ERR_NO_ERROR on success, + * RD_KAFKA_RESP_ERR__IN_PROGRESS if the term triggered an + * async operation (e.g., broker offset commit), or + * any other error in case of immediate failure. + * + * The offset layer will call rd_kafka_offset_store_term() when + * the offset management has been fully stopped for this partition. + * + * Locks: rd_kafka_toppar_lock() MUST be held. + */ +rd_kafka_resp_err_t rd_kafka_offset_store_stop(rd_kafka_toppar_t *rktp) { + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + + if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE)) + goto done; + + rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING; + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "%s [%" PRId32 + "]: stopping offset store " + "(stored %s, committed %s, EOF offset %" PRId64 ")", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_fetch_pos2str(rktp->rktp_stored_pos), + rd_kafka_fetch_pos2str(rktp->rktp_committed_pos), + rktp->rktp_offsets_fin.eof_offset); + + /* Store end offset for empty partitions */ + if (rktp->rktp_rkt->rkt_rk->rk_conf.enable_auto_offset_store && + rktp->rktp_stored_pos.offset == RD_KAFKA_OFFSET_INVALID && + rktp->rktp_offsets_fin.eof_offset > 0) + rd_kafka_offset_store0( + rktp, + RD_KAFKA_FETCH_POS(rktp->rktp_offsets_fin.eof_offset, + rktp->rktp_leader_epoch), + rd_true /* force */, RD_DONT_LOCK); + + /* Commit offset to backing store. + * This might be an async operation. */ + if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) && + rd_kafka_fetch_pos_cmp(&rktp->rktp_stored_pos, + &rktp->rktp_committed_pos) > 0) + err = rd_kafka_offset_commit(rktp, "offset store stop"); + + /* If stop is in progress (async commit), return now. */ + if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) + return err; + +done: + /* Stop is done */ + rd_kafka_offset_store_term(rktp, err); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +static void rd_kafka_offset_auto_commit_tmr_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_toppar_t *rktp = arg; + rd_kafka_offset_commit(rktp, "auto commit timer"); +} + +void rd_kafka_offset_query_tmr_cb(rd_kafka_timers_t *rkts, void *arg) { + rd_kafka_toppar_t *rktp = arg; + rd_kafka_toppar_lock(rktp); + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "Topic %s [%" PRId32 + "]: timed offset query for %s in state %s", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_fetch_pos2str(rktp->rktp_query_pos), + rd_kafka_fetch_states[rktp->rktp_fetch_state]); + rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_pos, 0); + rd_kafka_toppar_unlock(rktp); +} + + +/** + * Initialize toppar's offset store. + * + * Locality: toppar handler thread + */ +void rd_kafka_offset_store_init(rd_kafka_toppar_t *rktp) { + static const char *store_names[] = {"none", "file", "broker"}; + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "%s [%" PRId32 "]: using offset store method: %s", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + store_names[rktp->rktp_rkt->rkt_conf.offset_store_method]); + + /* The committed offset is unknown at this point. */ + rktp->rktp_committed_pos.offset = RD_KAFKA_OFFSET_INVALID; + + /* Set up the commit interval (for simple consumer). */ + if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) && + rktp->rktp_rkt->rkt_conf.auto_commit_interval_ms > 0) + rd_kafka_timer_start( + &rktp->rktp_rkt->rkt_rk->rk_timers, + &rktp->rktp_offset_commit_tmr, + rktp->rktp_rkt->rkt_conf.auto_commit_interval_ms * 1000ll, + rd_kafka_offset_auto_commit_tmr_cb, rktp); + + switch (rktp->rktp_rkt->rkt_conf.offset_store_method) { + case RD_KAFKA_OFFSET_METHOD_FILE: + rd_kafka_offset_file_init(rktp); + break; + case RD_KAFKA_OFFSET_METHOD_BROKER: + rd_kafka_offset_broker_init(rktp); + break; + case RD_KAFKA_OFFSET_METHOD_NONE: + break; + default: + /* NOTREACHED */ + return; + } + + rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE; +} + + +/** + * Update toppar app_pos and store_offset (if enabled) to the provided + * offset and epoch. + */ +void rd_kafka_update_app_pos(rd_kafka_t *rk, + rd_kafka_toppar_t *rktp, + rd_kafka_fetch_pos_t pos, + rd_dolock_t do_lock) { + + if (do_lock) + rd_kafka_toppar_lock(rktp); + + rktp->rktp_app_pos = pos; + if (rk->rk_conf.enable_auto_offset_store) + rd_kafka_offset_store0(rktp, pos, + /* force: ignore assignment state */ + rd_true, RD_DONT_LOCK); + + if (do_lock) + rd_kafka_toppar_unlock(rktp); +} |