summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/file/random_access_file_reader.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/file/random_access_file_reader.cc')
-rw-r--r--src/rocksdb/file/random_access_file_reader.cc189
1 files changed, 189 insertions, 0 deletions
diff --git a/src/rocksdb/file/random_access_file_reader.cc b/src/rocksdb/file/random_access_file_reader.cc
new file mode 100644
index 000000000..46892360f
--- /dev/null
+++ b/src/rocksdb/file/random_access_file_reader.cc
@@ -0,0 +1,189 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "file/random_access_file_reader.h"
+
+#include <algorithm>
+#include <mutex>
+
+#include "monitoring/histogram.h"
+#include "monitoring/iostats_context_imp.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+
+namespace ROCKSDB_NAMESPACE {
+Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch, bool for_compaction) const {
+ Status s;
+ uint64_t elapsed = 0;
+ {
+ StopWatch sw(env_, stats_, hist_type_,
+ (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
+ true /*delay_enabled*/);
+ auto prev_perf_level = GetPerfLevel();
+ IOSTATS_TIMER_GUARD(read_nanos);
+ if (use_direct_io()) {
+#ifndef ROCKSDB_LITE
+ size_t alignment = file_->GetRequiredBufferAlignment();
+ size_t aligned_offset =
+ TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
+ size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
+ size_t read_size =
+ Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
+ AlignedBuffer buf;
+ buf.Alignment(alignment);
+ buf.AllocateNewBuffer(read_size);
+ while (buf.CurrentSize() < read_size) {
+ size_t allowed;
+ if (for_compaction && rate_limiter_ != nullptr) {
+ allowed = rate_limiter_->RequestToken(
+ buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
+ Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
+ } else {
+ assert(buf.CurrentSize() == 0);
+ allowed = read_size;
+ }
+ Slice tmp;
+
+ FileOperationInfo::TimePoint start_ts;
+ uint64_t orig_offset = 0;
+ if (ShouldNotifyListeners()) {
+ start_ts = std::chrono::system_clock::now();
+ orig_offset = aligned_offset + buf.CurrentSize();
+ }
+ {
+ IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
+ s = file_->Read(aligned_offset + buf.CurrentSize(), allowed,
+ IOOptions(), &tmp, buf.Destination(), nullptr);
+ }
+ if (ShouldNotifyListeners()) {
+ auto finish_ts = std::chrono::system_clock::now();
+ NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
+ s);
+ }
+
+ buf.Size(buf.CurrentSize() + tmp.size());
+ if (!s.ok() || tmp.size() < allowed) {
+ break;
+ }
+ }
+ size_t res_len = 0;
+ if (s.ok() && offset_advance < buf.CurrentSize()) {
+ res_len = buf.Read(scratch, offset_advance,
+ std::min(buf.CurrentSize() - offset_advance, n));
+ }
+ *result = Slice(scratch, res_len);
+#endif // !ROCKSDB_LITE
+ } else {
+ size_t pos = 0;
+ const char* res_scratch = nullptr;
+ while (pos < n) {
+ size_t allowed;
+ if (for_compaction && rate_limiter_ != nullptr) {
+ if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
+ sw.DelayStart();
+ }
+ allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
+ Env::IOPriority::IO_LOW, stats_,
+ RateLimiter::OpType::kRead);
+ if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
+ sw.DelayStop();
+ }
+ } else {
+ allowed = n;
+ }
+ Slice tmp_result;
+
+#ifndef ROCKSDB_LITE
+ FileOperationInfo::TimePoint start_ts;
+ if (ShouldNotifyListeners()) {
+ start_ts = std::chrono::system_clock::now();
+ }
+#endif
+ {
+ IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
+ s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result,
+ scratch + pos, nullptr);
+ }
+#ifndef ROCKSDB_LITE
+ if (ShouldNotifyListeners()) {
+ auto finish_ts = std::chrono::system_clock::now();
+ NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
+ finish_ts, s);
+ }
+#endif
+
+ if (res_scratch == nullptr) {
+ // we can't simply use `scratch` because reads of mmap'd files return
+ // data in a different buffer.
+ res_scratch = tmp_result.data();
+ } else {
+ // make sure chunks are inserted contiguously into `res_scratch`.
+ assert(tmp_result.data() == res_scratch + pos);
+ }
+ pos += tmp_result.size();
+ if (!s.ok() || tmp_result.size() < allowed) {
+ break;
+ }
+ }
+ *result = Slice(res_scratch, s.ok() ? pos : 0);
+ }
+ IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
+ SetPerfLevel(prev_perf_level);
+ }
+ if (stats_ != nullptr && file_read_hist_ != nullptr) {
+ file_read_hist_->Add(elapsed);
+ }
+
+ return s;
+}
+
+Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
+ size_t num_reqs) const {
+ Status s;
+ uint64_t elapsed = 0;
+ assert(!use_direct_io());
+ {
+ StopWatch sw(env_, stats_, hist_type_,
+ (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
+ true /*delay_enabled*/);
+ auto prev_perf_level = GetPerfLevel();
+ IOSTATS_TIMER_GUARD(read_nanos);
+
+#ifndef ROCKSDB_LITE
+ FileOperationInfo::TimePoint start_ts;
+ if (ShouldNotifyListeners()) {
+ start_ts = std::chrono::system_clock::now();
+ }
+#endif // ROCKSDB_LITE
+ {
+ IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
+ s = file_->MultiRead(read_reqs, num_reqs, IOOptions(), nullptr);
+ }
+ for (size_t i = 0; i < num_reqs; ++i) {
+#ifndef ROCKSDB_LITE
+ if (ShouldNotifyListeners()) {
+ auto finish_ts = std::chrono::system_clock::now();
+ NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
+ start_ts, finish_ts, read_reqs[i].status);
+ }
+#endif // ROCKSDB_LITE
+ IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
+ }
+ SetPerfLevel(prev_perf_level);
+ }
+ if (stats_ != nullptr && file_read_hist_ != nullptr) {
+ file_read_hist_->Add(elapsed);
+ }
+
+ return s;
+}
+} // namespace ROCKSDB_NAMESPACE