summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/env/mock_env.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/env/mock_env.cc')
-rw-r--r--src/rocksdb/env/mock_env.cc1070
1 files changed, 1070 insertions, 0 deletions
diff --git a/src/rocksdb/env/mock_env.cc b/src/rocksdb/env/mock_env.cc
new file mode 100644
index 000000000..bfa7dc2f4
--- /dev/null
+++ b/src/rocksdb/env/mock_env.cc
@@ -0,0 +1,1070 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "env/mock_env.h"
+
+#include <algorithm>
+#include <chrono>
+
+#include "env/emulated_clock.h"
+#include "file/filename.h"
+#include "port/sys_time.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/utilities/options_type.h"
+#include "test_util/sync_point.h"
+#include "util/cast_util.h"
+#include "util/hash.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace {
+int64_t MaybeCurrentTime(const std::shared_ptr<SystemClock>& clock) {
+ int64_t time = 1337346000; // arbitrary fallback default
+ clock->GetCurrentTime(&time).PermitUncheckedError();
+ return time;
+}
+
+static std::unordered_map<std::string, OptionTypeInfo> time_elapse_type_info = {
+#ifndef ROCKSDB_LITE
+ {"time_elapse_only_sleep",
+ {0, OptionType::kBoolean, OptionVerificationType::kNormal,
+ OptionTypeFlags::kCompareNever,
+ [](const ConfigOptions& /*opts*/, const std::string& /*name*/,
+ const std::string& value, void* addr) {
+ auto clock = static_cast<EmulatedSystemClock*>(addr);
+ clock->SetTimeElapseOnlySleep(ParseBoolean("", value));
+ return Status::OK();
+ },
+ [](const ConfigOptions& /*opts*/, const std::string& /*name*/,
+ const void* addr, std::string* value) {
+ const auto clock = static_cast<const EmulatedSystemClock*>(addr);
+ *value = clock->IsTimeElapseOnlySleep() ? "true" : "false";
+ return Status::OK();
+ },
+ nullptr}},
+#endif // ROCKSDB_LITE
+};
+static std::unordered_map<std::string, OptionTypeInfo> mock_sleep_type_info = {
+#ifndef ROCKSDB_LITE
+ {"mock_sleep",
+ {0, OptionType::kBoolean, OptionVerificationType::kNormal,
+ OptionTypeFlags::kCompareNever,
+ [](const ConfigOptions& /*opts*/, const std::string& /*name*/,
+ const std::string& value, void* addr) {
+ auto clock = static_cast<EmulatedSystemClock*>(addr);
+ clock->SetMockSleep(ParseBoolean("", value));
+ return Status::OK();
+ },
+ [](const ConfigOptions& /*opts*/, const std::string& /*name*/,
+ const void* addr, std::string* value) {
+ const auto clock = static_cast<const EmulatedSystemClock*>(addr);
+ *value = clock->IsMockSleepEnabled() ? "true" : "false";
+ return Status::OK();
+ },
+ nullptr}},
+#endif // ROCKSDB_LITE
+};
+} // namespace
+
+EmulatedSystemClock::EmulatedSystemClock(
+ const std::shared_ptr<SystemClock>& base, bool time_elapse_only_sleep)
+ : SystemClockWrapper(base),
+ maybe_starting_time_(MaybeCurrentTime(base)),
+ time_elapse_only_sleep_(time_elapse_only_sleep),
+ no_slowdown_(time_elapse_only_sleep) {
+ RegisterOptions("", this, &time_elapse_type_info);
+ RegisterOptions("", this, &mock_sleep_type_info);
+}
+
+class MemFile {
+ public:
+ explicit MemFile(SystemClock* clock, const std::string& fn,
+ bool _is_lock_file = false)
+ : clock_(clock),
+ fn_(fn),
+ refs_(0),
+ is_lock_file_(_is_lock_file),
+ locked_(false),
+ size_(0),
+ modified_time_(Now()),
+ rnd_(Lower32of64(GetSliceNPHash64(fn))),
+ fsynced_bytes_(0) {}
+ // No copying allowed.
+ MemFile(const MemFile&) = delete;
+ void operator=(const MemFile&) = delete;
+
+ void Ref() {
+ MutexLock lock(&mutex_);
+ ++refs_;
+ }
+
+ bool is_lock_file() const { return is_lock_file_; }
+
+ bool Lock() {
+ assert(is_lock_file_);
+ MutexLock lock(&mutex_);
+ if (locked_) {
+ return false;
+ } else {
+ locked_ = true;
+ return true;
+ }
+ }
+
+ void Unlock() {
+ assert(is_lock_file_);
+ MutexLock lock(&mutex_);
+ locked_ = false;
+ }
+
+ void Unref() {
+ bool do_delete = false;
+ {
+ MutexLock lock(&mutex_);
+ --refs_;
+ assert(refs_ >= 0);
+ if (refs_ <= 0) {
+ do_delete = true;
+ }
+ }
+
+ if (do_delete) {
+ delete this;
+ }
+ }
+
+ uint64_t Size() const { return size_; }
+
+ void Truncate(size_t size, const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) {
+ MutexLock lock(&mutex_);
+ if (size < size_) {
+ data_.resize(size);
+ size_ = size;
+ }
+ }
+
+ void CorruptBuffer() {
+ if (fsynced_bytes_ >= size_) {
+ return;
+ }
+ uint64_t buffered_bytes = size_ - fsynced_bytes_;
+ uint64_t start =
+ fsynced_bytes_ + rnd_.Uniform(static_cast<int>(buffered_bytes));
+ uint64_t end = std::min(start + 512, size_.load());
+ MutexLock lock(&mutex_);
+ for (uint64_t pos = start; pos < end; ++pos) {
+ data_[static_cast<size_t>(pos)] = static_cast<char>(rnd_.Uniform(256));
+ }
+ }
+
+ IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
+ Slice* result, char* scratch, IODebugContext* /*dbg*/) const {
+ {
+ IOStatus s;
+ TEST_SYNC_POINT_CALLBACK("MemFile::Read:IOStatus", &s);
+ if (!s.ok()) {
+ // with sync point only
+ *result = Slice();
+ return s;
+ }
+ }
+ MutexLock lock(&mutex_);
+ const uint64_t available = Size() - std::min(Size(), offset);
+ size_t offset_ = static_cast<size_t>(offset);
+ if (n > available) {
+ n = static_cast<size_t>(available);
+ }
+ if (n == 0) {
+ *result = Slice();
+ return IOStatus::OK();
+ }
+ if (scratch) {
+ memcpy(scratch, &(data_[offset_]), n);
+ *result = Slice(scratch, n);
+ } else {
+ *result = Slice(&(data_[offset_]), n);
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus Write(uint64_t offset, const Slice& data,
+ const IOOptions& /*options*/, IODebugContext* /*dbg*/) {
+ MutexLock lock(&mutex_);
+ size_t offset_ = static_cast<size_t>(offset);
+ if (offset + data.size() > data_.size()) {
+ data_.resize(offset_ + data.size());
+ }
+ data_.replace(offset_, data.size(), data.data(), data.size());
+ size_ = data_.size();
+ modified_time_ = Now();
+ return IOStatus::OK();
+ }
+
+ IOStatus Append(const Slice& data, const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) {
+ MutexLock lock(&mutex_);
+ data_.append(data.data(), data.size());
+ size_ = data_.size();
+ modified_time_ = Now();
+ return IOStatus::OK();
+ }
+
+ IOStatus Fsync(const IOOptions& /*options*/, IODebugContext* /*dbg*/) {
+ fsynced_bytes_ = size_.load();
+ return IOStatus::OK();
+ }
+
+ uint64_t ModifiedTime() const { return modified_time_; }
+
+ private:
+ uint64_t Now() {
+ int64_t unix_time = 0;
+ auto s = clock_->GetCurrentTime(&unix_time);
+ assert(s.ok());
+ return static_cast<uint64_t>(unix_time);
+ }
+
+ // Private since only Unref() should be used to delete it.
+ ~MemFile() { assert(refs_ == 0); }
+
+ SystemClock* clock_;
+ const std::string fn_;
+ mutable port::Mutex mutex_;
+ int refs_;
+ bool is_lock_file_;
+ bool locked_;
+
+ // Data written into this file, all bytes before fsynced_bytes are
+ // persistent.
+ std::string data_;
+ std::atomic<uint64_t> size_;
+ std::atomic<uint64_t> modified_time_;
+
+ Random rnd_;
+ std::atomic<uint64_t> fsynced_bytes_;
+};
+
+namespace {
+
+class MockSequentialFile : public FSSequentialFile {
+ public:
+ explicit MockSequentialFile(MemFile* file, const FileOptions& opts)
+ : file_(file),
+ use_direct_io_(opts.use_direct_reads),
+ use_mmap_read_(opts.use_mmap_reads),
+ pos_(0) {
+ file_->Ref();
+ }
+
+ ~MockSequentialFile() override { file_->Unref(); }
+
+ IOStatus Read(size_t n, const IOOptions& options, Slice* result,
+ char* scratch, IODebugContext* dbg) override {
+ IOStatus s = file_->Read(pos_, n, options, result,
+ (use_mmap_read_) ? nullptr : scratch, dbg);
+ if (s.ok()) {
+ pos_ += result->size();
+ }
+ return s;
+ }
+
+ bool use_direct_io() const override { return use_direct_io_; }
+ IOStatus Skip(uint64_t n) override {
+ if (pos_ > file_->Size()) {
+ return IOStatus::IOError("pos_ > file_->Size()");
+ }
+ const uint64_t available = file_->Size() - pos_;
+ if (n > available) {
+ n = available;
+ }
+ pos_ += static_cast<size_t>(n);
+ return IOStatus::OK();
+ }
+
+ private:
+ MemFile* file_;
+ bool use_direct_io_;
+ bool use_mmap_read_;
+ size_t pos_;
+};
+
+class MockRandomAccessFile : public FSRandomAccessFile {
+ public:
+ explicit MockRandomAccessFile(MemFile* file, const FileOptions& opts)
+ : file_(file),
+ use_direct_io_(opts.use_direct_reads),
+ use_mmap_read_(opts.use_mmap_reads) {
+ file_->Ref();
+ }
+
+ ~MockRandomAccessFile() override { file_->Unref(); }
+
+ bool use_direct_io() const override { return use_direct_io_; }
+
+ IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
+ Slice* result, char* scratch,
+ IODebugContext* dbg) const override {
+ if (use_mmap_read_) {
+ return file_->Read(offset, n, options, result, nullptr, dbg);
+ } else {
+ return file_->Read(offset, n, options, result, scratch, dbg);
+ }
+ }
+
+ private:
+ MemFile* file_;
+ bool use_direct_io_;
+ bool use_mmap_read_;
+};
+
+class MockRandomRWFile : public FSRandomRWFile {
+ public:
+ explicit MockRandomRWFile(MemFile* file) : file_(file) { file_->Ref(); }
+
+ ~MockRandomRWFile() override { file_->Unref(); }
+
+ IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& options,
+ IODebugContext* dbg) override {
+ return file_->Write(offset, data, options, dbg);
+ }
+
+ IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
+ Slice* result, char* scratch,
+ IODebugContext* dbg) const override {
+ return file_->Read(offset, n, options, result, scratch, dbg);
+ }
+
+ IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
+ return file_->Fsync(options, dbg);
+ }
+
+ IOStatus Flush(const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
+ return file_->Fsync(options, dbg);
+ }
+
+ private:
+ MemFile* file_;
+};
+
+class MockWritableFile : public FSWritableFile {
+ public:
+ MockWritableFile(MemFile* file, const FileOptions& opts)
+ : file_(file),
+ use_direct_io_(opts.use_direct_writes),
+ rate_limiter_(opts.rate_limiter) {
+ file_->Ref();
+ }
+
+ ~MockWritableFile() override { file_->Unref(); }
+
+ bool use_direct_io() const override { return false && use_direct_io_; }
+
+ using FSWritableFile::Append;
+ IOStatus Append(const Slice& data, const IOOptions& options,
+ IODebugContext* dbg) override {
+ size_t bytes_written = 0;
+ while (bytes_written < data.size()) {
+ auto bytes = RequestToken(data.size() - bytes_written);
+ IOStatus s = file_->Append(Slice(data.data() + bytes_written, bytes),
+ options, dbg);
+ if (!s.ok()) {
+ return s;
+ }
+ bytes_written += bytes;
+ }
+ return IOStatus::OK();
+ }
+
+ using FSWritableFile::PositionedAppend;
+ IOStatus PositionedAppend(const Slice& data, uint64_t /*offset*/,
+ const IOOptions& options,
+ IODebugContext* dbg) override {
+ assert(use_direct_io_);
+ return Append(data, options, dbg);
+ }
+
+ IOStatus Truncate(uint64_t size, const IOOptions& options,
+ IODebugContext* dbg) override {
+ file_->Truncate(static_cast<size_t>(size), options, dbg);
+ return IOStatus::OK();
+ }
+ IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
+ return file_->Fsync(options, dbg);
+ }
+
+ IOStatus Flush(const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
+ return file_->Fsync(options, dbg);
+ }
+
+ uint64_t GetFileSize(const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return file_->Size();
+ }
+
+ private:
+ inline size_t RequestToken(size_t bytes) {
+ if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) {
+ bytes = std::min(
+ bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
+ rate_limiter_->Request(bytes, io_priority_);
+ }
+ return bytes;
+ }
+
+ MemFile* file_;
+ bool use_direct_io_;
+ RateLimiter* rate_limiter_;
+};
+
+class MockEnvDirectory : public FSDirectory {
+ public:
+ IOStatus Fsync(const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ IOStatus Close(const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+};
+
+class MockEnvFileLock : public FileLock {
+ public:
+ explicit MockEnvFileLock(const std::string& fname) : fname_(fname) {}
+
+ std::string FileName() const { return fname_; }
+
+ private:
+ const std::string fname_;
+};
+
+class TestMemLogger : public Logger {
+ private:
+ std::unique_ptr<FSWritableFile> file_;
+ std::atomic_size_t log_size_;
+ static const uint64_t flush_every_seconds_ = 5;
+ std::atomic_uint_fast64_t last_flush_micros_;
+ SystemClock* clock_;
+ IOOptions options_;
+ IODebugContext* dbg_;
+ std::atomic<bool> flush_pending_;
+
+ public:
+ TestMemLogger(std::unique_ptr<FSWritableFile> f, SystemClock* clock,
+ const IOOptions& options, IODebugContext* dbg,
+ const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
+ : Logger(log_level),
+ file_(std::move(f)),
+ log_size_(0),
+ last_flush_micros_(0),
+ clock_(clock),
+ options_(options),
+ dbg_(dbg),
+ flush_pending_(false) {}
+ ~TestMemLogger() override {}
+
+ void Flush() override {
+ if (flush_pending_) {
+ flush_pending_ = false;
+ }
+ last_flush_micros_ = clock_->NowMicros();
+ }
+
+ using Logger::Logv;
+ void Logv(const char* format, va_list ap) override {
+ // We try twice: the first time with a fixed-size stack allocated buffer,
+ // and the second time with a much larger dynamically allocated buffer.
+ char buffer[500];
+ for (int iter = 0; iter < 2; iter++) {
+ char* base;
+ int bufsize;
+ if (iter == 0) {
+ bufsize = sizeof(buffer);
+ base = buffer;
+ } else {
+ bufsize = 30000;
+ base = new char[bufsize];
+ }
+ char* p = base;
+ char* limit = base + bufsize;
+
+ port::TimeVal now_tv;
+ port::GetTimeOfDay(&now_tv, nullptr);
+ const time_t seconds = now_tv.tv_sec;
+ struct tm t;
+ memset(&t, 0, sizeof(t));
+ struct tm* ret __attribute__((__unused__));
+ ret = port::LocalTimeR(&seconds, &t);
+ assert(ret);
+ p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d ",
+ t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour,
+ t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec));
+
+ // Print the message
+ if (p < limit) {
+ va_list backup_ap;
+ va_copy(backup_ap, ap);
+ p += vsnprintf(p, limit - p, format, backup_ap);
+ va_end(backup_ap);
+ }
+
+ // Truncate to available space if necessary
+ if (p >= limit) {
+ if (iter == 0) {
+ continue; // Try again with larger buffer
+ } else {
+ p = limit - 1;
+ }
+ }
+
+ // Add newline if necessary
+ if (p == base || p[-1] != '\n') {
+ *p++ = '\n';
+ }
+
+ assert(p <= limit);
+ const size_t write_size = p - base;
+
+ Status s = file_->Append(Slice(base, write_size), options_, dbg_);
+ if (s.ok()) {
+ flush_pending_ = true;
+ log_size_ += write_size;
+ }
+ uint64_t now_micros =
+ static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec;
+ if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
+ flush_pending_ = false;
+ last_flush_micros_ = now_micros;
+ }
+ if (base != buffer) {
+ delete[] base;
+ }
+ break;
+ }
+ }
+ size_t GetLogFileSize() const override { return log_size_; }
+};
+
+static std::unordered_map<std::string, OptionTypeInfo> mock_fs_type_info = {
+#ifndef ROCKSDB_LITE
+ {"supports_direct_io",
+ {0, OptionType::kBoolean, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+#endif // ROCKSDB_LITE
+};
+} // namespace
+
+MockFileSystem::MockFileSystem(const std::shared_ptr<SystemClock>& clock,
+ bool supports_direct_io)
+ : system_clock_(clock), supports_direct_io_(supports_direct_io) {
+ clock_ = system_clock_.get();
+ RegisterOptions("", &supports_direct_io_, &mock_fs_type_info);
+}
+
+MockFileSystem::~MockFileSystem() {
+ for (auto i = file_map_.begin(); i != file_map_.end(); ++i) {
+ i->second->Unref();
+ }
+}
+
+Status MockFileSystem::PrepareOptions(const ConfigOptions& options) {
+ Status s = FileSystem::PrepareOptions(options);
+ if (s.ok() && system_clock_ == SystemClock::Default()) {
+ system_clock_ = options.env->GetSystemClock();
+ clock_ = system_clock_.get();
+ }
+ return s;
+}
+
+IOStatus MockFileSystem::GetAbsolutePath(const std::string& db_path,
+ const IOOptions& /*options*/,
+ std::string* output_path,
+ IODebugContext* /*dbg*/) {
+ *output_path = NormalizeMockPath(db_path);
+ if (output_path->at(0) != '/') {
+ return IOStatus::NotSupported("GetAbsolutePath");
+ } else {
+ return IOStatus::OK();
+ }
+}
+
+std::string MockFileSystem::NormalizeMockPath(const std::string& path) {
+ std::string p = NormalizePath(path);
+ if (p.back() == kFilePathSeparator && p.size() > 1) {
+ p.pop_back();
+ }
+ return p;
+}
+
+// Partial implementation of the FileSystem interface.
+IOStatus MockFileSystem::NewSequentialFile(
+ const std::string& fname, const FileOptions& file_opts,
+ std::unique_ptr<FSSequentialFile>* result, IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) == file_map_.end()) {
+ *result = nullptr;
+ return IOStatus::PathNotFound(fn);
+ }
+ auto* f = file_map_[fn];
+ if (f->is_lock_file()) {
+ return IOStatus::InvalidArgument(fn, "Cannot open a lock file.");
+ } else if (file_opts.use_direct_reads && !supports_direct_io_) {
+ return IOStatus::NotSupported("Direct I/O Not Supported");
+ } else {
+ result->reset(new MockSequentialFile(f, file_opts));
+ return IOStatus::OK();
+ }
+}
+
+IOStatus MockFileSystem::NewRandomAccessFile(
+ const std::string& fname, const FileOptions& file_opts,
+ std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) == file_map_.end()) {
+ *result = nullptr;
+ return IOStatus::PathNotFound(fn);
+ }
+ auto* f = file_map_[fn];
+ if (f->is_lock_file()) {
+ return IOStatus::InvalidArgument(fn, "Cannot open a lock file.");
+ } else if (file_opts.use_direct_reads && !supports_direct_io_) {
+ return IOStatus::NotSupported("Direct I/O Not Supported");
+ } else {
+ result->reset(new MockRandomAccessFile(f, file_opts));
+ return IOStatus::OK();
+ }
+}
+
+IOStatus MockFileSystem::NewRandomRWFile(
+ const std::string& fname, const FileOptions& /*file_opts*/,
+ std::unique_ptr<FSRandomRWFile>* result, IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) == file_map_.end()) {
+ *result = nullptr;
+ return IOStatus::PathNotFound(fn);
+ }
+ auto* f = file_map_[fn];
+ if (f->is_lock_file()) {
+ return IOStatus::InvalidArgument(fn, "Cannot open a lock file.");
+ }
+ result->reset(new MockRandomRWFile(f));
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::ReuseWritableFile(
+ const std::string& fname, const std::string& old_fname,
+ const FileOptions& options, std::unique_ptr<FSWritableFile>* result,
+ IODebugContext* dbg) {
+ auto s = RenameFile(old_fname, fname, IOOptions(), dbg);
+ if (!s.ok()) {
+ return s;
+ } else {
+ result->reset();
+ return NewWritableFile(fname, options, result, dbg);
+ }
+}
+
+IOStatus MockFileSystem::NewWritableFile(
+ const std::string& fname, const FileOptions& file_opts,
+ std::unique_ptr<FSWritableFile>* result, IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) != file_map_.end()) {
+ DeleteFileInternal(fn);
+ }
+ MemFile* file = new MemFile(clock_, fn, false);
+ file->Ref();
+ file_map_[fn] = file;
+ if (file_opts.use_direct_writes && !supports_direct_io_) {
+ return IOStatus::NotSupported("Direct I/O Not Supported");
+ } else {
+ result->reset(new MockWritableFile(file, file_opts));
+ return IOStatus::OK();
+ }
+}
+
+IOStatus MockFileSystem::ReopenWritableFile(
+ const std::string& fname, const FileOptions& file_opts,
+ std::unique_ptr<FSWritableFile>* result, IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ MemFile* file = nullptr;
+ if (file_map_.find(fn) == file_map_.end()) {
+ file = new MemFile(clock_, fn, false);
+ // Only take a reference when we create the file objectt
+ file->Ref();
+ file_map_[fn] = file;
+ } else {
+ file = file_map_[fn];
+ }
+ if (file_opts.use_direct_writes && !supports_direct_io_) {
+ return IOStatus::NotSupported("Direct I/O Not Supported");
+ } else {
+ result->reset(new MockWritableFile(file, file_opts));
+ return IOStatus::OK();
+ }
+}
+
+IOStatus MockFileSystem::NewDirectory(const std::string& /*name*/,
+ const IOOptions& /*io_opts*/,
+ std::unique_ptr<FSDirectory>* result,
+ IODebugContext* /*dbg*/) {
+ result->reset(new MockEnvDirectory());
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::FileExists(const std::string& fname,
+ const IOOptions& /*io_opts*/,
+ IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) != file_map_.end()) {
+ // File exists
+ return IOStatus::OK();
+ }
+ // Now also check if fn exists as a dir
+ for (const auto& iter : file_map_) {
+ const std::string& filename = iter.first;
+ if (filename.size() >= fn.size() + 1 && filename[fn.size()] == '/' &&
+ Slice(filename).starts_with(Slice(fn))) {
+ return IOStatus::OK();
+ }
+ }
+ return IOStatus::NotFound();
+}
+
+bool MockFileSystem::GetChildrenInternal(const std::string& dir,
+ std::vector<std::string>* result) {
+ auto d = NormalizeMockPath(dir);
+ bool found_dir = false;
+ result->clear();
+ for (const auto& iter : file_map_) {
+ const std::string& filename = iter.first;
+
+ if (filename == d) {
+ found_dir = true;
+ } else if (filename.size() >= d.size() + 1 && filename[d.size()] == '/' &&
+ Slice(filename).starts_with(Slice(d))) {
+ found_dir = true;
+ size_t next_slash = filename.find('/', d.size() + 1);
+ if (next_slash != std::string::npos) {
+ result->push_back(
+ filename.substr(d.size() + 1, next_slash - d.size() - 1));
+ } else {
+ result->push_back(filename.substr(d.size() + 1));
+ }
+ }
+ }
+ result->erase(std::unique(result->begin(), result->end()), result->end());
+ return found_dir;
+}
+
+IOStatus MockFileSystem::GetChildren(const std::string& dir,
+ const IOOptions& /*options*/,
+ std::vector<std::string>* result,
+ IODebugContext* /*dbg*/) {
+ MutexLock lock(&mutex_);
+ bool found_dir = GetChildrenInternal(dir, result);
+#ifndef __clang_analyzer__
+ return found_dir ? IOStatus::OK() : IOStatus::NotFound(dir);
+#else
+ return found_dir ? IOStatus::OK() : IOStatus::NotFound();
+#endif
+}
+
+void MockFileSystem::DeleteFileInternal(const std::string& fname) {
+ assert(fname == NormalizeMockPath(fname));
+ const auto& pair = file_map_.find(fname);
+ if (pair != file_map_.end()) {
+ pair->second->Unref();
+ file_map_.erase(fname);
+ }
+}
+
+IOStatus MockFileSystem::DeleteFile(const std::string& fname,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) == file_map_.end()) {
+ return IOStatus::PathNotFound(fn);
+ }
+
+ DeleteFileInternal(fn);
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::Truncate(const std::string& fname, size_t size,
+ const IOOptions& options,
+ IODebugContext* dbg) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ if (iter == file_map_.end()) {
+ return IOStatus::PathNotFound(fn);
+ }
+ iter->second->Truncate(size, options, dbg);
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::CreateDir(const std::string& dirname,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) {
+ auto dn = NormalizeMockPath(dirname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(dn) == file_map_.end()) {
+ MemFile* file = new MemFile(clock_, dn, false);
+ file->Ref();
+ file_map_[dn] = file;
+ } else {
+ return IOStatus::IOError();
+ }
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::CreateDirIfMissing(const std::string& dirname,
+ const IOOptions& options,
+ IODebugContext* dbg) {
+ CreateDir(dirname, options, dbg).PermitUncheckedError();
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::DeleteDir(const std::string& dirname,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) {
+ auto dir = NormalizeMockPath(dirname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(dir) == file_map_.end()) {
+ return IOStatus::PathNotFound(dir);
+ } else {
+ std::vector<std::string> children;
+ if (GetChildrenInternal(dir, &children)) {
+ for (const auto& child : children) {
+ DeleteFileInternal(child);
+ }
+ }
+ DeleteFileInternal(dir);
+ return IOStatus::OK();
+ }
+}
+
+IOStatus MockFileSystem::GetFileSize(const std::string& fname,
+ const IOOptions& /*options*/,
+ uint64_t* file_size,
+ IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+ TEST_SYNC_POINT_CALLBACK("MockFileSystem::GetFileSize:CheckFileType", &fn);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ if (iter == file_map_.end()) {
+ return IOStatus::PathNotFound(fn);
+ }
+
+ *file_size = iter->second->Size();
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::GetFileModificationTime(const std::string& fname,
+ const IOOptions& /*options*/,
+ uint64_t* time,
+ IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ if (iter == file_map_.end()) {
+ return IOStatus::PathNotFound(fn);
+ }
+ *time = iter->second->ModifiedTime();
+ return IOStatus::OK();
+}
+
+bool MockFileSystem::RenameFileInternal(const std::string& src,
+ const std::string& dest) {
+ if (file_map_.find(src) == file_map_.end()) {
+ return false;
+ } else {
+ std::vector<std::string> children;
+ if (GetChildrenInternal(src, &children)) {
+ for (const auto& child : children) {
+ RenameFileInternal(src + "/" + child, dest + "/" + child);
+ }
+ }
+ DeleteFileInternal(dest);
+ file_map_[dest] = file_map_[src];
+ file_map_.erase(src);
+ return true;
+ }
+}
+
+IOStatus MockFileSystem::RenameFile(const std::string& src,
+ const std::string& dest,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) {
+ auto s = NormalizeMockPath(src);
+ auto t = NormalizeMockPath(dest);
+ MutexLock lock(&mutex_);
+ bool found = RenameFileInternal(s, t);
+ if (!found) {
+ return IOStatus::PathNotFound(s);
+ } else {
+ return IOStatus::OK();
+ }
+}
+
+IOStatus MockFileSystem::LinkFile(const std::string& src,
+ const std::string& dest,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) {
+ auto s = NormalizeMockPath(src);
+ auto t = NormalizeMockPath(dest);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(s) == file_map_.end()) {
+ return IOStatus::PathNotFound(s);
+ }
+
+ DeleteFileInternal(t);
+ file_map_[t] = file_map_[s];
+ file_map_[t]->Ref(); // Otherwise it might get deleted when noone uses s
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::NewLogger(const std::string& fname,
+ const IOOptions& io_opts,
+ std::shared_ptr<Logger>* result,
+ IODebugContext* dbg) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ MemFile* file = nullptr;
+ if (iter == file_map_.end()) {
+ file = new MemFile(clock_, fn, false);
+ file->Ref();
+ file_map_[fn] = file;
+ } else {
+ file = iter->second;
+ }
+ std::unique_ptr<FSWritableFile> f(new MockWritableFile(file, FileOptions()));
+ result->reset(new TestMemLogger(std::move(f), clock_, io_opts, dbg));
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::LockFile(const std::string& fname,
+ const IOOptions& /*options*/,
+ FileLock** flock, IODebugContext* /*dbg*/) {
+ auto fn = NormalizeMockPath(fname);
+ {
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) != file_map_.end()) {
+ if (!file_map_[fn]->is_lock_file()) {
+ return IOStatus::InvalidArgument(fname, "Not a lock file.");
+ }
+ if (!file_map_[fn]->Lock()) {
+ return IOStatus::IOError(fn, "lock is already held.");
+ }
+ } else {
+ auto* file = new MemFile(clock_, fn, true);
+ file->Ref();
+ file->Lock();
+ file_map_[fn] = file;
+ }
+ }
+ *flock = new MockEnvFileLock(fn);
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::UnlockFile(FileLock* flock,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) {
+ std::string fn = static_cast_with_check<MockEnvFileLock>(flock)->FileName();
+ {
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) != file_map_.end()) {
+ if (!file_map_[fn]->is_lock_file()) {
+ return IOStatus::InvalidArgument(fn, "Not a lock file.");
+ }
+ file_map_[fn]->Unlock();
+ }
+ }
+ delete flock;
+ return IOStatus::OK();
+}
+
+IOStatus MockFileSystem::GetTestDirectory(const IOOptions& /*options*/,
+ std::string* path,
+ IODebugContext* /*dbg*/) {
+ *path = "/test";
+ return IOStatus::OK();
+}
+
+Status MockFileSystem::CorruptBuffer(const std::string& fname) {
+ auto fn = NormalizeMockPath(fname);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ if (iter == file_map_.end()) {
+ return Status::IOError(fn, "File not found");
+ }
+ iter->second->CorruptBuffer();
+ return Status::OK();
+}
+
+MockEnv::MockEnv(Env* env, const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<SystemClock>& clock)
+ : CompositeEnvWrapper(env, fs, clock) {}
+
+MockEnv* MockEnv::Create(Env* env) {
+ auto clock =
+ std::make_shared<EmulatedSystemClock>(env->GetSystemClock(), true);
+ return MockEnv::Create(env, clock);
+}
+
+MockEnv* MockEnv::Create(Env* env, const std::shared_ptr<SystemClock>& clock) {
+ auto fs = std::make_shared<MockFileSystem>(clock);
+ return new MockEnv(env, fs, clock);
+}
+
+Status MockEnv::CorruptBuffer(const std::string& fname) {
+ auto mock = static_cast_with_check<MockFileSystem>(GetFileSystem().get());
+ return mock->CorruptBuffer(fname);
+}
+
+#ifndef ROCKSDB_LITE
+// This is to maintain the behavior before swithcing from InMemoryEnv to MockEnv
+Env* NewMemEnv(Env* base_env) { return MockEnv::Create(base_env); }
+
+#else // ROCKSDB_LITE
+
+Env* NewMemEnv(Env* /*base_env*/) { return nullptr; }
+
+#endif // !ROCKSDB_LITE
+
+} // namespace ROCKSDB_NAMESPACE