diff options
Diffstat (limited to 'src/rocksdb/env/mock_env.cc')
-rw-r--r-- | src/rocksdb/env/mock_env.cc | 775 |
1 files changed, 775 insertions, 0 deletions
diff --git a/src/rocksdb/env/mock_env.cc b/src/rocksdb/env/mock_env.cc new file mode 100644 index 00000000..793a0837 --- /dev/null +++ b/src/rocksdb/env/mock_env.cc @@ -0,0 +1,775 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "env/mock_env.h" +#include <algorithm> +#include <chrono> +#include "port/sys_time.h" +#include "util/cast_util.h" +#include "util/murmurhash.h" +#include "util/random.h" +#include "util/rate_limiter.h" + +namespace rocksdb { + +class MemFile { + public: + explicit MemFile(Env* env, const std::string& fn, bool _is_lock_file = false) + : env_(env), + fn_(fn), + refs_(0), + is_lock_file_(_is_lock_file), + locked_(false), + size_(0), + modified_time_(Now()), + rnd_(static_cast<uint32_t>( + MurmurHash(fn.data(), static_cast<int>(fn.size()), 0))), + fsynced_bytes_(0) {} + + void Ref() { + MutexLock lock(&mutex_); + ++refs_; + } + + bool is_lock_file() const { return is_lock_file_; } + + bool Lock() { + assert(is_lock_file_); + MutexLock lock(&mutex_); + if (locked_) { + return false; + } else { + locked_ = true; + return true; + } + } + + void Unlock() { + assert(is_lock_file_); + MutexLock lock(&mutex_); + locked_ = false; + } + + void Unref() { + bool do_delete = false; + { + MutexLock lock(&mutex_); + --refs_; + assert(refs_ >= 0); + if (refs_ <= 0) { + do_delete = true; + } + } + + if (do_delete) { + delete this; + } + } + + uint64_t Size() const { return size_; } + + void Truncate(size_t size) { + MutexLock lock(&mutex_); + if (size < size_) { + data_.resize(size); + size_ = size; + } + } + + void CorruptBuffer() { + if (fsynced_bytes_ >= size_) { + return; + } + uint64_t buffered_bytes = size_ - fsynced_bytes_; + uint64_t start = + fsynced_bytes_ + rnd_.Uniform(static_cast<int>(buffered_bytes)); + uint64_t end = std::min(start + 512, size_.load()); + MutexLock lock(&mutex_); + for (uint64_t pos = start; pos < end; ++pos) { + data_[static_cast<size_t>(pos)] = static_cast<char>(rnd_.Uniform(256)); + } + } + + Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const { + MutexLock lock(&mutex_); + const uint64_t available = Size() - std::min(Size(), offset); + size_t offset_ = static_cast<size_t>(offset); + if (n > available) { + n = static_cast<size_t>(available); + } + if (n == 0) { + *result = Slice(); + return Status::OK(); + } + if (scratch) { + memcpy(scratch, &(data_[offset_]), n); + *result = Slice(scratch, n); + } else { + *result = Slice(&(data_[offset_]), n); + } + return Status::OK(); + } + + Status Write(uint64_t offset, const Slice& data) { + MutexLock lock(&mutex_); + size_t offset_ = static_cast<size_t>(offset); + if (offset + data.size() > data_.size()) { + data_.resize(offset_ + data.size()); + } + data_.replace(offset_, data.size(), data.data(), data.size()); + size_ = data_.size(); + modified_time_ = Now(); + return Status::OK(); + } + + Status Append(const Slice& data) { + MutexLock lock(&mutex_); + data_.append(data.data(), data.size()); + size_ = data_.size(); + modified_time_ = Now(); + return Status::OK(); + } + + Status Fsync() { + fsynced_bytes_ = size_.load(); + return Status::OK(); + } + + uint64_t ModifiedTime() const { return modified_time_; } + + private: + uint64_t Now() { + int64_t unix_time = 0; + auto s = env_->GetCurrentTime(&unix_time); + assert(s.ok()); + return static_cast<uint64_t>(unix_time); + } + + // Private since only Unref() should be used to delete it. + ~MemFile() { assert(refs_ == 0); } + + // No copying allowed. + MemFile(const MemFile&); + void operator=(const MemFile&); + + Env* env_; + const std::string fn_; + mutable port::Mutex mutex_; + int refs_; + bool is_lock_file_; + bool locked_; + + // Data written into this file, all bytes before fsynced_bytes are + // persistent. + std::string data_; + std::atomic<uint64_t> size_; + std::atomic<uint64_t> modified_time_; + + Random rnd_; + std::atomic<uint64_t> fsynced_bytes_; +}; + +namespace { + +class MockSequentialFile : public SequentialFile { + public: + explicit MockSequentialFile(MemFile* file) : file_(file), pos_(0) { + file_->Ref(); + } + + ~MockSequentialFile() override { file_->Unref(); } + + Status Read(size_t n, Slice* result, char* scratch) override { + Status s = file_->Read(pos_, n, result, scratch); + if (s.ok()) { + pos_ += result->size(); + } + return s; + } + + Status Skip(uint64_t n) override { + if (pos_ > file_->Size()) { + return Status::IOError("pos_ > file_->Size()"); + } + const uint64_t available = file_->Size() - pos_; + if (n > available) { + n = available; + } + pos_ += static_cast<size_t>(n); + return Status::OK(); + } + + private: + MemFile* file_; + size_t pos_; +}; + +class MockRandomAccessFile : public RandomAccessFile { + public: + explicit MockRandomAccessFile(MemFile* file) : file_(file) { file_->Ref(); } + + ~MockRandomAccessFile() override { file_->Unref(); } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + return file_->Read(offset, n, result, scratch); + } + + private: + MemFile* file_; +}; + +class MockRandomRWFile : public RandomRWFile { + public: + explicit MockRandomRWFile(MemFile* file) : file_(file) { file_->Ref(); } + + ~MockRandomRWFile() override { file_->Unref(); } + + Status Write(uint64_t offset, const Slice& data) override { + return file_->Write(offset, data); + } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + return file_->Read(offset, n, result, scratch); + } + + Status Close() override { return file_->Fsync(); } + + Status Flush() override { return Status::OK(); } + + Status Sync() override { return file_->Fsync(); } + + private: + MemFile* file_; +}; + +class MockWritableFile : public WritableFile { + public: + MockWritableFile(MemFile* file, RateLimiter* rate_limiter) + : file_(file), rate_limiter_(rate_limiter) { + file_->Ref(); + } + + ~MockWritableFile() override { file_->Unref(); } + + Status Append(const Slice& data) override { + size_t bytes_written = 0; + while (bytes_written < data.size()) { + auto bytes = RequestToken(data.size() - bytes_written); + Status s = file_->Append(Slice(data.data() + bytes_written, bytes)); + if (!s.ok()) { + return s; + } + bytes_written += bytes; + } + return Status::OK(); + } + Status Truncate(uint64_t size) override { + file_->Truncate(static_cast<size_t>(size)); + return Status::OK(); + } + Status Close() override { return file_->Fsync(); } + + Status Flush() override { return Status::OK(); } + + Status Sync() override { return file_->Fsync(); } + + uint64_t GetFileSize() override { return file_->Size(); } + + private: + inline size_t RequestToken(size_t bytes) { + if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) { + bytes = std::min( + bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes())); + rate_limiter_->Request(bytes, io_priority_); + } + return bytes; + } + + MemFile* file_; + RateLimiter* rate_limiter_; +}; + +class MockEnvDirectory : public Directory { + public: + Status Fsync() override { return Status::OK(); } +}; + +class MockEnvFileLock : public FileLock { + public: + explicit MockEnvFileLock(const std::string& fname) : fname_(fname) {} + + std::string FileName() const { return fname_; } + + private: + const std::string fname_; +}; + +class TestMemLogger : public Logger { + private: + std::unique_ptr<WritableFile> file_; + std::atomic_size_t log_size_; + static const uint64_t flush_every_seconds_ = 5; + std::atomic_uint_fast64_t last_flush_micros_; + Env* env_; + std::atomic<bool> flush_pending_; + + public: + TestMemLogger(std::unique_ptr<WritableFile> f, Env* env, + const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL) + : Logger(log_level), + file_(std::move(f)), + log_size_(0), + last_flush_micros_(0), + env_(env), + flush_pending_(false) {} + ~TestMemLogger() override {} + + void Flush() override { + if (flush_pending_) { + flush_pending_ = false; + } + last_flush_micros_ = env_->NowMicros(); + } + + using Logger::Logv; + void Logv(const char* format, va_list ap) override { + // We try twice: the first time with a fixed-size stack allocated buffer, + // and the second time with a much larger dynamically allocated buffer. + char buffer[500]; + for (int iter = 0; iter < 2; iter++) { + char* base; + int bufsize; + if (iter == 0) { + bufsize = sizeof(buffer); + base = buffer; + } else { + bufsize = 30000; + base = new char[bufsize]; + } + char* p = base; + char* limit = base + bufsize; + + struct timeval now_tv; + gettimeofday(&now_tv, nullptr); + const time_t seconds = now_tv.tv_sec; + struct tm t; + memset(&t, 0, sizeof(t)); + struct tm* ret __attribute__((__unused__)); + ret = localtime_r(&seconds, &t); + assert(ret); + p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d ", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, + t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec)); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Truncate to available space if necessary + if (p >= limit) { + if (iter == 0) { + continue; // Try again with larger buffer + } else { + p = limit - 1; + } + } + + // Add newline if necessary + if (p == base || p[-1] != '\n') { + *p++ = '\n'; + } + + assert(p <= limit); + const size_t write_size = p - base; + + file_->Append(Slice(base, write_size)); + flush_pending_ = true; + log_size_ += write_size; + uint64_t now_micros = + static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec; + if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { + flush_pending_ = false; + last_flush_micros_ = now_micros; + } + if (base != buffer) { + delete[] base; + } + break; + } + } + size_t GetLogFileSize() const override { return log_size_; } +}; + +} // Anonymous namespace + +MockEnv::MockEnv(Env* base_env) : EnvWrapper(base_env), fake_sleep_micros_(0) {} + +MockEnv::~MockEnv() { + for (FileSystem::iterator i = file_map_.begin(); i != file_map_.end(); ++i) { + i->second->Unref(); + } +} + +// Partial implementation of the Env interface. +Status MockEnv::NewSequentialFile(const std::string& fname, + std::unique_ptr<SequentialFile>* result, + const EnvOptions& /*soptions*/) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) == file_map_.end()) { + *result = nullptr; + return Status::IOError(fn, "File not found"); + } + auto* f = file_map_[fn]; + if (f->is_lock_file()) { + return Status::InvalidArgument(fn, "Cannot open a lock file."); + } + result->reset(new MockSequentialFile(f)); + return Status::OK(); +} + +Status MockEnv::NewRandomAccessFile(const std::string& fname, + std::unique_ptr<RandomAccessFile>* result, + const EnvOptions& /*soptions*/) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) == file_map_.end()) { + *result = nullptr; + return Status::IOError(fn, "File not found"); + } + auto* f = file_map_[fn]; + if (f->is_lock_file()) { + return Status::InvalidArgument(fn, "Cannot open a lock file."); + } + result->reset(new MockRandomAccessFile(f)); + return Status::OK(); +} + +Status MockEnv::NewRandomRWFile(const std::string& fname, + std::unique_ptr<RandomRWFile>* result, + const EnvOptions& /*soptions*/) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) == file_map_.end()) { + *result = nullptr; + return Status::IOError(fn, "File not found"); + } + auto* f = file_map_[fn]; + if (f->is_lock_file()) { + return Status::InvalidArgument(fn, "Cannot open a lock file."); + } + result->reset(new MockRandomRWFile(f)); + return Status::OK(); +} + +Status MockEnv::ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& options) { + auto s = RenameFile(old_fname, fname); + if (!s.ok()) { + return s; + } + result->reset(); + return NewWritableFile(fname, result, options); +} + +Status MockEnv::NewWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& env_options) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) != file_map_.end()) { + DeleteFileInternal(fn); + } + MemFile* file = new MemFile(this, fn, false); + file->Ref(); + file_map_[fn] = file; + + result->reset(new MockWritableFile(file, env_options.rate_limiter)); + return Status::OK(); +} + +Status MockEnv::NewDirectory(const std::string& /*name*/, + std::unique_ptr<Directory>* result) { + result->reset(new MockEnvDirectory()); + return Status::OK(); +} + +Status MockEnv::FileExists(const std::string& fname) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) != file_map_.end()) { + // File exists + return Status::OK(); + } + // Now also check if fn exists as a dir + for (const auto& iter : file_map_) { + const std::string& filename = iter.first; + if (filename.size() >= fn.size() + 1 && filename[fn.size()] == '/' && + Slice(filename).starts_with(Slice(fn))) { + return Status::OK(); + } + } + return Status::NotFound(); +} + +Status MockEnv::GetChildren(const std::string& dir, + std::vector<std::string>* result) { + auto d = NormalizePath(dir); + bool found_dir = false; + { + MutexLock lock(&mutex_); + result->clear(); + for (const auto& iter : file_map_) { + const std::string& filename = iter.first; + + if (filename == d) { + found_dir = true; + } else if (filename.size() >= d.size() + 1 && filename[d.size()] == '/' && + Slice(filename).starts_with(Slice(d))) { + found_dir = true; + size_t next_slash = filename.find('/', d.size() + 1); + if (next_slash != std::string::npos) { + result->push_back( + filename.substr(d.size() + 1, next_slash - d.size() - 1)); + } else { + result->push_back(filename.substr(d.size() + 1)); + } + } + } + } + result->erase(std::unique(result->begin(), result->end()), result->end()); + return found_dir ? Status::OK() : Status::NotFound(); +} + +void MockEnv::DeleteFileInternal(const std::string& fname) { + assert(fname == NormalizePath(fname)); + const auto& pair = file_map_.find(fname); + if (pair != file_map_.end()) { + pair->second->Unref(); + file_map_.erase(fname); + } +} + +Status MockEnv::DeleteFile(const std::string& fname) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + + DeleteFileInternal(fn); + return Status::OK(); +} + +Status MockEnv::Truncate(const std::string& fname, size_t size) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + if (iter == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + iter->second->Truncate(size); + return Status::OK(); +} + +Status MockEnv::CreateDir(const std::string& dirname) { + auto dn = NormalizePath(dirname); + if (file_map_.find(dn) == file_map_.end()) { + MemFile* file = new MemFile(this, dn, false); + file->Ref(); + file_map_[dn] = file; + } else { + return Status::IOError(); + } + return Status::OK(); +} + +Status MockEnv::CreateDirIfMissing(const std::string& dirname) { + CreateDir(dirname); + return Status::OK(); +} + +Status MockEnv::DeleteDir(const std::string& dirname) { + return DeleteFile(dirname); +} + +Status MockEnv::GetFileSize(const std::string& fname, uint64_t* file_size) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + if (iter == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + + *file_size = iter->second->Size(); + return Status::OK(); +} + +Status MockEnv::GetFileModificationTime(const std::string& fname, + uint64_t* time) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + if (iter == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + *time = iter->second->ModifiedTime(); + return Status::OK(); +} + +Status MockEnv::RenameFile(const std::string& src, const std::string& dest) { + auto s = NormalizePath(src); + auto t = NormalizePath(dest); + MutexLock lock(&mutex_); + if (file_map_.find(s) == file_map_.end()) { + return Status::IOError(s, "File not found"); + } + + DeleteFileInternal(t); + file_map_[t] = file_map_[s]; + file_map_.erase(s); + return Status::OK(); +} + +Status MockEnv::LinkFile(const std::string& src, const std::string& dest) { + auto s = NormalizePath(src); + auto t = NormalizePath(dest); + MutexLock lock(&mutex_); + if (file_map_.find(s) == file_map_.end()) { + return Status::IOError(s, "File not found"); + } + + DeleteFileInternal(t); + file_map_[t] = file_map_[s]; + file_map_[t]->Ref(); // Otherwise it might get deleted when noone uses s + return Status::OK(); +} + +Status MockEnv::NewLogger(const std::string& fname, + std::shared_ptr<Logger>* result) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + MemFile* file = nullptr; + if (iter == file_map_.end()) { + file = new MemFile(this, fn, false); + file->Ref(); + file_map_[fn] = file; + } else { + file = iter->second; + } + std::unique_ptr<WritableFile> f(new MockWritableFile(file, nullptr)); + result->reset(new TestMemLogger(std::move(f), this)); + return Status::OK(); +} + +Status MockEnv::LockFile(const std::string& fname, FileLock** flock) { + auto fn = NormalizePath(fname); + { + MutexLock lock(&mutex_); + if (file_map_.find(fn) != file_map_.end()) { + if (!file_map_[fn]->is_lock_file()) { + return Status::InvalidArgument(fname, "Not a lock file."); + } + if (!file_map_[fn]->Lock()) { + return Status::IOError(fn, "Lock is already held."); + } + } else { + auto* file = new MemFile(this, fn, true); + file->Ref(); + file->Lock(); + file_map_[fn] = file; + } + } + *flock = new MockEnvFileLock(fn); + return Status::OK(); +} + +Status MockEnv::UnlockFile(FileLock* flock) { + std::string fn = + static_cast_with_check<MockEnvFileLock, FileLock>(flock)->FileName(); + { + MutexLock lock(&mutex_); + if (file_map_.find(fn) != file_map_.end()) { + if (!file_map_[fn]->is_lock_file()) { + return Status::InvalidArgument(fn, "Not a lock file."); + } + file_map_[fn]->Unlock(); + } + } + delete flock; + return Status::OK(); +} + +Status MockEnv::GetTestDirectory(std::string* path) { + *path = "/test"; + return Status::OK(); +} + +Status MockEnv::GetCurrentTime(int64_t* unix_time) { + auto s = EnvWrapper::GetCurrentTime(unix_time); + if (s.ok()) { + *unix_time += fake_sleep_micros_.load() / (1000 * 1000); + } + return s; +} + +uint64_t MockEnv::NowMicros() { + return EnvWrapper::NowMicros() + fake_sleep_micros_.load(); +} + +uint64_t MockEnv::NowNanos() { + return EnvWrapper::NowNanos() + fake_sleep_micros_.load() * 1000; +} + +Status MockEnv::CorruptBuffer(const std::string& fname) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + if (iter == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + iter->second->CorruptBuffer(); + return Status::OK(); +} + +std::string MockEnv::NormalizePath(const std::string path) { + std::string dst; + for (auto c : path) { + if (!dst.empty() && c == '/' && dst.back() == '/') { + continue; + } + dst.push_back(c); + } + return dst; +} + +void MockEnv::FakeSleepForMicroseconds(int64_t micros) { + fake_sleep_micros_.fetch_add(micros); +} + +#ifndef ROCKSDB_LITE +// This is to maintain the behavior before swithcing from InMemoryEnv to MockEnv +Env* NewMemEnv(Env* base_env) { return new MockEnv(base_env); } + +#else // ROCKSDB_LITE + +Env* NewMemEnv(Env* /*base_env*/) { return nullptr; } + +#endif // !ROCKSDB_LITE + +} // namespace rocksdb |