diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/test_util/fault_injection_test_env.cc | 437 | ||||
-rw-r--r-- | src/rocksdb/test_util/fault_injection_test_env.h | 225 | ||||
-rw-r--r-- | src/rocksdb/test_util/mock_time_env.h | 45 | ||||
-rw-r--r-- | src/rocksdb/test_util/sync_point.cc | 66 | ||||
-rw-r--r-- | src/rocksdb/test_util/sync_point.h | 144 | ||||
-rw-r--r-- | src/rocksdb/test_util/sync_point_impl.cc | 129 | ||||
-rw-r--r-- | src/rocksdb/test_util/sync_point_impl.h | 74 | ||||
-rw-r--r-- | src/rocksdb/test_util/testharness.cc | 56 | ||||
-rw-r--r-- | src/rocksdb/test_util/testharness.h | 47 | ||||
-rw-r--r-- | src/rocksdb/test_util/testutil.cc | 454 | ||||
-rw-r--r-- | src/rocksdb/test_util/testutil.h | 802 | ||||
-rw-r--r-- | src/rocksdb/test_util/transaction_test_util.cc | 387 | ||||
-rw-r--r-- | src/rocksdb/test_util/transaction_test_util.h | 132 |
13 files changed, 2998 insertions, 0 deletions
diff --git a/src/rocksdb/test_util/fault_injection_test_env.cc b/src/rocksdb/test_util/fault_injection_test_env.cc new file mode 100644 index 000000000..8bbd2692e --- /dev/null +++ b/src/rocksdb/test_util/fault_injection_test_env.cc @@ -0,0 +1,437 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright 2014 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// This test uses a custom Env to keep track of the state of a filesystem as of +// the last "sync". It then checks for data loss errors by purposely dropping +// file data (or entire files) not protected by a "sync". + +#include "test_util/fault_injection_test_env.h" +#include <functional> +#include <utility> + +namespace ROCKSDB_NAMESPACE { + +// Assume a filename, and not a directory name like "/foo/bar/" +std::string GetDirName(const std::string filename) { + size_t found = filename.find_last_of("/\\"); + if (found == std::string::npos) { + return ""; + } else { + return filename.substr(0, found); + } +} + +// A basic file truncation function suitable for this test. +Status Truncate(Env* env, const std::string& filename, uint64_t length) { + std::unique_ptr<SequentialFile> orig_file; + const EnvOptions options; + Status s = env->NewSequentialFile(filename, &orig_file, options); + if (!s.ok()) { + fprintf(stderr, "Cannot open file %s for truncation: %s\n", + filename.c_str(), s.ToString().c_str()); + return s; + } + + std::unique_ptr<char[]> scratch(new char[length]); + ROCKSDB_NAMESPACE::Slice result; + s = orig_file->Read(length, &result, scratch.get()); +#ifdef OS_WIN + orig_file.reset(); +#endif + if (s.ok()) { + std::string tmp_name = GetDirName(filename) + "/truncate.tmp"; + std::unique_ptr<WritableFile> tmp_file; + s = env->NewWritableFile(tmp_name, &tmp_file, options); + if (s.ok()) { + s = tmp_file->Append(result); + if (s.ok()) { + s = env->RenameFile(tmp_name, filename); + } else { + fprintf(stderr, "Cannot rename file %s to %s: %s\n", tmp_name.c_str(), + filename.c_str(), s.ToString().c_str()); + env->DeleteFile(tmp_name); + } + } + } + if (!s.ok()) { + fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(), + s.ToString().c_str()); + } + + return s; +} + +// Trim the tailing "/" in the end of `str` +std::string TrimDirname(const std::string& str) { + size_t found = str.find_last_not_of("/"); + if (found == std::string::npos) { + return str; + } + return str.substr(0, found + 1); +} + +// Return pair <parent directory name, file name> of a full path. +std::pair<std::string, std::string> GetDirAndName(const std::string& name) { + std::string dirname = GetDirName(name); + std::string fname = name.substr(dirname.size() + 1); + return std::make_pair(dirname, fname); +} + +Status FileState::DropUnsyncedData(Env* env) const { + ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; + return Truncate(env, filename_, sync_pos); +} + +Status FileState::DropRandomUnsyncedData(Env* env, Random* rand) const { + ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; + assert(pos_ >= sync_pos); + int range = static_cast<int>(pos_ - sync_pos); + uint64_t truncated_size = + static_cast<uint64_t>(sync_pos) + rand->Uniform(range); + return Truncate(env, filename_, truncated_size); +} + +Status TestDirectory::Fsync() { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + env_->SyncDir(dirname_); + return dir_->Fsync(); +} + +TestWritableFile::TestWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>&& f, + FaultInjectionTestEnv* env) + : state_(fname), + target_(std::move(f)), + writable_file_opened_(true), + env_(env) { + assert(target_ != nullptr); + state_.pos_ = 0; +} + +TestWritableFile::~TestWritableFile() { + if (writable_file_opened_) { + Close(); + } +} + +Status TestWritableFile::Append(const Slice& data) { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + Status s = target_->Append(data); + if (s.ok()) { + state_.pos_ += data.size(); + env_->WritableFileAppended(state_); + } + return s; +} + +Status TestWritableFile::Close() { + writable_file_opened_ = false; + Status s = target_->Close(); + if (s.ok()) { + env_->WritableFileClosed(state_); + } + return s; +} + +Status TestWritableFile::Flush() { + Status s = target_->Flush(); + if (s.ok() && env_->IsFilesystemActive()) { + state_.pos_at_last_flush_ = state_.pos_; + } + return s; +} + +Status TestWritableFile::Sync() { + if (!env_->IsFilesystemActive()) { + return Status::IOError("FaultInjectionTestEnv: not active"); + } + // No need to actual sync. + state_.pos_at_last_sync_ = state_.pos_; + env_->WritableFileSynced(state_); + return Status::OK(); +} + +TestRandomRWFile::TestRandomRWFile(const std::string& /*fname*/, + std::unique_ptr<RandomRWFile>&& f, + FaultInjectionTestEnv* env) + : target_(std::move(f)), file_opened_(true), env_(env) { + assert(target_ != nullptr); +} + +TestRandomRWFile::~TestRandomRWFile() { + if (file_opened_) { + Close(); + } +} + +Status TestRandomRWFile::Write(uint64_t offset, const Slice& data) { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Write(offset, data); +} + +Status TestRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Read(offset, n, result, scratch); +} + +Status TestRandomRWFile::Close() { + file_opened_ = false; + return target_->Close(); +} + +Status TestRandomRWFile::Flush() { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Flush(); +} + +Status TestRandomRWFile::Sync() { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Sync(); +} + +Status FaultInjectionTestEnv::NewDirectory(const std::string& name, + std::unique_ptr<Directory>* result) { + std::unique_ptr<Directory> r; + Status s = target()->NewDirectory(name, &r); + assert(s.ok()); + if (!s.ok()) { + return s; + } + result->reset(new TestDirectory(this, TrimDirname(name), r.release())); + return Status::OK(); +} + +Status FaultInjectionTestEnv::NewWritableFile( + const std::string& fname, std::unique_ptr<WritableFile>* result, + const EnvOptions& soptions) { + if (!IsFilesystemActive()) { + return GetError(); + } + // Not allow overwriting files + Status s = target()->FileExists(fname); + if (s.ok()) { + return Status::Corruption("File already exists."); + } else if (!s.IsNotFound()) { + assert(s.IsIOError()); + return s; + } + s = target()->NewWritableFile(fname, result, soptions); + if (s.ok()) { + result->reset(new TestWritableFile(fname, std::move(*result), this)); + // WritableFileWriter* file is opened + // again then it will be truncated - so forget our saved state. + UntrackFile(fname); + MutexLock l(&mutex_); + open_files_.insert(fname); + auto dir_and_name = GetDirAndName(fname); + auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; + list.insert(dir_and_name.second); + } + return s; +} + +Status FaultInjectionTestEnv::ReopenWritableFile( + const std::string& fname, std::unique_ptr<WritableFile>* result, + const EnvOptions& soptions) { + if (!IsFilesystemActive()) { + return GetError(); + } + Status s = target()->ReopenWritableFile(fname, result, soptions); + if (s.ok()) { + result->reset(new TestWritableFile(fname, std::move(*result), this)); + // WritableFileWriter* file is opened + // again then it will be truncated - so forget our saved state. + UntrackFile(fname); + MutexLock l(&mutex_); + open_files_.insert(fname); + auto dir_and_name = GetDirAndName(fname); + auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; + list.insert(dir_and_name.second); + } + return s; +} + +Status FaultInjectionTestEnv::NewRandomRWFile( + const std::string& fname, std::unique_ptr<RandomRWFile>* result, + const EnvOptions& soptions) { + if (!IsFilesystemActive()) { + return GetError(); + } + Status s = target()->NewRandomRWFile(fname, result, soptions); + if (s.ok()) { + result->reset(new TestRandomRWFile(fname, std::move(*result), this)); + // WritableFileWriter* file is opened + // again then it will be truncated - so forget our saved state. + UntrackFile(fname); + MutexLock l(&mutex_); + open_files_.insert(fname); + auto dir_and_name = GetDirAndName(fname); + auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; + list.insert(dir_and_name.second); + } + return s; +} + +Status FaultInjectionTestEnv::NewRandomAccessFile( + const std::string& fname, std::unique_ptr<RandomAccessFile>* result, + const EnvOptions& soptions) { + if (!IsFilesystemActive()) { + return GetError(); + } + return target()->NewRandomAccessFile(fname, result, soptions); +} + +Status FaultInjectionTestEnv::DeleteFile(const std::string& f) { + if (!IsFilesystemActive()) { + return GetError(); + } + Status s = EnvWrapper::DeleteFile(f); + if (!s.ok()) { + fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(), + s.ToString().c_str()); + } + if (s.ok()) { + UntrackFile(f); + } + return s; +} + +Status FaultInjectionTestEnv::RenameFile(const std::string& s, + const std::string& t) { + if (!IsFilesystemActive()) { + return GetError(); + } + Status ret = EnvWrapper::RenameFile(s, t); + + if (ret.ok()) { + MutexLock l(&mutex_); + if (db_file_state_.find(s) != db_file_state_.end()) { + db_file_state_[t] = db_file_state_[s]; + db_file_state_.erase(s); + } + + auto sdn = GetDirAndName(s); + auto tdn = GetDirAndName(t); + if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) { + auto& tlist = dir_to_new_files_since_last_sync_[tdn.first]; + assert(tlist.find(tdn.second) == tlist.end()); + tlist.insert(tdn.second); + } + } + + return ret; +} + +void FaultInjectionTestEnv::WritableFileClosed(const FileState& state) { + MutexLock l(&mutex_); + if (open_files_.find(state.filename_) != open_files_.end()) { + db_file_state_[state.filename_] = state; + open_files_.erase(state.filename_); + } +} + +void FaultInjectionTestEnv::WritableFileSynced(const FileState& state) { + MutexLock l(&mutex_); + if (open_files_.find(state.filename_) != open_files_.end()) { + if (db_file_state_.find(state.filename_) == db_file_state_.end()) { + db_file_state_.insert(std::make_pair(state.filename_, state)); + } else { + db_file_state_[state.filename_] = state; + } + } +} + +void FaultInjectionTestEnv::WritableFileAppended(const FileState& state) { + MutexLock l(&mutex_); + if (open_files_.find(state.filename_) != open_files_.end()) { + if (db_file_state_.find(state.filename_) == db_file_state_.end()) { + db_file_state_.insert(std::make_pair(state.filename_, state)); + } else { + db_file_state_[state.filename_] = state; + } + } +} + +// For every file that is not fully synced, make a call to `func` with +// FileState of the file as the parameter. +Status FaultInjectionTestEnv::DropFileData( + std::function<Status(Env*, FileState)> func) { + Status s; + MutexLock l(&mutex_); + for (std::map<std::string, FileState>::const_iterator it = + db_file_state_.begin(); + s.ok() && it != db_file_state_.end(); ++it) { + const FileState& state = it->second; + if (!state.IsFullySynced()) { + s = func(target(), state); + } + } + return s; +} + +Status FaultInjectionTestEnv::DropUnsyncedFileData() { + return DropFileData([&](Env* env, const FileState& state) { + return state.DropUnsyncedData(env); + }); +} + +Status FaultInjectionTestEnv::DropRandomUnsyncedFileData(Random* rnd) { + return DropFileData([&](Env* env, const FileState& state) { + return state.DropRandomUnsyncedData(env, rnd); + }); +} + +Status FaultInjectionTestEnv::DeleteFilesCreatedAfterLastDirSync() { + // Because DeleteFile access this container make a copy to avoid deadlock + std::map<std::string, std::set<std::string>> map_copy; + { + MutexLock l(&mutex_); + map_copy.insert(dir_to_new_files_since_last_sync_.begin(), + dir_to_new_files_since_last_sync_.end()); + } + + for (auto& pair : map_copy) { + for (std::string name : pair.second) { + Status s = DeleteFile(pair.first + "/" + name); + if (!s.ok()) { + return s; + } + } + } + return Status::OK(); +} +void FaultInjectionTestEnv::ResetState() { + MutexLock l(&mutex_); + db_file_state_.clear(); + dir_to_new_files_since_last_sync_.clear(); + SetFilesystemActiveNoLock(true); +} + +void FaultInjectionTestEnv::UntrackFile(const std::string& f) { + MutexLock l(&mutex_); + auto dir_and_name = GetDirAndName(f); + dir_to_new_files_since_last_sync_[dir_and_name.first].erase( + dir_and_name.second); + db_file_state_.erase(f); + open_files_.erase(f); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/fault_injection_test_env.h b/src/rocksdb/test_util/fault_injection_test_env.h new file mode 100644 index 000000000..9cc33a8d3 --- /dev/null +++ b/src/rocksdb/test_util/fault_injection_test_env.h @@ -0,0 +1,225 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright 2014 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// This test uses a custom Env to keep track of the state of a filesystem as of +// the last "sync". It then checks for data loss errors by purposely dropping +// file data (or entire files) not protected by a "sync". + +#pragma once + +#include <map> +#include <set> +#include <string> + +#include "db/version_set.h" +#include "env/mock_env.h" +#include "file/filename.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "util/mutexlock.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { + +class TestWritableFile; +class FaultInjectionTestEnv; + +struct FileState { + std::string filename_; + ssize_t pos_; + ssize_t pos_at_last_sync_; + ssize_t pos_at_last_flush_; + + explicit FileState(const std::string& filename) + : filename_(filename), + pos_(-1), + pos_at_last_sync_(-1), + pos_at_last_flush_(-1) {} + + FileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {} + + bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; } + + Status DropUnsyncedData(Env* env) const; + + Status DropRandomUnsyncedData(Env* env, Random* rand) const; +}; + +// A wrapper around WritableFileWriter* file +// is written to or sync'ed. +class TestWritableFile : public WritableFile { + public: + explicit TestWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>&& f, + FaultInjectionTestEnv* env); + virtual ~TestWritableFile(); + virtual Status Append(const Slice& data) override; + virtual Status Truncate(uint64_t size) override { + return target_->Truncate(size); + } + virtual Status Close() override; + virtual Status Flush() override; + virtual Status Sync() override; + virtual bool IsSyncThreadSafe() const override { return true; } + virtual Status PositionedAppend(const Slice& data, + uint64_t offset) override { + return target_->PositionedAppend(data, offset); + } + virtual bool use_direct_io() const override { + return target_->use_direct_io(); + }; + + private: + FileState state_; + std::unique_ptr<WritableFile> target_; + bool writable_file_opened_; + FaultInjectionTestEnv* env_; +}; + +// A wrapper around WritableFileWriter* file +// is written to or sync'ed. +class TestRandomRWFile : public RandomRWFile { + public: + explicit TestRandomRWFile(const std::string& fname, + std::unique_ptr<RandomRWFile>&& f, + FaultInjectionTestEnv* env); + virtual ~TestRandomRWFile(); + Status Write(uint64_t offset, const Slice& data) override; + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + Status Close() override; + Status Flush() override; + Status Sync() override; + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + bool use_direct_io() const override { return target_->use_direct_io(); }; + + private: + std::unique_ptr<RandomRWFile> target_; + bool file_opened_; + FaultInjectionTestEnv* env_; +}; + +class TestDirectory : public Directory { + public: + explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname, + Directory* dir) + : env_(env), dirname_(dirname), dir_(dir) {} + ~TestDirectory() {} + + virtual Status Fsync() override; + + private: + FaultInjectionTestEnv* env_; + std::string dirname_; + std::unique_ptr<Directory> dir_; +}; + +class FaultInjectionTestEnv : public EnvWrapper { + public: + explicit FaultInjectionTestEnv(Env* base) + : EnvWrapper(base), filesystem_active_(true) {} + virtual ~FaultInjectionTestEnv() {} + + Status NewDirectory(const std::string& name, + std::unique_ptr<Directory>* result) override; + + Status NewWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& soptions) override; + + Status ReopenWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& soptions) override; + + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr<RandomRWFile>* result, + const EnvOptions& soptions) override; + + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr<RandomAccessFile>* result, + const EnvOptions& soptions) override; + + virtual Status DeleteFile(const std::string& f) override; + + virtual Status RenameFile(const std::string& s, + const std::string& t) override; + +// Undef to eliminate clash on Windows +#undef GetFreeSpace + virtual Status GetFreeSpace(const std::string& path, + uint64_t* disk_free) override { + if (!IsFilesystemActive() && error_ == Status::NoSpace()) { + *disk_free = 0; + return Status::OK(); + } else { + return target()->GetFreeSpace(path, disk_free); + } + } + + void WritableFileClosed(const FileState& state); + + void WritableFileSynced(const FileState& state); + + void WritableFileAppended(const FileState& state); + + // For every file that is not fully synced, make a call to `func` with + // FileState of the file as the parameter. + Status DropFileData(std::function<Status(Env*, FileState)> func); + + Status DropUnsyncedFileData(); + + Status DropRandomUnsyncedFileData(Random* rnd); + + Status DeleteFilesCreatedAfterLastDirSync(); + + void ResetState(); + + void UntrackFile(const std::string& f); + + void SyncDir(const std::string& dirname) { + MutexLock l(&mutex_); + dir_to_new_files_since_last_sync_.erase(dirname); + } + + // Setting the filesystem to inactive is the test equivalent to simulating a + // system reset. Setting to inactive will freeze our saved filesystem state so + // that it will stop being recorded. It can then be reset back to the state at + // the time of the reset. + bool IsFilesystemActive() { + MutexLock l(&mutex_); + return filesystem_active_; + } + void SetFilesystemActiveNoLock(bool active, + Status error = Status::Corruption("Not active")) { + filesystem_active_ = active; + if (!active) { + error_ = error; + } + } + void SetFilesystemActive(bool active, + Status error = Status::Corruption("Not active")) { + MutexLock l(&mutex_); + SetFilesystemActiveNoLock(active, error); + } + void AssertNoOpenFile() { assert(open_files_.empty()); } + Status GetError() { return error_; } + + private: + port::Mutex mutex_; + std::map<std::string, FileState> db_file_state_; + std::set<std::string> open_files_; + std::unordered_map<std::string, std::set<std::string>> + dir_to_new_files_since_last_sync_; + bool filesystem_active_; // Record flushes, syncs, writes + Status error_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/mock_time_env.h b/src/rocksdb/test_util/mock_time_env.h new file mode 100644 index 000000000..32b40f79c --- /dev/null +++ b/src/rocksdb/test_util/mock_time_env.h @@ -0,0 +1,45 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { + +class MockTimeEnv : public EnvWrapper { + public: + explicit MockTimeEnv(Env* base) : EnvWrapper(base) {} + + virtual Status GetCurrentTime(int64_t* time) override { + assert(time != nullptr); + assert(current_time_ <= + static_cast<uint64_t>(std::numeric_limits<int64_t>::max())); + *time = static_cast<int64_t>(current_time_); + return Status::OK(); + } + + virtual uint64_t NowMicros() override { + assert(current_time_ <= std::numeric_limits<uint64_t>::max() / 1000000); + return current_time_ * 1000000; + } + + virtual uint64_t NowNanos() override { + assert(current_time_ <= std::numeric_limits<uint64_t>::max() / 1000000000); + return current_time_ * 1000000000; + } + + uint64_t RealNowMicros() { return target()->NowMicros(); } + + void set_current_time(uint64_t time) { + assert(time >= current_time_); + current_time_ = time; + } + + private: + std::atomic<uint64_t> current_time_{0}; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/sync_point.cc b/src/rocksdb/test_util/sync_point.cc new file mode 100644 index 000000000..d969ae3c8 --- /dev/null +++ b/src/rocksdb/test_util/sync_point.cc @@ -0,0 +1,66 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/sync_point.h" +#include "test_util/sync_point_impl.h" + +int rocksdb_kill_odds = 0; +std::vector<std::string> rocksdb_kill_prefix_blacklist; + +#ifndef NDEBUG +namespace ROCKSDB_NAMESPACE { + +SyncPoint* SyncPoint::GetInstance() { + static SyncPoint sync_point; + return &sync_point; +} + +SyncPoint::SyncPoint() : impl_(new Data) {} + +SyncPoint:: ~SyncPoint() { + delete impl_; +} + +void SyncPoint::LoadDependency(const std::vector<SyncPointPair>& dependencies) { + impl_->LoadDependency(dependencies); +} + +void SyncPoint::LoadDependencyAndMarkers( + const std::vector<SyncPointPair>& dependencies, + const std::vector<SyncPointPair>& markers) { + impl_->LoadDependencyAndMarkers(dependencies, markers); +} + +void SyncPoint::SetCallBack(const std::string& point, + const std::function<void(void*)>& callback) { + impl_->SetCallBack(point, callback); +} + +void SyncPoint::ClearCallBack(const std::string& point) { + impl_->ClearCallBack(point); +} + +void SyncPoint::ClearAllCallBacks() { + impl_->ClearAllCallBacks(); +} + +void SyncPoint::EnableProcessing() { + impl_->EnableProcessing(); +} + +void SyncPoint::DisableProcessing() { + impl_->DisableProcessing(); +} + +void SyncPoint::ClearTrace() { + impl_->ClearTrace(); +} + +void SyncPoint::Process(const std::string& point, void* cb_arg) { + impl_->Process(point, cb_arg); +} + +} // namespace ROCKSDB_NAMESPACE +#endif // NDEBUG diff --git a/src/rocksdb/test_util/sync_point.h b/src/rocksdb/test_util/sync_point.h new file mode 100644 index 000000000..46bfd50d7 --- /dev/null +++ b/src/rocksdb/test_util/sync_point.h @@ -0,0 +1,144 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include <assert.h> +#include <functional> +#include <mutex> +#include <string> +#include <thread> +#include <vector> + +#include "rocksdb/rocksdb_namespace.h" + +// This is only set from db_stress.cc and for testing only. +// If non-zero, kill at various points in source code with probability 1/this +extern int rocksdb_kill_odds; +// If kill point has a prefix on this list, will skip killing. +extern std::vector<std::string> rocksdb_kill_prefix_blacklist; + +#ifdef NDEBUG +// empty in release build +#define TEST_KILL_RANDOM(kill_point, rocksdb_kill_odds) +#else + +namespace ROCKSDB_NAMESPACE { +// Kill the process with probability 1/odds for testing. +extern void TestKillRandom(std::string kill_point, int odds, + const std::string& srcfile, int srcline); + +// To avoid crashing always at some frequently executed codepaths (during +// kill random test), use this factor to reduce odds +#define REDUCE_ODDS 2 +#define REDUCE_ODDS2 4 + +#define TEST_KILL_RANDOM(kill_point, rocksdb_kill_odds) \ + { \ + if (rocksdb_kill_odds > 0) { \ + TestKillRandom(kill_point, rocksdb_kill_odds, __FILE__, __LINE__); \ + } \ + } +} // namespace ROCKSDB_NAMESPACE +#endif + +#ifdef NDEBUG +#define TEST_SYNC_POINT(x) +#define TEST_IDX_SYNC_POINT(x, index) +#define TEST_SYNC_POINT_CALLBACK(x, y) +#define INIT_SYNC_POINT_SINGLETONS() +#else + +namespace ROCKSDB_NAMESPACE { + +// This class provides facility to reproduce race conditions deterministically +// in unit tests. +// Developer could specify sync points in the codebase via TEST_SYNC_POINT. +// Each sync point represents a position in the execution stream of a thread. +// In the unit test, 'Happens After' relationship among sync points could be +// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of +// threads execution. +// Refer to (DBTest,TransactionLogIteratorRace), for an example use case. + +class SyncPoint { + public: + static SyncPoint* GetInstance(); + + SyncPoint(const SyncPoint&) = delete; + SyncPoint& operator=(const SyncPoint&) = delete; + ~SyncPoint(); + + struct SyncPointPair { + std::string predecessor; + std::string successor; + }; + + // call once at the beginning of a test to setup the dependency between + // sync points + void LoadDependency(const std::vector<SyncPointPair>& dependencies); + + // call once at the beginning of a test to setup the dependency between + // sync points and setup markers indicating the successor is only enabled + // when it is processed on the same thread as the predecessor. + // When adding a marker, it implicitly adds a dependency for the marker pair. + void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies, + const std::vector<SyncPointPair>& markers); + + // The argument to the callback is passed through from + // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or + // TEST_IDX_SYNC_POINT was used. + void SetCallBack(const std::string& point, + const std::function<void(void*)>& callback); + + // Clear callback function by point + void ClearCallBack(const std::string& point); + + // Clear all call back functions. + void ClearAllCallBacks(); + + // enable sync point processing (disabled on startup) + void EnableProcessing(); + + // disable sync point processing + void DisableProcessing(); + + // remove the execution trace of all sync points + void ClearTrace(); + + // triggered by TEST_SYNC_POINT, blocking execution until all predecessors + // are executed. + // And/or call registered callback function, with argument `cb_arg` + void Process(const std::string& point, void* cb_arg = nullptr); + + // TODO: it might be useful to provide a function that blocks until all + // sync points are cleared. + + // We want this to be public so we can + // subclass the implementation + struct Data; + + private: + // Singleton + SyncPoint(); + Data* impl_; +}; + +} // namespace ROCKSDB_NAMESPACE + +// Use TEST_SYNC_POINT to specify sync points inside code base. +// Sync points can have happens-after depedency on other sync points, +// configured at runtime via SyncPoint::LoadDependency. This could be +// utilized to re-produce race conditions between threads. +// See TransactionLogIteratorRace in db_test.cc for an example use case. +// TEST_SYNC_POINT is no op in release build. +#define TEST_SYNC_POINT(x) \ + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x) +#define TEST_IDX_SYNC_POINT(x, index) \ + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x + \ + std::to_string(index)) +#define TEST_SYNC_POINT_CALLBACK(x, y) \ + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x, y) +#define INIT_SYNC_POINT_SINGLETONS() \ + (void)ROCKSDB_NAMESPACE::SyncPoint::GetInstance(); +#endif // NDEBUG diff --git a/src/rocksdb/test_util/sync_point_impl.cc b/src/rocksdb/test_util/sync_point_impl.cc new file mode 100644 index 000000000..7b939c5f4 --- /dev/null +++ b/src/rocksdb/test_util/sync_point_impl.cc @@ -0,0 +1,129 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/sync_point_impl.h" + +#ifndef NDEBUG +namespace ROCKSDB_NAMESPACE { + +void TestKillRandom(std::string kill_point, int odds, + const std::string& srcfile, int srcline) { + for (auto& p : rocksdb_kill_prefix_blacklist) { + if (kill_point.substr(0, p.length()) == p) { + return; + } + } + + assert(odds > 0); + if (odds % 7 == 0) { + // class Random uses multiplier 16807, which is 7^5. If odds are + // multiplier of 7, there might be limited values generated. + odds++; + } + auto* r = Random::GetTLSInstance(); + bool crash = r->OneIn(odds); + if (crash) { + port::Crash(srcfile, srcline); + } +} + + +void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) { + std::lock_guard<std::mutex> lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + cv_.notify_all(); +} + +void SyncPoint::Data::LoadDependencyAndMarkers( + const std::vector<SyncPointPair>& dependencies, + const std::vector<SyncPointPair>& markers) { + std::lock_guard<std::mutex> lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + markers_.clear(); + marked_thread_id_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + for (const auto& marker : markers) { + successors_[marker.predecessor].push_back(marker.successor); + predecessors_[marker.successor].push_back(marker.predecessor); + markers_[marker.predecessor].push_back(marker.successor); + } + cv_.notify_all(); +} + +bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) { + for (const auto& pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::Data::ClearCallBack(const std::string& point) { + std::unique_lock<std::mutex> lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.erase(point); +} + +void SyncPoint::Data::ClearAllCallBacks() { + std::unique_lock<std::mutex> lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.clear(); +} + +void SyncPoint::Data::Process(const std::string& point, void* cb_arg) { + if (!enabled_) { + return; + } + + std::unique_lock<std::mutex> lock(mutex_); + auto thread_id = std::this_thread::get_id(); + + auto marker_iter = markers_.find(point); + if (marker_iter != markers_.end()) { + for (auto& marked_point : marker_iter->second) { + marked_thread_id_.emplace(marked_point, thread_id); + } + } + + if (DisabledByMarker(point, thread_id)) { + return; + } + + while (!PredecessorsAllCleared(point)) { + cv_.wait(lock); + if (DisabledByMarker(point, thread_id)) { + return; + } + } + + auto callback_pair = callbacks_.find(point); + if (callback_pair != callbacks_.end()) { + num_callbacks_running_++; + mutex_.unlock(); + callback_pair->second(cb_arg); + mutex_.lock(); + num_callbacks_running_--; + } + cleared_points_.insert(point); + cv_.notify_all(); +} +} // namespace ROCKSDB_NAMESPACE +#endif diff --git a/src/rocksdb/test_util/sync_point_impl.h b/src/rocksdb/test_util/sync_point_impl.h new file mode 100644 index 000000000..b246c0198 --- /dev/null +++ b/src/rocksdb/test_util/sync_point_impl.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/sync_point.h" + +#include <assert.h> +#include <atomic> +#include <condition_variable> +#include <functional> +#include <mutex> +#include <string> +#include <thread> +#include <unordered_map> +#include <unordered_set> + +#include "port/port.h" +#include "util/random.h" + +#pragma once + +#ifndef NDEBUG +namespace ROCKSDB_NAMESPACE { +struct SyncPoint::Data { + Data() : enabled_(false) {} + // Enable proper deletion by subclasses + virtual ~Data() {} + // successor/predecessor map loaded from LoadDependency + std::unordered_map<std::string, std::vector<std::string>> successors_; + std::unordered_map<std::string, std::vector<std::string>> predecessors_; + std::unordered_map<std::string, std::function<void(void*)> > callbacks_; + std::unordered_map<std::string, std::vector<std::string> > markers_; + std::unordered_map<std::string, std::thread::id> marked_thread_id_; + + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set<std::string> cleared_points_; + std::atomic<bool> enabled_; + int num_callbacks_running_ = 0; + + void LoadDependency(const std::vector<SyncPointPair>& dependencies); + void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies, + const std::vector<SyncPointPair>& markers); + bool PredecessorsAllCleared(const std::string& point); + void SetCallBack(const std::string& point, + const std::function<void(void*)>& callback) { + std::lock_guard<std::mutex> lock(mutex_); + callbacks_[point] = callback; +} + + void ClearCallBack(const std::string& point); + void ClearAllCallBacks(); + void EnableProcessing() { + enabled_ = true; + } + void DisableProcessing() { + enabled_ = false; + } + void ClearTrace() { + std::lock_guard<std::mutex> lock(mutex_); + cleared_points_.clear(); + } + bool DisabledByMarker(const std::string& point, + std::thread::id thread_id) { + auto marked_point_iter = marked_thread_id_.find(point); + return marked_point_iter != marked_thread_id_.end() && + thread_id != marked_point_iter->second; + } + void Process(const std::string& point, void* cb_arg); +}; +} // namespace ROCKSDB_NAMESPACE +#endif // NDEBUG diff --git a/src/rocksdb/test_util/testharness.cc b/src/rocksdb/test_util/testharness.cc new file mode 100644 index 000000000..d38d43080 --- /dev/null +++ b/src/rocksdb/test_util/testharness.cc @@ -0,0 +1,56 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "test_util/testharness.h" +#include <string> +#include <thread> + +namespace ROCKSDB_NAMESPACE { +namespace test { + +::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s) { + if (s.ok()) { + return ::testing::AssertionSuccess(); + } else { + return ::testing::AssertionFailure() << s_expr << std::endl + << s.ToString(); + } +} + +std::string TmpDir(Env* env) { + std::string dir; + Status s = env->GetTestDirectory(&dir); + EXPECT_TRUE(s.ok()) << s.ToString(); + return dir; +} + +std::string PerThreadDBPath(std::string dir, std::string name) { + size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id()); + return dir + "/" + name + "_" + std::to_string(tid); +} + +std::string PerThreadDBPath(std::string name) { + return PerThreadDBPath(test::TmpDir(), name); +} + +std::string PerThreadDBPath(Env* env, std::string name) { + return PerThreadDBPath(test::TmpDir(env), name); +} + +int RandomSeed() { + const char* env = getenv("TEST_RANDOM_SEED"); + int result = (env != nullptr ? atoi(env) : 301); + if (result <= 0) { + result = 301; + } + return result; +} + +} // namespace test +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/testharness.h b/src/rocksdb/test_util/testharness.h new file mode 100644 index 000000000..11f40ff83 --- /dev/null +++ b/src/rocksdb/test_util/testharness.h @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#ifdef OS_AIX +#include "gtest/gtest.h" +#else +#include <gtest/gtest.h> +#endif + +#include <string> +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { +namespace test { + +// Return the directory to use for temporary storage. +std::string TmpDir(Env* env = Env::Default()); + +// A path unique within the thread +std::string PerThreadDBPath(std::string name); +std::string PerThreadDBPath(Env* env, std::string name); +std::string PerThreadDBPath(std::string dir, std::string name); + +// Return a randomization seed for this run. Typically returns the +// same number on repeated invocations of this binary, but automated +// runs may be able to vary the seed. +int RandomSeed(); + +::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s); + +#define ASSERT_OK(s) \ + ASSERT_PRED_FORMAT1(ROCKSDB_NAMESPACE::test::AssertStatus, s) +#define ASSERT_NOK(s) ASSERT_FALSE((s).ok()) +#define EXPECT_OK(s) \ + EXPECT_PRED_FORMAT1(ROCKSDB_NAMESPACE::test::AssertStatus, s) +#define EXPECT_NOK(s) EXPECT_FALSE((s).ok()) + +} // namespace test +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/testutil.cc b/src/rocksdb/test_util/testutil.cc new file mode 100644 index 000000000..2969a140a --- /dev/null +++ b/src/rocksdb/test_util/testutil.cc @@ -0,0 +1,454 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "test_util/testutil.h" + +#include <array> +#include <cctype> +#include <fstream> +#include <sstream> + +#include "db/memtable_list.h" +#include "env/composite_env_wrapper.h" +#include "file/random_access_file_reader.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" +#include "port/port.h" + +namespace ROCKSDB_NAMESPACE { +namespace test { + +const uint32_t kDefaultFormatVersion = BlockBasedTableOptions().format_version; +const uint32_t kLatestFormatVersion = 5u; + +Slice RandomString(Random* rnd, int len, std::string* dst) { + dst->resize(len); + for (int i = 0; i < len; i++) { + (*dst)[i] = static_cast<char>(' ' + rnd->Uniform(95)); // ' ' .. '~' + } + return Slice(*dst); +} + +extern std::string RandomHumanReadableString(Random* rnd, int len) { + std::string ret; + ret.resize(len); + for (int i = 0; i < len; ++i) { + ret[i] = static_cast<char>('a' + rnd->Uniform(26)); + } + return ret; +} + +std::string RandomKey(Random* rnd, int len, RandomKeyType type) { + // Make sure to generate a wide variety of characters so we + // test the boundary conditions for short-key optimizations. + static const char kTestChars[] = {'\0', '\1', 'a', 'b', 'c', + 'd', 'e', '\xfd', '\xfe', '\xff'}; + std::string result; + for (int i = 0; i < len; i++) { + std::size_t indx = 0; + switch (type) { + case RandomKeyType::RANDOM: + indx = rnd->Uniform(sizeof(kTestChars)); + break; + case RandomKeyType::LARGEST: + indx = sizeof(kTestChars) - 1; + break; + case RandomKeyType::MIDDLE: + indx = sizeof(kTestChars) / 2; + break; + case RandomKeyType::SMALLEST: + indx = 0; + break; + } + result += kTestChars[indx]; + } + return result; +} + +extern Slice CompressibleString(Random* rnd, double compressed_fraction, + int len, std::string* dst) { + int raw = static_cast<int>(len * compressed_fraction); + if (raw < 1) raw = 1; + std::string raw_data; + RandomString(rnd, raw, &raw_data); + + // Duplicate the random data until we have filled "len" bytes + dst->clear(); + while (dst->size() < (unsigned int)len) { + dst->append(raw_data); + } + dst->resize(len); + return Slice(*dst); +} + +namespace { +class Uint64ComparatorImpl : public Comparator { + public: + Uint64ComparatorImpl() {} + + const char* Name() const override { return "rocksdb.Uint64Comparator"; } + + int Compare(const Slice& a, const Slice& b) const override { + assert(a.size() == sizeof(uint64_t) && b.size() == sizeof(uint64_t)); + const uint64_t* left = reinterpret_cast<const uint64_t*>(a.data()); + const uint64_t* right = reinterpret_cast<const uint64_t*>(b.data()); + uint64_t leftValue; + uint64_t rightValue; + GetUnaligned(left, &leftValue); + GetUnaligned(right, &rightValue); + if (leftValue == rightValue) { + return 0; + } else if (leftValue < rightValue) { + return -1; + } else { + return 1; + } + } + + void FindShortestSeparator(std::string* /*start*/, + const Slice& /*limit*/) const override { + return; + } + + void FindShortSuccessor(std::string* /*key*/) const override { return; } +}; +} // namespace + +const Comparator* Uint64Comparator() { + static Uint64ComparatorImpl uint64comp; + return &uint64comp; +} + +WritableFileWriter* GetWritableFileWriter(WritableFile* wf, + const std::string& fname) { + std::unique_ptr<WritableFile> file(wf); + return new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), + fname, EnvOptions()); +} + +RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf) { + std::unique_ptr<RandomAccessFile> file(raf); + return new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), + "[test RandomAccessFileReader]"); +} + +SequentialFileReader* GetSequentialFileReader(SequentialFile* se, + const std::string& fname) { + std::unique_ptr<SequentialFile> file(se); + return new SequentialFileReader(NewLegacySequentialFileWrapper(file), fname); +} + +void CorruptKeyType(InternalKey* ikey) { + std::string keystr = ikey->Encode().ToString(); + keystr[keystr.size() - 8] = kTypeLogData; + ikey->DecodeFrom(Slice(keystr.data(), keystr.size())); +} + +std::string KeyStr(const std::string& user_key, const SequenceNumber& seq, + const ValueType& t, bool corrupt) { + InternalKey k(user_key, seq, t); + if (corrupt) { + CorruptKeyType(&k); + } + return k.Encode().ToString(); +} + +std::string RandomName(Random* rnd, const size_t len) { + std::stringstream ss; + for (size_t i = 0; i < len; ++i) { + ss << static_cast<char>(rnd->Uniform(26) + 'a'); + } + return ss.str(); +} + +CompressionType RandomCompressionType(Random* rnd) { + auto ret = static_cast<CompressionType>(rnd->Uniform(6)); + while (!CompressionTypeSupported(ret)) { + ret = static_cast<CompressionType>((static_cast<int>(ret) + 1) % 6); + } + return ret; +} + +void RandomCompressionTypeVector(const size_t count, + std::vector<CompressionType>* types, + Random* rnd) { + types->clear(); + for (size_t i = 0; i < count; ++i) { + types->emplace_back(RandomCompressionType(rnd)); + } +} + +const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined) { + int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4); + switch (random_num) { + case 0: + return NewFixedPrefixTransform(rnd->Uniform(20) + 1); + case 1: + return NewCappedPrefixTransform(rnd->Uniform(20) + 1); + case 2: + return NewNoopTransform(); + default: + return nullptr; + } +} + +BlockBasedTableOptions RandomBlockBasedTableOptions(Random* rnd) { + BlockBasedTableOptions opt; + opt.cache_index_and_filter_blocks = rnd->Uniform(2); + opt.pin_l0_filter_and_index_blocks_in_cache = rnd->Uniform(2); + opt.pin_top_level_index_and_filter = rnd->Uniform(2); + using IndexType = BlockBasedTableOptions::IndexType; + const std::array<IndexType, 4> index_types = { + {IndexType::kBinarySearch, IndexType::kHashSearch, + IndexType::kTwoLevelIndexSearch, IndexType::kBinarySearchWithFirstKey}}; + opt.index_type = + index_types[rnd->Uniform(static_cast<int>(index_types.size()))]; + opt.hash_index_allow_collision = rnd->Uniform(2); + opt.checksum = static_cast<ChecksumType>(rnd->Uniform(3)); + opt.block_size = rnd->Uniform(10000000); + opt.block_size_deviation = rnd->Uniform(100); + opt.block_restart_interval = rnd->Uniform(100); + opt.index_block_restart_interval = rnd->Uniform(100); + opt.whole_key_filtering = rnd->Uniform(2); + + return opt; +} + +TableFactory* RandomTableFactory(Random* rnd, int pre_defined) { +#ifndef ROCKSDB_LITE + int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4); + switch (random_num) { + case 0: + return NewPlainTableFactory(); + case 1: + return NewCuckooTableFactory(); + default: + return NewBlockBasedTableFactory(); + } +#else + (void)rnd; + (void)pre_defined; + return NewBlockBasedTableFactory(); +#endif // !ROCKSDB_LITE +} + +MergeOperator* RandomMergeOperator(Random* rnd) { + return new ChanglingMergeOperator(RandomName(rnd, 10)); +} + +CompactionFilter* RandomCompactionFilter(Random* rnd) { + return new ChanglingCompactionFilter(RandomName(rnd, 10)); +} + +CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd) { + return new ChanglingCompactionFilterFactory(RandomName(rnd, 10)); +} + +void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) { + // boolean options + db_opt->advise_random_on_open = rnd->Uniform(2); + db_opt->allow_mmap_reads = rnd->Uniform(2); + db_opt->allow_mmap_writes = rnd->Uniform(2); + db_opt->use_direct_reads = rnd->Uniform(2); + db_opt->use_direct_io_for_flush_and_compaction = rnd->Uniform(2); + db_opt->create_if_missing = rnd->Uniform(2); + db_opt->create_missing_column_families = rnd->Uniform(2); + db_opt->enable_thread_tracking = rnd->Uniform(2); + db_opt->error_if_exists = rnd->Uniform(2); + db_opt->is_fd_close_on_exec = rnd->Uniform(2); + db_opt->paranoid_checks = rnd->Uniform(2); + db_opt->skip_log_error_on_recovery = rnd->Uniform(2); + db_opt->skip_stats_update_on_db_open = rnd->Uniform(2); + db_opt->skip_checking_sst_file_sizes_on_db_open = rnd->Uniform(2); + db_opt->use_adaptive_mutex = rnd->Uniform(2); + db_opt->use_fsync = rnd->Uniform(2); + db_opt->recycle_log_file_num = rnd->Uniform(2); + db_opt->avoid_flush_during_recovery = rnd->Uniform(2); + db_opt->avoid_flush_during_shutdown = rnd->Uniform(2); + + // int options + db_opt->max_background_compactions = rnd->Uniform(100); + db_opt->max_background_flushes = rnd->Uniform(100); + db_opt->max_file_opening_threads = rnd->Uniform(100); + db_opt->max_open_files = rnd->Uniform(100); + db_opt->table_cache_numshardbits = rnd->Uniform(100); + + // size_t options + db_opt->db_write_buffer_size = rnd->Uniform(10000); + db_opt->keep_log_file_num = rnd->Uniform(10000); + db_opt->log_file_time_to_roll = rnd->Uniform(10000); + db_opt->manifest_preallocation_size = rnd->Uniform(10000); + db_opt->max_log_file_size = rnd->Uniform(10000); + + // std::string options + db_opt->db_log_dir = "path/to/db_log_dir"; + db_opt->wal_dir = "path/to/wal_dir"; + + // uint32_t options + db_opt->max_subcompactions = rnd->Uniform(100000); + + // uint64_t options + static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX); + db_opt->WAL_size_limit_MB = uint_max + rnd->Uniform(100000); + db_opt->WAL_ttl_seconds = uint_max + rnd->Uniform(100000); + db_opt->bytes_per_sync = uint_max + rnd->Uniform(100000); + db_opt->delayed_write_rate = uint_max + rnd->Uniform(100000); + db_opt->delete_obsolete_files_period_micros = uint_max + rnd->Uniform(100000); + db_opt->max_manifest_file_size = uint_max + rnd->Uniform(100000); + db_opt->max_total_wal_size = uint_max + rnd->Uniform(100000); + db_opt->wal_bytes_per_sync = uint_max + rnd->Uniform(100000); + + // unsigned int options + db_opt->stats_dump_period_sec = rnd->Uniform(100000); +} + +void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options, + Random* rnd) { + cf_opt->compaction_style = (CompactionStyle)(rnd->Uniform(4)); + + // boolean options + cf_opt->report_bg_io_stats = rnd->Uniform(2); + cf_opt->disable_auto_compactions = rnd->Uniform(2); + cf_opt->inplace_update_support = rnd->Uniform(2); + cf_opt->level_compaction_dynamic_level_bytes = rnd->Uniform(2); + cf_opt->optimize_filters_for_hits = rnd->Uniform(2); + cf_opt->paranoid_file_checks = rnd->Uniform(2); + cf_opt->purge_redundant_kvs_while_flush = rnd->Uniform(2); + cf_opt->force_consistency_checks = rnd->Uniform(2); + cf_opt->compaction_options_fifo.allow_compaction = rnd->Uniform(2); + cf_opt->memtable_whole_key_filtering = rnd->Uniform(2); + + // double options + cf_opt->hard_rate_limit = static_cast<double>(rnd->Uniform(10000)) / 13; + cf_opt->soft_rate_limit = static_cast<double>(rnd->Uniform(10000)) / 13; + cf_opt->memtable_prefix_bloom_size_ratio = + static_cast<double>(rnd->Uniform(10000)) / 20000.0; + + // int options + cf_opt->level0_file_num_compaction_trigger = rnd->Uniform(100); + cf_opt->level0_slowdown_writes_trigger = rnd->Uniform(100); + cf_opt->level0_stop_writes_trigger = rnd->Uniform(100); + cf_opt->max_bytes_for_level_multiplier = rnd->Uniform(100); + cf_opt->max_mem_compaction_level = rnd->Uniform(100); + cf_opt->max_write_buffer_number = rnd->Uniform(100); + cf_opt->max_write_buffer_number_to_maintain = rnd->Uniform(100); + cf_opt->max_write_buffer_size_to_maintain = rnd->Uniform(10000); + cf_opt->min_write_buffer_number_to_merge = rnd->Uniform(100); + cf_opt->num_levels = rnd->Uniform(100); + cf_opt->target_file_size_multiplier = rnd->Uniform(100); + + // vector int options + cf_opt->max_bytes_for_level_multiplier_additional.resize(cf_opt->num_levels); + for (int i = 0; i < cf_opt->num_levels; i++) { + cf_opt->max_bytes_for_level_multiplier_additional[i] = rnd->Uniform(100); + } + + // size_t options + cf_opt->arena_block_size = rnd->Uniform(10000); + cf_opt->inplace_update_num_locks = rnd->Uniform(10000); + cf_opt->max_successive_merges = rnd->Uniform(10000); + cf_opt->memtable_huge_page_size = rnd->Uniform(10000); + cf_opt->write_buffer_size = rnd->Uniform(10000); + + // uint32_t options + cf_opt->bloom_locality = rnd->Uniform(10000); + cf_opt->max_bytes_for_level_base = rnd->Uniform(10000); + + // uint64_t options + static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX); + cf_opt->ttl = + db_options.max_open_files == -1 ? uint_max + rnd->Uniform(10000) : 0; + cf_opt->periodic_compaction_seconds = + db_options.max_open_files == -1 ? uint_max + rnd->Uniform(10000) : 0; + cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000); + cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000); + cf_opt->max_compaction_bytes = + cf_opt->target_file_size_base * rnd->Uniform(100); + cf_opt->compaction_options_fifo.max_table_files_size = + uint_max + rnd->Uniform(10000); + + // unsigned int options + cf_opt->rate_limit_delay_max_milliseconds = rnd->Uniform(10000); + + // pointer typed options + cf_opt->prefix_extractor.reset(RandomSliceTransform(rnd)); + cf_opt->table_factory.reset(RandomTableFactory(rnd)); + cf_opt->merge_operator.reset(RandomMergeOperator(rnd)); + if (cf_opt->compaction_filter) { + delete cf_opt->compaction_filter; + } + cf_opt->compaction_filter = RandomCompactionFilter(rnd); + cf_opt->compaction_filter_factory.reset(RandomCompactionFilterFactory(rnd)); + + // custom typed options + cf_opt->compression = RandomCompressionType(rnd); + RandomCompressionTypeVector(cf_opt->num_levels, + &cf_opt->compression_per_level, rnd); +} + +Status DestroyDir(Env* env, const std::string& dir) { + Status s; + if (env->FileExists(dir).IsNotFound()) { + return s; + } + std::vector<std::string> files_in_dir; + s = env->GetChildren(dir, &files_in_dir); + if (s.ok()) { + for (auto& file_in_dir : files_in_dir) { + if (file_in_dir == "." || file_in_dir == "..") { + continue; + } + s = env->DeleteFile(dir + "/" + file_in_dir); + if (!s.ok()) { + break; + } + } + } + + if (s.ok()) { + s = env->DeleteDir(dir); + } + return s; +} + +bool IsDirectIOSupported(Env* env, const std::string& dir) { + EnvOptions env_options; + env_options.use_mmap_writes = false; + env_options.use_direct_writes = true; + std::string tmp = TempFileName(dir, 999); + Status s; + { + std::unique_ptr<WritableFile> file; + s = env->NewWritableFile(tmp, &file, env_options); + } + if (s.ok()) { + s = env->DeleteFile(tmp); + } + return s.ok(); +} + +size_t GetLinesCount(const std::string& fname, const std::string& pattern) { + std::stringstream ssbuf; + std::string line; + size_t count = 0; + + std::ifstream inFile(fname.c_str()); + ssbuf << inFile.rdbuf(); + + while (getline(ssbuf, line)) { + if (line.find(pattern) != std::string::npos) { + count++; + } + } + + return count; +} + +} // namespace test +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/testutil.h b/src/rocksdb/test_util/testutil.h new file mode 100644 index 000000000..3f6b47929 --- /dev/null +++ b/src/rocksdb/test_util/testutil.h @@ -0,0 +1,802 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include <algorithm> +#include <deque> +#include <string> +#include <vector> + +#include "env/composite_env_wrapper.h" +#include "file/writable_file_writer.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/table.h" +#include "table/block_based/block_based_table_factory.h" +#include "table/internal_iterator.h" +#include "table/plain/plain_table_factory.h" +#include "util/mutexlock.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { +class SequentialFile; +class SequentialFileReader; + +namespace test { + +extern const uint32_t kDefaultFormatVersion; +extern const uint32_t kLatestFormatVersion; + +// Store in *dst a random string of length "len" and return a Slice that +// references the generated data. +extern Slice RandomString(Random* rnd, int len, std::string* dst); + +extern std::string RandomHumanReadableString(Random* rnd, int len); + +// Return a random key with the specified length that may contain interesting +// characters (e.g. \x00, \xff, etc.). +enum RandomKeyType : char { RANDOM, LARGEST, SMALLEST, MIDDLE }; +extern std::string RandomKey(Random* rnd, int len, + RandomKeyType type = RandomKeyType::RANDOM); + +// Store in *dst a string of length "len" that will compress to +// "N*compressed_fraction" bytes and return a Slice that references +// the generated data. +extern Slice CompressibleString(Random* rnd, double compressed_fraction, + int len, std::string* dst); + +// A wrapper that allows injection of errors. +class ErrorEnv : public EnvWrapper { + public: + bool writable_file_error_; + int num_writable_file_errors_; + + ErrorEnv() : EnvWrapper(Env::Default()), + writable_file_error_(false), + num_writable_file_errors_(0) { } + + virtual Status NewWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result, + const EnvOptions& soptions) override { + result->reset(); + if (writable_file_error_) { + ++num_writable_file_errors_; + return Status::IOError(fname, "fake error"); + } + return target()->NewWritableFile(fname, result, soptions); + } +}; + +#ifndef NDEBUG +// An internal comparator that just forward comparing results from the +// user comparator in it. Can be used to test entities that have no dependency +// on internal key structure but consumes InternalKeyComparator, like +// BlockBasedTable. +class PlainInternalKeyComparator : public InternalKeyComparator { + public: + explicit PlainInternalKeyComparator(const Comparator* c) + : InternalKeyComparator(c) {} + + virtual ~PlainInternalKeyComparator() {} + + virtual int Compare(const Slice& a, const Slice& b) const override { + return user_comparator()->Compare(a, b); + } +}; +#endif + +// A test comparator which compare two strings in this way: +// (1) first compare prefix of 8 bytes in alphabet order, +// (2) if two strings share the same prefix, sort the other part of the string +// in the reverse alphabet order. +// This helps simulate the case of compounded key of [entity][timestamp] and +// latest timestamp first. +class SimpleSuffixReverseComparator : public Comparator { + public: + SimpleSuffixReverseComparator() {} + + virtual const char* Name() const override { + return "SimpleSuffixReverseComparator"; + } + + virtual int Compare(const Slice& a, const Slice& b) const override { + Slice prefix_a = Slice(a.data(), 8); + Slice prefix_b = Slice(b.data(), 8); + int prefix_comp = prefix_a.compare(prefix_b); + if (prefix_comp != 0) { + return prefix_comp; + } else { + Slice suffix_a = Slice(a.data() + 8, a.size() - 8); + Slice suffix_b = Slice(b.data() + 8, b.size() - 8); + return -(suffix_a.compare(suffix_b)); + } + } + virtual void FindShortestSeparator(std::string* /*start*/, + const Slice& /*limit*/) const override {} + + virtual void FindShortSuccessor(std::string* /*key*/) const override {} +}; + +// Returns a user key comparator that can be used for comparing two uint64_t +// slices. Instead of comparing slices byte-wise, it compares all the 8 bytes +// at once. Assumes same endian-ness is used though the database's lifetime. +// Symantics of comparison would differ from Bytewise comparator in little +// endian machines. +extern const Comparator* Uint64Comparator(); + +// Iterator over a vector of keys/values +class VectorIterator : public InternalIterator { + public: + explicit VectorIterator(const std::vector<std::string>& keys) + : keys_(keys), current_(keys.size()) { + std::sort(keys_.begin(), keys_.end()); + values_.resize(keys.size()); + } + + VectorIterator(const std::vector<std::string>& keys, + const std::vector<std::string>& values) + : keys_(keys), values_(values), current_(keys.size()) { + assert(keys_.size() == values_.size()); + } + + virtual bool Valid() const override { return current_ < keys_.size(); } + + virtual void SeekToFirst() override { current_ = 0; } + virtual void SeekToLast() override { current_ = keys_.size() - 1; } + + virtual void Seek(const Slice& target) override { + current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) - + keys_.begin(); + } + + virtual void SeekForPrev(const Slice& target) override { + current_ = std::upper_bound(keys_.begin(), keys_.end(), target.ToString()) - + keys_.begin(); + if (!Valid()) { + SeekToLast(); + } else { + Prev(); + } + } + + virtual void Next() override { current_++; } + virtual void Prev() override { current_--; } + + virtual Slice key() const override { return Slice(keys_[current_]); } + virtual Slice value() const override { return Slice(values_[current_]); } + + virtual Status status() const override { return Status::OK(); } + + virtual bool IsKeyPinned() const override { return true; } + virtual bool IsValuePinned() const override { return true; } + + private: + std::vector<std::string> keys_; + std::vector<std::string> values_; + size_t current_; +}; +extern WritableFileWriter* GetWritableFileWriter(WritableFile* wf, + const std::string& fname); + +extern RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf); + +extern SequentialFileReader* GetSequentialFileReader(SequentialFile* se, + const std::string& fname); + +class StringSink: public WritableFile { + public: + std::string contents_; + + explicit StringSink(Slice* reader_contents = nullptr) : + WritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) { + if (reader_contents_ != nullptr) { + *reader_contents_ = Slice(contents_.data(), 0); + } + } + + const std::string& contents() const { return contents_; } + + virtual Status Truncate(uint64_t size) override { + contents_.resize(static_cast<size_t>(size)); + return Status::OK(); + } + virtual Status Close() override { return Status::OK(); } + virtual Status Flush() override { + if (reader_contents_ != nullptr) { + assert(reader_contents_->size() <= last_flush_); + size_t offset = last_flush_ - reader_contents_->size(); + *reader_contents_ = Slice( + contents_.data() + offset, + contents_.size() - offset); + last_flush_ = contents_.size(); + } + + return Status::OK(); + } + virtual Status Sync() override { return Status::OK(); } + virtual Status Append(const Slice& slice) override { + contents_.append(slice.data(), slice.size()); + return Status::OK(); + } + void Drop(size_t bytes) { + if (reader_contents_ != nullptr) { + contents_.resize(contents_.size() - bytes); + *reader_contents_ = Slice( + reader_contents_->data(), reader_contents_->size() - bytes); + last_flush_ = contents_.size(); + } + } + + private: + Slice* reader_contents_; + size_t last_flush_; +}; + +// A wrapper around a StringSink to give it a RandomRWFile interface +class RandomRWStringSink : public RandomRWFile { + public: + explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {} + + Status Write(uint64_t offset, const Slice& data) override { + if (offset + data.size() > ss_->contents_.size()) { + ss_->contents_.resize(static_cast<size_t>(offset) + data.size(), '\0'); + } + + char* pos = const_cast<char*>(ss_->contents_.data() + offset); + memcpy(pos, data.data(), data.size()); + return Status::OK(); + } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* /*scratch*/) const override { + *result = Slice(nullptr, 0); + if (offset < ss_->contents_.size()) { + size_t str_res_sz = + std::min(static_cast<size_t>(ss_->contents_.size() - offset), n); + *result = Slice(ss_->contents_.data() + offset, str_res_sz); + } + return Status::OK(); + } + + Status Flush() override { return Status::OK(); } + + Status Sync() override { return Status::OK(); } + + Status Close() override { return Status::OK(); } + + const std::string& contents() const { return ss_->contents(); } + + private: + StringSink* ss_; +}; + +// Like StringSink, this writes into a string. Unlink StringSink, it +// has some initial content and overwrites it, just like a recycled +// log file. +class OverwritingStringSink : public WritableFile { + public: + explicit OverwritingStringSink(Slice* reader_contents) + : WritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) {} + + const std::string& contents() const { return contents_; } + + virtual Status Truncate(uint64_t size) override { + contents_.resize(static_cast<size_t>(size)); + return Status::OK(); + } + virtual Status Close() override { return Status::OK(); } + virtual Status Flush() override { + if (last_flush_ < contents_.size()) { + assert(reader_contents_->size() >= contents_.size()); + memcpy((char*)reader_contents_->data() + last_flush_, + contents_.data() + last_flush_, contents_.size() - last_flush_); + last_flush_ = contents_.size(); + } + return Status::OK(); + } + virtual Status Sync() override { return Status::OK(); } + virtual Status Append(const Slice& slice) override { + contents_.append(slice.data(), slice.size()); + return Status::OK(); + } + void Drop(size_t bytes) { + contents_.resize(contents_.size() - bytes); + if (last_flush_ > contents_.size()) last_flush_ = contents_.size(); + } + + private: + std::string contents_; + Slice* reader_contents_; + size_t last_flush_; +}; + +class StringSource: public RandomAccessFile { + public: + explicit StringSource(const Slice& contents, uint64_t uniq_id = 0, + bool mmap = false) + : contents_(contents.data(), contents.size()), + uniq_id_(uniq_id), + mmap_(mmap), + total_reads_(0) {} + + virtual ~StringSource() { } + + uint64_t Size() const { return contents_.size(); } + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + total_reads_++; + if (offset > contents_.size()) { + return Status::InvalidArgument("invalid Read offset"); + } + if (offset + n > contents_.size()) { + n = contents_.size() - static_cast<size_t>(offset); + } + if (!mmap_) { + memcpy(scratch, &contents_[static_cast<size_t>(offset)], n); + *result = Slice(scratch, n); + } else { + *result = Slice(&contents_[static_cast<size_t>(offset)], n); + } + return Status::OK(); + } + + virtual size_t GetUniqueId(char* id, size_t max_size) const override { + if (max_size < 20) { + return 0; + } + + char* rid = id; + rid = EncodeVarint64(rid, uniq_id_); + rid = EncodeVarint64(rid, 0); + return static_cast<size_t>(rid-id); + } + + int total_reads() const { return total_reads_; } + + void set_total_reads(int tr) { total_reads_ = tr; } + + private: + std::string contents_; + uint64_t uniq_id_; + bool mmap_; + mutable int total_reads_; +}; + +inline StringSink* GetStringSinkFromLegacyWriter( + const WritableFileWriter* writer) { + LegacyWritableFileWrapper* file = + static_cast<LegacyWritableFileWrapper*>(writer->writable_file()); + return static_cast<StringSink*>(file->target()); +} + +class NullLogger : public Logger { + public: + using Logger::Logv; + virtual void Logv(const char* /*format*/, va_list /*ap*/) override {} + virtual size_t GetLogFileSize() const override { return 0; } +}; + +// Corrupts key by changing the type +extern void CorruptKeyType(InternalKey* ikey); + +extern std::string KeyStr(const std::string& user_key, + const SequenceNumber& seq, const ValueType& t, + bool corrupt = false); + +class SleepingBackgroundTask { + public: + SleepingBackgroundTask() + : bg_cv_(&mutex_), + should_sleep_(true), + done_with_sleep_(false), + sleeping_(false) {} + + bool IsSleeping() { + MutexLock l(&mutex_); + return sleeping_; + } + void DoSleep() { + MutexLock l(&mutex_); + sleeping_ = true; + bg_cv_.SignalAll(); + while (should_sleep_) { + bg_cv_.Wait(); + } + sleeping_ = false; + done_with_sleep_ = true; + bg_cv_.SignalAll(); + } + void WaitUntilSleeping() { + MutexLock l(&mutex_); + while (!sleeping_ || !should_sleep_) { + bg_cv_.Wait(); + } + } + // Waits for the status to change to sleeping, + // otherwise times out. + // wait_time is in microseconds. + // Returns true when times out, false otherwise. + bool TimedWaitUntilSleeping(uint64_t wait_time) { + auto abs_time = Env::Default()->NowMicros() + wait_time; + MutexLock l(&mutex_); + while (!sleeping_ || !should_sleep_) { + if (bg_cv_.TimedWait(abs_time)) { + return true; + } + } + return false; + } + void WakeUp() { + MutexLock l(&mutex_); + should_sleep_ = false; + bg_cv_.SignalAll(); + } + void WaitUntilDone() { + MutexLock l(&mutex_); + while (!done_with_sleep_) { + bg_cv_.Wait(); + } + } + // Similar to TimedWaitUntilSleeping. + // Waits until the task is done. + bool TimedWaitUntilDone(uint64_t wait_time) { + auto abs_time = Env::Default()->NowMicros() + wait_time; + MutexLock l(&mutex_); + while (!done_with_sleep_) { + if (bg_cv_.TimedWait(abs_time)) { + return true; + } + } + return false; + } + bool WokenUp() { + MutexLock l(&mutex_); + return should_sleep_ == false; + } + + void Reset() { + MutexLock l(&mutex_); + should_sleep_ = true; + done_with_sleep_ = false; + } + + static void DoSleepTask(void* arg) { + reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep(); + } + + private: + port::Mutex mutex_; + port::CondVar bg_cv_; // Signalled when background work finishes + bool should_sleep_; + bool done_with_sleep_; + bool sleeping_; +}; + +// Filters merge operands and values that are equal to `num`. +class FilterNumber : public CompactionFilter { + public: + explicit FilterNumber(uint64_t num) : num_(num) {} + + std::string last_merge_operand_key() { return last_merge_operand_key_; } + + bool Filter(int /*level*/, const ROCKSDB_NAMESPACE::Slice& /*key*/, + const ROCKSDB_NAMESPACE::Slice& value, std::string* /*new_value*/, + bool* /*value_changed*/) const override { + if (value.size() == sizeof(uint64_t)) { + return num_ == DecodeFixed64(value.data()); + } + return true; + } + + bool FilterMergeOperand( + int /*level*/, const ROCKSDB_NAMESPACE::Slice& key, + const ROCKSDB_NAMESPACE::Slice& value) const override { + last_merge_operand_key_ = key.ToString(); + if (value.size() == sizeof(uint64_t)) { + return num_ == DecodeFixed64(value.data()); + } + return true; + } + + const char* Name() const override { return "FilterBadMergeOperand"; } + + private: + mutable std::string last_merge_operand_key_; + uint64_t num_; +}; + +inline std::string EncodeInt(uint64_t x) { + std::string result; + PutFixed64(&result, x); + return result; +} + + class SeqStringSource : public SequentialFile { + public: + SeqStringSource(const std::string& data, std::atomic<int>* read_count) + : data_(data), offset_(0), read_count_(read_count) {} + ~SeqStringSource() override {} + Status Read(size_t n, Slice* result, char* scratch) override { + std::string output; + if (offset_ < data_.size()) { + n = std::min(data_.size() - offset_, n); + memcpy(scratch, data_.data() + offset_, n); + offset_ += n; + *result = Slice(scratch, n); + } else { + return Status::InvalidArgument( + "Attemp to read when it already reached eof."); + } + (*read_count_)++; + return Status::OK(); + } + Status Skip(uint64_t n) override { + if (offset_ >= data_.size()) { + return Status::InvalidArgument( + "Attemp to read when it already reached eof."); + } + // TODO(yhchiang): Currently doesn't handle the overflow case. + offset_ += static_cast<size_t>(n); + return Status::OK(); + } + + private: + std::string data_; + size_t offset_; + std::atomic<int>* read_count_; + }; + + class StringEnv : public EnvWrapper { + public: + class StringSink : public WritableFile { + public: + explicit StringSink(std::string* contents) + : WritableFile(), contents_(contents) {} + virtual Status Truncate(uint64_t size) override { + contents_->resize(static_cast<size_t>(size)); + return Status::OK(); + } + virtual Status Close() override { return Status::OK(); } + virtual Status Flush() override { return Status::OK(); } + virtual Status Sync() override { return Status::OK(); } + virtual Status Append(const Slice& slice) override { + contents_->append(slice.data(), slice.size()); + return Status::OK(); + } + + private: + std::string* contents_; + }; + + explicit StringEnv(Env* t) : EnvWrapper(t) {} + ~StringEnv() override {} + + const std::string& GetContent(const std::string& f) { return files_[f]; } + + const Status WriteToNewFile(const std::string& file_name, + const std::string& content) { + std::unique_ptr<WritableFile> r; + auto s = NewWritableFile(file_name, &r, EnvOptions()); + if (!s.ok()) { + return s; + } + r->Append(content); + r->Flush(); + r->Close(); + assert(files_[file_name] == content); + return Status::OK(); + } + + // The following text is boilerplate that forwards all methods to target() + Status NewSequentialFile(const std::string& f, + std::unique_ptr<SequentialFile>* r, + const EnvOptions& /*options*/) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return Status::NotFound("The specified file does not exist", f); + } + r->reset(new SeqStringSource(iter->second, &num_seq_file_read_)); + return Status::OK(); + } + Status NewRandomAccessFile(const std::string& /*f*/, + std::unique_ptr<RandomAccessFile>* /*r*/, + const EnvOptions& /*options*/) override { + return Status::NotSupported(); + } + Status NewWritableFile(const std::string& f, + std::unique_ptr<WritableFile>* r, + const EnvOptions& /*options*/) override { + auto iter = files_.find(f); + if (iter != files_.end()) { + return Status::IOError("The specified file already exists", f); + } + r->reset(new StringSink(&files_[f])); + return Status::OK(); + } + virtual Status NewDirectory( + const std::string& /*name*/, + std::unique_ptr<Directory>* /*result*/) override { + return Status::NotSupported(); + } + Status FileExists(const std::string& f) override { + if (files_.find(f) == files_.end()) { + return Status::NotFound(); + } + return Status::OK(); + } + Status GetChildren(const std::string& /*dir*/, + std::vector<std::string>* /*r*/) override { + return Status::NotSupported(); + } + Status DeleteFile(const std::string& f) override { + files_.erase(f); + return Status::OK(); + } + Status CreateDir(const std::string& /*d*/) override { + return Status::NotSupported(); + } + Status CreateDirIfMissing(const std::string& /*d*/) override { + return Status::NotSupported(); + } + Status DeleteDir(const std::string& /*d*/) override { + return Status::NotSupported(); + } + Status GetFileSize(const std::string& f, uint64_t* s) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return Status::NotFound("The specified file does not exist:", f); + } + *s = iter->second.size(); + return Status::OK(); + } + + Status GetFileModificationTime(const std::string& /*fname*/, + uint64_t* /*file_mtime*/) override { + return Status::NotSupported(); + } + + Status RenameFile(const std::string& /*s*/, + const std::string& /*t*/) override { + return Status::NotSupported(); + } + + Status LinkFile(const std::string& /*s*/, + const std::string& /*t*/) override { + return Status::NotSupported(); + } + + Status LockFile(const std::string& /*f*/, FileLock** /*l*/) override { + return Status::NotSupported(); + } + + Status UnlockFile(FileLock* /*l*/) override { + return Status::NotSupported(); + } + + std::atomic<int> num_seq_file_read_; + + protected: + std::unordered_map<std::string, std::string> files_; + }; + +// Randomly initialize the given DBOptions +void RandomInitDBOptions(DBOptions* db_opt, Random* rnd); + +// Randomly initialize the given ColumnFamilyOptions +// Note that the caller is responsible for releasing non-null +// cf_opt->compaction_filter. +void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions&, Random* rnd); + +// A dummy merge operator which can change its name +class ChanglingMergeOperator : public MergeOperator { + public: + explicit ChanglingMergeOperator(const std::string& name) + : name_(name + "MergeOperator") {} + ~ChanglingMergeOperator() {} + + void SetName(const std::string& name) { name_ = name; } + + virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/, + MergeOperationOutput* /*merge_out*/) const override { + return false; + } + virtual bool PartialMergeMulti(const Slice& /*key*/, + const std::deque<Slice>& /*operand_list*/, + std::string* /*new_value*/, + Logger* /*logger*/) const override { + return false; + } + virtual const char* Name() const override { return name_.c_str(); } + + protected: + std::string name_; +}; + +// Returns a dummy merge operator with random name. +MergeOperator* RandomMergeOperator(Random* rnd); + +// A dummy compaction filter which can change its name +class ChanglingCompactionFilter : public CompactionFilter { + public: + explicit ChanglingCompactionFilter(const std::string& name) + : name_(name + "CompactionFilter") {} + ~ChanglingCompactionFilter() {} + + void SetName(const std::string& name) { name_ = name; } + + bool Filter(int /*level*/, const Slice& /*key*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + bool* /*value_changed*/) const override { + return false; + } + + const char* Name() const override { return name_.c_str(); } + + private: + std::string name_; +}; + +// Returns a dummy compaction filter with a random name. +CompactionFilter* RandomCompactionFilter(Random* rnd); + +// A dummy compaction filter factory which can change its name +class ChanglingCompactionFilterFactory : public CompactionFilterFactory { + public: + explicit ChanglingCompactionFilterFactory(const std::string& name) + : name_(name + "CompactionFilterFactory") {} + ~ChanglingCompactionFilterFactory() {} + + void SetName(const std::string& name) { name_ = name; } + + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override { + return std::unique_ptr<CompactionFilter>(); + } + + // Returns a name that identifies this compaction filter factory. + const char* Name() const override { return name_.c_str(); } + + protected: + std::string name_; +}; + +CompressionType RandomCompressionType(Random* rnd); + +void RandomCompressionTypeVector(const size_t count, + std::vector<CompressionType>* types, + Random* rnd); + +CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd); + +const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined = -1); + +TableFactory* RandomTableFactory(Random* rnd, int pre_defined = -1); + +std::string RandomName(Random* rnd, const size_t len); + +Status DestroyDir(Env* env, const std::string& dir); + +bool IsDirectIOSupported(Env* env, const std::string& dir); + +// Return the number of lines where a given pattern was found in a file. +size_t GetLinesCount(const std::string& fname, const std::string& pattern); + +} // namespace test +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/transaction_test_util.cc b/src/rocksdb/test_util/transaction_test_util.cc new file mode 100644 index 000000000..736707595 --- /dev/null +++ b/src/rocksdb/test_util/transaction_test_util.cc @@ -0,0 +1,387 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#include "test_util/transaction_test_util.h" + +#include <algorithm> +#include <cinttypes> +#include <numeric> +#include <random> +#include <string> +#include <thread> + +#include "rocksdb/db.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" + +#include "db/dbformat.h" +#include "db/snapshot_impl.h" +#include "logging/logging.h" +#include "util/random.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +RandomTransactionInserter::RandomTransactionInserter( + Random64* rand, const WriteOptions& write_options, + const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets, + const uint64_t cmt_delay_ms, const uint64_t first_id) + : rand_(rand), + write_options_(write_options), + read_options_(read_options), + num_keys_(num_keys), + num_sets_(num_sets), + txn_id_(first_id), + cmt_delay_ms_(cmt_delay_ms) {} + +RandomTransactionInserter::~RandomTransactionInserter() { + if (txn_ != nullptr) { + delete txn_; + } + if (optimistic_txn_ != nullptr) { + delete optimistic_txn_; + } +} + +bool RandomTransactionInserter::TransactionDBInsert( + TransactionDB* db, const TransactionOptions& txn_options) { + txn_ = db->BeginTransaction(write_options_, txn_options, txn_); + + std::hash<std::thread::id> hasher; + char name[64]; + snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%" PRIu64, + hasher(std::this_thread::get_id()), txn_id_++); + assert(strlen(name) < 64 - 1); + assert(txn_->SetName(name).ok()); + + // Take a snapshot if set_snapshot was not set or with 50% change otherwise + bool take_snapshot = txn_->GetSnapshot() == nullptr || rand_->OneIn(2); + if (take_snapshot) { + txn_->SetSnapshot(); + read_options_.snapshot = txn_->GetSnapshot(); + } + auto res = DoInsert(db, txn_, false); + if (take_snapshot) { + read_options_.snapshot = nullptr; + } + return res; +} + +bool RandomTransactionInserter::OptimisticTransactionDBInsert( + OptimisticTransactionDB* db, + const OptimisticTransactionOptions& txn_options) { + optimistic_txn_ = + db->BeginTransaction(write_options_, txn_options, optimistic_txn_); + + return DoInsert(db, optimistic_txn_, true); +} + +bool RandomTransactionInserter::DBInsert(DB* db) { + return DoInsert(db, nullptr, false); +} + +Status RandomTransactionInserter::DBGet( + DB* db, Transaction* txn, ReadOptions& read_options, uint16_t set_i, + uint64_t ikey, bool get_for_update, uint64_t* int_value, + std::string* full_key, bool* unexpected_error) { + Status s; + // Five digits (since the largest uint16_t is 65535) plus the NUL + // end char. + char prefix_buf[6]; + // Pad prefix appropriately so we can iterate over each set + assert(set_i + 1 <= 9999); + snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1); + // key format: [SET#][random#] + std::string skey = ToString(ikey); + Slice base_key(skey); + *full_key = std::string(prefix_buf) + base_key.ToString(); + Slice key(*full_key); + + std::string value; + if (txn != nullptr) { + if (get_for_update) { + s = txn->GetForUpdate(read_options, key, &value); + } else { + s = txn->Get(read_options, key, &value); + } + } else { + s = db->Get(read_options, key, &value); + } + + if (s.ok()) { + // Found key, parse its value + *int_value = std::stoull(value); + if (*int_value == 0 || *int_value == ULONG_MAX) { + *unexpected_error = true; + fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str()); + s = Status::Corruption(); + } + } else if (s.IsNotFound()) { + // Have not yet written to this key, so assume its value is 0 + *int_value = 0; + s = Status::OK(); + } + return s; +} + +bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, + bool is_optimistic) { + Status s; + WriteBatch batch; + + // pick a random number to use to increment a key in each set + uint64_t incr = (rand_->Next() % 100) + 1; + bool unexpected_error = false; + + std::vector<uint16_t> set_vec(num_sets_); + std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0)); + std::shuffle(set_vec.begin(), set_vec.end(), std::random_device{}); + + // For each set, pick a key at random and increment it + for (uint16_t set_i : set_vec) { + uint64_t int_value = 0; + std::string full_key; + uint64_t rand_key = rand_->Next() % num_keys_; + const bool get_for_update = txn ? rand_->OneIn(2) : false; + s = DBGet(db, txn, read_options_, set_i, rand_key, get_for_update, + &int_value, &full_key, &unexpected_error); + Slice key(full_key); + if (!s.ok()) { + // Optimistic transactions should never return non-ok status here. + // Non-optimistic transactions may return write-coflict/timeout errors. + if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { + fprintf(stderr, "Get returned an unexpected error: %s\n", + s.ToString().c_str()); + unexpected_error = true; + } + break; + } + + if (s.ok()) { + // Increment key + std::string sum = ToString(int_value + incr); + if (txn != nullptr) { + s = txn->Put(key, sum); + if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) { + // If the initial get was not for update, then the key is not locked + // before put and put could fail due to concurrent writes. + break; + } else if (!s.ok()) { + // Since we did a GetForUpdate, Put should not fail. + fprintf(stderr, "Put returned an unexpected error: %s\n", + s.ToString().c_str()); + unexpected_error = true; + } + } else { + batch.Put(key, sum); + } + bytes_inserted_ += key.size() + sum.size(); + } + if (txn != nullptr) { + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Insert (%s) %s snap: %" PRIu64 " key:%s value: %" PRIu64 + "+%" PRIu64 "=%" PRIu64, + txn->GetName().c_str(), s.ToString().c_str(), + txn->GetSnapshot()->GetSequenceNumber(), full_key.c_str(), + int_value, incr, int_value + incr); + } + } + + if (s.ok()) { + if (txn != nullptr) { + bool with_prepare = !is_optimistic && !rand_->OneIn(10); + if (with_prepare) { + // Also try commit without prepare + s = txn->Prepare(); + assert(s.ok()); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Prepare of %" PRIu64 " %s (%s)", txn->GetId(), + s.ToString().c_str(), txn->GetName().c_str()); + if (rand_->OneIn(20)) { + // This currently only tests the mechanics of writing commit time + // write batch so the exact values would not matter. + s = txn_->GetCommitTimeWriteBatch()->Put("cat", "dog"); + assert(s.ok()); + } + db->GetDBOptions().env->SleepForMicroseconds( + static_cast<int>(cmt_delay_ms_ * 1000)); + } + if (!rand_->OneIn(20)) { + s = txn->Commit(); + assert(!with_prepare || s.ok()); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Commit of %" PRIu64 " %s (%s)", txn->GetId(), + s.ToString().c_str(), txn->GetName().c_str()); + } else { + // Also try 5% rollback + s = txn->Rollback(); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Rollback %" PRIu64 " %s %s", txn->GetId(), + txn->GetName().c_str(), s.ToString().c_str()); + assert(s.ok()); + } + assert(is_optimistic || s.ok()); + + if (!s.ok()) { + if (is_optimistic) { + // Optimistic transactions can have write-conflict errors on commit. + // Any other error is unexpected. + if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { + unexpected_error = true; + } + } else { + // Non-optimistic transactions should only fail due to expiration + // or write failures. For testing purproses, we do not expect any + // write failures. + if (!s.IsExpired()) { + unexpected_error = true; + } + } + + if (unexpected_error) { + fprintf(stderr, "Commit returned an unexpected error: %s\n", + s.ToString().c_str()); + } + } + } else { + s = db->Write(write_options_, &batch); + if (!s.ok()) { + unexpected_error = true; + fprintf(stderr, "Write returned an unexpected error: %s\n", + s.ToString().c_str()); + } + } + } else { + if (txn != nullptr) { + assert(txn->Rollback().ok()); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, "Error %s for txn %s", + s.ToString().c_str(), txn->GetName().c_str()); + } + } + + if (s.ok()) { + success_count_++; + } else { + failure_count_++; + } + + last_status_ = s; + + // return success if we didn't get any unexpected errors + return !unexpected_error; +} + +// Verify that the sum of the keys in each set are equal +Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets, + uint64_t num_keys_per_set, + bool take_snapshot, Random64* rand, + uint64_t delay_ms) { + // delay_ms is the delay between taking a snapshot and doing the reads. It + // emulates reads from a long-running backup job. + assert(delay_ms == 0 || take_snapshot); + uint64_t prev_total = 0; + uint32_t prev_i = 0; + bool prev_assigned = false; + + ReadOptions roptions; + if (take_snapshot) { + roptions.snapshot = db->GetSnapshot(); + db->GetDBOptions().env->SleepForMicroseconds( + static_cast<int>(delay_ms * 1000)); + } + + std::vector<uint16_t> set_vec(num_sets); + std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0)); + std::shuffle(set_vec.begin(), set_vec.end(), std::random_device{}); + + // For each set of keys with the same prefix, sum all the values + for (uint16_t set_i : set_vec) { + // Five digits (since the largest uint16_t is 65535) plus the NUL + // end char. + char prefix_buf[6]; + assert(set_i + 1 <= 9999); + snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1); + uint64_t total = 0; + + // Use either point lookup or iterator. Point lookups are slower so we use + // it less often. + const bool use_point_lookup = + num_keys_per_set != 0 && rand && rand->OneIn(10); + if (use_point_lookup) { + ReadOptions read_options; + for (uint64_t k = 0; k < num_keys_per_set; k++) { + std::string dont_care; + uint64_t int_value = 0; + bool unexpected_error = false; + const bool FOR_UPDATE = false; + Status s = DBGet(db, nullptr, roptions, set_i, k, FOR_UPDATE, + &int_value, &dont_care, &unexpected_error); + assert(s.ok()); + assert(!unexpected_error); + total += int_value; + } + } else { // user iterators + Iterator* iter = db->NewIterator(roptions); + for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { + Slice key = iter->key(); + // stop when we reach a different prefix + if (key.ToString().compare(0, 4, prefix_buf) != 0) { + break; + } + Slice value = iter->value(); + uint64_t int_value = std::stoull(value.ToString()); + if (int_value == 0 || int_value == ULONG_MAX) { + fprintf(stderr, "Iter returned unexpected value: %s\n", + value.ToString().c_str()); + return Status::Corruption(); + } + ROCKS_LOG_DEBUG( + db->GetDBOptions().info_log, + "VerifyRead at %" PRIu64 " (%" PRIu64 "): %.*s value: %" PRIu64, + roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul, + roptions.snapshot + ? ((SnapshotImpl*)roptions.snapshot)->min_uncommitted_ + : 0ul, + static_cast<int>(key.size()), key.data(), int_value); + total += int_value; + } + delete iter; + } + + if (prev_assigned && total != prev_total) { + db->GetDBOptions().info_log->Flush(); + fprintf(stdout, + "RandomTransactionVerify found inconsistent totals using " + "pointlookup? %d " + "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 + " at snapshot %" PRIu64 "\n", + use_point_lookup, prev_i, prev_total, set_i, total, + roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul); + fflush(stdout); + return Status::Corruption(); + } else { + ROCKS_LOG_DEBUG( + db->GetDBOptions().info_log, + "RandomTransactionVerify pass pointlookup? %d total: %" PRIu64 + " snap: %" PRIu64, + use_point_lookup, total, + roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul); + } + prev_total = total; + prev_i = set_i; + prev_assigned = true; + } + if (take_snapshot) { + db->ReleaseSnapshot(roptions.snapshot); + } + + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/test_util/transaction_test_util.h b/src/rocksdb/test_util/transaction_test_util.h new file mode 100644 index 000000000..086b0ea6f --- /dev/null +++ b/src/rocksdb/test_util/transaction_test_util.h @@ -0,0 +1,132 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include "rocksdb/options.h" +#include "port/port.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/transaction_db.h" + +namespace ROCKSDB_NAMESPACE { + +class DB; +class Random64; + +// Utility class for stress testing transactions. Can be used to write many +// transactions in parallel and then validate that the data written is logically +// consistent. This class assumes the input DB is initially empty. +// +// Each call to TransactionDBInsert()/OptimisticTransactionDBInsert() will +// increment the value of a key in #num_sets sets of keys. Regardless of +// whether the transaction succeeds, the total sum of values of keys in each +// set is an invariant that should remain equal. +// +// After calling TransactionDBInsert()/OptimisticTransactionDBInsert() many +// times, Verify() can be called to validate that the invariant holds. +// +// To test writing Transaction in parallel, multiple threads can create a +// RandomTransactionInserter with similar arguments using the same DB. +class RandomTransactionInserter { + public: + // num_keys is the number of keys in each set. + // num_sets is the number of sets of keys. + // cmt_delay_ms is the delay between prepare (if there is any) and commit + // first_id is the id of the first transaction + explicit RandomTransactionInserter( + Random64* rand, const WriteOptions& write_options = WriteOptions(), + const ReadOptions& read_options = ReadOptions(), uint64_t num_keys = 1000, + uint16_t num_sets = 3, const uint64_t cmt_delay_ms = 0, + const uint64_t first_id = 0); + + ~RandomTransactionInserter(); + + // Increment a key in each set using a Transaction on a TransactionDB. + // + // Returns true if the transaction succeeded OR if any error encountered was + // expected (eg a write-conflict). Error status may be obtained by calling + // GetLastStatus(); + bool TransactionDBInsert( + TransactionDB* db, + const TransactionOptions& txn_options = TransactionOptions()); + + // Increment a key in each set using a Transaction on an + // OptimisticTransactionDB + // + // Returns true if the transaction succeeded OR if any error encountered was + // expected (eg a write-conflict). Error status may be obtained by calling + // GetLastStatus(); + bool OptimisticTransactionDBInsert( + OptimisticTransactionDB* db, + const OptimisticTransactionOptions& txn_options = + OptimisticTransactionOptions()); + // Increment a key in each set without using a transaction. If this function + // is called in parallel, then Verify() may fail. + // + // Returns true if the write succeeds. + // Error status may be obtained by calling GetLastStatus(). + bool DBInsert(DB* db); + + // Get the ikey'th key from set set_i + static Status DBGet(DB* db, Transaction* txn, ReadOptions& read_options, + uint16_t set_i, uint64_t ikey, bool get_for_update, + uint64_t* int_value, std::string* full_key, + bool* unexpected_error); + + // Returns OK if Invariant is true. + static Status Verify(DB* db, uint16_t num_sets, uint64_t num_keys_per_set = 0, + bool take_snapshot = false, Random64* rand = nullptr, + uint64_t delay_ms = 0); + + // Returns the status of the previous Insert operation + Status GetLastStatus() { return last_status_; } + + // Returns the number of successfully written calls to + // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert + uint64_t GetSuccessCount() { return success_count_; } + + // Returns the number of calls to + // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert that did not + // write any data. + uint64_t GetFailureCount() { return failure_count_; } + + // Returns the sum of user keys/values Put() to the DB. + size_t GetBytesInserted() { return bytes_inserted_; } + + private: + // Input options + Random64* rand_; + const WriteOptions write_options_; + ReadOptions read_options_; + const uint64_t num_keys_; + const uint16_t num_sets_; + + // Number of successful insert batches performed + uint64_t success_count_ = 0; + + // Number of failed insert batches attempted + uint64_t failure_count_ = 0; + + size_t bytes_inserted_ = 0; + + // Status returned by most recent insert operation + Status last_status_; + + // optimization: re-use allocated transaction objects. + Transaction* txn_ = nullptr; + Transaction* optimistic_txn_ = nullptr; + + uint64_t txn_id_; + // The delay between ::Prepare and ::Commit + const uint64_t cmt_delay_ms_; + + bool DoInsert(DB* db, Transaction* txn, bool is_optimistic); +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE |