diff options
Diffstat (limited to 'src/rocksdb/env/composite_env.cc')
-rw-r--r-- | src/rocksdb/env/composite_env.cc | 544 |
1 files changed, 544 insertions, 0 deletions
diff --git a/src/rocksdb/env/composite_env.cc b/src/rocksdb/env/composite_env.cc new file mode 100644 index 000000000..b93aa9fcb --- /dev/null +++ b/src/rocksdb/env/composite_env.cc @@ -0,0 +1,544 @@ +// Copyright (c) 2019-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "env/composite_env_wrapper.h" +#include "rocksdb/utilities/options_type.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +namespace { +// The CompositeEnvWrapper class provides an interface that is compatible +// with the old monolithic Env API, and an implementation that wraps around +// the new Env that provides threading and other OS related functionality, and +// the new FileSystem API that provides storage functionality. By +// providing the old Env interface, it allows the rest of RocksDB code to +// be agnostic of whether the underlying Env implementation is a monolithic +// Env or an Env + FileSystem. In the former case, the user will specify +// Options::env only, whereas in the latter case, the user will specify +// Options::env and Options::file_system. + +class CompositeSequentialFileWrapper : public SequentialFile { + public: + explicit CompositeSequentialFileWrapper( + std::unique_ptr<FSSequentialFile>& target) + : target_(std::move(target)) {} + + Status Read(size_t n, Slice* result, char* scratch) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Read(n, io_opts, result, scratch, &dbg); + } + Status Skip(uint64_t n) override { return target_->Skip(n); } + bool use_direct_io() const override { return target_->use_direct_io(); } + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + Status InvalidateCache(size_t offset, size_t length) override { + return target_->InvalidateCache(offset, length); + } + Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->PositionedRead(offset, n, io_opts, result, scratch, &dbg); + } + + private: + std::unique_ptr<FSSequentialFile> target_; +}; + +class CompositeRandomAccessFileWrapper : public RandomAccessFile { + public: + explicit CompositeRandomAccessFileWrapper( + std::unique_ptr<FSRandomAccessFile>& target) + : target_(std::move(target)) {} + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Read(offset, n, io_opts, result, scratch, &dbg); + } + Status MultiRead(ReadRequest* reqs, size_t num_reqs) override { + IOOptions io_opts; + IODebugContext dbg; + std::vector<FSReadRequest> fs_reqs; + Status status; + + fs_reqs.resize(num_reqs); + for (size_t i = 0; i < num_reqs; ++i) { + fs_reqs[i].offset = reqs[i].offset; + fs_reqs[i].len = reqs[i].len; + fs_reqs[i].scratch = reqs[i].scratch; + fs_reqs[i].status = IOStatus::OK(); + } + status = target_->MultiRead(fs_reqs.data(), num_reqs, io_opts, &dbg); + for (size_t i = 0; i < num_reqs; ++i) { + reqs[i].result = fs_reqs[i].result; + reqs[i].status = fs_reqs[i].status; + } + return status; + } + Status Prefetch(uint64_t offset, size_t n) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Prefetch(offset, n, io_opts, &dbg); + } + size_t GetUniqueId(char* id, size_t max_size) const override { + return target_->GetUniqueId(id, max_size); + } + void Hint(AccessPattern pattern) override { + target_->Hint((FSRandomAccessFile::AccessPattern)pattern); + } + bool use_direct_io() const override { return target_->use_direct_io(); } + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + Status InvalidateCache(size_t offset, size_t length) override { + return target_->InvalidateCache(offset, length); + } + + private: + std::unique_ptr<FSRandomAccessFile> target_; +}; + +class CompositeWritableFileWrapper : public WritableFile { + public: + explicit CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile>& t) + : target_(std::move(t)) {} + + Status Append(const Slice& data) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Append(data, io_opts, &dbg); + } + Status Append(const Slice& data, + const DataVerificationInfo& verification_info) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Append(data, io_opts, verification_info, &dbg); + } + Status PositionedAppend(const Slice& data, uint64_t offset) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->PositionedAppend(data, offset, io_opts, &dbg); + } + Status PositionedAppend( + const Slice& data, uint64_t offset, + const DataVerificationInfo& verification_info) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->PositionedAppend(data, offset, io_opts, verification_info, + &dbg); + } + Status Truncate(uint64_t size) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Truncate(size, io_opts, &dbg); + } + Status Close() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Close(io_opts, &dbg); + } + Status Flush() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Flush(io_opts, &dbg); + } + Status Sync() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Sync(io_opts, &dbg); + } + Status Fsync() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Fsync(io_opts, &dbg); + } + bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); } + + bool use_direct_io() const override { return target_->use_direct_io(); } + + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + + void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override { + target_->SetWriteLifeTimeHint(hint); + } + + Env::WriteLifeTimeHint GetWriteLifeTimeHint() override { + return target_->GetWriteLifeTimeHint(); + } + + uint64_t GetFileSize() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->GetFileSize(io_opts, &dbg); + } + + void SetPreallocationBlockSize(size_t size) override { + target_->SetPreallocationBlockSize(size); + } + + void GetPreallocationStatus(size_t* block_size, + size_t* last_allocated_block) override { + target_->GetPreallocationStatus(block_size, last_allocated_block); + } + + size_t GetUniqueId(char* id, size_t max_size) const override { + return target_->GetUniqueId(id, max_size); + } + + Status InvalidateCache(size_t offset, size_t length) override { + return target_->InvalidateCache(offset, length); + } + + Status RangeSync(uint64_t offset, uint64_t nbytes) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->RangeSync(offset, nbytes, io_opts, &dbg); + } + + void PrepareWrite(size_t offset, size_t len) override { + IOOptions io_opts; + IODebugContext dbg; + target_->PrepareWrite(offset, len, io_opts, &dbg); + } + + Status Allocate(uint64_t offset, uint64_t len) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Allocate(offset, len, io_opts, &dbg); + } + + std::unique_ptr<FSWritableFile>* target() { return &target_; } + + private: + std::unique_ptr<FSWritableFile> target_; +}; + +class CompositeRandomRWFileWrapper : public RandomRWFile { + public: + explicit CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile>& target) + : target_(std::move(target)) {} + + bool use_direct_io() const override { return target_->use_direct_io(); } + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + Status Write(uint64_t offset, const Slice& data) override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Write(offset, data, io_opts, &dbg); + } + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Read(offset, n, io_opts, result, scratch, &dbg); + } + Status Flush() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Flush(io_opts, &dbg); + } + Status Sync() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Sync(io_opts, &dbg); + } + Status Fsync() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Fsync(io_opts, &dbg); + } + Status Close() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Close(io_opts, &dbg); + } + + private: + std::unique_ptr<FSRandomRWFile> target_; +}; + +class CompositeDirectoryWrapper : public Directory { + public: + explicit CompositeDirectoryWrapper(std::unique_ptr<FSDirectory>& target) + : target_(std::move(target)) {} + + Status Fsync() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->FsyncWithDirOptions(io_opts, &dbg, DirFsyncOptions()); + } + + Status Close() override { + IOOptions io_opts; + IODebugContext dbg; + return target_->Close(io_opts, &dbg); + } + + size_t GetUniqueId(char* id, size_t max_size) const override { + return target_->GetUniqueId(id, max_size); + } + + private: + std::unique_ptr<FSDirectory> target_; +}; +} // namespace + +Status CompositeEnv::NewSequentialFile(const std::string& f, + std::unique_ptr<SequentialFile>* r, + const EnvOptions& options) { + IODebugContext dbg; + std::unique_ptr<FSSequentialFile> file; + Status status; + status = + file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg); + if (status.ok()) { + r->reset(new CompositeSequentialFileWrapper(file)); + } + return status; +} + +Status CompositeEnv::NewRandomAccessFile(const std::string& f, + std::unique_ptr<RandomAccessFile>* r, + const EnvOptions& options) { + IODebugContext dbg; + std::unique_ptr<FSRandomAccessFile> file; + Status status; + status = + file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg); + if (status.ok()) { + r->reset(new CompositeRandomAccessFileWrapper(file)); + } + return status; +} + +Status CompositeEnv::NewWritableFile(const std::string& f, + std::unique_ptr<WritableFile>* r, + const EnvOptions& options) { + IODebugContext dbg; + std::unique_ptr<FSWritableFile> file; + Status status; + status = file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg); + if (status.ok()) { + r->reset(new CompositeWritableFileWrapper(file)); + } + return status; +} + +Status CompositeEnv::ReopenWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) { + IODebugContext dbg; + Status status; + std::unique_ptr<FSWritableFile> file; + status = file_system_->ReopenWritableFile(fname, FileOptions(options), &file, + &dbg); + if (status.ok()) { + result->reset(new CompositeWritableFileWrapper(file)); + } + return status; +} + +Status CompositeEnv::ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr<WritableFile>* r, + const EnvOptions& options) { + IODebugContext dbg; + Status status; + std::unique_ptr<FSWritableFile> file; + status = file_system_->ReuseWritableFile(fname, old_fname, + FileOptions(options), &file, &dbg); + if (status.ok()) { + r->reset(new CompositeWritableFileWrapper(file)); + } + return status; +} + +Status CompositeEnv::NewRandomRWFile(const std::string& fname, + std::unique_ptr<RandomRWFile>* result, + const EnvOptions& options) { + IODebugContext dbg; + std::unique_ptr<FSRandomRWFile> file; + Status status; + status = + file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg); + if (status.ok()) { + result->reset(new CompositeRandomRWFileWrapper(file)); + } + return status; +} + +Status CompositeEnv::NewDirectory(const std::string& name, + std::unique_ptr<Directory>* result) { + IOOptions io_opts; + IODebugContext dbg; + std::unique_ptr<FSDirectory> dir; + Status status; + status = file_system_->NewDirectory(name, io_opts, &dir, &dbg); + if (status.ok()) { + result->reset(new CompositeDirectoryWrapper(dir)); + } + return status; +} + +namespace { +static std::unordered_map<std::string, OptionTypeInfo> env_wrapper_type_info = { +#ifndef ROCKSDB_LITE + {"target", + OptionTypeInfo(0, OptionType::kUnknown, OptionVerificationType::kByName, + OptionTypeFlags::kDontSerialize) + .SetParseFunc([](const ConfigOptions& opts, + const std::string& /*name*/, const std::string& value, + void* addr) { + auto target = static_cast<EnvWrapper::Target*>(addr); + return Env::CreateFromString(opts, value, &(target->env), + &(target->guard)); + }) + .SetEqualsFunc([](const ConfigOptions& opts, + const std::string& /*name*/, const void* addr1, + const void* addr2, std::string* mismatch) { + const auto target1 = static_cast<const EnvWrapper::Target*>(addr1); + const auto target2 = static_cast<const EnvWrapper::Target*>(addr2); + if (target1->env != nullptr) { + return target1->env->AreEquivalent(opts, target2->env, mismatch); + } else { + return (target2->env == nullptr); + } + }) + .SetPrepareFunc([](const ConfigOptions& opts, + const std::string& /*name*/, void* addr) { + auto target = static_cast<EnvWrapper::Target*>(addr); + if (target->guard.get() != nullptr) { + target->env = target->guard.get(); + } else if (target->env == nullptr) { + target->env = Env::Default(); + } + return target->env->PrepareOptions(opts); + }) + .SetValidateFunc([](const DBOptions& db_opts, + const ColumnFamilyOptions& cf_opts, + const std::string& /*name*/, const void* addr) { + const auto target = static_cast<const EnvWrapper::Target*>(addr); + if (target->env == nullptr) { + return Status::InvalidArgument("Target Env not specified"); + } else { + return target->env->ValidateOptions(db_opts, cf_opts); + } + })}, +#endif // ROCKSDB_LITE +}; +static std::unordered_map<std::string, OptionTypeInfo> + composite_fs_wrapper_type_info = { +#ifndef ROCKSDB_LITE + {"file_system", + OptionTypeInfo::AsCustomSharedPtr<FileSystem>( + 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)}, +#endif // ROCKSDB_LITE +}; + +static std::unordered_map<std::string, OptionTypeInfo> + composite_clock_wrapper_type_info = { +#ifndef ROCKSDB_LITE + {"clock", + OptionTypeInfo::AsCustomSharedPtr<SystemClock>( + 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)}, +#endif // ROCKSDB_LITE +}; + +} // namespace + +std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) { + return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs)); +} + +CompositeEnvWrapper::CompositeEnvWrapper(Env* env, + const std::shared_ptr<FileSystem>& fs, + const std::shared_ptr<SystemClock>& sc) + : CompositeEnv(fs, sc), target_(env) { + RegisterOptions("", &target_, &env_wrapper_type_info); + RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info); + RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info); +} + +CompositeEnvWrapper::CompositeEnvWrapper(const std::shared_ptr<Env>& env, + const std::shared_ptr<FileSystem>& fs, + const std::shared_ptr<SystemClock>& sc) + : CompositeEnv(fs, sc), target_(env) { + RegisterOptions("", &target_, &env_wrapper_type_info); + RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info); + RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info); +} + +Status CompositeEnvWrapper::PrepareOptions(const ConfigOptions& options) { + target_.Prepare(); + if (file_system_ == nullptr) { + file_system_ = target_.env->GetFileSystem(); + } + if (system_clock_ == nullptr) { + system_clock_ = target_.env->GetSystemClock(); + } + return Env::PrepareOptions(options); +} + +#ifndef ROCKSDB_LITE +std::string CompositeEnvWrapper::SerializeOptions( + const ConfigOptions& config_options, const std::string& header) const { + auto options = CompositeEnv::SerializeOptions(config_options, header); + if (target_.env != nullptr && target_.env != Env::Default()) { + options.append("target="); + options.append(target_.env->ToString(config_options)); + } + return options; +} +#endif // ROCKSDB_LITE + +EnvWrapper::EnvWrapper(Env* t) : target_(t) { + RegisterOptions("", &target_, &env_wrapper_type_info); +} + +EnvWrapper::EnvWrapper(std::unique_ptr<Env>&& t) : target_(std::move(t)) { + RegisterOptions("", &target_, &env_wrapper_type_info); +} + +EnvWrapper::EnvWrapper(const std::shared_ptr<Env>& t) : target_(t) { + RegisterOptions("", &target_, &env_wrapper_type_info); +} + +EnvWrapper::~EnvWrapper() {} + +Status EnvWrapper::PrepareOptions(const ConfigOptions& options) { + target_.Prepare(); + return Env::PrepareOptions(options); +} + +#ifndef ROCKSDB_LITE +std::string EnvWrapper::SerializeOptions(const ConfigOptions& config_options, + const std::string& header) const { + auto parent = Env::SerializeOptions(config_options, ""); + if (config_options.IsShallow() || target_.env == nullptr || + target_.env == Env::Default()) { + return parent; + } else { + std::string result = header; + if (!StartsWith(parent, OptionTypeInfo::kIdPropName())) { + result.append(OptionTypeInfo::kIdPropName()).append("="); + } + result.append(parent); + if (!EndsWith(result, config_options.delimiter)) { + result.append(config_options.delimiter); + } + result.append("target=").append(target_.env->ToString(config_options)); + return result; + } +} +#endif // ROCKSDB_LITE + +} // namespace ROCKSDB_NAMESPACE |