summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/file/file_prefetch_buffer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/file/file_prefetch_buffer.cc')
-rw-r--r--src/rocksdb/file/file_prefetch_buffer.cc136
1 files changed, 136 insertions, 0 deletions
diff --git a/src/rocksdb/file/file_prefetch_buffer.cc b/src/rocksdb/file/file_prefetch_buffer.cc
new file mode 100644
index 000000000..7b55bd397
--- /dev/null
+++ b/src/rocksdb/file/file_prefetch_buffer.cc
@@ -0,0 +1,136 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "file/file_prefetch_buffer.h"
+
+#include <algorithm>
+#include <mutex>
+
+#include "file/random_access_file_reader.h"
+#include "monitoring/histogram.h"
+#include "monitoring/iostats_context_imp.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+
+namespace ROCKSDB_NAMESPACE {
+Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,
+ uint64_t offset, size_t n,
+ bool for_compaction) {
+ if (!enable_ || reader == nullptr) {
+ return Status::OK();
+ }
+ size_t alignment = reader->file()->GetRequiredBufferAlignment();
+ size_t offset_ = static_cast<size_t>(offset);
+ uint64_t rounddown_offset = Rounddown(offset_, alignment);
+ uint64_t roundup_end = Roundup(offset_ + n, alignment);
+ uint64_t roundup_len = roundup_end - rounddown_offset;
+ assert(roundup_len >= alignment);
+ assert(roundup_len % alignment == 0);
+
+ // Check if requested bytes are in the existing buffer_.
+ // If all bytes exist -- return.
+ // If only a few bytes exist -- reuse them & read only what is really needed.
+ // This is typically the case of incremental reading of data.
+ // If no bytes exist in buffer -- full pread.
+
+ Status s;
+ uint64_t chunk_offset_in_buffer = 0;
+ uint64_t chunk_len = 0;
+ bool copy_data_to_new_buffer = false;
+ if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ &&
+ offset <= buffer_offset_ + buffer_.CurrentSize()) {
+ if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) {
+ // All requested bytes are already in the buffer. So no need to Read
+ // again.
+ return s;
+ } else {
+ // Only a few requested bytes are in the buffer. memmove those chunk of
+ // bytes to the beginning, and memcpy them back into the new buffer if a
+ // new buffer is created.
+ chunk_offset_in_buffer =
+ Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment);
+ chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer;
+ assert(chunk_offset_in_buffer % alignment == 0);
+ assert(chunk_len % alignment == 0);
+ assert(chunk_offset_in_buffer + chunk_len <=
+ buffer_offset_ + buffer_.CurrentSize());
+ if (chunk_len > 0) {
+ copy_data_to_new_buffer = true;
+ } else {
+ // this reset is not necessary, but just to be safe.
+ chunk_offset_in_buffer = 0;
+ }
+ }
+ }
+
+ // Create a new buffer only if current capacity is not sufficient, and memcopy
+ // bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
+ if (buffer_.Capacity() < roundup_len) {
+ buffer_.Alignment(alignment);
+ buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
+ copy_data_to_new_buffer, chunk_offset_in_buffer,
+ static_cast<size_t>(chunk_len));
+ } else if (chunk_len > 0) {
+ // New buffer not needed. But memmove bytes from tail to the beginning since
+ // chunk_len is greater than 0.
+ buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
+ static_cast<size_t>(chunk_len));
+ }
+
+ Slice result;
+ s = reader->Read(rounddown_offset + chunk_len,
+ static_cast<size_t>(roundup_len - chunk_len), &result,
+ buffer_.BufferStart() + chunk_len, for_compaction);
+ if (s.ok()) {
+ buffer_offset_ = rounddown_offset;
+ buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
+ }
+ return s;
+}
+
+bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n,
+ Slice* result, bool for_compaction) {
+ if (track_min_offset_ && offset < min_offset_read_) {
+ min_offset_read_ = static_cast<size_t>(offset);
+ }
+ if (!enable_ || offset < buffer_offset_) {
+ return false;
+ }
+
+ // If the buffer contains only a few of the requested bytes:
+ // If readahead is enabled: prefetch the remaining bytes + readadhead bytes
+ // and satisfy the request.
+ // If readahead is not enabled: return false.
+ if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
+ if (readahead_size_ > 0) {
+ assert(file_reader_ != nullptr);
+ assert(max_readahead_size_ >= readahead_size_);
+ Status s;
+ if (for_compaction) {
+ s = Prefetch(file_reader_, offset, std::max(n, readahead_size_),
+ for_compaction);
+ } else {
+ s = Prefetch(file_reader_, offset, n + readahead_size_, for_compaction);
+ }
+ if (!s.ok()) {
+ return false;
+ }
+ readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
+ } else {
+ return false;
+ }
+ }
+
+ uint64_t offset_in_buffer = offset - buffer_offset_;
+ *result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
+ return true;
+}
+} // namespace ROCKSDB_NAMESPACE