summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/file/file_prefetch_buffer.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/file/file_prefetch_buffer.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/file/file_prefetch_buffer.cc')
-rw-r--r--src/rocksdb/file/file_prefetch_buffer.cc918
1 files changed, 918 insertions, 0 deletions
diff --git a/src/rocksdb/file/file_prefetch_buffer.cc b/src/rocksdb/file/file_prefetch_buffer.cc
new file mode 100644
index 000000000..4ac0d0504
--- /dev/null
+++ b/src/rocksdb/file/file_prefetch_buffer.cc
@@ -0,0 +1,918 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "file/file_prefetch_buffer.h"
+
+#include <algorithm>
+#include <cassert>
+
+#include "file/random_access_file_reader.h"
+#include "monitoring/histogram.h"
+#include "monitoring/iostats_context_imp.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment,
+ uint64_t offset,
+ size_t roundup_len,
+ uint32_t index, bool refit_tail,
+ uint64_t& chunk_len) {
+ uint64_t chunk_offset_in_buffer = 0;
+ bool copy_data_to_new_buffer = false;
+ // Check if requested bytes are in the existing buffer_.
+ // If only a few bytes exist -- reuse them & read only what is really needed.
+ // This is typically the case of incremental reading of data.
+ // If no bytes exist in buffer -- full pread.
+ if (DoesBufferContainData(index) && IsOffsetInBuffer(offset, index)) {
+ // Only a few requested bytes are in the buffer. memmove those chunk of
+ // bytes to the beginning, and memcpy them back into the new buffer if a
+ // new buffer is created.
+ chunk_offset_in_buffer = Rounddown(
+ static_cast<size_t>(offset - bufs_[index].offset_), alignment);
+ chunk_len = static_cast<uint64_t>(bufs_[index].buffer_.CurrentSize()) -
+ chunk_offset_in_buffer;
+ assert(chunk_offset_in_buffer % alignment == 0);
+ assert(chunk_len % alignment == 0);
+ assert(chunk_offset_in_buffer + chunk_len <=
+ bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize());
+ if (chunk_len > 0) {
+ copy_data_to_new_buffer = true;
+ } else {
+ // this reset is not necessary, but just to be safe.
+ chunk_offset_in_buffer = 0;
+ }
+ }
+
+ // Create a new buffer only if current capacity is not sufficient, and memcopy
+ // bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
+ if (bufs_[index].buffer_.Capacity() < roundup_len) {
+ bufs_[index].buffer_.Alignment(alignment);
+ bufs_[index].buffer_.AllocateNewBuffer(
+ static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
+ chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
+ } else if (chunk_len > 0 && refit_tail) {
+ // New buffer not needed. But memmove bytes from tail to the beginning since
+ // chunk_len is greater than 0.
+ bufs_[index].buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
+ static_cast<size_t>(chunk_len));
+ } else if (chunk_len > 0) {
+ // For async prefetching, it doesn't call RefitTail with chunk_len > 0.
+ // Allocate new buffer if needed because aligned buffer calculate remaining
+ // buffer as capacity_ - cursize_ which might not be the case in this as we
+ // are not refitting.
+ // TODO akanksha: Update the condition when asynchronous prefetching is
+ // stable.
+ bufs_[index].buffer_.Alignment(alignment);
+ bufs_[index].buffer_.AllocateNewBuffer(
+ static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
+ chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
+ }
+}
+
+Status FilePrefetchBuffer::Read(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ Env::IOPriority rate_limiter_priority,
+ uint64_t read_len, uint64_t chunk_len,
+ uint64_t rounddown_start, uint32_t index) {
+ Slice result;
+ Status s = reader->Read(opts, rounddown_start + chunk_len, read_len, &result,
+ bufs_[index].buffer_.BufferStart() + chunk_len,
+ /*aligned_buf=*/nullptr, rate_limiter_priority);
+#ifndef NDEBUG
+ if (result.size() < read_len) {
+ // Fake an IO error to force db_stress fault injection to ignore
+ // truncated read errors
+ IGNORE_STATUS_IF_ERROR(Status::IOError());
+ }
+#endif
+ if (!s.ok()) {
+ return s;
+ }
+
+ // Update the buffer offset and size.
+ bufs_[index].offset_ = rounddown_start;
+ bufs_[index].buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
+ return s;
+}
+
+Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ uint64_t read_len,
+ uint64_t rounddown_start, uint32_t index) {
+ // callback for async read request.
+ auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this,
+ std::placeholders::_1, std::placeholders::_2);
+ FSReadRequest req;
+ Slice result;
+ req.len = read_len;
+ req.offset = rounddown_start;
+ req.result = result;
+ req.scratch = bufs_[index].buffer_.BufferStart();
+ bufs_[index].async_req_len_ = req.len;
+
+ Status s =
+ reader->ReadAsync(req, opts, fp, &(bufs_[index].pos_),
+ &(bufs_[index].io_handle_), &(bufs_[index].del_fn_),
+ /*aligned_buf=*/nullptr);
+ req.status.PermitUncheckedError();
+ if (s.ok()) {
+ bufs_[index].async_read_in_progress_ = true;
+ }
+ return s;
+}
+
+Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ uint64_t offset, size_t n,
+ Env::IOPriority rate_limiter_priority) {
+ if (!enable_ || reader == nullptr) {
+ return Status::OK();
+ }
+ TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
+
+ if (offset + n <= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
+ // All requested bytes are already in the curr_ buffer. So no need to Read
+ // again.
+ return Status::OK();
+ }
+
+ size_t alignment = reader->file()->GetRequiredBufferAlignment();
+ size_t offset_ = static_cast<size_t>(offset);
+ uint64_t rounddown_offset = Rounddown(offset_, alignment);
+ uint64_t roundup_end = Roundup(offset_ + n, alignment);
+ uint64_t roundup_len = roundup_end - rounddown_offset;
+ assert(roundup_len >= alignment);
+ assert(roundup_len % alignment == 0);
+
+ uint64_t chunk_len = 0;
+ CalculateOffsetAndLen(alignment, offset, roundup_len, curr_,
+ true /*refit_tail*/, chunk_len);
+ size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
+
+ Status s = Read(opts, reader, rate_limiter_priority, read_len, chunk_len,
+ rounddown_offset, curr_);
+ return s;
+}
+
+// Copy data from src to third buffer.
+void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset,
+ size_t& length) {
+ if (length == 0) {
+ return;
+ }
+ uint64_t copy_offset = (offset - bufs_[src].offset_);
+ size_t copy_len = 0;
+ if (IsDataBlockInBuffer(offset, length, src)) {
+ // All the bytes are in src.
+ copy_len = length;
+ } else {
+ copy_len = bufs_[src].buffer_.CurrentSize() - copy_offset;
+ }
+
+ memcpy(bufs_[2].buffer_.BufferStart() + bufs_[2].buffer_.CurrentSize(),
+ bufs_[src].buffer_.BufferStart() + copy_offset, copy_len);
+
+ bufs_[2].buffer_.Size(bufs_[2].buffer_.CurrentSize() + copy_len);
+
+ // Update offset and length.
+ offset += copy_len;
+ length -= copy_len;
+
+ // length > 0 indicates it has consumed all data from the src buffer and it
+ // still needs to read more other buffer.
+ if (length > 0) {
+ bufs_[src].buffer_.Clear();
+ }
+}
+
+// Clear the buffers if it contains outdated data. Outdated data can be
+// because previous sequential reads were read from the cache instead of these
+// buffer. In that case outdated IOs should be aborted.
+void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) {
+ uint32_t second = curr_ ^ 1;
+ std::vector<void*> handles;
+ autovector<uint32_t> buf_pos;
+ if (IsBufferOutdatedWithAsyncProgress(offset, curr_)) {
+ handles.emplace_back(bufs_[curr_].io_handle_);
+ buf_pos.emplace_back(curr_);
+ }
+ if (IsBufferOutdatedWithAsyncProgress(offset, second)) {
+ handles.emplace_back(bufs_[second].io_handle_);
+ buf_pos.emplace_back(second);
+ }
+ if (!handles.empty()) {
+ StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
+ Status s = fs_->AbortIO(handles);
+ assert(s.ok());
+ }
+
+ for (auto& pos : buf_pos) {
+ // Release io_handle.
+ DestroyAndClearIOHandle(pos);
+ }
+
+ if (bufs_[second].io_handle_ == nullptr) {
+ bufs_[second].async_read_in_progress_ = false;
+ }
+
+ if (bufs_[curr_].io_handle_ == nullptr) {
+ bufs_[curr_].async_read_in_progress_ = false;
+ }
+}
+
+void FilePrefetchBuffer::AbortAllIOs() {
+ uint32_t second = curr_ ^ 1;
+ std::vector<void*> handles;
+ for (uint32_t i = 0; i < 2; i++) {
+ if (bufs_[i].async_read_in_progress_ && bufs_[i].io_handle_ != nullptr) {
+ handles.emplace_back(bufs_[i].io_handle_);
+ }
+ }
+ if (!handles.empty()) {
+ StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
+ Status s = fs_->AbortIO(handles);
+ assert(s.ok());
+ }
+
+ // Release io_handles.
+ if (bufs_[curr_].io_handle_ != nullptr && bufs_[curr_].del_fn_ != nullptr) {
+ DestroyAndClearIOHandle(curr_);
+ } else {
+ bufs_[curr_].async_read_in_progress_ = false;
+ }
+
+ if (bufs_[second].io_handle_ != nullptr && bufs_[second].del_fn_ != nullptr) {
+ DestroyAndClearIOHandle(second);
+ } else {
+ bufs_[second].async_read_in_progress_ = false;
+ }
+}
+
+// Clear the buffers if it contains outdated data. Outdated data can be
+// because previous sequential reads were read from the cache instead of these
+// buffer.
+void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset) {
+ uint32_t second = curr_ ^ 1;
+ if (IsBufferOutdated(offset, curr_)) {
+ bufs_[curr_].buffer_.Clear();
+ }
+ if (IsBufferOutdated(offset, second)) {
+ bufs_[second].buffer_.Clear();
+ }
+
+ {
+ // In case buffers do not align, reset second buffer. This can happen in
+ // case readahead_size is set.
+ if (!bufs_[second].async_read_in_progress_ &&
+ !bufs_[curr_].async_read_in_progress_) {
+ if (DoesBufferContainData(curr_)) {
+ if (bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() !=
+ bufs_[second].offset_) {
+ bufs_[second].buffer_.Clear();
+ }
+ } else {
+ if (!IsOffsetInBuffer(offset, second)) {
+ bufs_[second].buffer_.Clear();
+ }
+ }
+ }
+ }
+
+ // If data starts from second buffer, make it curr_. Second buffer can be
+ // either partial filled, full or async read is in progress.
+ if (bufs_[second].async_read_in_progress_) {
+ if (IsOffsetInBufferWithAsyncProgress(offset, second)) {
+ curr_ = curr_ ^ 1;
+ }
+ } else {
+ if (DoesBufferContainData(second) && IsOffsetInBuffer(offset, second)) {
+ assert(bufs_[curr_].async_read_in_progress_ ||
+ bufs_[curr_].buffer_.CurrentSize() == 0);
+ curr_ = curr_ ^ 1;
+ }
+ }
+}
+
+void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) {
+ if (bufs_[curr_].async_read_in_progress_ && fs_ != nullptr) {
+ if (bufs_[curr_].io_handle_ != nullptr) {
+ // Wait for prefetch data to complete.
+ // No mutex is needed as async_read_in_progress behaves as mutex and is
+ // updated by main thread only.
+ std::vector<void*> handles;
+ handles.emplace_back(bufs_[curr_].io_handle_);
+ StopWatch sw(clock_, stats_, POLL_WAIT_MICROS);
+ fs_->Poll(handles, 1).PermitUncheckedError();
+ }
+
+ // Reset and Release io_handle after the Poll API as request has been
+ // completed.
+ DestroyAndClearIOHandle(curr_);
+ }
+ UpdateBuffersIfNeeded(offset);
+}
+
+Status FilePrefetchBuffer::HandleOverlappingData(
+ const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
+ size_t length, size_t readahead_size,
+ Env::IOPriority /*rate_limiter_priority*/, bool& copy_to_third_buffer,
+ uint64_t& tmp_offset, size_t& tmp_length) {
+ Status s;
+ size_t alignment = reader->file()->GetRequiredBufferAlignment();
+ uint32_t second;
+
+ // Check if the first buffer has the required offset and the async read is
+ // still in progress. This should only happen if a prefetch was initiated
+ // by Seek, but the next access is at another offset.
+ if (bufs_[curr_].async_read_in_progress_ &&
+ IsOffsetInBufferWithAsyncProgress(offset, curr_)) {
+ PollAndUpdateBuffersIfNeeded(offset);
+ }
+ second = curr_ ^ 1;
+
+ // If data is overlapping over two buffers, copy the data from curr_ and
+ // call ReadAsync on curr_.
+ if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) &&
+ IsOffsetInBuffer(offset, curr_) &&
+ (/*Data extends over curr_ buffer and second buffer either has data or in
+ process of population=*/
+ (offset + length > bufs_[second].offset_) &&
+ (bufs_[second].async_read_in_progress_ ||
+ DoesBufferContainData(second)))) {
+ // Allocate new buffer to third buffer;
+ bufs_[2].buffer_.Clear();
+ bufs_[2].buffer_.Alignment(alignment);
+ bufs_[2].buffer_.AllocateNewBuffer(length);
+ bufs_[2].offset_ = offset;
+ copy_to_third_buffer = true;
+
+ CopyDataToBuffer(curr_, tmp_offset, tmp_length);
+
+ // Call async prefetching on curr_ since data has been consumed in curr_
+ // only if data lies within second buffer.
+ size_t second_size = bufs_[second].async_read_in_progress_
+ ? bufs_[second].async_req_len_
+ : bufs_[second].buffer_.CurrentSize();
+ if (tmp_offset + tmp_length <= bufs_[second].offset_ + second_size) {
+ uint64_t rounddown_start = bufs_[second].offset_ + second_size;
+ uint64_t roundup_end =
+ Roundup(rounddown_start + readahead_size, alignment);
+ uint64_t roundup_len = roundup_end - rounddown_start;
+ uint64_t chunk_len = 0;
+ CalculateOffsetAndLen(alignment, rounddown_start, roundup_len, curr_,
+ false, chunk_len);
+ assert(chunk_len == 0);
+ assert(roundup_len >= chunk_len);
+
+ bufs_[curr_].offset_ = rounddown_start;
+ uint64_t read_len = static_cast<size_t>(roundup_len - chunk_len);
+ s = ReadAsync(opts, reader, read_len, rounddown_start, curr_);
+ if (!s.ok()) {
+ DestroyAndClearIOHandle(curr_);
+ bufs_[curr_].buffer_.Clear();
+ return s;
+ }
+ }
+ curr_ = curr_ ^ 1;
+ }
+ return s;
+}
+// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is
+// called. When buffers are switched, we clear the curr_ buffer as we assume the
+// data has been consumed because of sequential reads.
+// Data in buffers will always be sequential with curr_ following second and
+// not vice versa.
+//
+// Scenarios for prefetching asynchronously:
+// Case1: If both buffers are empty, prefetch n + readahead_size_/2 bytes
+// synchronously in curr_ and prefetch readahead_size_/2 async in second
+// buffer.
+// Case2: If second buffer has partial or full data, make it current and
+// prefetch readahead_size_/2 async in second buffer. In case of
+// partial data, prefetch remaining bytes from size n synchronously to
+// fulfill the requested bytes request.
+// Case3: If curr_ has partial data, prefetch remaining bytes from size n
+// synchronously in curr_ to fulfill the requested bytes request and
+// prefetch readahead_size_/2 bytes async in second buffer.
+// Case4: (Special case) If data is in both buffers, copy requested data from
+// curr_, send async request on curr_, wait for poll to fill second
+// buffer (if any), and copy remaining data from second buffer to third
+// buffer.
+Status FilePrefetchBuffer::PrefetchAsyncInternal(
+ const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
+ size_t length, size_t readahead_size, Env::IOPriority rate_limiter_priority,
+ bool& copy_to_third_buffer) {
+ if (!enable_) {
+ return Status::OK();
+ }
+
+ TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsyncInternal:Start");
+
+ size_t alignment = reader->file()->GetRequiredBufferAlignment();
+ Status s;
+ uint64_t tmp_offset = offset;
+ size_t tmp_length = length;
+
+ // 1. Abort IO and swap buffers if needed to point curr_ to first buffer with
+ // data.
+ if (!explicit_prefetch_submitted_) {
+ AbortIOIfNeeded(offset);
+ }
+ UpdateBuffersIfNeeded(offset);
+
+ // 2. Handle overlapping data over two buffers. If data is overlapping then
+ // during this call:
+ // - data from curr_ is copied into third buffer,
+ // - curr_ is send for async prefetching of further data if second buffer
+ // contains remaining requested data or in progress for async prefetch,
+ // - switch buffers and curr_ now points to second buffer to copy remaining
+ // data.
+ s = HandleOverlappingData(opts, reader, offset, length, readahead_size,
+ rate_limiter_priority, copy_to_third_buffer,
+ tmp_offset, tmp_length);
+ if (!s.ok()) {
+ return s;
+ }
+
+ // 3. Call Poll only if data is needed for the second buffer.
+ // - Return if whole data is in curr_ and second buffer is in progress or
+ // already full.
+ // - If second buffer is empty, it will go for ReadAsync for second buffer.
+ if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) &&
+ IsDataBlockInBuffer(offset, length, curr_)) {
+ // Whole data is in curr_.
+ UpdateBuffersIfNeeded(offset);
+ if (!IsSecondBuffEligibleForPrefetching()) {
+ return s;
+ }
+ } else {
+ // After poll request, curr_ might be empty because of IOError in
+ // callback while reading or may contain required data.
+ PollAndUpdateBuffersIfNeeded(offset);
+ }
+
+ if (copy_to_third_buffer) {
+ offset = tmp_offset;
+ length = tmp_length;
+ }
+
+ // 4. After polling and swapping buffers, if all the requested bytes are in
+ // curr_, it will only go for async prefetching.
+ // copy_to_third_buffer is a special case so it will be handled separately.
+ if (!copy_to_third_buffer && DoesBufferContainData(curr_) &&
+ IsDataBlockInBuffer(offset, length, curr_)) {
+ offset += length;
+ length = 0;
+
+ // Since async request was submitted directly by calling PrefetchAsync in
+ // last call, we don't need to prefetch further as this call is to poll
+ // the data submitted in previous call.
+ if (explicit_prefetch_submitted_) {
+ return s;
+ }
+ if (!IsSecondBuffEligibleForPrefetching()) {
+ return s;
+ }
+ }
+
+ uint32_t second = curr_ ^ 1;
+ assert(!bufs_[curr_].async_read_in_progress_);
+
+ // In case because of some IOError curr_ got empty, abort IO for second as
+ // well. Otherwise data might not align if more data needs to be read in curr_
+ // which might overlap with second buffer.
+ if (!DoesBufferContainData(curr_) && bufs_[second].async_read_in_progress_) {
+ if (bufs_[second].io_handle_ != nullptr) {
+ std::vector<void*> handles;
+ handles.emplace_back(bufs_[second].io_handle_);
+ {
+ StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
+ Status status = fs_->AbortIO(handles);
+ assert(status.ok());
+ }
+ }
+ DestroyAndClearIOHandle(second);
+ bufs_[second].buffer_.Clear();
+ }
+
+ // 5. Data is overlapping i.e. some of the data has been copied to third
+ // buffer and remaining will be updated below.
+ if (copy_to_third_buffer && DoesBufferContainData(curr_)) {
+ CopyDataToBuffer(curr_, offset, length);
+
+ // Length == 0: All the requested data has been copied to third buffer and
+ // it has already gone for async prefetching. It can return without doing
+ // anything further.
+ // Length > 0: More data needs to be consumed so it will continue async
+ // and sync prefetching and copy the remaining data to third buffer in the
+ // end.
+ if (length == 0) {
+ return s;
+ }
+ }
+
+ // 6. Go for ReadAsync and Read (if needed).
+ size_t prefetch_size = length + readahead_size;
+ size_t _offset = static_cast<size_t>(offset);
+
+ // offset and size alignment for curr_ buffer with synchronous prefetching
+ uint64_t rounddown_start1 = Rounddown(_offset, alignment);
+ uint64_t roundup_end1 = Roundup(_offset + prefetch_size, alignment);
+ uint64_t roundup_len1 = roundup_end1 - rounddown_start1;
+ assert(roundup_len1 >= alignment);
+ assert(roundup_len1 % alignment == 0);
+ uint64_t chunk_len1 = 0;
+ uint64_t read_len1 = 0;
+
+ assert(!bufs_[second].async_read_in_progress_ &&
+ !DoesBufferContainData(second));
+
+ // For length == 0, skip the synchronous prefetching. read_len1 will be 0.
+ if (length > 0) {
+ CalculateOffsetAndLen(alignment, offset, roundup_len1, curr_,
+ false /*refit_tail*/, chunk_len1);
+ assert(roundup_len1 >= chunk_len1);
+ read_len1 = static_cast<size_t>(roundup_len1 - chunk_len1);
+ }
+ {
+ // offset and size alignment for second buffer for asynchronous
+ // prefetching
+ uint64_t rounddown_start2 = roundup_end1;
+ uint64_t roundup_end2 =
+ Roundup(rounddown_start2 + readahead_size, alignment);
+
+ // For length == 0, do the asynchronous prefetching in second instead of
+ // synchronous prefetching in curr_.
+ if (length == 0) {
+ rounddown_start2 =
+ bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize();
+ roundup_end2 = Roundup(rounddown_start2 + prefetch_size, alignment);
+ }
+
+ uint64_t roundup_len2 = roundup_end2 - rounddown_start2;
+ uint64_t chunk_len2 = 0;
+ CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second,
+ false /*refit_tail*/, chunk_len2);
+ assert(chunk_len2 == 0);
+ // Update the buffer offset.
+ bufs_[second].offset_ = rounddown_start2;
+ assert(roundup_len2 >= chunk_len2);
+ uint64_t read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
+ Status tmp_s = ReadAsync(opts, reader, read_len2, rounddown_start2, second);
+ if (!tmp_s.ok()) {
+ DestroyAndClearIOHandle(second);
+ bufs_[second].buffer_.Clear();
+ }
+ }
+
+ if (read_len1 > 0) {
+ s = Read(opts, reader, rate_limiter_priority, read_len1, chunk_len1,
+ rounddown_start1, curr_);
+ if (!s.ok()) {
+ if (bufs_[second].io_handle_ != nullptr) {
+ std::vector<void*> handles;
+ handles.emplace_back(bufs_[second].io_handle_);
+ {
+ StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
+ Status status = fs_->AbortIO(handles);
+ assert(status.ok());
+ }
+ }
+ DestroyAndClearIOHandle(second);
+ bufs_[second].buffer_.Clear();
+ bufs_[curr_].buffer_.Clear();
+ return s;
+ }
+ }
+ // Copy remaining requested bytes to third_buffer.
+ if (copy_to_third_buffer && length > 0) {
+ CopyDataToBuffer(curr_, offset, length);
+ }
+ return s;
+}
+
+bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ uint64_t offset, size_t n,
+ Slice* result, Status* status,
+ Env::IOPriority rate_limiter_priority,
+ bool for_compaction /* = false */) {
+ if (track_min_offset_ && offset < min_offset_read_) {
+ min_offset_read_ = static_cast<size_t>(offset);
+ }
+ if (!enable_ || (offset < bufs_[curr_].offset_)) {
+ return false;
+ }
+
+ // If the buffer contains only a few of the requested bytes:
+ // If readahead is enabled: prefetch the remaining bytes + readahead bytes
+ // and satisfy the request.
+ // If readahead is not enabled: return false.
+ TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
+ &readahead_size_);
+ if (offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
+ if (readahead_size_ > 0) {
+ Status s;
+ assert(reader != nullptr);
+ assert(max_readahead_size_ >= readahead_size_);
+ if (for_compaction) {
+ s = Prefetch(opts, reader, offset, std::max(n, readahead_size_),
+ rate_limiter_priority);
+ } else {
+ if (implicit_auto_readahead_) {
+ if (!IsEligibleForPrefetch(offset, n)) {
+ // Ignore status as Prefetch is not called.
+ s.PermitUncheckedError();
+ return false;
+ }
+ }
+ s = Prefetch(opts, reader, offset, n + readahead_size_,
+ rate_limiter_priority);
+ }
+ if (!s.ok()) {
+ if (status) {
+ *status = s;
+ }
+#ifndef NDEBUG
+ IGNORE_STATUS_IF_ERROR(s);
+#endif
+ return false;
+ }
+ readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
+ } else {
+ return false;
+ }
+ }
+ UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
+
+ uint64_t offset_in_buffer = offset - bufs_[curr_].offset_;
+ *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n);
+ return true;
+}
+
+bool FilePrefetchBuffer::TryReadFromCacheAsync(
+ const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
+ size_t n, Slice* result, Status* status,
+ Env::IOPriority rate_limiter_priority) {
+ if (track_min_offset_ && offset < min_offset_read_) {
+ min_offset_read_ = static_cast<size_t>(offset);
+ }
+
+ if (!enable_) {
+ return false;
+ }
+
+ if (explicit_prefetch_submitted_) {
+ // explicit_prefetch_submitted_ is special case where it expects request
+ // submitted in PrefetchAsync should match with this request. Otherwise
+ // buffers will be outdated.
+ // Random offset called. So abort the IOs.
+ if (prev_offset_ != offset) {
+ AbortAllIOs();
+ bufs_[curr_].buffer_.Clear();
+ bufs_[curr_ ^ 1].buffer_.Clear();
+ explicit_prefetch_submitted_ = false;
+ return false;
+ }
+ }
+
+ if (!explicit_prefetch_submitted_ && offset < bufs_[curr_].offset_) {
+ return false;
+ }
+
+ bool prefetched = false;
+ bool copy_to_third_buffer = false;
+ // If the buffer contains only a few of the requested bytes:
+ // If readahead is enabled: prefetch the remaining bytes + readahead bytes
+ // and satisfy the request.
+ // If readahead is not enabled: return false.
+ TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
+ &readahead_size_);
+
+ if (explicit_prefetch_submitted_ ||
+ (bufs_[curr_].async_read_in_progress_ ||
+ offset + n >
+ bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) {
+ if (readahead_size_ > 0) {
+ Status s;
+ assert(reader != nullptr);
+ assert(max_readahead_size_ >= readahead_size_);
+
+ if (implicit_auto_readahead_) {
+ if (!IsEligibleForPrefetch(offset, n)) {
+ // Ignore status as Prefetch is not called.
+ s.PermitUncheckedError();
+ return false;
+ }
+ }
+ // Prefetch n + readahead_size_/2 synchronously as remaining
+ // readahead_size_/2 will be prefetched asynchronously.
+ s = PrefetchAsyncInternal(opts, reader, offset, n, readahead_size_ / 2,
+ rate_limiter_priority, copy_to_third_buffer);
+ explicit_prefetch_submitted_ = false;
+ if (!s.ok()) {
+ if (status) {
+ *status = s;
+ }
+#ifndef NDEBUG
+ IGNORE_STATUS_IF_ERROR(s);
+#endif
+ return false;
+ }
+ prefetched = explicit_prefetch_submitted_ ? false : true;
+ } else {
+ return false;
+ }
+ }
+
+ UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
+
+ uint32_t index = curr_;
+ if (copy_to_third_buffer) {
+ index = 2;
+ }
+ uint64_t offset_in_buffer = offset - bufs_[index].offset_;
+ *result = Slice(bufs_[index].buffer_.BufferStart() + offset_in_buffer, n);
+ if (prefetched) {
+ readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
+ }
+ return true;
+}
+
+void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
+ void* cb_arg) {
+ uint32_t index = *(static_cast<uint32_t*>(cb_arg));
+#ifndef NDEBUG
+ if (req.result.size() < req.len) {
+ // Fake an IO error to force db_stress fault injection to ignore
+ // truncated read errors
+ IGNORE_STATUS_IF_ERROR(Status::IOError());
+ }
+ IGNORE_STATUS_IF_ERROR(req.status);
+#endif
+
+ if (req.status.ok()) {
+ if (req.offset + req.result.size() <=
+ bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) {
+ // All requested bytes are already in the buffer or no data is read
+ // because of EOF. So no need to update.
+ return;
+ }
+ if (req.offset < bufs_[index].offset_) {
+ // Next block to be read has changed (Recent read was not a sequential
+ // read). So ignore this read.
+ return;
+ }
+ size_t current_size = bufs_[index].buffer_.CurrentSize();
+ bufs_[index].buffer_.Size(current_size + req.result.size());
+ }
+}
+
+Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
+ RandomAccessFileReader* reader,
+ uint64_t offset, size_t n,
+ Slice* result) {
+ assert(reader != nullptr);
+ if (!enable_) {
+ return Status::NotSupported();
+ }
+
+ TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:Start");
+
+ num_file_reads_ = 0;
+ explicit_prefetch_submitted_ = false;
+ bool is_eligible_for_prefetching = false;
+ if (readahead_size_ > 0 &&
+ (!implicit_auto_readahead_ ||
+ num_file_reads_ + 1 >= num_file_reads_for_auto_readahead_)) {
+ is_eligible_for_prefetching = true;
+ }
+
+ // 1. Cancel any pending async read to make code simpler as buffers can be out
+ // of sync.
+ AbortAllIOs();
+
+ // 2. Clear outdated data.
+ UpdateBuffersIfNeeded(offset);
+ uint32_t second = curr_ ^ 1;
+ // Since PrefetchAsync can be called on non sequential reads. So offset can
+ // be less than curr_ buffers' offset. In that case also it clears both
+ // buffers.
+ if (DoesBufferContainData(curr_) && !IsOffsetInBuffer(offset, curr_)) {
+ bufs_[curr_].buffer_.Clear();
+ bufs_[second].buffer_.Clear();
+ }
+
+ UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false);
+
+ bool data_found = false;
+
+ // 3. If curr_ has full data.
+ if (DoesBufferContainData(curr_) && IsDataBlockInBuffer(offset, n, curr_)) {
+ uint64_t offset_in_buffer = offset - bufs_[curr_].offset_;
+ *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n);
+ data_found = true;
+ // Update num_file_reads_ as TryReadFromCacheAsync won't be called for
+ // poll and update num_file_reads_ if data is found.
+ num_file_reads_++;
+
+ // 3.1 If second also has some data or is not eligible for prefetching,
+ // return.
+ if (!is_eligible_for_prefetching || DoesBufferContainData(second)) {
+ return Status::OK();
+ }
+ } else {
+ // Partial data in curr_.
+ bufs_[curr_].buffer_.Clear();
+ }
+ bufs_[second].buffer_.Clear();
+
+ Status s;
+ size_t alignment = reader->file()->GetRequiredBufferAlignment();
+ size_t prefetch_size = is_eligible_for_prefetching ? readahead_size_ / 2 : 0;
+ size_t offset_to_read = static_cast<size_t>(offset);
+ uint64_t rounddown_start1 = 0;
+ uint64_t roundup_end1 = 0;
+ uint64_t rounddown_start2 = 0;
+ uint64_t roundup_end2 = 0;
+ uint64_t chunk_len1 = 0;
+ uint64_t chunk_len2 = 0;
+ size_t read_len1 = 0;
+ size_t read_len2 = 0;
+
+ // - If curr_ is empty.
+ // - Call async read for full data + prefetch_size on curr_.
+ // - Call async read for prefetch_size on second if eligible.
+ // - If curr_ is filled.
+ // - prefetch_size on second.
+ // Calculate length and offsets for reading.
+ if (!DoesBufferContainData(curr_)) {
+ // Prefetch full data + prefetch_size in curr_.
+ rounddown_start1 = Rounddown(offset_to_read, alignment);
+ roundup_end1 = Roundup(offset_to_read + n + prefetch_size, alignment);
+ uint64_t roundup_len1 = roundup_end1 - rounddown_start1;
+ assert(roundup_len1 >= alignment);
+ assert(roundup_len1 % alignment == 0);
+
+ CalculateOffsetAndLen(alignment, rounddown_start1, roundup_len1, curr_,
+ false, chunk_len1);
+ assert(chunk_len1 == 0);
+ assert(roundup_len1 >= chunk_len1);
+ read_len1 = static_cast<size_t>(roundup_len1 - chunk_len1);
+ bufs_[curr_].offset_ = rounddown_start1;
+ }
+
+ if (is_eligible_for_prefetching) {
+ if (DoesBufferContainData(curr_)) {
+ rounddown_start2 =
+ bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize();
+ } else {
+ rounddown_start2 = roundup_end1;
+ }
+
+ roundup_end2 = Roundup(rounddown_start2 + prefetch_size, alignment);
+ uint64_t roundup_len2 = roundup_end2 - rounddown_start2;
+
+ assert(roundup_len2 >= alignment);
+ CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second,
+ false, chunk_len2);
+ assert(chunk_len2 == 0);
+ assert(roundup_len2 >= chunk_len2);
+ read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
+ // Update the buffer offset.
+ bufs_[second].offset_ = rounddown_start2;
+ }
+
+ if (read_len1) {
+ s = ReadAsync(opts, reader, read_len1, rounddown_start1, curr_);
+ if (!s.ok()) {
+ DestroyAndClearIOHandle(curr_);
+ bufs_[curr_].buffer_.Clear();
+ return s;
+ }
+ explicit_prefetch_submitted_ = true;
+ prev_len_ = 0;
+ }
+ if (read_len2) {
+ s = ReadAsync(opts, reader, read_len2, rounddown_start2, second);
+ if (!s.ok()) {
+ DestroyAndClearIOHandle(second);
+ bufs_[second].buffer_.Clear();
+ return s;
+ }
+ readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
+ }
+ return (data_found ? Status::OK() : Status::TryAgain());
+}
+
+} // namespace ROCKSDB_NAMESPACE