From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/file/sequence_file_reader.cc | 328 +++++++++++++++++++++++++++++++ 1 file changed, 328 insertions(+) create mode 100644 src/rocksdb/file/sequence_file_reader.cc (limited to 'src/rocksdb/file/sequence_file_reader.cc') diff --git a/src/rocksdb/file/sequence_file_reader.cc b/src/rocksdb/file/sequence_file_reader.cc new file mode 100644 index 000000000..d51d5be46 --- /dev/null +++ b/src/rocksdb/file/sequence_file_reader.cc @@ -0,0 +1,328 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "file/sequence_file_reader.h" + +#include +#include + +#include "file/read_write_util.h" +#include "monitoring/histogram.h" +#include "monitoring/iostats_context_imp.h" +#include "port/port.h" +#include "test_util/sync_point.h" +#include "util/aligned_buffer.h" +#include "util/random.h" +#include "util/rate_limiter.h" + +namespace ROCKSDB_NAMESPACE { +IOStatus SequentialFileReader::Create( + const std::shared_ptr& fs, const std::string& fname, + const FileOptions& file_opts, std::unique_ptr* reader, + IODebugContext* dbg, RateLimiter* rate_limiter) { + std::unique_ptr file; + IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg); + if (io_s.ok()) { + reader->reset(new SequentialFileReader(std::move(file), fname, nullptr, {}, + rate_limiter)); + } + return io_s; +} + +IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch, + Env::IOPriority rate_limiter_priority) { + IOStatus io_s; + if (use_direct_io()) { +#ifndef ROCKSDB_LITE + // + // |-offset_advance-|---bytes returned--| + // |----------------------buf size-------------------------| + // | | | | + // aligned offset offset + n Roundup(offset + n, + // offset alignment) + // + size_t offset = offset_.fetch_add(n); + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = TruncateToPageBoundary(alignment, offset); + size_t offset_advance = offset - aligned_offset; + size_t size = Roundup(offset + n, alignment) - aligned_offset; + size_t r = 0; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(size); + + while (buf.CurrentSize() < size) { + size_t allowed; + if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + buf.Capacity() - buf.CurrentSize(), buf.Alignment(), + rate_limiter_priority, nullptr /* stats */, + RateLimiter::OpType::kRead); + } else { + assert(buf.CurrentSize() == 0); + allowed = size; + } + + Slice tmp; + uint64_t orig_offset = 0; + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + orig_offset = aligned_offset + buf.CurrentSize(); + start_ts = FileOperationInfo::StartNow(); + } + io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed, + IOOptions(), &tmp, buf.Destination(), + nullptr /* dbg */); + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, + io_s); + } + buf.Size(buf.CurrentSize() + tmp.size()); + if (!io_s.ok() || tmp.size() < allowed) { + break; + } + } + + if (io_s.ok() && offset_advance < buf.CurrentSize()) { + r = buf.Read(scratch, offset_advance, + std::min(buf.CurrentSize() - offset_advance, n)); + } + *result = Slice(scratch, r); +#endif // !ROCKSDB_LITE + } else { + // To be paranoid, modify scratch a little bit, so in case underlying + // FileSystem doesn't fill the buffer but return success and `scratch` + // returns contains a previous block, returned value will not pass + // checksum. + // It's hard to find useful byte for direct I/O case, so we skip it. + if (n > 0 && scratch != nullptr) { + scratch[0]++; + } + + size_t read = 0; + while (read < n) { + size_t allowed; + if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + n - read, 0 /* alignment */, rate_limiter_priority, + nullptr /* stats */, RateLimiter::OpType::kRead); + } else { + allowed = n; + } +#ifndef ROCKSDB_LITE + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = FileOperationInfo::StartNow(); + } +#endif + Slice tmp; + io_s = file_->Read(allowed, IOOptions(), &tmp, scratch + read, + nullptr /* dbg */); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + size_t offset = offset_.fetch_add(tmp.size()); + NotifyOnFileReadFinish(offset, tmp.size(), start_ts, finish_ts, io_s); + } +#endif + read += tmp.size(); + if (!io_s.ok() || tmp.size() < allowed) { + break; + } + } + *result = Slice(scratch, read); + } + IOSTATS_ADD(bytes_read, result->size()); + return io_s; +} + +IOStatus SequentialFileReader::Skip(uint64_t n) { +#ifndef ROCKSDB_LITE + if (use_direct_io()) { + offset_ += static_cast(n); + return IOStatus::OK(); + } +#endif // !ROCKSDB_LITE + return file_->Skip(n); +} + +namespace { +// This class wraps a SequentialFile, exposing same API, with the differenece +// of being able to prefetch up to readahead_size bytes and then serve them +// from memory, avoiding the entire round-trip if, for example, the data for the +// file is actually remote. +class ReadaheadSequentialFile : public FSSequentialFile { + public: + ReadaheadSequentialFile(std::unique_ptr&& file, + size_t readahead_size) + : file_(std::move(file)), + alignment_(file_->GetRequiredBufferAlignment()), + readahead_size_(Roundup(readahead_size, alignment_)), + buffer_(), + buffer_offset_(0), + read_offset_(0) { + buffer_.Alignment(alignment_); + buffer_.AllocateNewBuffer(readahead_size_); + } + + ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete; + + ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete; + + IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch, + IODebugContext* dbg) override { + std::unique_lock lk(lock_); + + size_t cached_len = 0; + // Check if there is a cache hit, meaning that [offset, offset + n) is + // either completely or partially in the buffer. If it's completely cached, + // including end of file case when offset + n is greater than EOF, then + // return. + if (TryReadFromCache(n, &cached_len, scratch) && + (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { + // We read exactly what we needed, or we hit end of file - return. + *result = Slice(scratch, cached_len); + return IOStatus::OK(); + } + n -= cached_len; + + IOStatus s; + // Read-ahead only make sense if we have some slack left after reading + if (n + alignment_ >= readahead_size_) { + s = file_->Read(n, opts, result, scratch + cached_len, dbg); + if (s.ok()) { + read_offset_ += result->size(); + *result = Slice(scratch, cached_len + result->size()); + } + buffer_.Clear(); + return s; + } + + s = ReadIntoBuffer(readahead_size_, opts, dbg); + if (s.ok()) { + // The data we need is now in cache, so we can safely read it + size_t remaining_len; + TryReadFromCache(n, &remaining_len, scratch + cached_len); + *result = Slice(scratch, cached_len + remaining_len); + } + return s; + } + + IOStatus Skip(uint64_t n) override { + std::unique_lock lk(lock_); + IOStatus s = IOStatus::OK(); + // First check if we need to skip already cached data + if (buffer_.CurrentSize() > 0) { + // Do we need to skip beyond cached data? + if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) { + // Yes. Skip whaterver is in memory and adjust offset accordingly + n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_; + read_offset_ = buffer_offset_ + buffer_.CurrentSize(); + } else { + // No. The entire section to be skipped is entirely i cache. + read_offset_ += n; + n = 0; + } + } + if (n > 0) { + // We still need to skip more, so call the file API for skipping + s = file_->Skip(n); + if (s.ok()) { + read_offset_ += n; + } + buffer_.Clear(); + } + return s; + } + + IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts, + Slice* result, char* scratch, + IODebugContext* dbg) override { + return file_->PositionedRead(offset, n, opts, result, scratch, dbg); + } + + IOStatus InvalidateCache(size_t offset, size_t length) override { + std::unique_lock lk(lock_); + buffer_.Clear(); + return file_->InvalidateCache(offset, length); + } + + bool use_direct_io() const override { return file_->use_direct_io(); } + + private: + // Tries to read from buffer_ n bytes. If anything was read from the cache, it + // sets cached_len to the number of bytes actually read, copies these number + // of bytes to scratch and returns true. + // If nothing was read sets cached_len to 0 and returns false. + bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) { + if (read_offset_ < buffer_offset_ || + read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) { + *cached_len = 0; + return false; + } + uint64_t offset_in_buffer = read_offset_ - buffer_offset_; + *cached_len = std::min( + buffer_.CurrentSize() - static_cast(offset_in_buffer), n); + memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); + read_offset_ += *cached_len; + return true; + } + + // Reads into buffer_ the next n bytes from file_. + // Can actually read less if EOF was reached. + // Returns the status of the read operastion on the file. + IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts, + IODebugContext* dbg) { + if (n > buffer_.Capacity()) { + n = buffer_.Capacity(); + } + assert(IsFileSectorAligned(n, alignment_)); + Slice result; + IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg); + if (s.ok()) { + buffer_offset_ = read_offset_; + buffer_.Size(result.size()); + assert(result.size() == 0 || buffer_.BufferStart() == result.data()); + } + return s; + } + + const std::unique_ptr file_; + const size_t alignment_; + const size_t readahead_size_; + + std::mutex lock_; + // The buffer storing the prefetched data + AlignedBuffer buffer_; + // The offset in file_, corresponding to data stored in buffer_ + uint64_t buffer_offset_; + // The offset up to which data was read from file_. In fact, it can be larger + // than the actual file size, since the file_->Skip(n) call doesn't return the + // actual number of bytes that were skipped, which can be less than n. + // This is not a problemm since read_offset_ is monotonically increasing and + // its only use is to figure out if next piece of data should be read from + // buffer_ or file_ directly. + uint64_t read_offset_; +}; +} // namespace + +std::unique_ptr +SequentialFileReader::NewReadaheadSequentialFile( + std::unique_ptr&& file, size_t readahead_size) { + if (file->GetRequiredBufferAlignment() >= readahead_size) { + // Short-circuit and return the original file if readahead_size is + // too small and hence doesn't make sense to be used for prefetching. + return std::move(file); + } + std::unique_ptr result( + new ReadaheadSequentialFile(std::move(file), readahead_size)); + return result; +} +} // namespace ROCKSDB_NAMESPACE -- cgit v1.2.3