summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/file/sequence_file_reader.h
blob: baea10eb767fc5e72aec4144299948788a8f4210 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#pragma once
#include <atomic>
#include <string>

#include "env/file_system_tracer.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"

namespace ROCKSDB_NAMESPACE {

// SequentialFileReader is a wrapper on top of Env::SequentialFile. It handles
// Buffered (i.e when page cache is enabled) and Direct (with O_DIRECT / page
// cache disabled) reads appropriately, and also updates the IO stats.
class SequentialFileReader {
 private:
#ifndef ROCKSDB_LITE
  void NotifyOnFileReadFinish(
      uint64_t offset, size_t length,
      const FileOperationInfo::StartTimePoint& start_ts,
      const FileOperationInfo::FinishTimePoint& finish_ts,
      const Status& status) const {
    FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts,
                           finish_ts, status);
    info.offset = offset;
    info.length = length;

    for (auto& listener : listeners_) {
      listener->OnFileReadFinish(info);
    }
    info.status.PermitUncheckedError();
  }

  void AddFileIOListeners(
      const std::vector<std::shared_ptr<EventListener>>& listeners) {
    std::for_each(listeners.begin(), listeners.end(),
                  [this](const std::shared_ptr<EventListener>& e) {
                    if (e->ShouldBeNotifiedOnFileIO()) {
                      listeners_.emplace_back(e);
                    }
                  });
  }
#endif  // ROCKSDB_LITE

  bool ShouldNotifyListeners() const { return !listeners_.empty(); }

  std::string file_name_;
  FSSequentialFilePtr file_;
  std::atomic<size_t> offset_{0};  // read offset
  std::vector<std::shared_ptr<EventListener>> listeners_{};
  RateLimiter* rate_limiter_;

 public:
  explicit SequentialFileReader(
      std::unique_ptr<FSSequentialFile>&& _file, const std::string& _file_name,
      const std::shared_ptr<IOTracer>& io_tracer = nullptr,
      const std::vector<std::shared_ptr<EventListener>>& listeners = {},
      RateLimiter* rate_limiter =
          nullptr)  // TODO: migrate call sites to provide rate limiter
      : file_name_(_file_name),
        file_(std::move(_file), io_tracer, _file_name),
        listeners_(),
        rate_limiter_(rate_limiter) {
#ifndef ROCKSDB_LITE
    AddFileIOListeners(listeners);
#else
    (void)listeners;
#endif
  }

  explicit SequentialFileReader(
      std::unique_ptr<FSSequentialFile>&& _file, const std::string& _file_name,
      size_t _readahead_size,
      const std::shared_ptr<IOTracer>& io_tracer = nullptr,
      const std::vector<std::shared_ptr<EventListener>>& listeners = {},
      RateLimiter* rate_limiter =
          nullptr)  // TODO: migrate call sites to provide rate limiter
      : file_name_(_file_name),
        file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size),
              io_tracer, _file_name),
        listeners_(),
        rate_limiter_(rate_limiter) {
#ifndef ROCKSDB_LITE
    AddFileIOListeners(listeners);
#else
    (void)listeners;
#endif
  }
  static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
                         const std::string& fname, const FileOptions& file_opts,
                         std::unique_ptr<SequentialFileReader>* reader,
                         IODebugContext* dbg, RateLimiter* rate_limiter);

  SequentialFileReader(const SequentialFileReader&) = delete;
  SequentialFileReader& operator=(const SequentialFileReader&) = delete;

  // `rate_limiter_priority` is used to charge the internal rate limiter when
  // enabled. The special value `Env::IO_TOTAL` makes this operation bypass the
  // rate limiter. The amount charged to the internal rate limiter is n, even
  // when less than n bytes are actually read (e.g. at end of file). To avoid
  // overcharging the rate limiter, the caller can use file size to cap n to
  // read until end of file.
  IOStatus Read(size_t n, Slice* result, char* scratch,
                Env::IOPriority rate_limiter_priority);

  IOStatus Skip(uint64_t n);

  FSSequentialFile* file() { return file_.get(); }

  std::string file_name() { return file_name_; }

  bool use_direct_io() const { return file_->use_direct_io(); }

 private:
  // NewReadaheadSequentialFile provides a wrapper over SequentialFile to
  // always prefetch additional data with every read.
  static std::unique_ptr<FSSequentialFile> NewReadaheadSequentialFile(
      std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size);
};
}  // namespace ROCKSDB_NAMESPACE