diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/util/file_reader_writer.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/util/file_reader_writer.cc | 867 |
1 files changed, 867 insertions, 0 deletions
diff --git a/src/rocksdb/util/file_reader_writer.cc b/src/rocksdb/util/file_reader_writer.cc new file mode 100644 index 00000000..9a818cb0 --- /dev/null +++ b/src/rocksdb/util/file_reader_writer.cc @@ -0,0 +1,867 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/file_reader_writer.h" + +#include <algorithm> +#include <mutex> + +#include "monitoring/histogram.h" +#include "monitoring/iostats_context_imp.h" +#include "port/port.h" +#include "util/random.h" +#include "util/rate_limiter.h" +#include "util/sync_point.h" + +namespace rocksdb { + +#ifndef NDEBUG +namespace { +bool IsFileSectorAligned(const size_t off, size_t sector_size) { + return off % sector_size == 0; +} +} +#endif + +Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { + Status s; + if (use_direct_io()) { +#ifndef ROCKSDB_LITE + size_t offset = offset_.fetch_add(n); + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = TruncateToPageBoundary(alignment, offset); + size_t offset_advance = offset - aligned_offset; + size_t size = Roundup(offset + n, alignment) - aligned_offset; + size_t r = 0; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(size); + Slice tmp; + s = file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart()); + if (s.ok() && offset_advance < tmp.size()) { + buf.Size(tmp.size()); + r = buf.Read(scratch, offset_advance, + std::min(tmp.size() - offset_advance, n)); + } + *result = Slice(scratch, r); +#endif // !ROCKSDB_LITE + } else { + s = file_->Read(n, result, scratch); + } + IOSTATS_ADD(bytes_read, result->size()); + return s; +} + + +Status SequentialFileReader::Skip(uint64_t n) { +#ifndef ROCKSDB_LITE + if (use_direct_io()) { + offset_ += static_cast<size_t>(n); + return Status::OK(); + } +#endif // !ROCKSDB_LITE + return file_->Skip(n); +} + +Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + Status s; + uint64_t elapsed = 0; + { + StopWatch sw(env_, stats_, hist_type_, + (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, + true /*delay_enabled*/); + auto prev_perf_level = GetPerfLevel(); + IOSTATS_TIMER_GUARD(read_nanos); + if (use_direct_io()) { +#ifndef ROCKSDB_LITE + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = TruncateToPageBoundary(alignment, static_cast<size_t>(offset)); + size_t offset_advance = static_cast<size_t>(offset) - aligned_offset; + size_t read_size = Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(read_size); + while (buf.CurrentSize() < read_size) { + size_t allowed; + if (for_compaction_ && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + buf.Capacity() - buf.CurrentSize(), buf.Alignment(), + Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); + } else { + assert(buf.CurrentSize() == 0); + allowed = read_size; + } + Slice tmp; + + FileOperationInfo::TimePoint start_ts; + uint64_t orig_offset = 0; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + orig_offset = aligned_offset + buf.CurrentSize(); + } + { + IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); + s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp, + buf.Destination()); + } + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, + s); + } + + buf.Size(buf.CurrentSize() + tmp.size()); + if (!s.ok() || tmp.size() < allowed) { + break; + } + } + size_t res_len = 0; + if (s.ok() && offset_advance < buf.CurrentSize()) { + res_len = buf.Read(scratch, offset_advance, + std::min(buf.CurrentSize() - offset_advance, n)); + } + *result = Slice(scratch, res_len); +#endif // !ROCKSDB_LITE + } else { + size_t pos = 0; + const char* res_scratch = nullptr; + while (pos < n) { + size_t allowed; + if (for_compaction_ && rate_limiter_ != nullptr) { + if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { + sw.DelayStart(); + } + allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, + Env::IOPriority::IO_LOW, stats_, + RateLimiter::OpType::kRead); + if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { + sw.DelayStop(); + } + } else { + allowed = n; + } + Slice tmp_result; + +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif + { + IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); + s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, + finish_ts, s); + } +#endif + + if (res_scratch == nullptr) { + // we can't simply use `scratch` because reads of mmap'd files return + // data in a different buffer. + res_scratch = tmp_result.data(); + } else { + // make sure chunks are inserted contiguously into `res_scratch`. + assert(tmp_result.data() == res_scratch + pos); + } + pos += tmp_result.size(); + if (!s.ok() || tmp_result.size() < allowed) { + break; + } + } + *result = Slice(res_scratch, s.ok() ? pos : 0); + } + IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); + SetPerfLevel(prev_perf_level); + } + if (stats_ != nullptr && file_read_hist_ != nullptr) { + file_read_hist_->Add(elapsed); + } + + return s; +} + +Status WritableFileWriter::Append(const Slice& data) { + const char* src = data.data(); + size_t left = data.size(); + Status s; + pending_sync_ = true; + + TEST_KILL_RANDOM("WritableFileWriter::Append:0", + rocksdb_kill_odds * REDUCE_ODDS2); + + { + IOSTATS_TIMER_GUARD(prepare_write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); + writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left); + } + + // See whether we need to enlarge the buffer to avoid the flush + if (buf_.Capacity() - buf_.CurrentSize() < left) { + for (size_t cap = buf_.Capacity(); + cap < max_buffer_size_; // There is still room to increase + cap *= 2) { + // See whether the next available size is large enough. + // Buffer will never be increased to more than max_buffer_size_. + size_t desired_capacity = std::min(cap * 2, max_buffer_size_); + if (desired_capacity - buf_.CurrentSize() >= left || + (use_direct_io() && desired_capacity == max_buffer_size_)) { + buf_.AllocateNewBuffer(desired_capacity, true); + break; + } + } + } + + // Flush only when buffered I/O + if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { + if (buf_.CurrentSize() > 0) { + s = Flush(); + if (!s.ok()) { + return s; + } + } + assert(buf_.CurrentSize() == 0); + } + + // We never write directly to disk with direct I/O on. + // or we simply use it for its original purpose to accumulate many small + // chunks + if (use_direct_io() || (buf_.Capacity() >= left)) { + while (left > 0) { + size_t appended = buf_.Append(src, left); + left -= appended; + src += appended; + + if (left > 0) { + s = Flush(); + if (!s.ok()) { + break; + } + } + } + } else { + // Writing directly to file bypassing the buffer + assert(buf_.CurrentSize() == 0); + s = WriteBuffered(src, left); + } + + TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds); + if (s.ok()) { + filesize_ += data.size(); + } + return s; +} + +Status WritableFileWriter::Pad(const size_t pad_bytes) { + assert(pad_bytes < kDefaultPageSize); + size_t left = pad_bytes; + size_t cap = buf_.Capacity() - buf_.CurrentSize(); + + // Assume pad_bytes is small compared to buf_ capacity. So we always + // use buf_ rather than write directly to file in certain cases like + // Append() does. + while (left) { + size_t append_bytes = std::min(cap, left); + buf_.PadWith(append_bytes, 0); + left -= append_bytes; + if (left > 0) { + Status s = Flush(); + if (!s.ok()) { + return s; + } + } + cap = buf_.Capacity() - buf_.CurrentSize(); + } + pending_sync_ = true; + filesize_ += pad_bytes; + return Status::OK(); +} + +Status WritableFileWriter::Close() { + + // Do not quit immediately on failure the file MUST be closed + Status s; + + // Possible to close it twice now as we MUST close + // in __dtor, simply flushing is not enough + // Windows when pre-allocating does not fill with zeros + // also with unbuffered access we also set the end of data. + if (!writable_file_) { + return s; + } + + s = Flush(); // flush cache to OS + + Status interim; + // In direct I/O mode we write whole pages so + // we need to let the file know where data ends. + if (use_direct_io()) { + interim = writable_file_->Truncate(filesize_); + if (interim.ok()) { + interim = writable_file_->Fsync(); + } + if (!interim.ok() && s.ok()) { + s = interim; + } + } + + TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds); + interim = writable_file_->Close(); + if (!interim.ok() && s.ok()) { + s = interim; + } + + writable_file_.reset(); + TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds); + + return s; +} + +// write out the cached data to the OS cache or storage if direct I/O +// enabled +Status WritableFileWriter::Flush() { + Status s; + TEST_KILL_RANDOM("WritableFileWriter::Flush:0", + rocksdb_kill_odds * REDUCE_ODDS2); + + if (buf_.CurrentSize() > 0) { + if (use_direct_io()) { +#ifndef ROCKSDB_LITE + if (pending_sync_) { + s = WriteDirect(); + } +#endif // !ROCKSDB_LITE + } else { + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + } + if (!s.ok()) { + return s; + } + } + + s = writable_file_->Flush(); + + if (!s.ok()) { + return s; + } + + // sync OS cache to disk for every bytes_per_sync_ + // TODO: give log file and sst file different options (log + // files could be potentially cached in OS for their whole + // life time, thus we might not want to flush at all). + + // We try to avoid sync to the last 1MB of data. For two reasons: + // (1) avoid rewrite the same page that is modified later. + // (2) for older version of OS, write can block while writing out + // the page. + // Xfs does neighbor page flushing outside of the specified ranges. We + // need to make sure sync range is far from the write offset. + if (!use_direct_io() && bytes_per_sync_) { + const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. + const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. + if (filesize_ > kBytesNotSyncRange) { + uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; + offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; + assert(offset_sync_to >= last_sync_size_); + if (offset_sync_to > 0 && + offset_sync_to - last_sync_size_ >= bytes_per_sync_) { + s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); + last_sync_size_ = offset_sync_to; + } + } + } + + return s; +} + +Status WritableFileWriter::Sync(bool use_fsync) { + Status s = Flush(); + if (!s.ok()) { + return s; + } + TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds); + if (!use_direct_io() && pending_sync_) { + s = SyncInternal(use_fsync); + if (!s.ok()) { + return s; + } + } + TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds); + pending_sync_ = false; + return Status::OK(); +} + +Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) { + if (!writable_file_->IsSyncThreadSafe()) { + return Status::NotSupported( + "Can't WritableFileWriter::SyncWithoutFlush() because " + "WritableFile::IsSyncThreadSafe() is false"); + } + TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); + Status s = SyncInternal(use_fsync); + TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); + return s; +} + +Status WritableFileWriter::SyncInternal(bool use_fsync) { + Status s; + IOSTATS_TIMER_GUARD(fsync_nanos); + TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); + auto prev_perf_level = GetPerfLevel(); + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); + if (use_fsync) { + s = writable_file_->Fsync(); + } else { + s = writable_file_->Sync(); + } + SetPerfLevel(prev_perf_level); + return s; +} + +Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { + IOSTATS_TIMER_GUARD(range_sync_nanos); + TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); + return writable_file_->RangeSync(offset, nbytes); +} + +// This method writes to disk the specified data and makes use of the rate +// limiter if available +Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { + Status s; + assert(!use_direct_io()); + const char* src = data; + size_t left = size; + + while (left > 0) { + size_t allowed; + if (rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, + RateLimiter::OpType::kWrite); + } else { + allowed = left; + } + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + uint64_t old_size = writable_file_->GetFileSize(); + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + old_size = next_write_offset_; + } +#endif + { + auto prev_perf_level = GetPerfLevel(); + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); + s = writable_file_->Append(Slice(src, allowed)); + SetPerfLevel(prev_perf_level); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s); + } +#endif + if (!s.ok()) { + return s; + } + } + + IOSTATS_ADD(bytes_written, allowed); + TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds); + + left -= allowed; + src += allowed; + } + buf_.Size(0); + return s; +} + + +// This flushes the accumulated data in the buffer. We pad data with zeros if +// necessary to the whole page. +// However, during automatic flushes padding would not be necessary. +// We always use RateLimiter if available. We move (Refit) any buffer bytes +// that are left over the +// whole number of pages to be written again on the next flush because we can +// only write on aligned +// offsets. +#ifndef ROCKSDB_LITE +Status WritableFileWriter::WriteDirect() { + assert(use_direct_io()); + Status s; + const size_t alignment = buf_.Alignment(); + assert((next_write_offset_ % alignment) == 0); + + // Calculate whole page final file advance if all writes succeed + size_t file_advance = + TruncateToPageBoundary(alignment, buf_.CurrentSize()); + + // Calculate the leftover tail, we write it here padded with zeros BUT we + // will write + // it again in the future either on Close() OR when the current whole page + // fills out + size_t leftover_tail = buf_.CurrentSize() - file_advance; + + // Round up and pad + buf_.PadToAlignmentWith(0); + + const char* src = buf_.BufferStart(); + uint64_t write_offset = next_write_offset_; + size_t left = buf_.CurrentSize(); + + while (left > 0) { + // Check how much is allowed + size_t size; + if (rate_limiter_ != nullptr) { + size = rate_limiter_->RequestToken(left, buf_.Alignment(), + writable_file_->GetIOPriority(), + stats_, RateLimiter::OpType::kWrite); + } else { + size = left; + } + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } + // direct writes must be positional + s = writable_file_->PositionedAppend(Slice(src, size), write_offset); + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s); + } + if (!s.ok()) { + buf_.Size(file_advance + leftover_tail); + return s; + } + } + + IOSTATS_ADD(bytes_written, size); + left -= size; + src += size; + write_offset += size; + assert((next_write_offset_ % alignment) == 0); + } + + if (s.ok()) { + // Move the tail to the beginning of the buffer + // This never happens during normal Append but rather during + // explicit call to Flush()/Sync() or Close() + buf_.RefitTail(file_advance, leftover_tail); + // This is where we start writing next time which may or not be + // the actual file size on disk. They match if the buffer size + // is a multiple of whole pages otherwise filesize_ is leftover_tail + // behind + next_write_offset_ += file_advance; + } + return s; +} +#endif // !ROCKSDB_LITE + +namespace { +class ReadaheadRandomAccessFile : public RandomAccessFile { + public: + ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file, + size_t readahead_size) + : file_(std::move(file)), + alignment_(file_->GetRequiredBufferAlignment()), + readahead_size_(Roundup(readahead_size, alignment_)), + buffer_(), + buffer_offset_(0) { + buffer_.Alignment(alignment_); + buffer_.AllocateNewBuffer(readahead_size_); + } + + ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; + + ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete; + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + if (n + alignment_ >= readahead_size_) { + return file_->Read(offset, n, result, scratch); + } + + std::unique_lock<std::mutex> lk(lock_); + + size_t cached_len = 0; + // Check if there is a cache hit, means that [offset, offset + n) is either + // completely or partially in the buffer + // If it's completely cached, including end of file case when offset + n is + // greater than EOF, return + if (TryReadFromCache(offset, n, &cached_len, scratch) && + (cached_len == n || + // End of file + buffer_.CurrentSize() < readahead_size_)) { + *result = Slice(scratch, cached_len); + return Status::OK(); + } + size_t advanced_offset = static_cast<size_t>(offset + cached_len); + // In the case of cache hit advanced_offset is already aligned, means that + // chunk_offset equals to advanced_offset + size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset); + Slice readahead_result; + + Status s = ReadIntoBuffer(chunk_offset, readahead_size_); + if (s.ok()) { + // In the case of cache miss, i.e. when cached_len equals 0, an offset can + // exceed the file end position, so the following check is required + if (advanced_offset < chunk_offset + buffer_.CurrentSize()) { + // In the case of cache miss, the first chunk_padding bytes in buffer_ + // are + // stored for alignment only and must be skipped + size_t chunk_padding = advanced_offset - chunk_offset; + auto remaining_len = + std::min(buffer_.CurrentSize() - chunk_padding, n - cached_len); + memcpy(scratch + cached_len, buffer_.BufferStart() + chunk_padding, + remaining_len); + *result = Slice(scratch, cached_len + remaining_len); + } else { + *result = Slice(scratch, cached_len); + } + } + return s; + } + + Status Prefetch(uint64_t offset, size_t n) override { + if (n < readahead_size_) { + // Don't allow smaller prefetches than the configured `readahead_size_`. + // `Read()` assumes a smaller prefetch buffer indicates EOF was reached. + return Status::OK(); + } + size_t offset_ = static_cast<size_t>(offset); + size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_); + if (prefetch_offset == buffer_offset_) { + return Status::OK(); + } + return ReadIntoBuffer(prefetch_offset, + Roundup(offset_ + n, alignment_) - prefetch_offset); + } + + size_t GetUniqueId(char* id, size_t max_size) const override { + return file_->GetUniqueId(id, max_size); + } + + void Hint(AccessPattern pattern) override { file_->Hint(pattern); } + + Status InvalidateCache(size_t offset, size_t length) override { + return file_->InvalidateCache(offset, length); + } + + bool use_direct_io() const override { return file_->use_direct_io(); } + +private: + bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len, + char* scratch) const { + if (offset < buffer_offset_ || + offset >= buffer_offset_ + buffer_.CurrentSize()) { + *cached_len = 0; + return false; + } + uint64_t offset_in_buffer = offset - buffer_offset_; + *cached_len = std::min( + buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n); + memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); + return true; + } + + Status ReadIntoBuffer(uint64_t offset, size_t n) const { + if (n > buffer_.Capacity()) { + n = buffer_.Capacity(); + } + assert(IsFileSectorAligned(offset, alignment_)); + assert(IsFileSectorAligned(n, alignment_)); + Slice result; + Status s = file_->Read(offset, n, &result, buffer_.BufferStart()); + if (s.ok()) { + buffer_offset_ = offset; + buffer_.Size(result.size()); + assert(buffer_.BufferStart() == result.data()); + } + return s; + } + + std::unique_ptr<RandomAccessFile> file_; + const size_t alignment_; + size_t readahead_size_; + + mutable std::mutex lock_; + mutable AlignedBuffer buffer_; + mutable uint64_t buffer_offset_; +}; +} // namespace + +Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, + uint64_t offset, size_t n) { + size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t offset_ = static_cast<size_t>(offset); + uint64_t rounddown_offset = Rounddown(offset_, alignment); + uint64_t roundup_end = Roundup(offset_ + n, alignment); + uint64_t roundup_len = roundup_end - rounddown_offset; + assert(roundup_len >= alignment); + assert(roundup_len % alignment == 0); + + // Check if requested bytes are in the existing buffer_. + // If all bytes exist -- return. + // If only a few bytes exist -- reuse them & read only what is really needed. + // This is typically the case of incremental reading of data. + // If no bytes exist in buffer -- full pread. + + Status s; + uint64_t chunk_offset_in_buffer = 0; + uint64_t chunk_len = 0; + bool copy_data_to_new_buffer = false; + if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ && + offset <= buffer_offset_ + buffer_.CurrentSize()) { + if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) { + // All requested bytes are already in the buffer. So no need to Read + // again. + return s; + } else { + // Only a few requested bytes are in the buffer. memmove those chunk of + // bytes to the beginning, and memcpy them back into the new buffer if a + // new buffer is created. + chunk_offset_in_buffer = Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment); + chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer; + assert(chunk_offset_in_buffer % alignment == 0); + assert(chunk_len % alignment == 0); + assert(chunk_offset_in_buffer + chunk_len <= + buffer_offset_ + buffer_.CurrentSize()); + if (chunk_len > 0) { + copy_data_to_new_buffer = true; + } else { + // this reset is not necessary, but just to be safe. + chunk_offset_in_buffer = 0; + } + } + } + + // Create a new buffer only if current capacity is not sufficient, and memcopy + // bytes from old buffer if needed (i.e., if chunk_len is greater than 0). + if (buffer_.Capacity() < roundup_len) { + buffer_.Alignment(alignment); + buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len), + copy_data_to_new_buffer, chunk_offset_in_buffer, + static_cast<size_t>(chunk_len)); + } else if (chunk_len > 0) { + // New buffer not needed. But memmove bytes from tail to the beginning since + // chunk_len is greater than 0. + buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer), static_cast<size_t>(chunk_len)); + } + + Slice result; + s = reader->Read(rounddown_offset + chunk_len, + static_cast<size_t>(roundup_len - chunk_len), &result, + buffer_.BufferStart() + chunk_len); + if (s.ok()) { + buffer_offset_ = rounddown_offset; + buffer_.Size(static_cast<size_t>(chunk_len) + result.size()); + } + return s; +} + +bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n, + Slice* result) { + if (track_min_offset_ && offset < min_offset_read_) { + min_offset_read_ = static_cast<size_t>(offset); + } + if (!enable_ || offset < buffer_offset_) { + return false; + } + + // If the buffer contains only a few of the requested bytes: + // If readahead is enabled: prefetch the remaining bytes + readadhead bytes + // and satisfy the request. + // If readahead is not enabled: return false. + if (offset + n > buffer_offset_ + buffer_.CurrentSize()) { + if (readahead_size_ > 0) { + assert(file_reader_ != nullptr); + assert(max_readahead_size_ >= readahead_size_); + + Status s = Prefetch(file_reader_, offset, n + readahead_size_); + if (!s.ok()) { + return false; + } + readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); + } else { + return false; + } + } + + uint64_t offset_in_buffer = offset - buffer_offset_; + *result = Slice(buffer_.BufferStart() + offset_in_buffer, n); + return true; +} + +std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile( + std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) { + std::unique_ptr<RandomAccessFile> result( + new ReadaheadRandomAccessFile(std::move(file), readahead_size)); + return result; +} + +Status NewWritableFile(Env* env, const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) { + Status s = env->NewWritableFile(fname, result, options); + TEST_KILL_RANDOM("NewWritableFile:0", rocksdb_kill_odds * REDUCE_ODDS2); + return s; +} + +bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file, + std::string* output, bool* has_data, Status* result) { + const int kBufferSize = 8192; + char buffer[kBufferSize + 1]; + Slice input_slice; + + std::string line; + bool has_complete_line = false; + while (!has_complete_line) { + if (std::getline(*iss, line)) { + has_complete_line = !iss->eof(); + } else { + has_complete_line = false; + } + if (!has_complete_line) { + // if we're not sure whether we have a complete line, + // further read from the file. + if (*has_data) { + *result = seq_file->Read(kBufferSize, &input_slice, buffer); + } + if (input_slice.size() == 0) { + // meaning we have read all the data + *has_data = false; + break; + } else { + iss->str(line + input_slice.ToString()); + // reset the internal state of iss so that we can keep reading it. + iss->clear(); + *has_data = (input_slice.size() == kBufferSize); + continue; + } + } + } + *output = line; + return *has_data || has_complete_line; +} + +} // namespace rocksdb |