summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/env/composite_env.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/env/composite_env.cc')
-rw-r--r--src/rocksdb/env/composite_env.cc544
1 files changed, 544 insertions, 0 deletions
diff --git a/src/rocksdb/env/composite_env.cc b/src/rocksdb/env/composite_env.cc
new file mode 100644
index 000000000..b93aa9fcb
--- /dev/null
+++ b/src/rocksdb/env/composite_env.cc
@@ -0,0 +1,544 @@
+// Copyright (c) 2019-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#include "env/composite_env_wrapper.h"
+#include "rocksdb/utilities/options_type.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace {
+// The CompositeEnvWrapper class provides an interface that is compatible
+// with the old monolithic Env API, and an implementation that wraps around
+// the new Env that provides threading and other OS related functionality, and
+// the new FileSystem API that provides storage functionality. By
+// providing the old Env interface, it allows the rest of RocksDB code to
+// be agnostic of whether the underlying Env implementation is a monolithic
+// Env or an Env + FileSystem. In the former case, the user will specify
+// Options::env only, whereas in the latter case, the user will specify
+// Options::env and Options::file_system.
+
+class CompositeSequentialFileWrapper : public SequentialFile {
+ public:
+ explicit CompositeSequentialFileWrapper(
+ std::unique_ptr<FSSequentialFile>& target)
+ : target_(std::move(target)) {}
+
+ Status Read(size_t n, Slice* result, char* scratch) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Read(n, io_opts, result, scratch, &dbg);
+ }
+ Status Skip(uint64_t n) override { return target_->Skip(n); }
+ bool use_direct_io() const override { return target_->use_direct_io(); }
+ size_t GetRequiredBufferAlignment() const override {
+ return target_->GetRequiredBufferAlignment();
+ }
+ Status InvalidateCache(size_t offset, size_t length) override {
+ return target_->InvalidateCache(offset, length);
+ }
+ Status PositionedRead(uint64_t offset, size_t n, Slice* result,
+ char* scratch) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->PositionedRead(offset, n, io_opts, result, scratch, &dbg);
+ }
+
+ private:
+ std::unique_ptr<FSSequentialFile> target_;
+};
+
+class CompositeRandomAccessFileWrapper : public RandomAccessFile {
+ public:
+ explicit CompositeRandomAccessFileWrapper(
+ std::unique_ptr<FSRandomAccessFile>& target)
+ : target_(std::move(target)) {}
+
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Read(offset, n, io_opts, result, scratch, &dbg);
+ }
+ Status MultiRead(ReadRequest* reqs, size_t num_reqs) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ std::vector<FSReadRequest> fs_reqs;
+ Status status;
+
+ fs_reqs.resize(num_reqs);
+ for (size_t i = 0; i < num_reqs; ++i) {
+ fs_reqs[i].offset = reqs[i].offset;
+ fs_reqs[i].len = reqs[i].len;
+ fs_reqs[i].scratch = reqs[i].scratch;
+ fs_reqs[i].status = IOStatus::OK();
+ }
+ status = target_->MultiRead(fs_reqs.data(), num_reqs, io_opts, &dbg);
+ for (size_t i = 0; i < num_reqs; ++i) {
+ reqs[i].result = fs_reqs[i].result;
+ reqs[i].status = fs_reqs[i].status;
+ }
+ return status;
+ }
+ Status Prefetch(uint64_t offset, size_t n) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Prefetch(offset, n, io_opts, &dbg);
+ }
+ size_t GetUniqueId(char* id, size_t max_size) const override {
+ return target_->GetUniqueId(id, max_size);
+ }
+ void Hint(AccessPattern pattern) override {
+ target_->Hint((FSRandomAccessFile::AccessPattern)pattern);
+ }
+ bool use_direct_io() const override { return target_->use_direct_io(); }
+ size_t GetRequiredBufferAlignment() const override {
+ return target_->GetRequiredBufferAlignment();
+ }
+ Status InvalidateCache(size_t offset, size_t length) override {
+ return target_->InvalidateCache(offset, length);
+ }
+
+ private:
+ std::unique_ptr<FSRandomAccessFile> target_;
+};
+
+class CompositeWritableFileWrapper : public WritableFile {
+ public:
+ explicit CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile>& t)
+ : target_(std::move(t)) {}
+
+ Status Append(const Slice& data) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Append(data, io_opts, &dbg);
+ }
+ Status Append(const Slice& data,
+ const DataVerificationInfo& verification_info) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Append(data, io_opts, verification_info, &dbg);
+ }
+ Status PositionedAppend(const Slice& data, uint64_t offset) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->PositionedAppend(data, offset, io_opts, &dbg);
+ }
+ Status PositionedAppend(
+ const Slice& data, uint64_t offset,
+ const DataVerificationInfo& verification_info) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->PositionedAppend(data, offset, io_opts, verification_info,
+ &dbg);
+ }
+ Status Truncate(uint64_t size) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Truncate(size, io_opts, &dbg);
+ }
+ Status Close() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Close(io_opts, &dbg);
+ }
+ Status Flush() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Flush(io_opts, &dbg);
+ }
+ Status Sync() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Sync(io_opts, &dbg);
+ }
+ Status Fsync() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Fsync(io_opts, &dbg);
+ }
+ bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
+
+ bool use_direct_io() const override { return target_->use_direct_io(); }
+
+ size_t GetRequiredBufferAlignment() const override {
+ return target_->GetRequiredBufferAlignment();
+ }
+
+ void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
+ target_->SetWriteLifeTimeHint(hint);
+ }
+
+ Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
+ return target_->GetWriteLifeTimeHint();
+ }
+
+ uint64_t GetFileSize() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->GetFileSize(io_opts, &dbg);
+ }
+
+ void SetPreallocationBlockSize(size_t size) override {
+ target_->SetPreallocationBlockSize(size);
+ }
+
+ void GetPreallocationStatus(size_t* block_size,
+ size_t* last_allocated_block) override {
+ target_->GetPreallocationStatus(block_size, last_allocated_block);
+ }
+
+ size_t GetUniqueId(char* id, size_t max_size) const override {
+ return target_->GetUniqueId(id, max_size);
+ }
+
+ Status InvalidateCache(size_t offset, size_t length) override {
+ return target_->InvalidateCache(offset, length);
+ }
+
+ Status RangeSync(uint64_t offset, uint64_t nbytes) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->RangeSync(offset, nbytes, io_opts, &dbg);
+ }
+
+ void PrepareWrite(size_t offset, size_t len) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ target_->PrepareWrite(offset, len, io_opts, &dbg);
+ }
+
+ Status Allocate(uint64_t offset, uint64_t len) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Allocate(offset, len, io_opts, &dbg);
+ }
+
+ std::unique_ptr<FSWritableFile>* target() { return &target_; }
+
+ private:
+ std::unique_ptr<FSWritableFile> target_;
+};
+
+class CompositeRandomRWFileWrapper : public RandomRWFile {
+ public:
+ explicit CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile>& target)
+ : target_(std::move(target)) {}
+
+ bool use_direct_io() const override { return target_->use_direct_io(); }
+ size_t GetRequiredBufferAlignment() const override {
+ return target_->GetRequiredBufferAlignment();
+ }
+ Status Write(uint64_t offset, const Slice& data) override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Write(offset, data, io_opts, &dbg);
+ }
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Read(offset, n, io_opts, result, scratch, &dbg);
+ }
+ Status Flush() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Flush(io_opts, &dbg);
+ }
+ Status Sync() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Sync(io_opts, &dbg);
+ }
+ Status Fsync() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Fsync(io_opts, &dbg);
+ }
+ Status Close() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Close(io_opts, &dbg);
+ }
+
+ private:
+ std::unique_ptr<FSRandomRWFile> target_;
+};
+
+class CompositeDirectoryWrapper : public Directory {
+ public:
+ explicit CompositeDirectoryWrapper(std::unique_ptr<FSDirectory>& target)
+ : target_(std::move(target)) {}
+
+ Status Fsync() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->FsyncWithDirOptions(io_opts, &dbg, DirFsyncOptions());
+ }
+
+ Status Close() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Close(io_opts, &dbg);
+ }
+
+ size_t GetUniqueId(char* id, size_t max_size) const override {
+ return target_->GetUniqueId(id, max_size);
+ }
+
+ private:
+ std::unique_ptr<FSDirectory> target_;
+};
+} // namespace
+
+Status CompositeEnv::NewSequentialFile(const std::string& f,
+ std::unique_ptr<SequentialFile>* r,
+ const EnvOptions& options) {
+ IODebugContext dbg;
+ std::unique_ptr<FSSequentialFile> file;
+ Status status;
+ status =
+ file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
+ if (status.ok()) {
+ r->reset(new CompositeSequentialFileWrapper(file));
+ }
+ return status;
+}
+
+Status CompositeEnv::NewRandomAccessFile(const std::string& f,
+ std::unique_ptr<RandomAccessFile>* r,
+ const EnvOptions& options) {
+ IODebugContext dbg;
+ std::unique_ptr<FSRandomAccessFile> file;
+ Status status;
+ status =
+ file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg);
+ if (status.ok()) {
+ r->reset(new CompositeRandomAccessFileWrapper(file));
+ }
+ return status;
+}
+
+Status CompositeEnv::NewWritableFile(const std::string& f,
+ std::unique_ptr<WritableFile>* r,
+ const EnvOptions& options) {
+ IODebugContext dbg;
+ std::unique_ptr<FSWritableFile> file;
+ Status status;
+ status = file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg);
+ if (status.ok()) {
+ r->reset(new CompositeWritableFileWrapper(file));
+ }
+ return status;
+}
+
+Status CompositeEnv::ReopenWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) {
+ IODebugContext dbg;
+ Status status;
+ std::unique_ptr<FSWritableFile> file;
+ status = file_system_->ReopenWritableFile(fname, FileOptions(options), &file,
+ &dbg);
+ if (status.ok()) {
+ result->reset(new CompositeWritableFileWrapper(file));
+ }
+ return status;
+}
+
+Status CompositeEnv::ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ std::unique_ptr<WritableFile>* r,
+ const EnvOptions& options) {
+ IODebugContext dbg;
+ Status status;
+ std::unique_ptr<FSWritableFile> file;
+ status = file_system_->ReuseWritableFile(fname, old_fname,
+ FileOptions(options), &file, &dbg);
+ if (status.ok()) {
+ r->reset(new CompositeWritableFileWrapper(file));
+ }
+ return status;
+}
+
+Status CompositeEnv::NewRandomRWFile(const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result,
+ const EnvOptions& options) {
+ IODebugContext dbg;
+ std::unique_ptr<FSRandomRWFile> file;
+ Status status;
+ status =
+ file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg);
+ if (status.ok()) {
+ result->reset(new CompositeRandomRWFileWrapper(file));
+ }
+ return status;
+}
+
+Status CompositeEnv::NewDirectory(const std::string& name,
+ std::unique_ptr<Directory>* result) {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ std::unique_ptr<FSDirectory> dir;
+ Status status;
+ status = file_system_->NewDirectory(name, io_opts, &dir, &dbg);
+ if (status.ok()) {
+ result->reset(new CompositeDirectoryWrapper(dir));
+ }
+ return status;
+}
+
+namespace {
+static std::unordered_map<std::string, OptionTypeInfo> env_wrapper_type_info = {
+#ifndef ROCKSDB_LITE
+ {"target",
+ OptionTypeInfo(0, OptionType::kUnknown, OptionVerificationType::kByName,
+ OptionTypeFlags::kDontSerialize)
+ .SetParseFunc([](const ConfigOptions& opts,
+ const std::string& /*name*/, const std::string& value,
+ void* addr) {
+ auto target = static_cast<EnvWrapper::Target*>(addr);
+ return Env::CreateFromString(opts, value, &(target->env),
+ &(target->guard));
+ })
+ .SetEqualsFunc([](const ConfigOptions& opts,
+ const std::string& /*name*/, const void* addr1,
+ const void* addr2, std::string* mismatch) {
+ const auto target1 = static_cast<const EnvWrapper::Target*>(addr1);
+ const auto target2 = static_cast<const EnvWrapper::Target*>(addr2);
+ if (target1->env != nullptr) {
+ return target1->env->AreEquivalent(opts, target2->env, mismatch);
+ } else {
+ return (target2->env == nullptr);
+ }
+ })
+ .SetPrepareFunc([](const ConfigOptions& opts,
+ const std::string& /*name*/, void* addr) {
+ auto target = static_cast<EnvWrapper::Target*>(addr);
+ if (target->guard.get() != nullptr) {
+ target->env = target->guard.get();
+ } else if (target->env == nullptr) {
+ target->env = Env::Default();
+ }
+ return target->env->PrepareOptions(opts);
+ })
+ .SetValidateFunc([](const DBOptions& db_opts,
+ const ColumnFamilyOptions& cf_opts,
+ const std::string& /*name*/, const void* addr) {
+ const auto target = static_cast<const EnvWrapper::Target*>(addr);
+ if (target->env == nullptr) {
+ return Status::InvalidArgument("Target Env not specified");
+ } else {
+ return target->env->ValidateOptions(db_opts, cf_opts);
+ }
+ })},
+#endif // ROCKSDB_LITE
+};
+static std::unordered_map<std::string, OptionTypeInfo>
+ composite_fs_wrapper_type_info = {
+#ifndef ROCKSDB_LITE
+ {"file_system",
+ OptionTypeInfo::AsCustomSharedPtr<FileSystem>(
+ 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
+#endif // ROCKSDB_LITE
+};
+
+static std::unordered_map<std::string, OptionTypeInfo>
+ composite_clock_wrapper_type_info = {
+#ifndef ROCKSDB_LITE
+ {"clock",
+ OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
+ 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
+#endif // ROCKSDB_LITE
+};
+
+} // namespace
+
+std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
+ return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
+}
+
+CompositeEnvWrapper::CompositeEnvWrapper(Env* env,
+ const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<SystemClock>& sc)
+ : CompositeEnv(fs, sc), target_(env) {
+ RegisterOptions("", &target_, &env_wrapper_type_info);
+ RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
+ RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
+}
+
+CompositeEnvWrapper::CompositeEnvWrapper(const std::shared_ptr<Env>& env,
+ const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<SystemClock>& sc)
+ : CompositeEnv(fs, sc), target_(env) {
+ RegisterOptions("", &target_, &env_wrapper_type_info);
+ RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
+ RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
+}
+
+Status CompositeEnvWrapper::PrepareOptions(const ConfigOptions& options) {
+ target_.Prepare();
+ if (file_system_ == nullptr) {
+ file_system_ = target_.env->GetFileSystem();
+ }
+ if (system_clock_ == nullptr) {
+ system_clock_ = target_.env->GetSystemClock();
+ }
+ return Env::PrepareOptions(options);
+}
+
+#ifndef ROCKSDB_LITE
+std::string CompositeEnvWrapper::SerializeOptions(
+ const ConfigOptions& config_options, const std::string& header) const {
+ auto options = CompositeEnv::SerializeOptions(config_options, header);
+ if (target_.env != nullptr && target_.env != Env::Default()) {
+ options.append("target=");
+ options.append(target_.env->ToString(config_options));
+ }
+ return options;
+}
+#endif // ROCKSDB_LITE
+
+EnvWrapper::EnvWrapper(Env* t) : target_(t) {
+ RegisterOptions("", &target_, &env_wrapper_type_info);
+}
+
+EnvWrapper::EnvWrapper(std::unique_ptr<Env>&& t) : target_(std::move(t)) {
+ RegisterOptions("", &target_, &env_wrapper_type_info);
+}
+
+EnvWrapper::EnvWrapper(const std::shared_ptr<Env>& t) : target_(t) {
+ RegisterOptions("", &target_, &env_wrapper_type_info);
+}
+
+EnvWrapper::~EnvWrapper() {}
+
+Status EnvWrapper::PrepareOptions(const ConfigOptions& options) {
+ target_.Prepare();
+ return Env::PrepareOptions(options);
+}
+
+#ifndef ROCKSDB_LITE
+std::string EnvWrapper::SerializeOptions(const ConfigOptions& config_options,
+ const std::string& header) const {
+ auto parent = Env::SerializeOptions(config_options, "");
+ if (config_options.IsShallow() || target_.env == nullptr ||
+ target_.env == Env::Default()) {
+ return parent;
+ } else {
+ std::string result = header;
+ if (!StartsWith(parent, OptionTypeInfo::kIdPropName())) {
+ result.append(OptionTypeInfo::kIdPropName()).append("=");
+ }
+ result.append(parent);
+ if (!EndsWith(result, config_options.delimiter)) {
+ result.append(config_options.delimiter);
+ }
+ result.append("target=").append(target_.env->ToString(config_options));
+ return result;
+ }
+}
+#endif // ROCKSDB_LITE
+
+} // namespace ROCKSDB_NAMESPACE