summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/test_util
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/test_util/fault_injection_test_env.cc437
-rw-r--r--src/rocksdb/test_util/fault_injection_test_env.h225
-rw-r--r--src/rocksdb/test_util/mock_time_env.h45
-rw-r--r--src/rocksdb/test_util/sync_point.cc66
-rw-r--r--src/rocksdb/test_util/sync_point.h144
-rw-r--r--src/rocksdb/test_util/sync_point_impl.cc129
-rw-r--r--src/rocksdb/test_util/sync_point_impl.h74
-rw-r--r--src/rocksdb/test_util/testharness.cc56
-rw-r--r--src/rocksdb/test_util/testharness.h47
-rw-r--r--src/rocksdb/test_util/testutil.cc454
-rw-r--r--src/rocksdb/test_util/testutil.h802
-rw-r--r--src/rocksdb/test_util/transaction_test_util.cc387
-rw-r--r--src/rocksdb/test_util/transaction_test_util.h132
13 files changed, 2998 insertions, 0 deletions
diff --git a/src/rocksdb/test_util/fault_injection_test_env.cc b/src/rocksdb/test_util/fault_injection_test_env.cc
new file mode 100644
index 000000000..8bbd2692e
--- /dev/null
+++ b/src/rocksdb/test_util/fault_injection_test_env.cc
@@ -0,0 +1,437 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright 2014 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+// This test uses a custom Env to keep track of the state of a filesystem as of
+// the last "sync". It then checks for data loss errors by purposely dropping
+// file data (or entire files) not protected by a "sync".
+
+#include "test_util/fault_injection_test_env.h"
+#include <functional>
+#include <utility>
+
+namespace ROCKSDB_NAMESPACE {
+
+// Assume a filename, and not a directory name like "/foo/bar/"
+std::string GetDirName(const std::string filename) {
+ size_t found = filename.find_last_of("/\\");
+ if (found == std::string::npos) {
+ return "";
+ } else {
+ return filename.substr(0, found);
+ }
+}
+
+// A basic file truncation function suitable for this test.
+Status Truncate(Env* env, const std::string& filename, uint64_t length) {
+ std::unique_ptr<SequentialFile> orig_file;
+ const EnvOptions options;
+ Status s = env->NewSequentialFile(filename, &orig_file, options);
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot open file %s for truncation: %s\n",
+ filename.c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ std::unique_ptr<char[]> scratch(new char[length]);
+ ROCKSDB_NAMESPACE::Slice result;
+ s = orig_file->Read(length, &result, scratch.get());
+#ifdef OS_WIN
+ orig_file.reset();
+#endif
+ if (s.ok()) {
+ std::string tmp_name = GetDirName(filename) + "/truncate.tmp";
+ std::unique_ptr<WritableFile> tmp_file;
+ s = env->NewWritableFile(tmp_name, &tmp_file, options);
+ if (s.ok()) {
+ s = tmp_file->Append(result);
+ if (s.ok()) {
+ s = env->RenameFile(tmp_name, filename);
+ } else {
+ fprintf(stderr, "Cannot rename file %s to %s: %s\n", tmp_name.c_str(),
+ filename.c_str(), s.ToString().c_str());
+ env->DeleteFile(tmp_name);
+ }
+ }
+ }
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(),
+ s.ToString().c_str());
+ }
+
+ return s;
+}
+
+// Trim the tailing "/" in the end of `str`
+std::string TrimDirname(const std::string& str) {
+ size_t found = str.find_last_not_of("/");
+ if (found == std::string::npos) {
+ return str;
+ }
+ return str.substr(0, found + 1);
+}
+
+// Return pair <parent directory name, file name> of a full path.
+std::pair<std::string, std::string> GetDirAndName(const std::string& name) {
+ std::string dirname = GetDirName(name);
+ std::string fname = name.substr(dirname.size() + 1);
+ return std::make_pair(dirname, fname);
+}
+
+Status FileState::DropUnsyncedData(Env* env) const {
+ ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_;
+ return Truncate(env, filename_, sync_pos);
+}
+
+Status FileState::DropRandomUnsyncedData(Env* env, Random* rand) const {
+ ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_;
+ assert(pos_ >= sync_pos);
+ int range = static_cast<int>(pos_ - sync_pos);
+ uint64_t truncated_size =
+ static_cast<uint64_t>(sync_pos) + rand->Uniform(range);
+ return Truncate(env, filename_, truncated_size);
+}
+
+Status TestDirectory::Fsync() {
+ if (!env_->IsFilesystemActive()) {
+ return env_->GetError();
+ }
+ env_->SyncDir(dirname_);
+ return dir_->Fsync();
+}
+
+TestWritableFile::TestWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>&& f,
+ FaultInjectionTestEnv* env)
+ : state_(fname),
+ target_(std::move(f)),
+ writable_file_opened_(true),
+ env_(env) {
+ assert(target_ != nullptr);
+ state_.pos_ = 0;
+}
+
+TestWritableFile::~TestWritableFile() {
+ if (writable_file_opened_) {
+ Close();
+ }
+}
+
+Status TestWritableFile::Append(const Slice& data) {
+ if (!env_->IsFilesystemActive()) {
+ return env_->GetError();
+ }
+ Status s = target_->Append(data);
+ if (s.ok()) {
+ state_.pos_ += data.size();
+ env_->WritableFileAppended(state_);
+ }
+ return s;
+}
+
+Status TestWritableFile::Close() {
+ writable_file_opened_ = false;
+ Status s = target_->Close();
+ if (s.ok()) {
+ env_->WritableFileClosed(state_);
+ }
+ return s;
+}
+
+Status TestWritableFile::Flush() {
+ Status s = target_->Flush();
+ if (s.ok() && env_->IsFilesystemActive()) {
+ state_.pos_at_last_flush_ = state_.pos_;
+ }
+ return s;
+}
+
+Status TestWritableFile::Sync() {
+ if (!env_->IsFilesystemActive()) {
+ return Status::IOError("FaultInjectionTestEnv: not active");
+ }
+ // No need to actual sync.
+ state_.pos_at_last_sync_ = state_.pos_;
+ env_->WritableFileSynced(state_);
+ return Status::OK();
+}
+
+TestRandomRWFile::TestRandomRWFile(const std::string& /*fname*/,
+ std::unique_ptr<RandomRWFile>&& f,
+ FaultInjectionTestEnv* env)
+ : target_(std::move(f)), file_opened_(true), env_(env) {
+ assert(target_ != nullptr);
+}
+
+TestRandomRWFile::~TestRandomRWFile() {
+ if (file_opened_) {
+ Close();
+ }
+}
+
+Status TestRandomRWFile::Write(uint64_t offset, const Slice& data) {
+ if (!env_->IsFilesystemActive()) {
+ return env_->GetError();
+ }
+ return target_->Write(offset, data);
+}
+
+Status TestRandomRWFile::Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const {
+ if (!env_->IsFilesystemActive()) {
+ return env_->GetError();
+ }
+ return target_->Read(offset, n, result, scratch);
+}
+
+Status TestRandomRWFile::Close() {
+ file_opened_ = false;
+ return target_->Close();
+}
+
+Status TestRandomRWFile::Flush() {
+ if (!env_->IsFilesystemActive()) {
+ return env_->GetError();
+ }
+ return target_->Flush();
+}
+
+Status TestRandomRWFile::Sync() {
+ if (!env_->IsFilesystemActive()) {
+ return env_->GetError();
+ }
+ return target_->Sync();
+}
+
+Status FaultInjectionTestEnv::NewDirectory(const std::string& name,
+ std::unique_ptr<Directory>* result) {
+ std::unique_ptr<Directory> r;
+ Status s = target()->NewDirectory(name, &r);
+ assert(s.ok());
+ if (!s.ok()) {
+ return s;
+ }
+ result->reset(new TestDirectory(this, TrimDirname(name), r.release()));
+ return Status::OK();
+}
+
+Status FaultInjectionTestEnv::NewWritableFile(
+ const std::string& fname, std::unique_ptr<WritableFile>* result,
+ const EnvOptions& soptions) {
+ if (!IsFilesystemActive()) {
+ return GetError();
+ }
+ // Not allow overwriting files
+ Status s = target()->FileExists(fname);
+ if (s.ok()) {
+ return Status::Corruption("File already exists.");
+ } else if (!s.IsNotFound()) {
+ assert(s.IsIOError());
+ return s;
+ }
+ s = target()->NewWritableFile(fname, result, soptions);
+ if (s.ok()) {
+ result->reset(new TestWritableFile(fname, std::move(*result), this));
+ // WritableFileWriter* file is opened
+ // again then it will be truncated - so forget our saved state.
+ UntrackFile(fname);
+ MutexLock l(&mutex_);
+ open_files_.insert(fname);
+ auto dir_and_name = GetDirAndName(fname);
+ auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
+ list.insert(dir_and_name.second);
+ }
+ return s;
+}
+
+Status FaultInjectionTestEnv::ReopenWritableFile(
+ const std::string& fname, std::unique_ptr<WritableFile>* result,
+ const EnvOptions& soptions) {
+ if (!IsFilesystemActive()) {
+ return GetError();
+ }
+ Status s = target()->ReopenWritableFile(fname, result, soptions);
+ if (s.ok()) {
+ result->reset(new TestWritableFile(fname, std::move(*result), this));
+ // WritableFileWriter* file is opened
+ // again then it will be truncated - so forget our saved state.
+ UntrackFile(fname);
+ MutexLock l(&mutex_);
+ open_files_.insert(fname);
+ auto dir_and_name = GetDirAndName(fname);
+ auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
+ list.insert(dir_and_name.second);
+ }
+ return s;
+}
+
+Status FaultInjectionTestEnv::NewRandomRWFile(
+ const std::string& fname, std::unique_ptr<RandomRWFile>* result,
+ const EnvOptions& soptions) {
+ if (!IsFilesystemActive()) {
+ return GetError();
+ }
+ Status s = target()->NewRandomRWFile(fname, result, soptions);
+ if (s.ok()) {
+ result->reset(new TestRandomRWFile(fname, std::move(*result), this));
+ // WritableFileWriter* file is opened
+ // again then it will be truncated - so forget our saved state.
+ UntrackFile(fname);
+ MutexLock l(&mutex_);
+ open_files_.insert(fname);
+ auto dir_and_name = GetDirAndName(fname);
+ auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
+ list.insert(dir_and_name.second);
+ }
+ return s;
+}
+
+Status FaultInjectionTestEnv::NewRandomAccessFile(
+ const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& soptions) {
+ if (!IsFilesystemActive()) {
+ return GetError();
+ }
+ return target()->NewRandomAccessFile(fname, result, soptions);
+}
+
+Status FaultInjectionTestEnv::DeleteFile(const std::string& f) {
+ if (!IsFilesystemActive()) {
+ return GetError();
+ }
+ Status s = EnvWrapper::DeleteFile(f);
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(),
+ s.ToString().c_str());
+ }
+ if (s.ok()) {
+ UntrackFile(f);
+ }
+ return s;
+}
+
+Status FaultInjectionTestEnv::RenameFile(const std::string& s,
+ const std::string& t) {
+ if (!IsFilesystemActive()) {
+ return GetError();
+ }
+ Status ret = EnvWrapper::RenameFile(s, t);
+
+ if (ret.ok()) {
+ MutexLock l(&mutex_);
+ if (db_file_state_.find(s) != db_file_state_.end()) {
+ db_file_state_[t] = db_file_state_[s];
+ db_file_state_.erase(s);
+ }
+
+ auto sdn = GetDirAndName(s);
+ auto tdn = GetDirAndName(t);
+ if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) {
+ auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
+ assert(tlist.find(tdn.second) == tlist.end());
+ tlist.insert(tdn.second);
+ }
+ }
+
+ return ret;
+}
+
+void FaultInjectionTestEnv::WritableFileClosed(const FileState& state) {
+ MutexLock l(&mutex_);
+ if (open_files_.find(state.filename_) != open_files_.end()) {
+ db_file_state_[state.filename_] = state;
+ open_files_.erase(state.filename_);
+ }
+}
+
+void FaultInjectionTestEnv::WritableFileSynced(const FileState& state) {
+ MutexLock l(&mutex_);
+ if (open_files_.find(state.filename_) != open_files_.end()) {
+ if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
+ db_file_state_.insert(std::make_pair(state.filename_, state));
+ } else {
+ db_file_state_[state.filename_] = state;
+ }
+ }
+}
+
+void FaultInjectionTestEnv::WritableFileAppended(const FileState& state) {
+ MutexLock l(&mutex_);
+ if (open_files_.find(state.filename_) != open_files_.end()) {
+ if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
+ db_file_state_.insert(std::make_pair(state.filename_, state));
+ } else {
+ db_file_state_[state.filename_] = state;
+ }
+ }
+}
+
+// For every file that is not fully synced, make a call to `func` with
+// FileState of the file as the parameter.
+Status FaultInjectionTestEnv::DropFileData(
+ std::function<Status(Env*, FileState)> func) {
+ Status s;
+ MutexLock l(&mutex_);
+ for (std::map<std::string, FileState>::const_iterator it =
+ db_file_state_.begin();
+ s.ok() && it != db_file_state_.end(); ++it) {
+ const FileState& state = it->second;
+ if (!state.IsFullySynced()) {
+ s = func(target(), state);
+ }
+ }
+ return s;
+}
+
+Status FaultInjectionTestEnv::DropUnsyncedFileData() {
+ return DropFileData([&](Env* env, const FileState& state) {
+ return state.DropUnsyncedData(env);
+ });
+}
+
+Status FaultInjectionTestEnv::DropRandomUnsyncedFileData(Random* rnd) {
+ return DropFileData([&](Env* env, const FileState& state) {
+ return state.DropRandomUnsyncedData(env, rnd);
+ });
+}
+
+Status FaultInjectionTestEnv::DeleteFilesCreatedAfterLastDirSync() {
+ // Because DeleteFile access this container make a copy to avoid deadlock
+ std::map<std::string, std::set<std::string>> map_copy;
+ {
+ MutexLock l(&mutex_);
+ map_copy.insert(dir_to_new_files_since_last_sync_.begin(),
+ dir_to_new_files_since_last_sync_.end());
+ }
+
+ for (auto& pair : map_copy) {
+ for (std::string name : pair.second) {
+ Status s = DeleteFile(pair.first + "/" + name);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ }
+ return Status::OK();
+}
+void FaultInjectionTestEnv::ResetState() {
+ MutexLock l(&mutex_);
+ db_file_state_.clear();
+ dir_to_new_files_since_last_sync_.clear();
+ SetFilesystemActiveNoLock(true);
+}
+
+void FaultInjectionTestEnv::UntrackFile(const std::string& f) {
+ MutexLock l(&mutex_);
+ auto dir_and_name = GetDirAndName(f);
+ dir_to_new_files_since_last_sync_[dir_and_name.first].erase(
+ dir_and_name.second);
+ db_file_state_.erase(f);
+ open_files_.erase(f);
+}
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/fault_injection_test_env.h b/src/rocksdb/test_util/fault_injection_test_env.h
new file mode 100644
index 000000000..9cc33a8d3
--- /dev/null
+++ b/src/rocksdb/test_util/fault_injection_test_env.h
@@ -0,0 +1,225 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright 2014 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+// This test uses a custom Env to keep track of the state of a filesystem as of
+// the last "sync". It then checks for data loss errors by purposely dropping
+// file data (or entire files) not protected by a "sync".
+
+#pragma once
+
+#include <map>
+#include <set>
+#include <string>
+
+#include "db/version_set.h"
+#include "env/mock_env.h"
+#include "file/filename.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "util/mutexlock.h"
+#include "util/random.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class TestWritableFile;
+class FaultInjectionTestEnv;
+
+struct FileState {
+ std::string filename_;
+ ssize_t pos_;
+ ssize_t pos_at_last_sync_;
+ ssize_t pos_at_last_flush_;
+
+ explicit FileState(const std::string& filename)
+ : filename_(filename),
+ pos_(-1),
+ pos_at_last_sync_(-1),
+ pos_at_last_flush_(-1) {}
+
+ FileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {}
+
+ bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; }
+
+ Status DropUnsyncedData(Env* env) const;
+
+ Status DropRandomUnsyncedData(Env* env, Random* rand) const;
+};
+
+// A wrapper around WritableFileWriter* file
+// is written to or sync'ed.
+class TestWritableFile : public WritableFile {
+ public:
+ explicit TestWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>&& f,
+ FaultInjectionTestEnv* env);
+ virtual ~TestWritableFile();
+ virtual Status Append(const Slice& data) override;
+ virtual Status Truncate(uint64_t size) override {
+ return target_->Truncate(size);
+ }
+ virtual Status Close() override;
+ virtual Status Flush() override;
+ virtual Status Sync() override;
+ virtual bool IsSyncThreadSafe() const override { return true; }
+ virtual Status PositionedAppend(const Slice& data,
+ uint64_t offset) override {
+ return target_->PositionedAppend(data, offset);
+ }
+ virtual bool use_direct_io() const override {
+ return target_->use_direct_io();
+ };
+
+ private:
+ FileState state_;
+ std::unique_ptr<WritableFile> target_;
+ bool writable_file_opened_;
+ FaultInjectionTestEnv* env_;
+};
+
+// A wrapper around WritableFileWriter* file
+// is written to or sync'ed.
+class TestRandomRWFile : public RandomRWFile {
+ public:
+ explicit TestRandomRWFile(const std::string& fname,
+ std::unique_ptr<RandomRWFile>&& f,
+ FaultInjectionTestEnv* env);
+ virtual ~TestRandomRWFile();
+ Status Write(uint64_t offset, const Slice& data) override;
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override;
+ Status Close() override;
+ Status Flush() override;
+ Status Sync() override;
+ size_t GetRequiredBufferAlignment() const override {
+ return target_->GetRequiredBufferAlignment();
+ }
+ bool use_direct_io() const override { return target_->use_direct_io(); };
+
+ private:
+ std::unique_ptr<RandomRWFile> target_;
+ bool file_opened_;
+ FaultInjectionTestEnv* env_;
+};
+
+class TestDirectory : public Directory {
+ public:
+ explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname,
+ Directory* dir)
+ : env_(env), dirname_(dirname), dir_(dir) {}
+ ~TestDirectory() {}
+
+ virtual Status Fsync() override;
+
+ private:
+ FaultInjectionTestEnv* env_;
+ std::string dirname_;
+ std::unique_ptr<Directory> dir_;
+};
+
+class FaultInjectionTestEnv : public EnvWrapper {
+ public:
+ explicit FaultInjectionTestEnv(Env* base)
+ : EnvWrapper(base), filesystem_active_(true) {}
+ virtual ~FaultInjectionTestEnv() {}
+
+ Status NewDirectory(const std::string& name,
+ std::unique_ptr<Directory>* result) override;
+
+ Status NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& soptions) override;
+
+ Status ReopenWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& soptions) override;
+
+ Status NewRandomRWFile(const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result,
+ const EnvOptions& soptions) override;
+
+ Status NewRandomAccessFile(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& soptions) override;
+
+ virtual Status DeleteFile(const std::string& f) override;
+
+ virtual Status RenameFile(const std::string& s,
+ const std::string& t) override;
+
+// Undef to eliminate clash on Windows
+#undef GetFreeSpace
+ virtual Status GetFreeSpace(const std::string& path,
+ uint64_t* disk_free) override {
+ if (!IsFilesystemActive() && error_ == Status::NoSpace()) {
+ *disk_free = 0;
+ return Status::OK();
+ } else {
+ return target()->GetFreeSpace(path, disk_free);
+ }
+ }
+
+ void WritableFileClosed(const FileState& state);
+
+ void WritableFileSynced(const FileState& state);
+
+ void WritableFileAppended(const FileState& state);
+
+ // For every file that is not fully synced, make a call to `func` with
+ // FileState of the file as the parameter.
+ Status DropFileData(std::function<Status(Env*, FileState)> func);
+
+ Status DropUnsyncedFileData();
+
+ Status DropRandomUnsyncedFileData(Random* rnd);
+
+ Status DeleteFilesCreatedAfterLastDirSync();
+
+ void ResetState();
+
+ void UntrackFile(const std::string& f);
+
+ void SyncDir(const std::string& dirname) {
+ MutexLock l(&mutex_);
+ dir_to_new_files_since_last_sync_.erase(dirname);
+ }
+
+ // Setting the filesystem to inactive is the test equivalent to simulating a
+ // system reset. Setting to inactive will freeze our saved filesystem state so
+ // that it will stop being recorded. It can then be reset back to the state at
+ // the time of the reset.
+ bool IsFilesystemActive() {
+ MutexLock l(&mutex_);
+ return filesystem_active_;
+ }
+ void SetFilesystemActiveNoLock(bool active,
+ Status error = Status::Corruption("Not active")) {
+ filesystem_active_ = active;
+ if (!active) {
+ error_ = error;
+ }
+ }
+ void SetFilesystemActive(bool active,
+ Status error = Status::Corruption("Not active")) {
+ MutexLock l(&mutex_);
+ SetFilesystemActiveNoLock(active, error);
+ }
+ void AssertNoOpenFile() { assert(open_files_.empty()); }
+ Status GetError() { return error_; }
+
+ private:
+ port::Mutex mutex_;
+ std::map<std::string, FileState> db_file_state_;
+ std::set<std::string> open_files_;
+ std::unordered_map<std::string, std::set<std::string>>
+ dir_to_new_files_since_last_sync_;
+ bool filesystem_active_; // Record flushes, syncs, writes
+ Status error_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/mock_time_env.h b/src/rocksdb/test_util/mock_time_env.h
new file mode 100644
index 000000000..32b40f79c
--- /dev/null
+++ b/src/rocksdb/test_util/mock_time_env.h
@@ -0,0 +1,45 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include "rocksdb/env.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class MockTimeEnv : public EnvWrapper {
+ public:
+ explicit MockTimeEnv(Env* base) : EnvWrapper(base) {}
+
+ virtual Status GetCurrentTime(int64_t* time) override {
+ assert(time != nullptr);
+ assert(current_time_ <=
+ static_cast<uint64_t>(std::numeric_limits<int64_t>::max()));
+ *time = static_cast<int64_t>(current_time_);
+ return Status::OK();
+ }
+
+ virtual uint64_t NowMicros() override {
+ assert(current_time_ <= std::numeric_limits<uint64_t>::max() / 1000000);
+ return current_time_ * 1000000;
+ }
+
+ virtual uint64_t NowNanos() override {
+ assert(current_time_ <= std::numeric_limits<uint64_t>::max() / 1000000000);
+ return current_time_ * 1000000000;
+ }
+
+ uint64_t RealNowMicros() { return target()->NowMicros(); }
+
+ void set_current_time(uint64_t time) {
+ assert(time >= current_time_);
+ current_time_ = time;
+ }
+
+ private:
+ std::atomic<uint64_t> current_time_{0};
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/sync_point.cc b/src/rocksdb/test_util/sync_point.cc
new file mode 100644
index 000000000..d969ae3c8
--- /dev/null
+++ b/src/rocksdb/test_util/sync_point.cc
@@ -0,0 +1,66 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_util/sync_point.h"
+#include "test_util/sync_point_impl.h"
+
+int rocksdb_kill_odds = 0;
+std::vector<std::string> rocksdb_kill_prefix_blacklist;
+
+#ifndef NDEBUG
+namespace ROCKSDB_NAMESPACE {
+
+SyncPoint* SyncPoint::GetInstance() {
+ static SyncPoint sync_point;
+ return &sync_point;
+}
+
+SyncPoint::SyncPoint() : impl_(new Data) {}
+
+SyncPoint:: ~SyncPoint() {
+ delete impl_;
+}
+
+void SyncPoint::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
+ impl_->LoadDependency(dependencies);
+}
+
+void SyncPoint::LoadDependencyAndMarkers(
+ const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers) {
+ impl_->LoadDependencyAndMarkers(dependencies, markers);
+}
+
+void SyncPoint::SetCallBack(const std::string& point,
+ const std::function<void(void*)>& callback) {
+ impl_->SetCallBack(point, callback);
+}
+
+void SyncPoint::ClearCallBack(const std::string& point) {
+ impl_->ClearCallBack(point);
+}
+
+void SyncPoint::ClearAllCallBacks() {
+ impl_->ClearAllCallBacks();
+}
+
+void SyncPoint::EnableProcessing() {
+ impl_->EnableProcessing();
+}
+
+void SyncPoint::DisableProcessing() {
+ impl_->DisableProcessing();
+}
+
+void SyncPoint::ClearTrace() {
+ impl_->ClearTrace();
+}
+
+void SyncPoint::Process(const std::string& point, void* cb_arg) {
+ impl_->Process(point, cb_arg);
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // NDEBUG
diff --git a/src/rocksdb/test_util/sync_point.h b/src/rocksdb/test_util/sync_point.h
new file mode 100644
index 000000000..46bfd50d7
--- /dev/null
+++ b/src/rocksdb/test_util/sync_point.h
@@ -0,0 +1,144 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#include <assert.h>
+#include <functional>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "rocksdb/rocksdb_namespace.h"
+
+// This is only set from db_stress.cc and for testing only.
+// If non-zero, kill at various points in source code with probability 1/this
+extern int rocksdb_kill_odds;
+// If kill point has a prefix on this list, will skip killing.
+extern std::vector<std::string> rocksdb_kill_prefix_blacklist;
+
+#ifdef NDEBUG
+// empty in release build
+#define TEST_KILL_RANDOM(kill_point, rocksdb_kill_odds)
+#else
+
+namespace ROCKSDB_NAMESPACE {
+// Kill the process with probability 1/odds for testing.
+extern void TestKillRandom(std::string kill_point, int odds,
+ const std::string& srcfile, int srcline);
+
+// To avoid crashing always at some frequently executed codepaths (during
+// kill random test), use this factor to reduce odds
+#define REDUCE_ODDS 2
+#define REDUCE_ODDS2 4
+
+#define TEST_KILL_RANDOM(kill_point, rocksdb_kill_odds) \
+ { \
+ if (rocksdb_kill_odds > 0) { \
+ TestKillRandom(kill_point, rocksdb_kill_odds, __FILE__, __LINE__); \
+ } \
+ }
+} // namespace ROCKSDB_NAMESPACE
+#endif
+
+#ifdef NDEBUG
+#define TEST_SYNC_POINT(x)
+#define TEST_IDX_SYNC_POINT(x, index)
+#define TEST_SYNC_POINT_CALLBACK(x, y)
+#define INIT_SYNC_POINT_SINGLETONS()
+#else
+
+namespace ROCKSDB_NAMESPACE {
+
+// This class provides facility to reproduce race conditions deterministically
+// in unit tests.
+// Developer could specify sync points in the codebase via TEST_SYNC_POINT.
+// Each sync point represents a position in the execution stream of a thread.
+// In the unit test, 'Happens After' relationship among sync points could be
+// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of
+// threads execution.
+// Refer to (DBTest,TransactionLogIteratorRace), for an example use case.
+
+class SyncPoint {
+ public:
+ static SyncPoint* GetInstance();
+
+ SyncPoint(const SyncPoint&) = delete;
+ SyncPoint& operator=(const SyncPoint&) = delete;
+ ~SyncPoint();
+
+ struct SyncPointPair {
+ std::string predecessor;
+ std::string successor;
+ };
+
+ // call once at the beginning of a test to setup the dependency between
+ // sync points
+ void LoadDependency(const std::vector<SyncPointPair>& dependencies);
+
+ // call once at the beginning of a test to setup the dependency between
+ // sync points and setup markers indicating the successor is only enabled
+ // when it is processed on the same thread as the predecessor.
+ // When adding a marker, it implicitly adds a dependency for the marker pair.
+ void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers);
+
+ // The argument to the callback is passed through from
+ // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or
+ // TEST_IDX_SYNC_POINT was used.
+ void SetCallBack(const std::string& point,
+ const std::function<void(void*)>& callback);
+
+ // Clear callback function by point
+ void ClearCallBack(const std::string& point);
+
+ // Clear all call back functions.
+ void ClearAllCallBacks();
+
+ // enable sync point processing (disabled on startup)
+ void EnableProcessing();
+
+ // disable sync point processing
+ void DisableProcessing();
+
+ // remove the execution trace of all sync points
+ void ClearTrace();
+
+ // triggered by TEST_SYNC_POINT, blocking execution until all predecessors
+ // are executed.
+ // And/or call registered callback function, with argument `cb_arg`
+ void Process(const std::string& point, void* cb_arg = nullptr);
+
+ // TODO: it might be useful to provide a function that blocks until all
+ // sync points are cleared.
+
+ // We want this to be public so we can
+ // subclass the implementation
+ struct Data;
+
+ private:
+ // Singleton
+ SyncPoint();
+ Data* impl_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+// Use TEST_SYNC_POINT to specify sync points inside code base.
+// Sync points can have happens-after depedency on other sync points,
+// configured at runtime via SyncPoint::LoadDependency. This could be
+// utilized to re-produce race conditions between threads.
+// See TransactionLogIteratorRace in db_test.cc for an example use case.
+// TEST_SYNC_POINT is no op in release build.
+#define TEST_SYNC_POINT(x) \
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x)
+#define TEST_IDX_SYNC_POINT(x, index) \
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x + \
+ std::to_string(index))
+#define TEST_SYNC_POINT_CALLBACK(x, y) \
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x, y)
+#define INIT_SYNC_POINT_SINGLETONS() \
+ (void)ROCKSDB_NAMESPACE::SyncPoint::GetInstance();
+#endif // NDEBUG
diff --git a/src/rocksdb/test_util/sync_point_impl.cc b/src/rocksdb/test_util/sync_point_impl.cc
new file mode 100644
index 000000000..7b939c5f4
--- /dev/null
+++ b/src/rocksdb/test_util/sync_point_impl.cc
@@ -0,0 +1,129 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_util/sync_point_impl.h"
+
+#ifndef NDEBUG
+namespace ROCKSDB_NAMESPACE {
+
+void TestKillRandom(std::string kill_point, int odds,
+ const std::string& srcfile, int srcline) {
+ for (auto& p : rocksdb_kill_prefix_blacklist) {
+ if (kill_point.substr(0, p.length()) == p) {
+ return;
+ }
+ }
+
+ assert(odds > 0);
+ if (odds % 7 == 0) {
+ // class Random uses multiplier 16807, which is 7^5. If odds are
+ // multiplier of 7, there might be limited values generated.
+ odds++;
+ }
+ auto* r = Random::GetTLSInstance();
+ bool crash = r->OneIn(odds);
+ if (crash) {
+ port::Crash(srcfile, srcline);
+ }
+}
+
+
+void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ successors_.clear();
+ predecessors_.clear();
+ cleared_points_.clear();
+ for (const auto& dependency : dependencies) {
+ successors_[dependency.predecessor].push_back(dependency.successor);
+ predecessors_[dependency.successor].push_back(dependency.predecessor);
+ }
+ cv_.notify_all();
+}
+
+void SyncPoint::Data::LoadDependencyAndMarkers(
+ const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ successors_.clear();
+ predecessors_.clear();
+ cleared_points_.clear();
+ markers_.clear();
+ marked_thread_id_.clear();
+ for (const auto& dependency : dependencies) {
+ successors_[dependency.predecessor].push_back(dependency.successor);
+ predecessors_[dependency.successor].push_back(dependency.predecessor);
+ }
+ for (const auto& marker : markers) {
+ successors_[marker.predecessor].push_back(marker.successor);
+ predecessors_[marker.successor].push_back(marker.predecessor);
+ markers_[marker.predecessor].push_back(marker.successor);
+ }
+ cv_.notify_all();
+}
+
+bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) {
+ for (const auto& pred : predecessors_[point]) {
+ if (cleared_points_.count(pred) == 0) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void SyncPoint::Data::ClearCallBack(const std::string& point) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (num_callbacks_running_ > 0) {
+ cv_.wait(lock);
+ }
+ callbacks_.erase(point);
+}
+
+void SyncPoint::Data::ClearAllCallBacks() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (num_callbacks_running_ > 0) {
+ cv_.wait(lock);
+ }
+ callbacks_.clear();
+}
+
+void SyncPoint::Data::Process(const std::string& point, void* cb_arg) {
+ if (!enabled_) {
+ return;
+ }
+
+ std::unique_lock<std::mutex> lock(mutex_);
+ auto thread_id = std::this_thread::get_id();
+
+ auto marker_iter = markers_.find(point);
+ if (marker_iter != markers_.end()) {
+ for (auto& marked_point : marker_iter->second) {
+ marked_thread_id_.emplace(marked_point, thread_id);
+ }
+ }
+
+ if (DisabledByMarker(point, thread_id)) {
+ return;
+ }
+
+ while (!PredecessorsAllCleared(point)) {
+ cv_.wait(lock);
+ if (DisabledByMarker(point, thread_id)) {
+ return;
+ }
+ }
+
+ auto callback_pair = callbacks_.find(point);
+ if (callback_pair != callbacks_.end()) {
+ num_callbacks_running_++;
+ mutex_.unlock();
+ callback_pair->second(cb_arg);
+ mutex_.lock();
+ num_callbacks_running_--;
+ }
+ cleared_points_.insert(point);
+ cv_.notify_all();
+}
+} // namespace ROCKSDB_NAMESPACE
+#endif
diff --git a/src/rocksdb/test_util/sync_point_impl.h b/src/rocksdb/test_util/sync_point_impl.h
new file mode 100644
index 000000000..b246c0198
--- /dev/null
+++ b/src/rocksdb/test_util/sync_point_impl.h
@@ -0,0 +1,74 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_util/sync_point.h"
+
+#include <assert.h>
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "port/port.h"
+#include "util/random.h"
+
+#pragma once
+
+#ifndef NDEBUG
+namespace ROCKSDB_NAMESPACE {
+struct SyncPoint::Data {
+ Data() : enabled_(false) {}
+ // Enable proper deletion by subclasses
+ virtual ~Data() {}
+ // successor/predecessor map loaded from LoadDependency
+ std::unordered_map<std::string, std::vector<std::string>> successors_;
+ std::unordered_map<std::string, std::vector<std::string>> predecessors_;
+ std::unordered_map<std::string, std::function<void(void*)> > callbacks_;
+ std::unordered_map<std::string, std::vector<std::string> > markers_;
+ std::unordered_map<std::string, std::thread::id> marked_thread_id_;
+
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ // sync points that have been passed through
+ std::unordered_set<std::string> cleared_points_;
+ std::atomic<bool> enabled_;
+ int num_callbacks_running_ = 0;
+
+ void LoadDependency(const std::vector<SyncPointPair>& dependencies);
+ void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers);
+ bool PredecessorsAllCleared(const std::string& point);
+ void SetCallBack(const std::string& point,
+ const std::function<void(void*)>& callback) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ callbacks_[point] = callback;
+}
+
+ void ClearCallBack(const std::string& point);
+ void ClearAllCallBacks();
+ void EnableProcessing() {
+ enabled_ = true;
+ }
+ void DisableProcessing() {
+ enabled_ = false;
+ }
+ void ClearTrace() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ cleared_points_.clear();
+ }
+ bool DisabledByMarker(const std::string& point,
+ std::thread::id thread_id) {
+ auto marked_point_iter = marked_thread_id_.find(point);
+ return marked_point_iter != marked_thread_id_.end() &&
+ thread_id != marked_point_iter->second;
+ }
+ void Process(const std::string& point, void* cb_arg);
+};
+} // namespace ROCKSDB_NAMESPACE
+#endif // NDEBUG
diff --git a/src/rocksdb/test_util/testharness.cc b/src/rocksdb/test_util/testharness.cc
new file mode 100644
index 000000000..d38d43080
--- /dev/null
+++ b/src/rocksdb/test_util/testharness.cc
@@ -0,0 +1,56 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "test_util/testharness.h"
+#include <string>
+#include <thread>
+
+namespace ROCKSDB_NAMESPACE {
+namespace test {
+
+::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s) {
+ if (s.ok()) {
+ return ::testing::AssertionSuccess();
+ } else {
+ return ::testing::AssertionFailure() << s_expr << std::endl
+ << s.ToString();
+ }
+}
+
+std::string TmpDir(Env* env) {
+ std::string dir;
+ Status s = env->GetTestDirectory(&dir);
+ EXPECT_TRUE(s.ok()) << s.ToString();
+ return dir;
+}
+
+std::string PerThreadDBPath(std::string dir, std::string name) {
+ size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
+ return dir + "/" + name + "_" + std::to_string(tid);
+}
+
+std::string PerThreadDBPath(std::string name) {
+ return PerThreadDBPath(test::TmpDir(), name);
+}
+
+std::string PerThreadDBPath(Env* env, std::string name) {
+ return PerThreadDBPath(test::TmpDir(env), name);
+}
+
+int RandomSeed() {
+ const char* env = getenv("TEST_RANDOM_SEED");
+ int result = (env != nullptr ? atoi(env) : 301);
+ if (result <= 0) {
+ result = 301;
+ }
+ return result;
+}
+
+} // namespace test
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/testharness.h b/src/rocksdb/test_util/testharness.h
new file mode 100644
index 000000000..11f40ff83
--- /dev/null
+++ b/src/rocksdb/test_util/testharness.h
@@ -0,0 +1,47 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+
+#ifdef OS_AIX
+#include "gtest/gtest.h"
+#else
+#include <gtest/gtest.h>
+#endif
+
+#include <string>
+#include "rocksdb/env.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace test {
+
+// Return the directory to use for temporary storage.
+std::string TmpDir(Env* env = Env::Default());
+
+// A path unique within the thread
+std::string PerThreadDBPath(std::string name);
+std::string PerThreadDBPath(Env* env, std::string name);
+std::string PerThreadDBPath(std::string dir, std::string name);
+
+// Return a randomization seed for this run. Typically returns the
+// same number on repeated invocations of this binary, but automated
+// runs may be able to vary the seed.
+int RandomSeed();
+
+::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s);
+
+#define ASSERT_OK(s) \
+ ASSERT_PRED_FORMAT1(ROCKSDB_NAMESPACE::test::AssertStatus, s)
+#define ASSERT_NOK(s) ASSERT_FALSE((s).ok())
+#define EXPECT_OK(s) \
+ EXPECT_PRED_FORMAT1(ROCKSDB_NAMESPACE::test::AssertStatus, s)
+#define EXPECT_NOK(s) EXPECT_FALSE((s).ok())
+
+} // namespace test
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/testutil.cc b/src/rocksdb/test_util/testutil.cc
new file mode 100644
index 000000000..2969a140a
--- /dev/null
+++ b/src/rocksdb/test_util/testutil.cc
@@ -0,0 +1,454 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "test_util/testutil.h"
+
+#include <array>
+#include <cctype>
+#include <fstream>
+#include <sstream>
+
+#include "db/memtable_list.h"
+#include "env/composite_env_wrapper.h"
+#include "file/random_access_file_reader.h"
+#include "file/sequence_file_reader.h"
+#include "file/writable_file_writer.h"
+#include "port/port.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace test {
+
+const uint32_t kDefaultFormatVersion = BlockBasedTableOptions().format_version;
+const uint32_t kLatestFormatVersion = 5u;
+
+Slice RandomString(Random* rnd, int len, std::string* dst) {
+ dst->resize(len);
+ for (int i = 0; i < len; i++) {
+ (*dst)[i] = static_cast<char>(' ' + rnd->Uniform(95)); // ' ' .. '~'
+ }
+ return Slice(*dst);
+}
+
+extern std::string RandomHumanReadableString(Random* rnd, int len) {
+ std::string ret;
+ ret.resize(len);
+ for (int i = 0; i < len; ++i) {
+ ret[i] = static_cast<char>('a' + rnd->Uniform(26));
+ }
+ return ret;
+}
+
+std::string RandomKey(Random* rnd, int len, RandomKeyType type) {
+ // Make sure to generate a wide variety of characters so we
+ // test the boundary conditions for short-key optimizations.
+ static const char kTestChars[] = {'\0', '\1', 'a', 'b', 'c',
+ 'd', 'e', '\xfd', '\xfe', '\xff'};
+ std::string result;
+ for (int i = 0; i < len; i++) {
+ std::size_t indx = 0;
+ switch (type) {
+ case RandomKeyType::RANDOM:
+ indx = rnd->Uniform(sizeof(kTestChars));
+ break;
+ case RandomKeyType::LARGEST:
+ indx = sizeof(kTestChars) - 1;
+ break;
+ case RandomKeyType::MIDDLE:
+ indx = sizeof(kTestChars) / 2;
+ break;
+ case RandomKeyType::SMALLEST:
+ indx = 0;
+ break;
+ }
+ result += kTestChars[indx];
+ }
+ return result;
+}
+
+extern Slice CompressibleString(Random* rnd, double compressed_fraction,
+ int len, std::string* dst) {
+ int raw = static_cast<int>(len * compressed_fraction);
+ if (raw < 1) raw = 1;
+ std::string raw_data;
+ RandomString(rnd, raw, &raw_data);
+
+ // Duplicate the random data until we have filled "len" bytes
+ dst->clear();
+ while (dst->size() < (unsigned int)len) {
+ dst->append(raw_data);
+ }
+ dst->resize(len);
+ return Slice(*dst);
+}
+
+namespace {
+class Uint64ComparatorImpl : public Comparator {
+ public:
+ Uint64ComparatorImpl() {}
+
+ const char* Name() const override { return "rocksdb.Uint64Comparator"; }
+
+ int Compare(const Slice& a, const Slice& b) const override {
+ assert(a.size() == sizeof(uint64_t) && b.size() == sizeof(uint64_t));
+ const uint64_t* left = reinterpret_cast<const uint64_t*>(a.data());
+ const uint64_t* right = reinterpret_cast<const uint64_t*>(b.data());
+ uint64_t leftValue;
+ uint64_t rightValue;
+ GetUnaligned(left, &leftValue);
+ GetUnaligned(right, &rightValue);
+ if (leftValue == rightValue) {
+ return 0;
+ } else if (leftValue < rightValue) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+
+ void FindShortestSeparator(std::string* /*start*/,
+ const Slice& /*limit*/) const override {
+ return;
+ }
+
+ void FindShortSuccessor(std::string* /*key*/) const override { return; }
+};
+} // namespace
+
+const Comparator* Uint64Comparator() {
+ static Uint64ComparatorImpl uint64comp;
+ return &uint64comp;
+}
+
+WritableFileWriter* GetWritableFileWriter(WritableFile* wf,
+ const std::string& fname) {
+ std::unique_ptr<WritableFile> file(wf);
+ return new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)),
+ fname, EnvOptions());
+}
+
+RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf) {
+ std::unique_ptr<RandomAccessFile> file(raf);
+ return new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
+ "[test RandomAccessFileReader]");
+}
+
+SequentialFileReader* GetSequentialFileReader(SequentialFile* se,
+ const std::string& fname) {
+ std::unique_ptr<SequentialFile> file(se);
+ return new SequentialFileReader(NewLegacySequentialFileWrapper(file), fname);
+}
+
+void CorruptKeyType(InternalKey* ikey) {
+ std::string keystr = ikey->Encode().ToString();
+ keystr[keystr.size() - 8] = kTypeLogData;
+ ikey->DecodeFrom(Slice(keystr.data(), keystr.size()));
+}
+
+std::string KeyStr(const std::string& user_key, const SequenceNumber& seq,
+ const ValueType& t, bool corrupt) {
+ InternalKey k(user_key, seq, t);
+ if (corrupt) {
+ CorruptKeyType(&k);
+ }
+ return k.Encode().ToString();
+}
+
+std::string RandomName(Random* rnd, const size_t len) {
+ std::stringstream ss;
+ for (size_t i = 0; i < len; ++i) {
+ ss << static_cast<char>(rnd->Uniform(26) + 'a');
+ }
+ return ss.str();
+}
+
+CompressionType RandomCompressionType(Random* rnd) {
+ auto ret = static_cast<CompressionType>(rnd->Uniform(6));
+ while (!CompressionTypeSupported(ret)) {
+ ret = static_cast<CompressionType>((static_cast<int>(ret) + 1) % 6);
+ }
+ return ret;
+}
+
+void RandomCompressionTypeVector(const size_t count,
+ std::vector<CompressionType>* types,
+ Random* rnd) {
+ types->clear();
+ for (size_t i = 0; i < count; ++i) {
+ types->emplace_back(RandomCompressionType(rnd));
+ }
+}
+
+const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined) {
+ int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4);
+ switch (random_num) {
+ case 0:
+ return NewFixedPrefixTransform(rnd->Uniform(20) + 1);
+ case 1:
+ return NewCappedPrefixTransform(rnd->Uniform(20) + 1);
+ case 2:
+ return NewNoopTransform();
+ default:
+ return nullptr;
+ }
+}
+
+BlockBasedTableOptions RandomBlockBasedTableOptions(Random* rnd) {
+ BlockBasedTableOptions opt;
+ opt.cache_index_and_filter_blocks = rnd->Uniform(2);
+ opt.pin_l0_filter_and_index_blocks_in_cache = rnd->Uniform(2);
+ opt.pin_top_level_index_and_filter = rnd->Uniform(2);
+ using IndexType = BlockBasedTableOptions::IndexType;
+ const std::array<IndexType, 4> index_types = {
+ {IndexType::kBinarySearch, IndexType::kHashSearch,
+ IndexType::kTwoLevelIndexSearch, IndexType::kBinarySearchWithFirstKey}};
+ opt.index_type =
+ index_types[rnd->Uniform(static_cast<int>(index_types.size()))];
+ opt.hash_index_allow_collision = rnd->Uniform(2);
+ opt.checksum = static_cast<ChecksumType>(rnd->Uniform(3));
+ opt.block_size = rnd->Uniform(10000000);
+ opt.block_size_deviation = rnd->Uniform(100);
+ opt.block_restart_interval = rnd->Uniform(100);
+ opt.index_block_restart_interval = rnd->Uniform(100);
+ opt.whole_key_filtering = rnd->Uniform(2);
+
+ return opt;
+}
+
+TableFactory* RandomTableFactory(Random* rnd, int pre_defined) {
+#ifndef ROCKSDB_LITE
+ int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4);
+ switch (random_num) {
+ case 0:
+ return NewPlainTableFactory();
+ case 1:
+ return NewCuckooTableFactory();
+ default:
+ return NewBlockBasedTableFactory();
+ }
+#else
+ (void)rnd;
+ (void)pre_defined;
+ return NewBlockBasedTableFactory();
+#endif // !ROCKSDB_LITE
+}
+
+MergeOperator* RandomMergeOperator(Random* rnd) {
+ return new ChanglingMergeOperator(RandomName(rnd, 10));
+}
+
+CompactionFilter* RandomCompactionFilter(Random* rnd) {
+ return new ChanglingCompactionFilter(RandomName(rnd, 10));
+}
+
+CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd) {
+ return new ChanglingCompactionFilterFactory(RandomName(rnd, 10));
+}
+
+void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) {
+ // boolean options
+ db_opt->advise_random_on_open = rnd->Uniform(2);
+ db_opt->allow_mmap_reads = rnd->Uniform(2);
+ db_opt->allow_mmap_writes = rnd->Uniform(2);
+ db_opt->use_direct_reads = rnd->Uniform(2);
+ db_opt->use_direct_io_for_flush_and_compaction = rnd->Uniform(2);
+ db_opt->create_if_missing = rnd->Uniform(2);
+ db_opt->create_missing_column_families = rnd->Uniform(2);
+ db_opt->enable_thread_tracking = rnd->Uniform(2);
+ db_opt->error_if_exists = rnd->Uniform(2);
+ db_opt->is_fd_close_on_exec = rnd->Uniform(2);
+ db_opt->paranoid_checks = rnd->Uniform(2);
+ db_opt->skip_log_error_on_recovery = rnd->Uniform(2);
+ db_opt->skip_stats_update_on_db_open = rnd->Uniform(2);
+ db_opt->skip_checking_sst_file_sizes_on_db_open = rnd->Uniform(2);
+ db_opt->use_adaptive_mutex = rnd->Uniform(2);
+ db_opt->use_fsync = rnd->Uniform(2);
+ db_opt->recycle_log_file_num = rnd->Uniform(2);
+ db_opt->avoid_flush_during_recovery = rnd->Uniform(2);
+ db_opt->avoid_flush_during_shutdown = rnd->Uniform(2);
+
+ // int options
+ db_opt->max_background_compactions = rnd->Uniform(100);
+ db_opt->max_background_flushes = rnd->Uniform(100);
+ db_opt->max_file_opening_threads = rnd->Uniform(100);
+ db_opt->max_open_files = rnd->Uniform(100);
+ db_opt->table_cache_numshardbits = rnd->Uniform(100);
+
+ // size_t options
+ db_opt->db_write_buffer_size = rnd->Uniform(10000);
+ db_opt->keep_log_file_num = rnd->Uniform(10000);
+ db_opt->log_file_time_to_roll = rnd->Uniform(10000);
+ db_opt->manifest_preallocation_size = rnd->Uniform(10000);
+ db_opt->max_log_file_size = rnd->Uniform(10000);
+
+ // std::string options
+ db_opt->db_log_dir = "path/to/db_log_dir";
+ db_opt->wal_dir = "path/to/wal_dir";
+
+ // uint32_t options
+ db_opt->max_subcompactions = rnd->Uniform(100000);
+
+ // uint64_t options
+ static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX);
+ db_opt->WAL_size_limit_MB = uint_max + rnd->Uniform(100000);
+ db_opt->WAL_ttl_seconds = uint_max + rnd->Uniform(100000);
+ db_opt->bytes_per_sync = uint_max + rnd->Uniform(100000);
+ db_opt->delayed_write_rate = uint_max + rnd->Uniform(100000);
+ db_opt->delete_obsolete_files_period_micros = uint_max + rnd->Uniform(100000);
+ db_opt->max_manifest_file_size = uint_max + rnd->Uniform(100000);
+ db_opt->max_total_wal_size = uint_max + rnd->Uniform(100000);
+ db_opt->wal_bytes_per_sync = uint_max + rnd->Uniform(100000);
+
+ // unsigned int options
+ db_opt->stats_dump_period_sec = rnd->Uniform(100000);
+}
+
+void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options,
+ Random* rnd) {
+ cf_opt->compaction_style = (CompactionStyle)(rnd->Uniform(4));
+
+ // boolean options
+ cf_opt->report_bg_io_stats = rnd->Uniform(2);
+ cf_opt->disable_auto_compactions = rnd->Uniform(2);
+ cf_opt->inplace_update_support = rnd->Uniform(2);
+ cf_opt->level_compaction_dynamic_level_bytes = rnd->Uniform(2);
+ cf_opt->optimize_filters_for_hits = rnd->Uniform(2);
+ cf_opt->paranoid_file_checks = rnd->Uniform(2);
+ cf_opt->purge_redundant_kvs_while_flush = rnd->Uniform(2);
+ cf_opt->force_consistency_checks = rnd->Uniform(2);
+ cf_opt->compaction_options_fifo.allow_compaction = rnd->Uniform(2);
+ cf_opt->memtable_whole_key_filtering = rnd->Uniform(2);
+
+ // double options
+ cf_opt->hard_rate_limit = static_cast<double>(rnd->Uniform(10000)) / 13;
+ cf_opt->soft_rate_limit = static_cast<double>(rnd->Uniform(10000)) / 13;
+ cf_opt->memtable_prefix_bloom_size_ratio =
+ static_cast<double>(rnd->Uniform(10000)) / 20000.0;
+
+ // int options
+ cf_opt->level0_file_num_compaction_trigger = rnd->Uniform(100);
+ cf_opt->level0_slowdown_writes_trigger = rnd->Uniform(100);
+ cf_opt->level0_stop_writes_trigger = rnd->Uniform(100);
+ cf_opt->max_bytes_for_level_multiplier = rnd->Uniform(100);
+ cf_opt->max_mem_compaction_level = rnd->Uniform(100);
+ cf_opt->max_write_buffer_number = rnd->Uniform(100);
+ cf_opt->max_write_buffer_number_to_maintain = rnd->Uniform(100);
+ cf_opt->max_write_buffer_size_to_maintain = rnd->Uniform(10000);
+ cf_opt->min_write_buffer_number_to_merge = rnd->Uniform(100);
+ cf_opt->num_levels = rnd->Uniform(100);
+ cf_opt->target_file_size_multiplier = rnd->Uniform(100);
+
+ // vector int options
+ cf_opt->max_bytes_for_level_multiplier_additional.resize(cf_opt->num_levels);
+ for (int i = 0; i < cf_opt->num_levels; i++) {
+ cf_opt->max_bytes_for_level_multiplier_additional[i] = rnd->Uniform(100);
+ }
+
+ // size_t options
+ cf_opt->arena_block_size = rnd->Uniform(10000);
+ cf_opt->inplace_update_num_locks = rnd->Uniform(10000);
+ cf_opt->max_successive_merges = rnd->Uniform(10000);
+ cf_opt->memtable_huge_page_size = rnd->Uniform(10000);
+ cf_opt->write_buffer_size = rnd->Uniform(10000);
+
+ // uint32_t options
+ cf_opt->bloom_locality = rnd->Uniform(10000);
+ cf_opt->max_bytes_for_level_base = rnd->Uniform(10000);
+
+ // uint64_t options
+ static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX);
+ cf_opt->ttl =
+ db_options.max_open_files == -1 ? uint_max + rnd->Uniform(10000) : 0;
+ cf_opt->periodic_compaction_seconds =
+ db_options.max_open_files == -1 ? uint_max + rnd->Uniform(10000) : 0;
+ cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000);
+ cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000);
+ cf_opt->max_compaction_bytes =
+ cf_opt->target_file_size_base * rnd->Uniform(100);
+ cf_opt->compaction_options_fifo.max_table_files_size =
+ uint_max + rnd->Uniform(10000);
+
+ // unsigned int options
+ cf_opt->rate_limit_delay_max_milliseconds = rnd->Uniform(10000);
+
+ // pointer typed options
+ cf_opt->prefix_extractor.reset(RandomSliceTransform(rnd));
+ cf_opt->table_factory.reset(RandomTableFactory(rnd));
+ cf_opt->merge_operator.reset(RandomMergeOperator(rnd));
+ if (cf_opt->compaction_filter) {
+ delete cf_opt->compaction_filter;
+ }
+ cf_opt->compaction_filter = RandomCompactionFilter(rnd);
+ cf_opt->compaction_filter_factory.reset(RandomCompactionFilterFactory(rnd));
+
+ // custom typed options
+ cf_opt->compression = RandomCompressionType(rnd);
+ RandomCompressionTypeVector(cf_opt->num_levels,
+ &cf_opt->compression_per_level, rnd);
+}
+
+Status DestroyDir(Env* env, const std::string& dir) {
+ Status s;
+ if (env->FileExists(dir).IsNotFound()) {
+ return s;
+ }
+ std::vector<std::string> files_in_dir;
+ s = env->GetChildren(dir, &files_in_dir);
+ if (s.ok()) {
+ for (auto& file_in_dir : files_in_dir) {
+ if (file_in_dir == "." || file_in_dir == "..") {
+ continue;
+ }
+ s = env->DeleteFile(dir + "/" + file_in_dir);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ }
+
+ if (s.ok()) {
+ s = env->DeleteDir(dir);
+ }
+ return s;
+}
+
+bool IsDirectIOSupported(Env* env, const std::string& dir) {
+ EnvOptions env_options;
+ env_options.use_mmap_writes = false;
+ env_options.use_direct_writes = true;
+ std::string tmp = TempFileName(dir, 999);
+ Status s;
+ {
+ std::unique_ptr<WritableFile> file;
+ s = env->NewWritableFile(tmp, &file, env_options);
+ }
+ if (s.ok()) {
+ s = env->DeleteFile(tmp);
+ }
+ return s.ok();
+}
+
+size_t GetLinesCount(const std::string& fname, const std::string& pattern) {
+ std::stringstream ssbuf;
+ std::string line;
+ size_t count = 0;
+
+ std::ifstream inFile(fname.c_str());
+ ssbuf << inFile.rdbuf();
+
+ while (getline(ssbuf, line)) {
+ if (line.find(pattern) != std::string::npos) {
+ count++;
+ }
+ }
+
+ return count;
+}
+
+} // namespace test
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/testutil.h b/src/rocksdb/test_util/testutil.h
new file mode 100644
index 000000000..3f6b47929
--- /dev/null
+++ b/src/rocksdb/test_util/testutil.h
@@ -0,0 +1,802 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <algorithm>
+#include <deque>
+#include <string>
+#include <vector>
+
+#include "env/composite_env_wrapper.h"
+#include "file/writable_file_writer.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/table.h"
+#include "table/block_based/block_based_table_factory.h"
+#include "table/internal_iterator.h"
+#include "table/plain/plain_table_factory.h"
+#include "util/mutexlock.h"
+#include "util/random.h"
+
+namespace ROCKSDB_NAMESPACE {
+class SequentialFile;
+class SequentialFileReader;
+
+namespace test {
+
+extern const uint32_t kDefaultFormatVersion;
+extern const uint32_t kLatestFormatVersion;
+
+// Store in *dst a random string of length "len" and return a Slice that
+// references the generated data.
+extern Slice RandomString(Random* rnd, int len, std::string* dst);
+
+extern std::string RandomHumanReadableString(Random* rnd, int len);
+
+// Return a random key with the specified length that may contain interesting
+// characters (e.g. \x00, \xff, etc.).
+enum RandomKeyType : char { RANDOM, LARGEST, SMALLEST, MIDDLE };
+extern std::string RandomKey(Random* rnd, int len,
+ RandomKeyType type = RandomKeyType::RANDOM);
+
+// Store in *dst a string of length "len" that will compress to
+// "N*compressed_fraction" bytes and return a Slice that references
+// the generated data.
+extern Slice CompressibleString(Random* rnd, double compressed_fraction,
+ int len, std::string* dst);
+
+// A wrapper that allows injection of errors.
+class ErrorEnv : public EnvWrapper {
+ public:
+ bool writable_file_error_;
+ int num_writable_file_errors_;
+
+ ErrorEnv() : EnvWrapper(Env::Default()),
+ writable_file_error_(false),
+ num_writable_file_errors_(0) { }
+
+ virtual Status NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& soptions) override {
+ result->reset();
+ if (writable_file_error_) {
+ ++num_writable_file_errors_;
+ return Status::IOError(fname, "fake error");
+ }
+ return target()->NewWritableFile(fname, result, soptions);
+ }
+};
+
+#ifndef NDEBUG
+// An internal comparator that just forward comparing results from the
+// user comparator in it. Can be used to test entities that have no dependency
+// on internal key structure but consumes InternalKeyComparator, like
+// BlockBasedTable.
+class PlainInternalKeyComparator : public InternalKeyComparator {
+ public:
+ explicit PlainInternalKeyComparator(const Comparator* c)
+ : InternalKeyComparator(c) {}
+
+ virtual ~PlainInternalKeyComparator() {}
+
+ virtual int Compare(const Slice& a, const Slice& b) const override {
+ return user_comparator()->Compare(a, b);
+ }
+};
+#endif
+
+// A test comparator which compare two strings in this way:
+// (1) first compare prefix of 8 bytes in alphabet order,
+// (2) if two strings share the same prefix, sort the other part of the string
+// in the reverse alphabet order.
+// This helps simulate the case of compounded key of [entity][timestamp] and
+// latest timestamp first.
+class SimpleSuffixReverseComparator : public Comparator {
+ public:
+ SimpleSuffixReverseComparator() {}
+
+ virtual const char* Name() const override {
+ return "SimpleSuffixReverseComparator";
+ }
+
+ virtual int Compare(const Slice& a, const Slice& b) const override {
+ Slice prefix_a = Slice(a.data(), 8);
+ Slice prefix_b = Slice(b.data(), 8);
+ int prefix_comp = prefix_a.compare(prefix_b);
+ if (prefix_comp != 0) {
+ return prefix_comp;
+ } else {
+ Slice suffix_a = Slice(a.data() + 8, a.size() - 8);
+ Slice suffix_b = Slice(b.data() + 8, b.size() - 8);
+ return -(suffix_a.compare(suffix_b));
+ }
+ }
+ virtual void FindShortestSeparator(std::string* /*start*/,
+ const Slice& /*limit*/) const override {}
+
+ virtual void FindShortSuccessor(std::string* /*key*/) const override {}
+};
+
+// Returns a user key comparator that can be used for comparing two uint64_t
+// slices. Instead of comparing slices byte-wise, it compares all the 8 bytes
+// at once. Assumes same endian-ness is used though the database's lifetime.
+// Symantics of comparison would differ from Bytewise comparator in little
+// endian machines.
+extern const Comparator* Uint64Comparator();
+
+// Iterator over a vector of keys/values
+class VectorIterator : public InternalIterator {
+ public:
+ explicit VectorIterator(const std::vector<std::string>& keys)
+ : keys_(keys), current_(keys.size()) {
+ std::sort(keys_.begin(), keys_.end());
+ values_.resize(keys.size());
+ }
+
+ VectorIterator(const std::vector<std::string>& keys,
+ const std::vector<std::string>& values)
+ : keys_(keys), values_(values), current_(keys.size()) {
+ assert(keys_.size() == values_.size());
+ }
+
+ virtual bool Valid() const override { return current_ < keys_.size(); }
+
+ virtual void SeekToFirst() override { current_ = 0; }
+ virtual void SeekToLast() override { current_ = keys_.size() - 1; }
+
+ virtual void Seek(const Slice& target) override {
+ current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
+ keys_.begin();
+ }
+
+ virtual void SeekForPrev(const Slice& target) override {
+ current_ = std::upper_bound(keys_.begin(), keys_.end(), target.ToString()) -
+ keys_.begin();
+ if (!Valid()) {
+ SeekToLast();
+ } else {
+ Prev();
+ }
+ }
+
+ virtual void Next() override { current_++; }
+ virtual void Prev() override { current_--; }
+
+ virtual Slice key() const override { return Slice(keys_[current_]); }
+ virtual Slice value() const override { return Slice(values_[current_]); }
+
+ virtual Status status() const override { return Status::OK(); }
+
+ virtual bool IsKeyPinned() const override { return true; }
+ virtual bool IsValuePinned() const override { return true; }
+
+ private:
+ std::vector<std::string> keys_;
+ std::vector<std::string> values_;
+ size_t current_;
+};
+extern WritableFileWriter* GetWritableFileWriter(WritableFile* wf,
+ const std::string& fname);
+
+extern RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf);
+
+extern SequentialFileReader* GetSequentialFileReader(SequentialFile* se,
+ const std::string& fname);
+
+class StringSink: public WritableFile {
+ public:
+ std::string contents_;
+
+ explicit StringSink(Slice* reader_contents = nullptr) :
+ WritableFile(),
+ contents_(""),
+ reader_contents_(reader_contents),
+ last_flush_(0) {
+ if (reader_contents_ != nullptr) {
+ *reader_contents_ = Slice(contents_.data(), 0);
+ }
+ }
+
+ const std::string& contents() const { return contents_; }
+
+ virtual Status Truncate(uint64_t size) override {
+ contents_.resize(static_cast<size_t>(size));
+ return Status::OK();
+ }
+ virtual Status Close() override { return Status::OK(); }
+ virtual Status Flush() override {
+ if (reader_contents_ != nullptr) {
+ assert(reader_contents_->size() <= last_flush_);
+ size_t offset = last_flush_ - reader_contents_->size();
+ *reader_contents_ = Slice(
+ contents_.data() + offset,
+ contents_.size() - offset);
+ last_flush_ = contents_.size();
+ }
+
+ return Status::OK();
+ }
+ virtual Status Sync() override { return Status::OK(); }
+ virtual Status Append(const Slice& slice) override {
+ contents_.append(slice.data(), slice.size());
+ return Status::OK();
+ }
+ void Drop(size_t bytes) {
+ if (reader_contents_ != nullptr) {
+ contents_.resize(contents_.size() - bytes);
+ *reader_contents_ = Slice(
+ reader_contents_->data(), reader_contents_->size() - bytes);
+ last_flush_ = contents_.size();
+ }
+ }
+
+ private:
+ Slice* reader_contents_;
+ size_t last_flush_;
+};
+
+// A wrapper around a StringSink to give it a RandomRWFile interface
+class RandomRWStringSink : public RandomRWFile {
+ public:
+ explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {}
+
+ Status Write(uint64_t offset, const Slice& data) override {
+ if (offset + data.size() > ss_->contents_.size()) {
+ ss_->contents_.resize(static_cast<size_t>(offset) + data.size(), '\0');
+ }
+
+ char* pos = const_cast<char*>(ss_->contents_.data() + offset);
+ memcpy(pos, data.data(), data.size());
+ return Status::OK();
+ }
+
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* /*scratch*/) const override {
+ *result = Slice(nullptr, 0);
+ if (offset < ss_->contents_.size()) {
+ size_t str_res_sz =
+ std::min(static_cast<size_t>(ss_->contents_.size() - offset), n);
+ *result = Slice(ss_->contents_.data() + offset, str_res_sz);
+ }
+ return Status::OK();
+ }
+
+ Status Flush() override { return Status::OK(); }
+
+ Status Sync() override { return Status::OK(); }
+
+ Status Close() override { return Status::OK(); }
+
+ const std::string& contents() const { return ss_->contents(); }
+
+ private:
+ StringSink* ss_;
+};
+
+// Like StringSink, this writes into a string. Unlink StringSink, it
+// has some initial content and overwrites it, just like a recycled
+// log file.
+class OverwritingStringSink : public WritableFile {
+ public:
+ explicit OverwritingStringSink(Slice* reader_contents)
+ : WritableFile(),
+ contents_(""),
+ reader_contents_(reader_contents),
+ last_flush_(0) {}
+
+ const std::string& contents() const { return contents_; }
+
+ virtual Status Truncate(uint64_t size) override {
+ contents_.resize(static_cast<size_t>(size));
+ return Status::OK();
+ }
+ virtual Status Close() override { return Status::OK(); }
+ virtual Status Flush() override {
+ if (last_flush_ < contents_.size()) {
+ assert(reader_contents_->size() >= contents_.size());
+ memcpy((char*)reader_contents_->data() + last_flush_,
+ contents_.data() + last_flush_, contents_.size() - last_flush_);
+ last_flush_ = contents_.size();
+ }
+ return Status::OK();
+ }
+ virtual Status Sync() override { return Status::OK(); }
+ virtual Status Append(const Slice& slice) override {
+ contents_.append(slice.data(), slice.size());
+ return Status::OK();
+ }
+ void Drop(size_t bytes) {
+ contents_.resize(contents_.size() - bytes);
+ if (last_flush_ > contents_.size()) last_flush_ = contents_.size();
+ }
+
+ private:
+ std::string contents_;
+ Slice* reader_contents_;
+ size_t last_flush_;
+};
+
+class StringSource: public RandomAccessFile {
+ public:
+ explicit StringSource(const Slice& contents, uint64_t uniq_id = 0,
+ bool mmap = false)
+ : contents_(contents.data(), contents.size()),
+ uniq_id_(uniq_id),
+ mmap_(mmap),
+ total_reads_(0) {}
+
+ virtual ~StringSource() { }
+
+ uint64_t Size() const { return contents_.size(); }
+
+ virtual Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override {
+ total_reads_++;
+ if (offset > contents_.size()) {
+ return Status::InvalidArgument("invalid Read offset");
+ }
+ if (offset + n > contents_.size()) {
+ n = contents_.size() - static_cast<size_t>(offset);
+ }
+ if (!mmap_) {
+ memcpy(scratch, &contents_[static_cast<size_t>(offset)], n);
+ *result = Slice(scratch, n);
+ } else {
+ *result = Slice(&contents_[static_cast<size_t>(offset)], n);
+ }
+ return Status::OK();
+ }
+
+ virtual size_t GetUniqueId(char* id, size_t max_size) const override {
+ if (max_size < 20) {
+ return 0;
+ }
+
+ char* rid = id;
+ rid = EncodeVarint64(rid, uniq_id_);
+ rid = EncodeVarint64(rid, 0);
+ return static_cast<size_t>(rid-id);
+ }
+
+ int total_reads() const { return total_reads_; }
+
+ void set_total_reads(int tr) { total_reads_ = tr; }
+
+ private:
+ std::string contents_;
+ uint64_t uniq_id_;
+ bool mmap_;
+ mutable int total_reads_;
+};
+
+inline StringSink* GetStringSinkFromLegacyWriter(
+ const WritableFileWriter* writer) {
+ LegacyWritableFileWrapper* file =
+ static_cast<LegacyWritableFileWrapper*>(writer->writable_file());
+ return static_cast<StringSink*>(file->target());
+}
+
+class NullLogger : public Logger {
+ public:
+ using Logger::Logv;
+ virtual void Logv(const char* /*format*/, va_list /*ap*/) override {}
+ virtual size_t GetLogFileSize() const override { return 0; }
+};
+
+// Corrupts key by changing the type
+extern void CorruptKeyType(InternalKey* ikey);
+
+extern std::string KeyStr(const std::string& user_key,
+ const SequenceNumber& seq, const ValueType& t,
+ bool corrupt = false);
+
+class SleepingBackgroundTask {
+ public:
+ SleepingBackgroundTask()
+ : bg_cv_(&mutex_),
+ should_sleep_(true),
+ done_with_sleep_(false),
+ sleeping_(false) {}
+
+ bool IsSleeping() {
+ MutexLock l(&mutex_);
+ return sleeping_;
+ }
+ void DoSleep() {
+ MutexLock l(&mutex_);
+ sleeping_ = true;
+ bg_cv_.SignalAll();
+ while (should_sleep_) {
+ bg_cv_.Wait();
+ }
+ sleeping_ = false;
+ done_with_sleep_ = true;
+ bg_cv_.SignalAll();
+ }
+ void WaitUntilSleeping() {
+ MutexLock l(&mutex_);
+ while (!sleeping_ || !should_sleep_) {
+ bg_cv_.Wait();
+ }
+ }
+ // Waits for the status to change to sleeping,
+ // otherwise times out.
+ // wait_time is in microseconds.
+ // Returns true when times out, false otherwise.
+ bool TimedWaitUntilSleeping(uint64_t wait_time) {
+ auto abs_time = Env::Default()->NowMicros() + wait_time;
+ MutexLock l(&mutex_);
+ while (!sleeping_ || !should_sleep_) {
+ if (bg_cv_.TimedWait(abs_time)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ void WakeUp() {
+ MutexLock l(&mutex_);
+ should_sleep_ = false;
+ bg_cv_.SignalAll();
+ }
+ void WaitUntilDone() {
+ MutexLock l(&mutex_);
+ while (!done_with_sleep_) {
+ bg_cv_.Wait();
+ }
+ }
+ // Similar to TimedWaitUntilSleeping.
+ // Waits until the task is done.
+ bool TimedWaitUntilDone(uint64_t wait_time) {
+ auto abs_time = Env::Default()->NowMicros() + wait_time;
+ MutexLock l(&mutex_);
+ while (!done_with_sleep_) {
+ if (bg_cv_.TimedWait(abs_time)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ bool WokenUp() {
+ MutexLock l(&mutex_);
+ return should_sleep_ == false;
+ }
+
+ void Reset() {
+ MutexLock l(&mutex_);
+ should_sleep_ = true;
+ done_with_sleep_ = false;
+ }
+
+ static void DoSleepTask(void* arg) {
+ reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
+ }
+
+ private:
+ port::Mutex mutex_;
+ port::CondVar bg_cv_; // Signalled when background work finishes
+ bool should_sleep_;
+ bool done_with_sleep_;
+ bool sleeping_;
+};
+
+// Filters merge operands and values that are equal to `num`.
+class FilterNumber : public CompactionFilter {
+ public:
+ explicit FilterNumber(uint64_t num) : num_(num) {}
+
+ std::string last_merge_operand_key() { return last_merge_operand_key_; }
+
+ bool Filter(int /*level*/, const ROCKSDB_NAMESPACE::Slice& /*key*/,
+ const ROCKSDB_NAMESPACE::Slice& value, std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ if (value.size() == sizeof(uint64_t)) {
+ return num_ == DecodeFixed64(value.data());
+ }
+ return true;
+ }
+
+ bool FilterMergeOperand(
+ int /*level*/, const ROCKSDB_NAMESPACE::Slice& key,
+ const ROCKSDB_NAMESPACE::Slice& value) const override {
+ last_merge_operand_key_ = key.ToString();
+ if (value.size() == sizeof(uint64_t)) {
+ return num_ == DecodeFixed64(value.data());
+ }
+ return true;
+ }
+
+ const char* Name() const override { return "FilterBadMergeOperand"; }
+
+ private:
+ mutable std::string last_merge_operand_key_;
+ uint64_t num_;
+};
+
+inline std::string EncodeInt(uint64_t x) {
+ std::string result;
+ PutFixed64(&result, x);
+ return result;
+}
+
+ class SeqStringSource : public SequentialFile {
+ public:
+ SeqStringSource(const std::string& data, std::atomic<int>* read_count)
+ : data_(data), offset_(0), read_count_(read_count) {}
+ ~SeqStringSource() override {}
+ Status Read(size_t n, Slice* result, char* scratch) override {
+ std::string output;
+ if (offset_ < data_.size()) {
+ n = std::min(data_.size() - offset_, n);
+ memcpy(scratch, data_.data() + offset_, n);
+ offset_ += n;
+ *result = Slice(scratch, n);
+ } else {
+ return Status::InvalidArgument(
+ "Attemp to read when it already reached eof.");
+ }
+ (*read_count_)++;
+ return Status::OK();
+ }
+ Status Skip(uint64_t n) override {
+ if (offset_ >= data_.size()) {
+ return Status::InvalidArgument(
+ "Attemp to read when it already reached eof.");
+ }
+ // TODO(yhchiang): Currently doesn't handle the overflow case.
+ offset_ += static_cast<size_t>(n);
+ return Status::OK();
+ }
+
+ private:
+ std::string data_;
+ size_t offset_;
+ std::atomic<int>* read_count_;
+ };
+
+ class StringEnv : public EnvWrapper {
+ public:
+ class StringSink : public WritableFile {
+ public:
+ explicit StringSink(std::string* contents)
+ : WritableFile(), contents_(contents) {}
+ virtual Status Truncate(uint64_t size) override {
+ contents_->resize(static_cast<size_t>(size));
+ return Status::OK();
+ }
+ virtual Status Close() override { return Status::OK(); }
+ virtual Status Flush() override { return Status::OK(); }
+ virtual Status Sync() override { return Status::OK(); }
+ virtual Status Append(const Slice& slice) override {
+ contents_->append(slice.data(), slice.size());
+ return Status::OK();
+ }
+
+ private:
+ std::string* contents_;
+ };
+
+ explicit StringEnv(Env* t) : EnvWrapper(t) {}
+ ~StringEnv() override {}
+
+ const std::string& GetContent(const std::string& f) { return files_[f]; }
+
+ const Status WriteToNewFile(const std::string& file_name,
+ const std::string& content) {
+ std::unique_ptr<WritableFile> r;
+ auto s = NewWritableFile(file_name, &r, EnvOptions());
+ if (!s.ok()) {
+ return s;
+ }
+ r->Append(content);
+ r->Flush();
+ r->Close();
+ assert(files_[file_name] == content);
+ return Status::OK();
+ }
+
+ // The following text is boilerplate that forwards all methods to target()
+ Status NewSequentialFile(const std::string& f,
+ std::unique_ptr<SequentialFile>* r,
+ const EnvOptions& /*options*/) override {
+ auto iter = files_.find(f);
+ if (iter == files_.end()) {
+ return Status::NotFound("The specified file does not exist", f);
+ }
+ r->reset(new SeqStringSource(iter->second, &num_seq_file_read_));
+ return Status::OK();
+ }
+ Status NewRandomAccessFile(const std::string& /*f*/,
+ std::unique_ptr<RandomAccessFile>* /*r*/,
+ const EnvOptions& /*options*/) override {
+ return Status::NotSupported();
+ }
+ Status NewWritableFile(const std::string& f,
+ std::unique_ptr<WritableFile>* r,
+ const EnvOptions& /*options*/) override {
+ auto iter = files_.find(f);
+ if (iter != files_.end()) {
+ return Status::IOError("The specified file already exists", f);
+ }
+ r->reset(new StringSink(&files_[f]));
+ return Status::OK();
+ }
+ virtual Status NewDirectory(
+ const std::string& /*name*/,
+ std::unique_ptr<Directory>* /*result*/) override {
+ return Status::NotSupported();
+ }
+ Status FileExists(const std::string& f) override {
+ if (files_.find(f) == files_.end()) {
+ return Status::NotFound();
+ }
+ return Status::OK();
+ }
+ Status GetChildren(const std::string& /*dir*/,
+ std::vector<std::string>* /*r*/) override {
+ return Status::NotSupported();
+ }
+ Status DeleteFile(const std::string& f) override {
+ files_.erase(f);
+ return Status::OK();
+ }
+ Status CreateDir(const std::string& /*d*/) override {
+ return Status::NotSupported();
+ }
+ Status CreateDirIfMissing(const std::string& /*d*/) override {
+ return Status::NotSupported();
+ }
+ Status DeleteDir(const std::string& /*d*/) override {
+ return Status::NotSupported();
+ }
+ Status GetFileSize(const std::string& f, uint64_t* s) override {
+ auto iter = files_.find(f);
+ if (iter == files_.end()) {
+ return Status::NotFound("The specified file does not exist:", f);
+ }
+ *s = iter->second.size();
+ return Status::OK();
+ }
+
+ Status GetFileModificationTime(const std::string& /*fname*/,
+ uint64_t* /*file_mtime*/) override {
+ return Status::NotSupported();
+ }
+
+ Status RenameFile(const std::string& /*s*/,
+ const std::string& /*t*/) override {
+ return Status::NotSupported();
+ }
+
+ Status LinkFile(const std::string& /*s*/,
+ const std::string& /*t*/) override {
+ return Status::NotSupported();
+ }
+
+ Status LockFile(const std::string& /*f*/, FileLock** /*l*/) override {
+ return Status::NotSupported();
+ }
+
+ Status UnlockFile(FileLock* /*l*/) override {
+ return Status::NotSupported();
+ }
+
+ std::atomic<int> num_seq_file_read_;
+
+ protected:
+ std::unordered_map<std::string, std::string> files_;
+ };
+
+// Randomly initialize the given DBOptions
+void RandomInitDBOptions(DBOptions* db_opt, Random* rnd);
+
+// Randomly initialize the given ColumnFamilyOptions
+// Note that the caller is responsible for releasing non-null
+// cf_opt->compaction_filter.
+void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions&, Random* rnd);
+
+// A dummy merge operator which can change its name
+class ChanglingMergeOperator : public MergeOperator {
+ public:
+ explicit ChanglingMergeOperator(const std::string& name)
+ : name_(name + "MergeOperator") {}
+ ~ChanglingMergeOperator() {}
+
+ void SetName(const std::string& name) { name_ = name; }
+
+ virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
+ MergeOperationOutput* /*merge_out*/) const override {
+ return false;
+ }
+ virtual bool PartialMergeMulti(const Slice& /*key*/,
+ const std::deque<Slice>& /*operand_list*/,
+ std::string* /*new_value*/,
+ Logger* /*logger*/) const override {
+ return false;
+ }
+ virtual const char* Name() const override { return name_.c_str(); }
+
+ protected:
+ std::string name_;
+};
+
+// Returns a dummy merge operator with random name.
+MergeOperator* RandomMergeOperator(Random* rnd);
+
+// A dummy compaction filter which can change its name
+class ChanglingCompactionFilter : public CompactionFilter {
+ public:
+ explicit ChanglingCompactionFilter(const std::string& name)
+ : name_(name + "CompactionFilter") {}
+ ~ChanglingCompactionFilter() {}
+
+ void SetName(const std::string& name) { name_ = name; }
+
+ bool Filter(int /*level*/, const Slice& /*key*/,
+ const Slice& /*existing_value*/, std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ return false;
+ }
+
+ const char* Name() const override { return name_.c_str(); }
+
+ private:
+ std::string name_;
+};
+
+// Returns a dummy compaction filter with a random name.
+CompactionFilter* RandomCompactionFilter(Random* rnd);
+
+// A dummy compaction filter factory which can change its name
+class ChanglingCompactionFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit ChanglingCompactionFilterFactory(const std::string& name)
+ : name_(name + "CompactionFilterFactory") {}
+ ~ChanglingCompactionFilterFactory() {}
+
+ void SetName(const std::string& name) { name_ = name; }
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override {
+ return std::unique_ptr<CompactionFilter>();
+ }
+
+ // Returns a name that identifies this compaction filter factory.
+ const char* Name() const override { return name_.c_str(); }
+
+ protected:
+ std::string name_;
+};
+
+CompressionType RandomCompressionType(Random* rnd);
+
+void RandomCompressionTypeVector(const size_t count,
+ std::vector<CompressionType>* types,
+ Random* rnd);
+
+CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd);
+
+const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined = -1);
+
+TableFactory* RandomTableFactory(Random* rnd, int pre_defined = -1);
+
+std::string RandomName(Random* rnd, const size_t len);
+
+Status DestroyDir(Env* env, const std::string& dir);
+
+bool IsDirectIOSupported(Env* env, const std::string& dir);
+
+// Return the number of lines where a given pattern was found in a file.
+size_t GetLinesCount(const std::string& fname, const std::string& pattern);
+
+} // namespace test
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/transaction_test_util.cc b/src/rocksdb/test_util/transaction_test_util.cc
new file mode 100644
index 000000000..736707595
--- /dev/null
+++ b/src/rocksdb/test_util/transaction_test_util.cc
@@ -0,0 +1,387 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "test_util/transaction_test_util.h"
+
+#include <algorithm>
+#include <cinttypes>
+#include <numeric>
+#include <random>
+#include <string>
+#include <thread>
+
+#include "rocksdb/db.h"
+#include "rocksdb/utilities/optimistic_transaction_db.h"
+#include "rocksdb/utilities/transaction.h"
+#include "rocksdb/utilities/transaction_db.h"
+
+#include "db/dbformat.h"
+#include "db/snapshot_impl.h"
+#include "logging/logging.h"
+#include "util/random.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+RandomTransactionInserter::RandomTransactionInserter(
+ Random64* rand, const WriteOptions& write_options,
+ const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets,
+ const uint64_t cmt_delay_ms, const uint64_t first_id)
+ : rand_(rand),
+ write_options_(write_options),
+ read_options_(read_options),
+ num_keys_(num_keys),
+ num_sets_(num_sets),
+ txn_id_(first_id),
+ cmt_delay_ms_(cmt_delay_ms) {}
+
+RandomTransactionInserter::~RandomTransactionInserter() {
+ if (txn_ != nullptr) {
+ delete txn_;
+ }
+ if (optimistic_txn_ != nullptr) {
+ delete optimistic_txn_;
+ }
+}
+
+bool RandomTransactionInserter::TransactionDBInsert(
+ TransactionDB* db, const TransactionOptions& txn_options) {
+ txn_ = db->BeginTransaction(write_options_, txn_options, txn_);
+
+ std::hash<std::thread::id> hasher;
+ char name[64];
+ snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%" PRIu64,
+ hasher(std::this_thread::get_id()), txn_id_++);
+ assert(strlen(name) < 64 - 1);
+ assert(txn_->SetName(name).ok());
+
+ // Take a snapshot if set_snapshot was not set or with 50% change otherwise
+ bool take_snapshot = txn_->GetSnapshot() == nullptr || rand_->OneIn(2);
+ if (take_snapshot) {
+ txn_->SetSnapshot();
+ read_options_.snapshot = txn_->GetSnapshot();
+ }
+ auto res = DoInsert(db, txn_, false);
+ if (take_snapshot) {
+ read_options_.snapshot = nullptr;
+ }
+ return res;
+}
+
+bool RandomTransactionInserter::OptimisticTransactionDBInsert(
+ OptimisticTransactionDB* db,
+ const OptimisticTransactionOptions& txn_options) {
+ optimistic_txn_ =
+ db->BeginTransaction(write_options_, txn_options, optimistic_txn_);
+
+ return DoInsert(db, optimistic_txn_, true);
+}
+
+bool RandomTransactionInserter::DBInsert(DB* db) {
+ return DoInsert(db, nullptr, false);
+}
+
+Status RandomTransactionInserter::DBGet(
+ DB* db, Transaction* txn, ReadOptions& read_options, uint16_t set_i,
+ uint64_t ikey, bool get_for_update, uint64_t* int_value,
+ std::string* full_key, bool* unexpected_error) {
+ Status s;
+ // Five digits (since the largest uint16_t is 65535) plus the NUL
+ // end char.
+ char prefix_buf[6];
+ // Pad prefix appropriately so we can iterate over each set
+ assert(set_i + 1 <= 9999);
+ snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
+ // key format: [SET#][random#]
+ std::string skey = ToString(ikey);
+ Slice base_key(skey);
+ *full_key = std::string(prefix_buf) + base_key.ToString();
+ Slice key(*full_key);
+
+ std::string value;
+ if (txn != nullptr) {
+ if (get_for_update) {
+ s = txn->GetForUpdate(read_options, key, &value);
+ } else {
+ s = txn->Get(read_options, key, &value);
+ }
+ } else {
+ s = db->Get(read_options, key, &value);
+ }
+
+ if (s.ok()) {
+ // Found key, parse its value
+ *int_value = std::stoull(value);
+ if (*int_value == 0 || *int_value == ULONG_MAX) {
+ *unexpected_error = true;
+ fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str());
+ s = Status::Corruption();
+ }
+ } else if (s.IsNotFound()) {
+ // Have not yet written to this key, so assume its value is 0
+ *int_value = 0;
+ s = Status::OK();
+ }
+ return s;
+}
+
+bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
+ bool is_optimistic) {
+ Status s;
+ WriteBatch batch;
+
+ // pick a random number to use to increment a key in each set
+ uint64_t incr = (rand_->Next() % 100) + 1;
+ bool unexpected_error = false;
+
+ std::vector<uint16_t> set_vec(num_sets_);
+ std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
+ std::shuffle(set_vec.begin(), set_vec.end(), std::random_device{});
+
+ // For each set, pick a key at random and increment it
+ for (uint16_t set_i : set_vec) {
+ uint64_t int_value = 0;
+ std::string full_key;
+ uint64_t rand_key = rand_->Next() % num_keys_;
+ const bool get_for_update = txn ? rand_->OneIn(2) : false;
+ s = DBGet(db, txn, read_options_, set_i, rand_key, get_for_update,
+ &int_value, &full_key, &unexpected_error);
+ Slice key(full_key);
+ if (!s.ok()) {
+ // Optimistic transactions should never return non-ok status here.
+ // Non-optimistic transactions may return write-coflict/timeout errors.
+ if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
+ fprintf(stderr, "Get returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ unexpected_error = true;
+ }
+ break;
+ }
+
+ if (s.ok()) {
+ // Increment key
+ std::string sum = ToString(int_value + incr);
+ if (txn != nullptr) {
+ s = txn->Put(key, sum);
+ if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) {
+ // If the initial get was not for update, then the key is not locked
+ // before put and put could fail due to concurrent writes.
+ break;
+ } else if (!s.ok()) {
+ // Since we did a GetForUpdate, Put should not fail.
+ fprintf(stderr, "Put returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ unexpected_error = true;
+ }
+ } else {
+ batch.Put(key, sum);
+ }
+ bytes_inserted_ += key.size() + sum.size();
+ }
+ if (txn != nullptr) {
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
+ "Insert (%s) %s snap: %" PRIu64 " key:%s value: %" PRIu64
+ "+%" PRIu64 "=%" PRIu64,
+ txn->GetName().c_str(), s.ToString().c_str(),
+ txn->GetSnapshot()->GetSequenceNumber(), full_key.c_str(),
+ int_value, incr, int_value + incr);
+ }
+ }
+
+ if (s.ok()) {
+ if (txn != nullptr) {
+ bool with_prepare = !is_optimistic && !rand_->OneIn(10);
+ if (with_prepare) {
+ // Also try commit without prepare
+ s = txn->Prepare();
+ assert(s.ok());
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
+ "Prepare of %" PRIu64 " %s (%s)", txn->GetId(),
+ s.ToString().c_str(), txn->GetName().c_str());
+ if (rand_->OneIn(20)) {
+ // This currently only tests the mechanics of writing commit time
+ // write batch so the exact values would not matter.
+ s = txn_->GetCommitTimeWriteBatch()->Put("cat", "dog");
+ assert(s.ok());
+ }
+ db->GetDBOptions().env->SleepForMicroseconds(
+ static_cast<int>(cmt_delay_ms_ * 1000));
+ }
+ if (!rand_->OneIn(20)) {
+ s = txn->Commit();
+ assert(!with_prepare || s.ok());
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
+ "Commit of %" PRIu64 " %s (%s)", txn->GetId(),
+ s.ToString().c_str(), txn->GetName().c_str());
+ } else {
+ // Also try 5% rollback
+ s = txn->Rollback();
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
+ "Rollback %" PRIu64 " %s %s", txn->GetId(),
+ txn->GetName().c_str(), s.ToString().c_str());
+ assert(s.ok());
+ }
+ assert(is_optimistic || s.ok());
+
+ if (!s.ok()) {
+ if (is_optimistic) {
+ // Optimistic transactions can have write-conflict errors on commit.
+ // Any other error is unexpected.
+ if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
+ unexpected_error = true;
+ }
+ } else {
+ // Non-optimistic transactions should only fail due to expiration
+ // or write failures. For testing purproses, we do not expect any
+ // write failures.
+ if (!s.IsExpired()) {
+ unexpected_error = true;
+ }
+ }
+
+ if (unexpected_error) {
+ fprintf(stderr, "Commit returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ }
+ }
+ } else {
+ s = db->Write(write_options_, &batch);
+ if (!s.ok()) {
+ unexpected_error = true;
+ fprintf(stderr, "Write returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ }
+ }
+ } else {
+ if (txn != nullptr) {
+ assert(txn->Rollback().ok());
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, "Error %s for txn %s",
+ s.ToString().c_str(), txn->GetName().c_str());
+ }
+ }
+
+ if (s.ok()) {
+ success_count_++;
+ } else {
+ failure_count_++;
+ }
+
+ last_status_ = s;
+
+ // return success if we didn't get any unexpected errors
+ return !unexpected_error;
+}
+
+// Verify that the sum of the keys in each set are equal
+Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
+ uint64_t num_keys_per_set,
+ bool take_snapshot, Random64* rand,
+ uint64_t delay_ms) {
+ // delay_ms is the delay between taking a snapshot and doing the reads. It
+ // emulates reads from a long-running backup job.
+ assert(delay_ms == 0 || take_snapshot);
+ uint64_t prev_total = 0;
+ uint32_t prev_i = 0;
+ bool prev_assigned = false;
+
+ ReadOptions roptions;
+ if (take_snapshot) {
+ roptions.snapshot = db->GetSnapshot();
+ db->GetDBOptions().env->SleepForMicroseconds(
+ static_cast<int>(delay_ms * 1000));
+ }
+
+ std::vector<uint16_t> set_vec(num_sets);
+ std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
+ std::shuffle(set_vec.begin(), set_vec.end(), std::random_device{});
+
+ // For each set of keys with the same prefix, sum all the values
+ for (uint16_t set_i : set_vec) {
+ // Five digits (since the largest uint16_t is 65535) plus the NUL
+ // end char.
+ char prefix_buf[6];
+ assert(set_i + 1 <= 9999);
+ snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
+ uint64_t total = 0;
+
+ // Use either point lookup or iterator. Point lookups are slower so we use
+ // it less often.
+ const bool use_point_lookup =
+ num_keys_per_set != 0 && rand && rand->OneIn(10);
+ if (use_point_lookup) {
+ ReadOptions read_options;
+ for (uint64_t k = 0; k < num_keys_per_set; k++) {
+ std::string dont_care;
+ uint64_t int_value = 0;
+ bool unexpected_error = false;
+ const bool FOR_UPDATE = false;
+ Status s = DBGet(db, nullptr, roptions, set_i, k, FOR_UPDATE,
+ &int_value, &dont_care, &unexpected_error);
+ assert(s.ok());
+ assert(!unexpected_error);
+ total += int_value;
+ }
+ } else { // user iterators
+ Iterator* iter = db->NewIterator(roptions);
+ for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) {
+ Slice key = iter->key();
+ // stop when we reach a different prefix
+ if (key.ToString().compare(0, 4, prefix_buf) != 0) {
+ break;
+ }
+ Slice value = iter->value();
+ uint64_t int_value = std::stoull(value.ToString());
+ if (int_value == 0 || int_value == ULONG_MAX) {
+ fprintf(stderr, "Iter returned unexpected value: %s\n",
+ value.ToString().c_str());
+ return Status::Corruption();
+ }
+ ROCKS_LOG_DEBUG(
+ db->GetDBOptions().info_log,
+ "VerifyRead at %" PRIu64 " (%" PRIu64 "): %.*s value: %" PRIu64,
+ roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul,
+ roptions.snapshot
+ ? ((SnapshotImpl*)roptions.snapshot)->min_uncommitted_
+ : 0ul,
+ static_cast<int>(key.size()), key.data(), int_value);
+ total += int_value;
+ }
+ delete iter;
+ }
+
+ if (prev_assigned && total != prev_total) {
+ db->GetDBOptions().info_log->Flush();
+ fprintf(stdout,
+ "RandomTransactionVerify found inconsistent totals using "
+ "pointlookup? %d "
+ "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64
+ " at snapshot %" PRIu64 "\n",
+ use_point_lookup, prev_i, prev_total, set_i, total,
+ roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
+ fflush(stdout);
+ return Status::Corruption();
+ } else {
+ ROCKS_LOG_DEBUG(
+ db->GetDBOptions().info_log,
+ "RandomTransactionVerify pass pointlookup? %d total: %" PRIu64
+ " snap: %" PRIu64,
+ use_point_lookup, total,
+ roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
+ }
+ prev_total = total;
+ prev_i = set_i;
+ prev_assigned = true;
+ }
+ if (take_snapshot) {
+ db->ReleaseSnapshot(roptions.snapshot);
+ }
+
+ return Status::OK();
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/test_util/transaction_test_util.h b/src/rocksdb/test_util/transaction_test_util.h
new file mode 100644
index 000000000..086b0ea6f
--- /dev/null
+++ b/src/rocksdb/test_util/transaction_test_util.h
@@ -0,0 +1,132 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include "rocksdb/options.h"
+#include "port/port.h"
+#include "rocksdb/utilities/optimistic_transaction_db.h"
+#include "rocksdb/utilities/transaction_db.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class DB;
+class Random64;
+
+// Utility class for stress testing transactions. Can be used to write many
+// transactions in parallel and then validate that the data written is logically
+// consistent. This class assumes the input DB is initially empty.
+//
+// Each call to TransactionDBInsert()/OptimisticTransactionDBInsert() will
+// increment the value of a key in #num_sets sets of keys. Regardless of
+// whether the transaction succeeds, the total sum of values of keys in each
+// set is an invariant that should remain equal.
+//
+// After calling TransactionDBInsert()/OptimisticTransactionDBInsert() many
+// times, Verify() can be called to validate that the invariant holds.
+//
+// To test writing Transaction in parallel, multiple threads can create a
+// RandomTransactionInserter with similar arguments using the same DB.
+class RandomTransactionInserter {
+ public:
+ // num_keys is the number of keys in each set.
+ // num_sets is the number of sets of keys.
+ // cmt_delay_ms is the delay between prepare (if there is any) and commit
+ // first_id is the id of the first transaction
+ explicit RandomTransactionInserter(
+ Random64* rand, const WriteOptions& write_options = WriteOptions(),
+ const ReadOptions& read_options = ReadOptions(), uint64_t num_keys = 1000,
+ uint16_t num_sets = 3, const uint64_t cmt_delay_ms = 0,
+ const uint64_t first_id = 0);
+
+ ~RandomTransactionInserter();
+
+ // Increment a key in each set using a Transaction on a TransactionDB.
+ //
+ // Returns true if the transaction succeeded OR if any error encountered was
+ // expected (eg a write-conflict). Error status may be obtained by calling
+ // GetLastStatus();
+ bool TransactionDBInsert(
+ TransactionDB* db,
+ const TransactionOptions& txn_options = TransactionOptions());
+
+ // Increment a key in each set using a Transaction on an
+ // OptimisticTransactionDB
+ //
+ // Returns true if the transaction succeeded OR if any error encountered was
+ // expected (eg a write-conflict). Error status may be obtained by calling
+ // GetLastStatus();
+ bool OptimisticTransactionDBInsert(
+ OptimisticTransactionDB* db,
+ const OptimisticTransactionOptions& txn_options =
+ OptimisticTransactionOptions());
+ // Increment a key in each set without using a transaction. If this function
+ // is called in parallel, then Verify() may fail.
+ //
+ // Returns true if the write succeeds.
+ // Error status may be obtained by calling GetLastStatus().
+ bool DBInsert(DB* db);
+
+ // Get the ikey'th key from set set_i
+ static Status DBGet(DB* db, Transaction* txn, ReadOptions& read_options,
+ uint16_t set_i, uint64_t ikey, bool get_for_update,
+ uint64_t* int_value, std::string* full_key,
+ bool* unexpected_error);
+
+ // Returns OK if Invariant is true.
+ static Status Verify(DB* db, uint16_t num_sets, uint64_t num_keys_per_set = 0,
+ bool take_snapshot = false, Random64* rand = nullptr,
+ uint64_t delay_ms = 0);
+
+ // Returns the status of the previous Insert operation
+ Status GetLastStatus() { return last_status_; }
+
+ // Returns the number of successfully written calls to
+ // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert
+ uint64_t GetSuccessCount() { return success_count_; }
+
+ // Returns the number of calls to
+ // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert that did not
+ // write any data.
+ uint64_t GetFailureCount() { return failure_count_; }
+
+ // Returns the sum of user keys/values Put() to the DB.
+ size_t GetBytesInserted() { return bytes_inserted_; }
+
+ private:
+ // Input options
+ Random64* rand_;
+ const WriteOptions write_options_;
+ ReadOptions read_options_;
+ const uint64_t num_keys_;
+ const uint16_t num_sets_;
+
+ // Number of successful insert batches performed
+ uint64_t success_count_ = 0;
+
+ // Number of failed insert batches attempted
+ uint64_t failure_count_ = 0;
+
+ size_t bytes_inserted_ = 0;
+
+ // Status returned by most recent insert operation
+ Status last_status_;
+
+ // optimization: re-use allocated transaction objects.
+ Transaction* txn_ = nullptr;
+ Transaction* optimistic_txn_ = nullptr;
+
+ uint64_t txn_id_;
+ // The delay between ::Prepare and ::Commit
+ const uint64_t cmt_delay_ms_;
+
+ bool DoInsert(DB* db, Transaction* txn, bool is_optimistic);
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE