diff options
Diffstat (limited to 'src/rocksdb/file/sequence_file_reader.cc')
-rw-r--r-- | src/rocksdb/file/sequence_file_reader.cc | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/src/rocksdb/file/sequence_file_reader.cc b/src/rocksdb/file/sequence_file_reader.cc new file mode 100644 index 000000000..81c5e5d1d --- /dev/null +++ b/src/rocksdb/file/sequence_file_reader.cc @@ -0,0 +1,237 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "file/sequence_file_reader.h" + +#include <algorithm> +#include <mutex> + +#include "file/read_write_util.h" +#include "monitoring/histogram.h" +#include "monitoring/iostats_context_imp.h" +#include "port/port.h" +#include "test_util/sync_point.h" +#include "util/aligned_buffer.h" +#include "util/random.h" +#include "util/rate_limiter.h" + +namespace ROCKSDB_NAMESPACE { +Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { + Status s; + if (use_direct_io()) { +#ifndef ROCKSDB_LITE + size_t offset = offset_.fetch_add(n); + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = TruncateToPageBoundary(alignment, offset); + size_t offset_advance = offset - aligned_offset; + size_t size = Roundup(offset + n, alignment) - aligned_offset; + size_t r = 0; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(size); + Slice tmp; + s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp, + buf.BufferStart(), nullptr); + if (s.ok() && offset_advance < tmp.size()) { + buf.Size(tmp.size()); + r = buf.Read(scratch, offset_advance, + std::min(tmp.size() - offset_advance, n)); + } + *result = Slice(scratch, r); +#endif // !ROCKSDB_LITE + } else { + s = file_->Read(n, IOOptions(), result, scratch, nullptr); + } + IOSTATS_ADD(bytes_read, result->size()); + return s; +} + +Status SequentialFileReader::Skip(uint64_t n) { +#ifndef ROCKSDB_LITE + if (use_direct_io()) { + offset_ += static_cast<size_t>(n); + return Status::OK(); + } +#endif // !ROCKSDB_LITE + return file_->Skip(n); +} + +namespace { +// This class wraps a SequentialFile, exposing same API, with the differenece +// of being able to prefetch up to readahead_size bytes and then serve them +// from memory, avoiding the entire round-trip if, for example, the data for the +// file is actually remote. +class ReadaheadSequentialFile : public FSSequentialFile { + public: + ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file, + size_t readahead_size) + : file_(std::move(file)), + alignment_(file_->GetRequiredBufferAlignment()), + readahead_size_(Roundup(readahead_size, alignment_)), + buffer_(), + buffer_offset_(0), + read_offset_(0) { + buffer_.Alignment(alignment_); + buffer_.AllocateNewBuffer(readahead_size_); + } + + ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete; + + ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete; + + IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch, + IODebugContext* dbg) override { + std::unique_lock<std::mutex> lk(lock_); + + size_t cached_len = 0; + // Check if there is a cache hit, meaning that [offset, offset + n) is + // either completely or partially in the buffer. If it's completely cached, + // including end of file case when offset + n is greater than EOF, then + // return. + if (TryReadFromCache(n, &cached_len, scratch) && + (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { + // We read exactly what we needed, or we hit end of file - return. + *result = Slice(scratch, cached_len); + return IOStatus::OK(); + } + n -= cached_len; + + IOStatus s; + // Read-ahead only make sense if we have some slack left after reading + if (n + alignment_ >= readahead_size_) { + s = file_->Read(n, opts, result, scratch + cached_len, dbg); + if (s.ok()) { + read_offset_ += result->size(); + *result = Slice(scratch, cached_len + result->size()); + } + buffer_.Clear(); + return s; + } + + s = ReadIntoBuffer(readahead_size_, opts, dbg); + if (s.ok()) { + // The data we need is now in cache, so we can safely read it + size_t remaining_len; + TryReadFromCache(n, &remaining_len, scratch + cached_len); + *result = Slice(scratch, cached_len + remaining_len); + } + return s; + } + + IOStatus Skip(uint64_t n) override { + std::unique_lock<std::mutex> lk(lock_); + IOStatus s = IOStatus::OK(); + // First check if we need to skip already cached data + if (buffer_.CurrentSize() > 0) { + // Do we need to skip beyond cached data? + if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) { + // Yes. Skip whaterver is in memory and adjust offset accordingly + n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_; + read_offset_ = buffer_offset_ + buffer_.CurrentSize(); + } else { + // No. The entire section to be skipped is entirely i cache. + read_offset_ += n; + n = 0; + } + } + if (n > 0) { + // We still need to skip more, so call the file API for skipping + s = file_->Skip(n); + if (s.ok()) { + read_offset_ += n; + } + buffer_.Clear(); + } + return s; + } + + IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts, + Slice* result, char* scratch, + IODebugContext* dbg) override { + return file_->PositionedRead(offset, n, opts, result, scratch, dbg); + } + + IOStatus InvalidateCache(size_t offset, size_t length) override { + std::unique_lock<std::mutex> lk(lock_); + buffer_.Clear(); + return file_->InvalidateCache(offset, length); + } + + bool use_direct_io() const override { return file_->use_direct_io(); } + + private: + // Tries to read from buffer_ n bytes. If anything was read from the cache, it + // sets cached_len to the number of bytes actually read, copies these number + // of bytes to scratch and returns true. + // If nothing was read sets cached_len to 0 and returns false. + bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) { + if (read_offset_ < buffer_offset_ || + read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) { + *cached_len = 0; + return false; + } + uint64_t offset_in_buffer = read_offset_ - buffer_offset_; + *cached_len = std::min( + buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n); + memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); + read_offset_ += *cached_len; + return true; + } + + // Reads into buffer_ the next n bytes from file_. + // Can actually read less if EOF was reached. + // Returns the status of the read operastion on the file. + IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts, + IODebugContext* dbg) { + if (n > buffer_.Capacity()) { + n = buffer_.Capacity(); + } + assert(IsFileSectorAligned(n, alignment_)); + Slice result; + IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg); + if (s.ok()) { + buffer_offset_ = read_offset_; + buffer_.Size(result.size()); + assert(result.size() == 0 || buffer_.BufferStart() == result.data()); + } + return s; + } + + const std::unique_ptr<FSSequentialFile> file_; + const size_t alignment_; + const size_t readahead_size_; + + std::mutex lock_; + // The buffer storing the prefetched data + AlignedBuffer buffer_; + // The offset in file_, corresponding to data stored in buffer_ + uint64_t buffer_offset_; + // The offset up to which data was read from file_. In fact, it can be larger + // than the actual file size, since the file_->Skip(n) call doesn't return the + // actual number of bytes that were skipped, which can be less than n. + // This is not a problemm since read_offset_ is monotonically increasing and + // its only use is to figure out if next piece of data should be read from + // buffer_ or file_ directly. + uint64_t read_offset_; +}; +} // namespace + +std::unique_ptr<FSSequentialFile> +SequentialFileReader::NewReadaheadSequentialFile( + std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) { + if (file->GetRequiredBufferAlignment() >= readahead_size) { + // Short-circuit and return the original file if readahead_size is + // too small and hence doesn't make sense to be used for prefetching. + return std::move(file); + } + std::unique_ptr<FSSequentialFile> result( + new ReadaheadSequentialFile(std::move(file), readahead_size)); + return result; +} +} // namespace ROCKSDB_NAMESPACE |