summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/test_util
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/test_util
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/test_util')
-rw-r--r--src/rocksdb/test_util/mock_time_env.cc38
-rw-r--r--src/rocksdb/test_util/mock_time_env.h78
-rw-r--r--src/rocksdb/test_util/sync_point.cc82
-rw-r--r--src/rocksdb/test_util/sync_point.h180
-rw-r--r--src/rocksdb/test_util/sync_point_impl.cc152
-rw-r--r--src/rocksdb/test_util/sync_point_impl.h96
-rw-r--r--src/rocksdb/test_util/testharness.cc107
-rw-r--r--src/rocksdb/test_util/testharness.h119
-rw-r--r--src/rocksdb/test_util/testutil.cc738
-rw-r--r--src/rocksdb/test_util/testutil.h852
-rw-r--r--src/rocksdb/test_util/testutil_test.cc43
-rw-r--r--src/rocksdb/test_util/transaction_test_util.cc402
-rw-r--r--src/rocksdb/test_util/transaction_test_util.h149
13 files changed, 3036 insertions, 0 deletions
diff --git a/src/rocksdb/test_util/mock_time_env.cc b/src/rocksdb/test_util/mock_time_env.cc
new file mode 100644
index 000000000..23888e69e
--- /dev/null
+++ b/src/rocksdb/test_util/mock_time_env.cc
@@ -0,0 +1,38 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_util/mock_time_env.h"
+
+#include "test_util/sync_point.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TODO: this is a workaround for the different behavior on different platform
+// for timedwait timeout. Ideally timedwait API should be moved to env.
+// details: PR #7101.
+void MockSystemClock::InstallTimedWaitFixCallback() {
+#ifndef NDEBUG
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+#ifdef OS_MACOSX
+ // This is an alternate way (vs. SpecialEnv) of dealing with the fact
+ // that on some platforms, pthread_cond_timedwait does not appear to
+ // release the lock for other threads to operate if the deadline time
+ // is already passed. (TimedWait calls are currently a bad abstraction
+ // because the deadline parameter is usually computed from Env time,
+ // but is interpreted in real clock time.)
+ SyncPoint::GetInstance()->SetCallBack(
+ "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
+ uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
+ if (time_us < this->RealNowMicros()) {
+ *reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000;
+ }
+ });
+#endif // OS_MACOSX
+ SyncPoint::GetInstance()->EnableProcessing();
+#endif // !NDEBUG
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/mock_time_env.h b/src/rocksdb/test_util/mock_time_env.h
new file mode 100644
index 000000000..7834368e0
--- /dev/null
+++ b/src/rocksdb/test_util/mock_time_env.h
@@ -0,0 +1,78 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <atomic>
+#include <limits>
+
+#include "rocksdb/system_clock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// NOTE: SpecialEnv offers most of this functionality, along with hooks
+// for safe DB behavior under a mock time environment, so should be used
+// instead of MockSystemClock for DB tests.
+class MockSystemClock : public SystemClockWrapper {
+ public:
+ explicit MockSystemClock(const std::shared_ptr<SystemClock>& base)
+ : SystemClockWrapper(base) {}
+
+ static const char* kClassName() { return "MockSystemClock"; }
+ const char* Name() const override { return kClassName(); }
+ virtual Status GetCurrentTime(int64_t* time_sec) override {
+ assert(time_sec != nullptr);
+ *time_sec = static_cast<int64_t>(current_time_us_ / kMicrosInSecond);
+ return Status::OK();
+ }
+
+ virtual uint64_t NowSeconds() { return current_time_us_ / kMicrosInSecond; }
+
+ virtual uint64_t NowMicros() override { return current_time_us_; }
+
+ virtual uint64_t NowNanos() override {
+ assert(current_time_us_ <= std::numeric_limits<uint64_t>::max() / 1000);
+ return current_time_us_ * 1000;
+ }
+
+ uint64_t RealNowMicros() { return target_->NowMicros(); }
+
+ void SetCurrentTime(uint64_t time_sec) {
+ assert(time_sec < std::numeric_limits<uint64_t>::max() / kMicrosInSecond);
+ assert(time_sec * kMicrosInSecond >= current_time_us_);
+ current_time_us_ = time_sec * kMicrosInSecond;
+ }
+
+ // It's a fake sleep that just updates the Env current time, which is similar
+ // to `NoSleepEnv.SleepForMicroseconds()` and
+ // `SpecialEnv.MockSleepForMicroseconds()`.
+ // It's also similar to `set_current_time()`, which takes an absolute time in
+ // seconds, vs. this one takes the sleep in microseconds.
+ // Note: Not thread safe.
+ void SleepForMicroseconds(int micros) override {
+ assert(micros >= 0);
+ assert(current_time_us_ + static_cast<uint64_t>(micros) >=
+ current_time_us_);
+ current_time_us_.fetch_add(micros);
+ }
+
+ void MockSleepForSeconds(int seconds) {
+ assert(seconds >= 0);
+ uint64_t micros = static_cast<uint64_t>(seconds) * kMicrosInSecond;
+ assert(current_time_us_ + micros >= current_time_us_);
+ current_time_us_.fetch_add(micros);
+ }
+
+ // TODO: this is a workaround for the different behavior on different platform
+ // for timedwait timeout. Ideally timedwait API should be moved to env.
+ // details: PR #7101.
+ void InstallTimedWaitFixCallback();
+
+ private:
+ std::atomic<uint64_t> current_time_us_{0};
+ static constexpr uint64_t kMicrosInSecond = 1000U * 1000U;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/sync_point.cc b/src/rocksdb/test_util/sync_point.cc
new file mode 100644
index 000000000..bec02d4f6
--- /dev/null
+++ b/src/rocksdb/test_util/sync_point.cc
@@ -0,0 +1,82 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_util/sync_point.h"
+
+#include <fcntl.h>
+
+#include "test_util/sync_point_impl.h"
+
+std::vector<std::string> rocksdb_kill_exclude_prefixes;
+
+#ifndef NDEBUG
+namespace ROCKSDB_NAMESPACE {
+
+SyncPoint* SyncPoint::GetInstance() {
+ static SyncPoint sync_point;
+ return &sync_point;
+}
+
+SyncPoint::SyncPoint() : impl_(new Data) {}
+
+SyncPoint::~SyncPoint() { delete impl_; }
+
+void SyncPoint::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
+ impl_->LoadDependency(dependencies);
+}
+
+void SyncPoint::LoadDependencyAndMarkers(
+ const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers) {
+ impl_->LoadDependencyAndMarkers(dependencies, markers);
+}
+
+void SyncPoint::SetCallBack(const std::string& point,
+ const std::function<void(void*)>& callback) {
+ impl_->SetCallBack(point, callback);
+}
+
+void SyncPoint::ClearCallBack(const std::string& point) {
+ impl_->ClearCallBack(point);
+}
+
+void SyncPoint::ClearAllCallBacks() { impl_->ClearAllCallBacks(); }
+
+void SyncPoint::EnableProcessing() { impl_->EnableProcessing(); }
+
+void SyncPoint::DisableProcessing() { impl_->DisableProcessing(); }
+
+void SyncPoint::ClearTrace() { impl_->ClearTrace(); }
+
+void SyncPoint::Process(const Slice& point, void* cb_arg) {
+ impl_->Process(point, cb_arg);
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // NDEBUG
+
+namespace ROCKSDB_NAMESPACE {
+void SetupSyncPointsToMockDirectIO() {
+#if !defined(NDEBUG) && !defined(OS_MACOSX) && !defined(OS_WIN) && \
+ !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD)
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "NewWritableFile:O_DIRECT", [&](void* arg) {
+ int* val = static_cast<int*>(arg);
+ *val &= ~O_DIRECT;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "NewRandomAccessFile:O_DIRECT", [&](void* arg) {
+ int* val = static_cast<int*>(arg);
+ *val &= ~O_DIRECT;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "NewSequentialFile:O_DIRECT", [&](void* arg) {
+ int* val = static_cast<int*>(arg);
+ *val &= ~O_DIRECT;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+#endif
+}
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/sync_point.h b/src/rocksdb/test_util/sync_point.h
new file mode 100644
index 000000000..65f1239ec
--- /dev/null
+++ b/src/rocksdb/test_util/sync_point.h
@@ -0,0 +1,180 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#include <assert.h>
+
+#include <functional>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/slice.h"
+
+#ifdef NDEBUG
+// empty in release build
+#define TEST_KILL_RANDOM_WITH_WEIGHT(kill_point, rocksdb_kill_odds_weight)
+#define TEST_KILL_RANDOM(kill_point)
+#else
+
+namespace ROCKSDB_NAMESPACE {
+
+// To avoid crashing always at some frequently executed codepaths (during
+// kill random test), use this factor to reduce odds
+#define REDUCE_ODDS 2
+#define REDUCE_ODDS2 4
+
+// A class used to pass when a kill point is reached.
+struct KillPoint {
+ public:
+ // This is only set from db_stress.cc and for testing only.
+ // If non-zero, kill at various points in source code with probability 1/this
+ int rocksdb_kill_odds = 0;
+ // If kill point has a prefix on this list, will skip killing.
+ std::vector<std::string> rocksdb_kill_exclude_prefixes;
+ // Kill the process with probability 1/odds for testing.
+ void TestKillRandom(std::string kill_point, int odds,
+ const std::string& srcfile, int srcline);
+
+ static KillPoint* GetInstance();
+};
+
+#define TEST_KILL_RANDOM_WITH_WEIGHT(kill_point, rocksdb_kill_odds_weight) \
+ { \
+ KillPoint::GetInstance()->TestKillRandom( \
+ kill_point, rocksdb_kill_odds_weight, __FILE__, __LINE__); \
+ }
+#define TEST_KILL_RANDOM(kill_point) TEST_KILL_RANDOM_WITH_WEIGHT(kill_point, 1)
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
+
+#ifdef NDEBUG
+#define TEST_SYNC_POINT(x)
+#define TEST_IDX_SYNC_POINT(x, index)
+#define TEST_SYNC_POINT_CALLBACK(x, y)
+#define INIT_SYNC_POINT_SINGLETONS()
+#else
+
+namespace ROCKSDB_NAMESPACE {
+
+// This class provides facility to reproduce race conditions deterministically
+// in unit tests.
+// Developer could specify sync points in the codebase via TEST_SYNC_POINT.
+// Each sync point represents a position in the execution stream of a thread.
+// In the unit test, 'Happens After' relationship among sync points could be
+// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of
+// threads execution.
+// Refer to (DBTest,TransactionLogIteratorRace), for an example use case.
+
+class SyncPoint {
+ public:
+ static SyncPoint* GetInstance();
+
+ SyncPoint(const SyncPoint&) = delete;
+ SyncPoint& operator=(const SyncPoint&) = delete;
+ ~SyncPoint();
+
+ struct SyncPointPair {
+ std::string predecessor;
+ std::string successor;
+ };
+
+ // call once at the beginning of a test to setup the dependency between
+ // sync points
+ void LoadDependency(const std::vector<SyncPointPair>& dependencies);
+
+ // call once at the beginning of a test to setup the dependency between
+ // sync points and setup markers indicating the successor is only enabled
+ // when it is processed on the same thread as the predecessor.
+ // When adding a marker, it implicitly adds a dependency for the marker pair.
+ void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers);
+
+ // The argument to the callback is passed through from
+ // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or
+ // TEST_IDX_SYNC_POINT was used.
+ void SetCallBack(const std::string& point,
+ const std::function<void(void*)>& callback);
+
+ // Clear callback function by point
+ void ClearCallBack(const std::string& point);
+
+ // Clear all call back functions.
+ void ClearAllCallBacks();
+
+ // enable sync point processing (disabled on startup)
+ void EnableProcessing();
+
+ // disable sync point processing
+ void DisableProcessing();
+
+ // remove the execution trace of all sync points
+ void ClearTrace();
+
+ // triggered by TEST_SYNC_POINT, blocking execution until all predecessors
+ // are executed.
+ // And/or call registered callback function, with argument `cb_arg`
+ void Process(const Slice& point, void* cb_arg = nullptr);
+
+ // template gets length of const string at compile time,
+ // avoiding strlen() at runtime
+ template <size_t kLen>
+ void Process(const char (&point)[kLen], void* cb_arg = nullptr) {
+ static_assert(kLen > 0, "Must not be empty");
+ assert(point[kLen - 1] == '\0');
+ Process(Slice(point, kLen - 1), cb_arg);
+ }
+
+ // TODO: it might be useful to provide a function that blocks until all
+ // sync points are cleared.
+
+ // We want this to be public so we can
+ // subclass the implementation
+ struct Data;
+
+ private:
+ // Singleton
+ SyncPoint();
+ Data* impl_;
+};
+
+// Sets up sync points to mock direct IO instead of actually issuing direct IO
+// to the file system.
+void SetupSyncPointsToMockDirectIO();
+} // namespace ROCKSDB_NAMESPACE
+
+// Use TEST_SYNC_POINT to specify sync points inside code base.
+// Sync points can have happens-after dependency on other sync points,
+// configured at runtime via SyncPoint::LoadDependency. This could be
+// utilized to re-produce race conditions between threads.
+// See TransactionLogIteratorRace in db_test.cc for an example use case.
+// TEST_SYNC_POINT is no op in release build.
+#define TEST_SYNC_POINT(x) \
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x)
+#define TEST_IDX_SYNC_POINT(x, index) \
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x + \
+ std::to_string(index))
+#define TEST_SYNC_POINT_CALLBACK(x, y) \
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x, y)
+#define INIT_SYNC_POINT_SINGLETONS() \
+ (void)ROCKSDB_NAMESPACE::SyncPoint::GetInstance();
+#endif // NDEBUG
+
+// Callback sync point for any read IO errors that should be ignored by
+// the fault injection framework
+// Disable in release mode
+#ifdef NDEBUG
+#define IGNORE_STATUS_IF_ERROR(_status_)
+#else
+#define IGNORE_STATUS_IF_ERROR(_status_) \
+ { \
+ if (!_status_.ok()) { \
+ TEST_SYNC_POINT("FaultInjectionIgnoreError"); \
+ } \
+ }
+#endif // NDEBUG
diff --git a/src/rocksdb/test_util/sync_point_impl.cc b/src/rocksdb/test_util/sync_point_impl.cc
new file mode 100644
index 000000000..2a4bd3ccd
--- /dev/null
+++ b/src/rocksdb/test_util/sync_point_impl.cc
@@ -0,0 +1,152 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_util/sync_point_impl.h"
+
+#ifndef NDEBUG
+namespace ROCKSDB_NAMESPACE {
+KillPoint* KillPoint::GetInstance() {
+ static KillPoint kp;
+ return &kp;
+}
+
+void KillPoint::TestKillRandom(std::string kill_point, int odds_weight,
+ const std::string& srcfile, int srcline) {
+ if (rocksdb_kill_odds <= 0) {
+ return;
+ }
+ int odds = rocksdb_kill_odds * odds_weight;
+ for (auto& p : rocksdb_kill_exclude_prefixes) {
+ if (kill_point.substr(0, p.length()) == p) {
+ return;
+ }
+ }
+
+ assert(odds > 0);
+ if (odds % 7 == 0) {
+ // class Random uses multiplier 16807, which is 7^5. If odds are
+ // multiplier of 7, there might be limited values generated.
+ odds++;
+ }
+ auto* r = Random::GetTLSInstance();
+ bool crash = r->OneIn(odds);
+ if (crash) {
+ port::Crash(srcfile, srcline);
+ }
+}
+
+void SyncPoint::Data::LoadDependency(
+ const std::vector<SyncPointPair>& dependencies) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ successors_.clear();
+ predecessors_.clear();
+ cleared_points_.clear();
+ for (const auto& dependency : dependencies) {
+ successors_[dependency.predecessor].push_back(dependency.successor);
+ predecessors_[dependency.successor].push_back(dependency.predecessor);
+ point_filter_.Add(dependency.successor);
+ point_filter_.Add(dependency.predecessor);
+ }
+ cv_.notify_all();
+}
+
+void SyncPoint::Data::LoadDependencyAndMarkers(
+ const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ successors_.clear();
+ predecessors_.clear();
+ cleared_points_.clear();
+ markers_.clear();
+ marked_thread_id_.clear();
+ for (const auto& dependency : dependencies) {
+ successors_[dependency.predecessor].push_back(dependency.successor);
+ predecessors_[dependency.successor].push_back(dependency.predecessor);
+ point_filter_.Add(dependency.successor);
+ point_filter_.Add(dependency.predecessor);
+ }
+ for (const auto& marker : markers) {
+ successors_[marker.predecessor].push_back(marker.successor);
+ predecessors_[marker.successor].push_back(marker.predecessor);
+ markers_[marker.predecessor].push_back(marker.successor);
+ point_filter_.Add(marker.predecessor);
+ point_filter_.Add(marker.successor);
+ }
+ cv_.notify_all();
+}
+
+bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) {
+ for (const auto& pred : predecessors_[point]) {
+ if (cleared_points_.count(pred) == 0) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void SyncPoint::Data::ClearCallBack(const std::string& point) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (num_callbacks_running_ > 0) {
+ cv_.wait(lock);
+ }
+ callbacks_.erase(point);
+}
+
+void SyncPoint::Data::ClearAllCallBacks() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (num_callbacks_running_ > 0) {
+ cv_.wait(lock);
+ }
+ callbacks_.clear();
+}
+
+void SyncPoint::Data::Process(const Slice& point, void* cb_arg) {
+ if (!enabled_) {
+ return;
+ }
+
+ // Use a filter to prevent mutex lock if possible.
+ if (!point_filter_.MayContain(point)) {
+ return;
+ }
+
+ // Must convert to std::string for remaining work. Take
+ // heap hit.
+ std::string point_string(point.ToString());
+ std::unique_lock<std::mutex> lock(mutex_);
+ auto thread_id = std::this_thread::get_id();
+
+ auto marker_iter = markers_.find(point_string);
+ if (marker_iter != markers_.end()) {
+ for (auto& marked_point : marker_iter->second) {
+ marked_thread_id_.emplace(marked_point, thread_id);
+ point_filter_.Add(marked_point);
+ }
+ }
+
+ if (DisabledByMarker(point_string, thread_id)) {
+ return;
+ }
+
+ while (!PredecessorsAllCleared(point_string)) {
+ cv_.wait(lock);
+ if (DisabledByMarker(point_string, thread_id)) {
+ return;
+ }
+ }
+
+ auto callback_pair = callbacks_.find(point_string);
+ if (callback_pair != callbacks_.end()) {
+ num_callbacks_running_++;
+ mutex_.unlock();
+ callback_pair->second(cb_arg);
+ mutex_.lock();
+ num_callbacks_running_--;
+ }
+ cleared_points_.insert(point_string);
+ cv_.notify_all();
+}
+} // namespace ROCKSDB_NAMESPACE
+#endif
diff --git a/src/rocksdb/test_util/sync_point_impl.h b/src/rocksdb/test_util/sync_point_impl.h
new file mode 100644
index 000000000..64cc0445e
--- /dev/null
+++ b/src/rocksdb/test_util/sync_point_impl.h
@@ -0,0 +1,96 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <assert.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "memory/concurrent_arena.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/dynamic_bloom.h"
+#include "util/random.h"
+
+#pragma once
+
+#ifndef NDEBUG
+namespace ROCKSDB_NAMESPACE {
+// A hacky allocator for single use.
+// Arena depends on SyncPoint and create circular dependency.
+class SingleAllocator : public Allocator {
+ public:
+ char* Allocate(size_t) override {
+ assert(false);
+ return nullptr;
+ }
+ char* AllocateAligned(size_t bytes, size_t, Logger*) override {
+ buf_.resize(bytes);
+ return const_cast<char*>(buf_.data());
+ }
+ size_t BlockSize() const override {
+ assert(false);
+ return 0;
+ }
+
+ private:
+ std::string buf_;
+};
+
+struct SyncPoint::Data {
+ Data() : point_filter_(&alloc_, /*total_bits=*/8192), enabled_(false) {}
+ // Enable proper deletion by subclasses
+ virtual ~Data() {}
+ // successor/predecessor map loaded from LoadDependency
+ std::unordered_map<std::string, std::vector<std::string>> successors_;
+ std::unordered_map<std::string, std::vector<std::string>> predecessors_;
+ std::unordered_map<std::string, std::function<void(void*)>> callbacks_;
+ std::unordered_map<std::string, std::vector<std::string>> markers_;
+ std::unordered_map<std::string, std::thread::id> marked_thread_id_;
+
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ // sync points that have been passed through
+ std::unordered_set<std::string> cleared_points_;
+ SingleAllocator alloc_;
+ // A filter before holding mutex to speed up process.
+ DynamicBloom point_filter_;
+ std::atomic<bool> enabled_;
+ int num_callbacks_running_ = 0;
+
+ void LoadDependency(const std::vector<SyncPointPair>& dependencies);
+ void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies,
+ const std::vector<SyncPointPair>& markers);
+ bool PredecessorsAllCleared(const std::string& point);
+ void SetCallBack(const std::string& point,
+ const std::function<void(void*)>& callback) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ callbacks_[point] = callback;
+ point_filter_.Add(point);
+ }
+
+ void ClearCallBack(const std::string& point);
+ void ClearAllCallBacks();
+ void EnableProcessing() { enabled_ = true; }
+ void DisableProcessing() { enabled_ = false; }
+ void ClearTrace() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ cleared_points_.clear();
+ }
+ bool DisabledByMarker(const std::string& point, std::thread::id thread_id) {
+ auto marked_point_iter = marked_thread_id_.find(point);
+ return marked_point_iter != marked_thread_id_.end() &&
+ thread_id != marked_point_iter->second;
+ }
+ void Process(const Slice& point, void* cb_arg);
+};
+} // namespace ROCKSDB_NAMESPACE
+#endif // NDEBUG
diff --git a/src/rocksdb/test_util/testharness.cc b/src/rocksdb/test_util/testharness.cc
new file mode 100644
index 000000000..3c7b835d2
--- /dev/null
+++ b/src/rocksdb/test_util/testharness.cc
@@ -0,0 +1,107 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "test_util/testharness.h"
+
+#include <regex>
+#include <string>
+#include <thread>
+
+namespace ROCKSDB_NAMESPACE {
+namespace test {
+
+#ifdef OS_WIN
+#include <windows.h>
+
+std::string GetPidStr() { return std::to_string(GetCurrentProcessId()); }
+#else
+std::string GetPidStr() { return std::to_string(getpid()); }
+#endif
+
+::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s) {
+ if (s.ok()) {
+ return ::testing::AssertionSuccess();
+ } else {
+ return ::testing::AssertionFailure() << s_expr << std::endl << s.ToString();
+ }
+}
+
+std::string TmpDir(Env* env) {
+ std::string dir;
+ Status s = env->GetTestDirectory(&dir);
+ EXPECT_OK(s);
+ return dir;
+}
+
+std::string PerThreadDBPath(std::string dir, std::string name) {
+ size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
+ return dir + "/" + name + "_" + GetPidStr() + "_" + std::to_string(tid);
+}
+
+std::string PerThreadDBPath(std::string name) {
+ return PerThreadDBPath(test::TmpDir(), name);
+}
+
+std::string PerThreadDBPath(Env* env, std::string name) {
+ return PerThreadDBPath(test::TmpDir(env), name);
+}
+
+int RandomSeed() {
+ const char* env = getenv("TEST_RANDOM_SEED");
+ int result = (env != nullptr ? atoi(env) : 301);
+ if (result <= 0) {
+ result = 301;
+ }
+ return result;
+}
+
+TestRegex::TestRegex(const std::string& pattern)
+ : impl_(std::make_shared<Impl>(pattern)), pattern_(pattern) {}
+TestRegex::TestRegex(const char* pattern)
+ : impl_(std::make_shared<Impl>(pattern)), pattern_(pattern) {}
+
+const std::string& TestRegex::GetPattern() const { return pattern_; }
+
+class TestRegex::Impl : public std::regex {
+ public:
+ using std::regex::basic_regex;
+};
+
+bool TestRegex::Matches(const std::string& str) const {
+ if (impl_) {
+ return std::regex_match(str, *impl_);
+ } else {
+ // Should not call Matches on unset Regex
+ assert(false);
+ return false;
+ }
+}
+
+::testing::AssertionResult AssertMatchesRegex(const char* str_expr,
+ const char* pattern_expr,
+ const std::string& str,
+ const TestRegex& pattern) {
+ if (pattern.Matches(str)) {
+ return ::testing::AssertionSuccess();
+ } else if (TestRegex("\".*\"").Matches(pattern_expr)) {
+ // constant regex string
+ return ::testing::AssertionFailure()
+ << str << " (" << str_expr << ")" << std::endl
+ << "does not match regex " << pattern.GetPattern();
+ } else {
+ // runtime regex string
+ return ::testing::AssertionFailure()
+ << str << " (" << str_expr << ")" << std::endl
+ << "does not match regex" << std::endl
+ << pattern.GetPattern() << " (" << pattern_expr << ")";
+ }
+}
+
+} // namespace test
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/testharness.h b/src/rocksdb/test_util/testharness.h
new file mode 100644
index 000000000..69018629a
--- /dev/null
+++ b/src/rocksdb/test_util/testharness.h
@@ -0,0 +1,119 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+
+#ifdef OS_AIX
+#include "gtest/gtest.h"
+#else
+#include <gtest/gtest.h>
+#endif
+
+// A "skipped" test has a specific meaning in Facebook infrastructure: the
+// test is in good shape and should be run, but something about the
+// compilation or execution environment means the test cannot be run.
+// Specifically, there is a hole in intended testing if any
+// parameterization of a test (e.g. Foo/FooTest.Bar/42) is skipped for all
+// tested build configurations/platforms/etc.
+//
+// If GTEST_SKIP is available, use it. Otherwise, define skip as success.
+//
+// The GTEST macros do not seem to print the message, even with -verbose,
+// so these print to stderr. Note that these do not exit the test themselves;
+// calling code should 'return' or similar from the test.
+#ifdef GTEST_SKIP_
+#define ROCKSDB_GTEST_SKIP(m) \
+ do { \
+ fputs("SKIPPED: " m "\n", stderr); \
+ GTEST_SKIP_(m); \
+ } while (false) /* user ; */
+#else
+#define ROCKSDB_GTEST_SKIP(m) \
+ do { \
+ fputs("SKIPPED: " m "\n", stderr); \
+ GTEST_SUCCESS_("SKIPPED: " m); \
+ } while (false) /* user ; */
+#endif
+
+// We add "bypass" as an alternative to ROCKSDB_GTEST_SKIP that is allowed to
+// be a permanent condition, e.g. for intentionally omitting or disabling some
+// parameterizations for some tests. (Use _DISABLED at the end of the test
+// name to disable an entire test.)
+#define ROCKSDB_GTEST_BYPASS(m) \
+ do { \
+ fputs("BYPASSED: " m "\n", stderr); \
+ GTEST_SUCCESS_("BYPASSED: " m); \
+ } while (false) /* user ; */
+
+#include <string>
+
+#include "port/stack_trace.h"
+#include "rocksdb/env.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace test {
+
+// Return the directory to use for temporary storage.
+std::string TmpDir(Env* env = Env::Default());
+
+// A path unique within the thread
+std::string PerThreadDBPath(std::string name);
+std::string PerThreadDBPath(Env* env, std::string name);
+std::string PerThreadDBPath(std::string dir, std::string name);
+
+// Return a randomization seed for this run. Typically returns the
+// same number on repeated invocations of this binary, but automated
+// runs may be able to vary the seed.
+int RandomSeed();
+
+::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s);
+
+#define ASSERT_OK(s) \
+ ASSERT_PRED_FORMAT1(ROCKSDB_NAMESPACE::test::AssertStatus, s)
+#define ASSERT_NOK(s) ASSERT_FALSE((s).ok())
+#define EXPECT_OK(s) \
+ EXPECT_PRED_FORMAT1(ROCKSDB_NAMESPACE::test::AssertStatus, s)
+#define EXPECT_NOK(s) EXPECT_FALSE((s).ok())
+
+// Useful for testing
+// * No need to deal with Status like in Regex public API
+// * No triggering lint reports on use of std::regex in tests
+// * Available in LITE (unlike public API)
+class TestRegex {
+ public:
+ // These throw on bad pattern
+ /*implicit*/ TestRegex(const std::string& pattern);
+ /*implicit*/ TestRegex(const char* pattern);
+
+ // Checks that the whole of str is matched by this regex
+ bool Matches(const std::string& str) const;
+
+ const std::string& GetPattern() const;
+
+ private:
+ class Impl;
+ std::shared_ptr<Impl> impl_; // shared_ptr for simple implementation
+ std::string pattern_;
+};
+
+::testing::AssertionResult AssertMatchesRegex(const char* str_expr,
+ const char* pattern_expr,
+ const std::string& str,
+ const TestRegex& pattern);
+
+#define ASSERT_MATCHES_REGEX(str, pattern) \
+ ASSERT_PRED_FORMAT2(ROCKSDB_NAMESPACE::test::AssertMatchesRegex, str, pattern)
+#define EXPECT_MATCHES_REGEX(str, pattern) \
+ EXPECT_PRED_FORMAT2(ROCKSDB_NAMESPACE::test::AssertMatchesRegex, str, pattern)
+
+} // namespace test
+
+using test::TestRegex;
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/testutil.cc b/src/rocksdb/test_util/testutil.cc
new file mode 100644
index 000000000..5e1b909f9
--- /dev/null
+++ b/src/rocksdb/test_util/testutil.cc
@@ -0,0 +1,738 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "test_util/testutil.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+
+#include <array>
+#include <cctype>
+#include <fstream>
+#include <sstream>
+
+#include "db/memtable_list.h"
+#include "env/composite_env_wrapper.h"
+#include "file/random_access_file_reader.h"
+#include "file/sequence_file_reader.h"
+#include "file/writable_file_writer.h"
+#include "port/port.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "test_util/mock_time_env.h"
+#include "test_util/sync_point.h"
+#include "util/random.h"
+
+#ifndef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
+void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
+#endif
+
+namespace ROCKSDB_NAMESPACE {
+namespace test {
+
+const uint32_t kDefaultFormatVersion = BlockBasedTableOptions().format_version;
+const std::set<uint32_t> kFooterFormatVersionsToTest{
+ 5U,
+ // In case any interesting future changes
+ kDefaultFormatVersion,
+ kLatestFormatVersion,
+};
+
+std::string RandomKey(Random* rnd, int len, RandomKeyType type) {
+ // Make sure to generate a wide variety of characters so we
+ // test the boundary conditions for short-key optimizations.
+ static const char kTestChars[] = {'\0', '\1', 'a', 'b', 'c',
+ 'd', 'e', '\xfd', '\xfe', '\xff'};
+ std::string result;
+ for (int i = 0; i < len; i++) {
+ std::size_t indx = 0;
+ switch (type) {
+ case RandomKeyType::RANDOM:
+ indx = rnd->Uniform(sizeof(kTestChars));
+ break;
+ case RandomKeyType::LARGEST:
+ indx = sizeof(kTestChars) - 1;
+ break;
+ case RandomKeyType::MIDDLE:
+ indx = sizeof(kTestChars) / 2;
+ break;
+ case RandomKeyType::SMALLEST:
+ indx = 0;
+ break;
+ }
+ result += kTestChars[indx];
+ }
+ return result;
+}
+
+extern Slice CompressibleString(Random* rnd, double compressed_fraction,
+ int len, std::string* dst) {
+ int raw = static_cast<int>(len * compressed_fraction);
+ if (raw < 1) raw = 1;
+ std::string raw_data = rnd->RandomString(raw);
+
+ // Duplicate the random data until we have filled "len" bytes
+ dst->clear();
+ while (dst->size() < (unsigned int)len) {
+ dst->append(raw_data);
+ }
+ dst->resize(len);
+ return Slice(*dst);
+}
+
+namespace {
+class Uint64ComparatorImpl : public Comparator {
+ public:
+ Uint64ComparatorImpl() {}
+
+ const char* Name() const override { return "rocksdb.Uint64Comparator"; }
+
+ int Compare(const Slice& a, const Slice& b) const override {
+ assert(a.size() == sizeof(uint64_t) && b.size() == sizeof(uint64_t));
+ const uint64_t* left = reinterpret_cast<const uint64_t*>(a.data());
+ const uint64_t* right = reinterpret_cast<const uint64_t*>(b.data());
+ uint64_t leftValue;
+ uint64_t rightValue;
+ GetUnaligned(left, &leftValue);
+ GetUnaligned(right, &rightValue);
+ if (leftValue == rightValue) {
+ return 0;
+ } else if (leftValue < rightValue) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+
+ void FindShortestSeparator(std::string* /*start*/,
+ const Slice& /*limit*/) const override {
+ return;
+ }
+
+ void FindShortSuccessor(std::string* /*key*/) const override { return; }
+};
+} // namespace
+
+const Comparator* Uint64Comparator() {
+ static Uint64ComparatorImpl uint64comp;
+ return &uint64comp;
+}
+
+const Comparator* BytewiseComparatorWithU64TsWrapper() {
+ ConfigOptions config_options;
+ const Comparator* user_comparator = nullptr;
+ Status s = Comparator::CreateFromString(
+ config_options, "leveldb.BytewiseComparator.u64ts", &user_comparator);
+ s.PermitUncheckedError();
+ return user_comparator;
+}
+
+void CorruptKeyType(InternalKey* ikey) {
+ std::string keystr = ikey->Encode().ToString();
+ keystr[keystr.size() - 8] = kTypeLogData;
+ ikey->DecodeFrom(Slice(keystr.data(), keystr.size()));
+}
+
+std::string KeyStr(const std::string& user_key, const SequenceNumber& seq,
+ const ValueType& t, bool corrupt) {
+ InternalKey k(user_key, seq, t);
+ if (corrupt) {
+ CorruptKeyType(&k);
+ }
+ return k.Encode().ToString();
+}
+
+std::string KeyStr(uint64_t ts, const std::string& user_key,
+ const SequenceNumber& seq, const ValueType& t,
+ bool corrupt) {
+ std::string user_key_with_ts(user_key);
+ std::string ts_str;
+ PutFixed64(&ts_str, ts);
+ user_key_with_ts.append(ts_str);
+ return KeyStr(user_key_with_ts, seq, t, corrupt);
+}
+
+bool SleepingBackgroundTask::TimedWaitUntilSleeping(uint64_t wait_time) {
+ auto abs_time = SystemClock::Default()->NowMicros() + wait_time;
+ MutexLock l(&mutex_);
+ while (!sleeping_ || !should_sleep_) {
+ if (bg_cv_.TimedWait(abs_time)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool SleepingBackgroundTask::TimedWaitUntilDone(uint64_t wait_time) {
+ auto abs_time = SystemClock::Default()->NowMicros() + wait_time;
+ MutexLock l(&mutex_);
+ while (!done_with_sleep_) {
+ if (bg_cv_.TimedWait(abs_time)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+std::string RandomName(Random* rnd, const size_t len) {
+ std::stringstream ss;
+ for (size_t i = 0; i < len; ++i) {
+ ss << static_cast<char>(rnd->Uniform(26) + 'a');
+ }
+ return ss.str();
+}
+
+CompressionType RandomCompressionType(Random* rnd) {
+ auto ret = static_cast<CompressionType>(rnd->Uniform(6));
+ while (!CompressionTypeSupported(ret)) {
+ ret = static_cast<CompressionType>((static_cast<int>(ret) + 1) % 6);
+ }
+ return ret;
+}
+
+void RandomCompressionTypeVector(const size_t count,
+ std::vector<CompressionType>* types,
+ Random* rnd) {
+ types->clear();
+ for (size_t i = 0; i < count; ++i) {
+ types->emplace_back(RandomCompressionType(rnd));
+ }
+}
+
+const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined) {
+ int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4);
+ switch (random_num) {
+ case 0:
+ return NewFixedPrefixTransform(rnd->Uniform(20) + 1);
+ case 1:
+ return NewCappedPrefixTransform(rnd->Uniform(20) + 1);
+ case 2:
+ return NewNoopTransform();
+ default:
+ return nullptr;
+ }
+}
+
+BlockBasedTableOptions RandomBlockBasedTableOptions(Random* rnd) {
+ BlockBasedTableOptions opt;
+ opt.cache_index_and_filter_blocks = rnd->Uniform(2);
+ opt.pin_l0_filter_and_index_blocks_in_cache = rnd->Uniform(2);
+ opt.pin_top_level_index_and_filter = rnd->Uniform(2);
+ using IndexType = BlockBasedTableOptions::IndexType;
+ const std::array<IndexType, 4> index_types = {
+ {IndexType::kBinarySearch, IndexType::kHashSearch,
+ IndexType::kTwoLevelIndexSearch, IndexType::kBinarySearchWithFirstKey}};
+ opt.index_type =
+ index_types[rnd->Uniform(static_cast<int>(index_types.size()))];
+ opt.checksum = static_cast<ChecksumType>(rnd->Uniform(3));
+ opt.block_size = rnd->Uniform(10000000);
+ opt.block_size_deviation = rnd->Uniform(100);
+ opt.block_restart_interval = rnd->Uniform(100);
+ opt.index_block_restart_interval = rnd->Uniform(100);
+ opt.whole_key_filtering = rnd->Uniform(2);
+
+ return opt;
+}
+
+TableFactory* RandomTableFactory(Random* rnd, int pre_defined) {
+#ifndef ROCKSDB_LITE
+ int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4);
+ switch (random_num) {
+ case 0:
+ return NewPlainTableFactory();
+ case 1:
+ return NewCuckooTableFactory();
+ default:
+ return NewBlockBasedTableFactory();
+ }
+#else
+ (void)rnd;
+ (void)pre_defined;
+ return NewBlockBasedTableFactory();
+#endif // !ROCKSDB_LITE
+}
+
+MergeOperator* RandomMergeOperator(Random* rnd) {
+ return new ChanglingMergeOperator(RandomName(rnd, 10));
+}
+
+CompactionFilter* RandomCompactionFilter(Random* rnd) {
+ return new ChanglingCompactionFilter(RandomName(rnd, 10));
+}
+
+CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd) {
+ return new ChanglingCompactionFilterFactory(RandomName(rnd, 10));
+}
+
+void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) {
+ // boolean options
+ db_opt->advise_random_on_open = rnd->Uniform(2);
+ db_opt->allow_mmap_reads = rnd->Uniform(2);
+ db_opt->allow_mmap_writes = rnd->Uniform(2);
+ db_opt->use_direct_reads = rnd->Uniform(2);
+ db_opt->use_direct_io_for_flush_and_compaction = rnd->Uniform(2);
+ db_opt->create_if_missing = rnd->Uniform(2);
+ db_opt->create_missing_column_families = rnd->Uniform(2);
+ db_opt->enable_thread_tracking = rnd->Uniform(2);
+ db_opt->error_if_exists = rnd->Uniform(2);
+ db_opt->is_fd_close_on_exec = rnd->Uniform(2);
+ db_opt->paranoid_checks = rnd->Uniform(2);
+ db_opt->track_and_verify_wals_in_manifest = rnd->Uniform(2);
+ db_opt->verify_sst_unique_id_in_manifest = rnd->Uniform(2);
+ db_opt->skip_stats_update_on_db_open = rnd->Uniform(2);
+ db_opt->skip_checking_sst_file_sizes_on_db_open = rnd->Uniform(2);
+ db_opt->use_adaptive_mutex = rnd->Uniform(2);
+ db_opt->use_fsync = rnd->Uniform(2);
+ db_opt->recycle_log_file_num = rnd->Uniform(2);
+ db_opt->avoid_flush_during_recovery = rnd->Uniform(2);
+ db_opt->avoid_flush_during_shutdown = rnd->Uniform(2);
+ db_opt->enforce_single_del_contracts = rnd->Uniform(2);
+
+ // int options
+ db_opt->max_background_compactions = rnd->Uniform(100);
+ db_opt->max_background_flushes = rnd->Uniform(100);
+ db_opt->max_file_opening_threads = rnd->Uniform(100);
+ db_opt->max_open_files = rnd->Uniform(100);
+ db_opt->table_cache_numshardbits = rnd->Uniform(100);
+
+ // size_t options
+ db_opt->db_write_buffer_size = rnd->Uniform(10000);
+ db_opt->keep_log_file_num = rnd->Uniform(10000);
+ db_opt->log_file_time_to_roll = rnd->Uniform(10000);
+ db_opt->manifest_preallocation_size = rnd->Uniform(10000);
+ db_opt->max_log_file_size = rnd->Uniform(10000);
+
+ // std::string options
+ db_opt->db_log_dir = "path/to/db_log_dir";
+ db_opt->wal_dir = "path/to/wal_dir";
+
+ // uint32_t options
+ db_opt->max_subcompactions = rnd->Uniform(100000);
+
+ // uint64_t options
+ static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX);
+ db_opt->WAL_size_limit_MB = uint_max + rnd->Uniform(100000);
+ db_opt->WAL_ttl_seconds = uint_max + rnd->Uniform(100000);
+ db_opt->bytes_per_sync = uint_max + rnd->Uniform(100000);
+ db_opt->delayed_write_rate = uint_max + rnd->Uniform(100000);
+ db_opt->delete_obsolete_files_period_micros = uint_max + rnd->Uniform(100000);
+ db_opt->max_manifest_file_size = uint_max + rnd->Uniform(100000);
+ db_opt->max_total_wal_size = uint_max + rnd->Uniform(100000);
+ db_opt->wal_bytes_per_sync = uint_max + rnd->Uniform(100000);
+
+ // unsigned int options
+ db_opt->stats_dump_period_sec = rnd->Uniform(100000);
+}
+
+void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options,
+ Random* rnd) {
+ cf_opt->compaction_style = (CompactionStyle)(rnd->Uniform(4));
+
+ // boolean options
+ cf_opt->report_bg_io_stats = rnd->Uniform(2);
+ cf_opt->disable_auto_compactions = rnd->Uniform(2);
+ cf_opt->inplace_update_support = rnd->Uniform(2);
+ cf_opt->level_compaction_dynamic_level_bytes = rnd->Uniform(2);
+ cf_opt->optimize_filters_for_hits = rnd->Uniform(2);
+ cf_opt->paranoid_file_checks = rnd->Uniform(2);
+ cf_opt->force_consistency_checks = rnd->Uniform(2);
+ cf_opt->compaction_options_fifo.allow_compaction = rnd->Uniform(2);
+ cf_opt->memtable_whole_key_filtering = rnd->Uniform(2);
+ cf_opt->enable_blob_files = rnd->Uniform(2);
+ cf_opt->enable_blob_garbage_collection = rnd->Uniform(2);
+
+ // double options
+ cf_opt->memtable_prefix_bloom_size_ratio =
+ static_cast<double>(rnd->Uniform(10000)) / 20000.0;
+ cf_opt->blob_garbage_collection_age_cutoff = rnd->Uniform(10000) / 10000.0;
+ cf_opt->blob_garbage_collection_force_threshold =
+ rnd->Uniform(10000) / 10000.0;
+
+ // int options
+ cf_opt->level0_file_num_compaction_trigger = rnd->Uniform(100);
+ cf_opt->level0_slowdown_writes_trigger = rnd->Uniform(100);
+ cf_opt->level0_stop_writes_trigger = rnd->Uniform(100);
+ cf_opt->max_bytes_for_level_multiplier = rnd->Uniform(100);
+ cf_opt->max_write_buffer_number = rnd->Uniform(100);
+ cf_opt->max_write_buffer_number_to_maintain = rnd->Uniform(100);
+ cf_opt->max_write_buffer_size_to_maintain = rnd->Uniform(10000);
+ cf_opt->min_write_buffer_number_to_merge = rnd->Uniform(100);
+ cf_opt->num_levels = rnd->Uniform(100);
+ cf_opt->target_file_size_multiplier = rnd->Uniform(100);
+
+ // vector int options
+ cf_opt->max_bytes_for_level_multiplier_additional.resize(cf_opt->num_levels);
+ for (int i = 0; i < cf_opt->num_levels; i++) {
+ cf_opt->max_bytes_for_level_multiplier_additional[i] = rnd->Uniform(100);
+ }
+
+ // size_t options
+ cf_opt->arena_block_size = rnd->Uniform(10000);
+ cf_opt->inplace_update_num_locks = rnd->Uniform(10000);
+ cf_opt->max_successive_merges = rnd->Uniform(10000);
+ cf_opt->memtable_huge_page_size = rnd->Uniform(10000);
+ cf_opt->write_buffer_size = rnd->Uniform(10000);
+
+ // uint32_t options
+ cf_opt->bloom_locality = rnd->Uniform(10000);
+ cf_opt->max_bytes_for_level_base = rnd->Uniform(10000);
+
+ // uint64_t options
+ static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX);
+ cf_opt->ttl =
+ db_options.max_open_files == -1 ? uint_max + rnd->Uniform(10000) : 0;
+ cf_opt->periodic_compaction_seconds =
+ db_options.max_open_files == -1 ? uint_max + rnd->Uniform(10000) : 0;
+ cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000);
+ cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000);
+ cf_opt->max_compaction_bytes =
+ cf_opt->target_file_size_base * rnd->Uniform(100);
+ cf_opt->compaction_options_fifo.max_table_files_size =
+ uint_max + rnd->Uniform(10000);
+ cf_opt->min_blob_size = uint_max + rnd->Uniform(10000);
+ cf_opt->blob_file_size = uint_max + rnd->Uniform(10000);
+ cf_opt->blob_compaction_readahead_size = uint_max + rnd->Uniform(10000);
+
+ // pointer typed options
+ cf_opt->prefix_extractor.reset(RandomSliceTransform(rnd));
+ cf_opt->table_factory.reset(RandomTableFactory(rnd));
+ cf_opt->merge_operator.reset(RandomMergeOperator(rnd));
+ if (cf_opt->compaction_filter) {
+ delete cf_opt->compaction_filter;
+ }
+ cf_opt->compaction_filter = RandomCompactionFilter(rnd);
+ cf_opt->compaction_filter_factory.reset(RandomCompactionFilterFactory(rnd));
+
+ // custom typed options
+ cf_opt->compression = RandomCompressionType(rnd);
+ RandomCompressionTypeVector(cf_opt->num_levels,
+ &cf_opt->compression_per_level, rnd);
+ cf_opt->blob_compression_type = RandomCompressionType(rnd);
+}
+
+bool IsDirectIOSupported(Env* env, const std::string& dir) {
+ EnvOptions env_options;
+ env_options.use_mmap_writes = false;
+ env_options.use_direct_writes = true;
+ std::string tmp = TempFileName(dir, 999);
+ Status s;
+ {
+ std::unique_ptr<WritableFile> file;
+ s = env->NewWritableFile(tmp, &file, env_options);
+ }
+ if (s.ok()) {
+ s = env->DeleteFile(tmp);
+ }
+ return s.ok();
+}
+
+bool IsPrefetchSupported(const std::shared_ptr<FileSystem>& fs,
+ const std::string& dir) {
+ bool supported = false;
+ std::string tmp = TempFileName(dir, 999);
+ Random rnd(301);
+ std::string test_string = rnd.RandomString(4096);
+ Slice data(test_string);
+ Status s = WriteStringToFile(fs.get(), data, tmp, true);
+ if (s.ok()) {
+ std::unique_ptr<FSRandomAccessFile> file;
+ auto io_s = fs->NewRandomAccessFile(tmp, FileOptions(), &file, nullptr);
+ if (io_s.ok()) {
+ supported = !(file->Prefetch(0, data.size(), IOOptions(), nullptr)
+ .IsNotSupported());
+ }
+ s = fs->DeleteFile(tmp, IOOptions(), nullptr);
+ }
+ return s.ok() && supported;
+}
+
+size_t GetLinesCount(const std::string& fname, const std::string& pattern) {
+ std::stringstream ssbuf;
+ std::string line;
+ size_t count = 0;
+
+ std::ifstream inFile(fname.c_str());
+ ssbuf << inFile.rdbuf();
+
+ while (getline(ssbuf, line)) {
+ if (line.find(pattern) != std::string::npos) {
+ count++;
+ }
+ }
+
+ return count;
+}
+
+Status CorruptFile(Env* env, const std::string& fname, int offset,
+ int bytes_to_corrupt, bool verify_checksum /*=true*/) {
+ uint64_t size;
+ Status s = env->GetFileSize(fname, &size);
+ if (!s.ok()) {
+ return s;
+ } else if (offset < 0) {
+ // Relative to end of file; make it absolute
+ if (-offset > static_cast<int>(size)) {
+ offset = 0;
+ } else {
+ offset = static_cast<int>(size + offset);
+ }
+ }
+ if (offset > static_cast<int>(size)) {
+ offset = static_cast<int>(size);
+ }
+ if (offset + bytes_to_corrupt > static_cast<int>(size)) {
+ bytes_to_corrupt = static_cast<int>(size - offset);
+ }
+
+ // Do it
+ std::string contents;
+ s = ReadFileToString(env, fname, &contents);
+ if (s.ok()) {
+ for (int i = 0; i < bytes_to_corrupt; i++) {
+ contents[i + offset] ^= 0x80;
+ }
+ s = WriteStringToFile(env, contents, fname);
+ }
+ if (s.ok() && verify_checksum) {
+#ifndef ROCKSDB_LITE
+ Options options;
+ options.env = env;
+ EnvOptions env_options;
+ Status v = VerifySstFileChecksum(options, env_options, fname);
+ assert(!v.ok());
+#endif
+ }
+ return s;
+}
+
+Status TruncateFile(Env* env, const std::string& fname, uint64_t new_length) {
+ uint64_t old_length;
+ Status s = env->GetFileSize(fname, &old_length);
+ if (!s.ok() || new_length == old_length) {
+ return s;
+ }
+ // Do it
+ std::string contents;
+ s = ReadFileToString(env, fname, &contents);
+ if (s.ok()) {
+ contents.resize(static_cast<size_t>(new_length), 'b');
+ s = WriteStringToFile(env, contents, fname);
+ }
+ return s;
+}
+
+// Try and delete a directory if it exists
+Status TryDeleteDir(Env* env, const std::string& dirname) {
+ bool is_dir = false;
+ Status s = env->IsDirectory(dirname, &is_dir);
+ if (s.ok() && is_dir) {
+ s = env->DeleteDir(dirname);
+ }
+ return s;
+}
+
+// Delete a directory if it exists
+void DeleteDir(Env* env, const std::string& dirname) {
+ TryDeleteDir(env, dirname).PermitUncheckedError();
+}
+
+Status CreateEnvFromSystem(const ConfigOptions& config_options, Env** result,
+ std::shared_ptr<Env>* guard) {
+ const char* env_uri = getenv("TEST_ENV_URI");
+ const char* fs_uri = getenv("TEST_FS_URI");
+ if (env_uri || fs_uri) {
+ return Env::CreateFromUri(config_options,
+ (env_uri != nullptr) ? env_uri : "",
+ (fs_uri != nullptr) ? fs_uri : "", result, guard);
+ } else {
+ // Neither specified. Use the default
+ *result = config_options.env;
+ guard->reset();
+ return Status::OK();
+ }
+}
+namespace {
+// A hacky skip list mem table that triggers flush after number of entries.
+class SpecialMemTableRep : public MemTableRep {
+ public:
+ explicit SpecialMemTableRep(Allocator* allocator, MemTableRep* memtable,
+ int num_entries_flush)
+ : MemTableRep(allocator),
+ memtable_(memtable),
+ num_entries_flush_(num_entries_flush),
+ num_entries_(0) {}
+
+ virtual KeyHandle Allocate(const size_t len, char** buf) override {
+ return memtable_->Allocate(len, buf);
+ }
+
+ // Insert key into the list.
+ // REQUIRES: nothing that compares equal to key is currently in the list.
+ virtual void Insert(KeyHandle handle) override {
+ num_entries_++;
+ memtable_->Insert(handle);
+ }
+
+ void InsertConcurrently(KeyHandle handle) override {
+ num_entries_++;
+ memtable_->Insert(handle);
+ }
+
+ // Returns true iff an entry that compares equal to key is in the list.
+ virtual bool Contains(const char* key) const override {
+ return memtable_->Contains(key);
+ }
+
+ virtual size_t ApproximateMemoryUsage() override {
+ // Return a high memory usage when number of entries exceeds the threshold
+ // to trigger a flush.
+ return (num_entries_ < num_entries_flush_) ? 0 : 1024 * 1024 * 1024;
+ }
+
+ virtual void Get(const LookupKey& k, void* callback_args,
+ bool (*callback_func)(void* arg,
+ const char* entry)) override {
+ memtable_->Get(k, callback_args, callback_func);
+ }
+
+ uint64_t ApproximateNumEntries(const Slice& start_ikey,
+ const Slice& end_ikey) override {
+ return memtable_->ApproximateNumEntries(start_ikey, end_ikey);
+ }
+
+ virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
+ return memtable_->GetIterator(arena);
+ }
+
+ virtual ~SpecialMemTableRep() override {}
+
+ private:
+ std::unique_ptr<MemTableRep> memtable_;
+ int num_entries_flush_;
+ int num_entries_;
+};
+class SpecialSkipListFactory : public MemTableRepFactory {
+ public:
+#ifndef ROCKSDB_LITE
+ static bool Register(ObjectLibrary& library, const std::string& /*arg*/) {
+ library.AddFactory<MemTableRepFactory>(
+ ObjectLibrary::PatternEntry(SpecialSkipListFactory::kClassName(), true)
+ .AddNumber(":"),
+ [](const std::string& uri, std::unique_ptr<MemTableRepFactory>* guard,
+ std::string* /* errmsg */) {
+ auto colon = uri.find(":");
+ if (colon != std::string::npos) {
+ auto count = ParseInt(uri.substr(colon + 1));
+ guard->reset(new SpecialSkipListFactory(count));
+ } else {
+ guard->reset(new SpecialSkipListFactory(2));
+ }
+ return guard->get();
+ });
+ return true;
+ }
+#endif // ROCKSDB_LITE
+ // After number of inserts exceeds `num_entries_flush` in a mem table, trigger
+ // flush.
+ explicit SpecialSkipListFactory(int num_entries_flush)
+ : num_entries_flush_(num_entries_flush) {}
+
+ using MemTableRepFactory::CreateMemTableRep;
+ virtual MemTableRep* CreateMemTableRep(
+ const MemTableRep::KeyComparator& compare, Allocator* allocator,
+ const SliceTransform* transform, Logger* /*logger*/) override {
+ return new SpecialMemTableRep(
+ allocator,
+ factory_.CreateMemTableRep(compare, allocator, transform, nullptr),
+ num_entries_flush_);
+ }
+ static const char* kClassName() { return "SpecialSkipListFactory"; }
+ virtual const char* Name() const override { return kClassName(); }
+ std::string GetId() const override {
+ std::string id = Name();
+ if (num_entries_flush_ > 0) {
+ id.append(":").append(std::to_string(num_entries_flush_));
+ }
+ return id;
+ }
+
+ bool IsInsertConcurrentlySupported() const override {
+ return factory_.IsInsertConcurrentlySupported();
+ }
+
+ private:
+ SkipListFactory factory_;
+ int num_entries_flush_;
+};
+} // namespace
+
+MemTableRepFactory* NewSpecialSkipListFactory(int num_entries_per_flush) {
+ RegisterTestLibrary();
+ return new SpecialSkipListFactory(num_entries_per_flush);
+}
+
+#ifndef ROCKSDB_LITE
+// This method loads existing test classes into the ObjectRegistry
+int RegisterTestObjects(ObjectLibrary& library, const std::string& arg) {
+ size_t num_types;
+ library.AddFactory<const Comparator>(
+ test::SimpleSuffixReverseComparator::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<const Comparator>* /*guard*/,
+ std::string* /* errmsg */) {
+ static test::SimpleSuffixReverseComparator ssrc;
+ return &ssrc;
+ });
+ SpecialSkipListFactory::Register(library, arg);
+ library.AddFactory<MergeOperator>(
+ "Changling",
+ [](const std::string& uri, std::unique_ptr<MergeOperator>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new test::ChanglingMergeOperator(uri));
+ return guard->get();
+ });
+ library.AddFactory<CompactionFilter>(
+ "Changling",
+ [](const std::string& uri, std::unique_ptr<CompactionFilter>* /*guard*/,
+ std::string* /* errmsg */) {
+ return new test::ChanglingCompactionFilter(uri);
+ });
+ library.AddFactory<CompactionFilterFactory>(
+ "Changling", [](const std::string& uri,
+ std::unique_ptr<CompactionFilterFactory>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new test::ChanglingCompactionFilterFactory(uri));
+ return guard->get();
+ });
+ library.AddFactory<SystemClock>(
+ MockSystemClock::kClassName(),
+ [](const std::string& /*uri*/, std::unique_ptr<SystemClock>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new MockSystemClock(SystemClock::Default()));
+ return guard->get();
+ });
+ return static_cast<int>(library.GetFactoryCount(&num_types));
+}
+
+#endif // ROCKSDB_LITE
+
+void RegisterTestLibrary(const std::string& arg) {
+ static bool registered = false;
+ if (!registered) {
+ registered = true;
+#ifndef ROCKSDB_LITE
+ ObjectRegistry::Default()->AddLibrary("test", RegisterTestObjects, arg);
+#else
+ (void)arg;
+#endif // ROCKSDB_LITE
+ }
+}
+} // namespace test
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/testutil.h b/src/rocksdb/test_util/testutil.h
new file mode 100644
index 000000000..1f43156ab
--- /dev/null
+++ b/src/rocksdb/test_util/testutil.h
@@ -0,0 +1,852 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <algorithm>
+#include <deque>
+#include <string>
+#include <vector>
+
+#include "env/composite_env_wrapper.h"
+#include "file/writable_file_writer.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/table.h"
+#include "table/internal_iterator.h"
+#include "util/mutexlock.h"
+
+#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
+extern "C" {
+void RegisterCustomObjects(int argc, char** argv);
+}
+#else
+void RegisterCustomObjects(int argc, char** argv);
+#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
+
+namespace ROCKSDB_NAMESPACE {
+class FileSystem;
+class MemTableRepFactory;
+class ObjectLibrary;
+class Random;
+class SequentialFile;
+class SequentialFileReader;
+
+namespace test {
+
+extern const uint32_t kDefaultFormatVersion;
+extern const std::set<uint32_t> kFooterFormatVersionsToTest;
+
+// Return a random key with the specified length that may contain interesting
+// characters (e.g. \x00, \xff, etc.).
+enum RandomKeyType : char { RANDOM, LARGEST, SMALLEST, MIDDLE };
+extern std::string RandomKey(Random* rnd, int len,
+ RandomKeyType type = RandomKeyType::RANDOM);
+
+// Store in *dst a string of length "len" that will compress to
+// "N*compressed_fraction" bytes and return a Slice that references
+// the generated data.
+extern Slice CompressibleString(Random* rnd, double compressed_fraction,
+ int len, std::string* dst);
+
+#ifndef NDEBUG
+// An internal comparator that just forward comparing results from the
+// user comparator in it. Can be used to test entities that have no dependency
+// on internal key structure but consumes InternalKeyComparator, like
+// BlockBasedTable.
+class PlainInternalKeyComparator : public InternalKeyComparator {
+ public:
+ explicit PlainInternalKeyComparator(const Comparator* c)
+ : InternalKeyComparator(c) {}
+
+ virtual ~PlainInternalKeyComparator() {}
+
+ virtual int Compare(const Slice& a, const Slice& b) const override {
+ return user_comparator()->Compare(a, b);
+ }
+};
+#endif
+
+// A test comparator which compare two strings in this way:
+// (1) first compare prefix of 8 bytes in alphabet order,
+// (2) if two strings share the same prefix, sort the other part of the string
+// in the reverse alphabet order.
+// This helps simulate the case of compounded key of [entity][timestamp] and
+// latest timestamp first.
+class SimpleSuffixReverseComparator : public Comparator {
+ public:
+ SimpleSuffixReverseComparator() {}
+ static const char* kClassName() { return "SimpleSuffixReverseComparator"; }
+ virtual const char* Name() const override { return kClassName(); }
+
+ virtual int Compare(const Slice& a, const Slice& b) const override {
+ Slice prefix_a = Slice(a.data(), 8);
+ Slice prefix_b = Slice(b.data(), 8);
+ int prefix_comp = prefix_a.compare(prefix_b);
+ if (prefix_comp != 0) {
+ return prefix_comp;
+ } else {
+ Slice suffix_a = Slice(a.data() + 8, a.size() - 8);
+ Slice suffix_b = Slice(b.data() + 8, b.size() - 8);
+ return -(suffix_a.compare(suffix_b));
+ }
+ }
+ virtual void FindShortestSeparator(std::string* /*start*/,
+ const Slice& /*limit*/) const override {}
+
+ virtual void FindShortSuccessor(std::string* /*key*/) const override {}
+};
+
+// Returns a user key comparator that can be used for comparing two uint64_t
+// slices. Instead of comparing slices byte-wise, it compares all the 8 bytes
+// at once. Assumes same endian-ness is used though the database's lifetime.
+// Symantics of comparison would differ from Bytewise comparator in little
+// endian machines.
+extern const Comparator* Uint64Comparator();
+
+// A wrapper api for getting the ComparatorWithU64Ts<BytewiseComparator>
+extern const Comparator* BytewiseComparatorWithU64TsWrapper();
+
+class StringSink : public FSWritableFile {
+ public:
+ std::string contents_;
+
+ explicit StringSink(Slice* reader_contents = nullptr)
+ : FSWritableFile(),
+ contents_(""),
+ reader_contents_(reader_contents),
+ last_flush_(0) {
+ if (reader_contents_ != nullptr) {
+ *reader_contents_ = Slice(contents_.data(), 0);
+ }
+ }
+
+ const std::string& contents() const { return contents_; }
+
+ IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ contents_.resize(static_cast<size_t>(size));
+ return IOStatus::OK();
+ }
+ IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+ IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ if (reader_contents_ != nullptr) {
+ assert(reader_contents_->size() <= last_flush_);
+ size_t offset = last_flush_ - reader_contents_->size();
+ *reader_contents_ =
+ Slice(contents_.data() + offset, contents_.size() - offset);
+ last_flush_ = contents_.size();
+ }
+
+ return IOStatus::OK();
+ }
+ IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ using FSWritableFile::Append;
+ IOStatus Append(const Slice& slice, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ contents_.append(slice.data(), slice.size());
+ return IOStatus::OK();
+ }
+ void Drop(size_t bytes) {
+ if (reader_contents_ != nullptr) {
+ contents_.resize(contents_.size() - bytes);
+ *reader_contents_ =
+ Slice(reader_contents_->data(), reader_contents_->size() - bytes);
+ last_flush_ = contents_.size();
+ }
+ }
+
+ private:
+ Slice* reader_contents_;
+ size_t last_flush_;
+};
+
+// A wrapper around a StringSink to give it a RandomRWFile interface
+class RandomRWStringSink : public FSRandomRWFile {
+ public:
+ explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {}
+
+ IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ if (offset + data.size() > ss_->contents_.size()) {
+ ss_->contents_.resize(static_cast<size_t>(offset) + data.size(), '\0');
+ }
+
+ char* pos = const_cast<char*>(ss_->contents_.data() + offset);
+ memcpy(pos, data.data(), data.size());
+ return IOStatus::OK();
+ }
+
+ IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/,
+ Slice* result, char* /*scratch*/,
+ IODebugContext* /*dbg*/) const override {
+ *result = Slice(nullptr, 0);
+ if (offset < ss_->contents_.size()) {
+ size_t str_res_sz =
+ std::min(static_cast<size_t>(ss_->contents_.size() - offset), n);
+ *result = Slice(ss_->contents_.data() + offset, str_res_sz);
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ const std::string& contents() const { return ss_->contents(); }
+
+ private:
+ StringSink* ss_;
+};
+
+// Like StringSink, this writes into a string. Unlink StringSink, it
+// has some initial content and overwrites it, just like a recycled
+// log file.
+class OverwritingStringSink : public FSWritableFile {
+ public:
+ explicit OverwritingStringSink(Slice* reader_contents)
+ : FSWritableFile(),
+ contents_(""),
+ reader_contents_(reader_contents),
+ last_flush_(0) {}
+
+ const std::string& contents() const { return contents_; }
+
+ IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ contents_.resize(static_cast<size_t>(size));
+ return IOStatus::OK();
+ }
+ IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+ IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ if (last_flush_ < contents_.size()) {
+ assert(reader_contents_->size() >= contents_.size());
+ memcpy((char*)reader_contents_->data() + last_flush_,
+ contents_.data() + last_flush_, contents_.size() - last_flush_);
+ last_flush_ = contents_.size();
+ }
+ return IOStatus::OK();
+ }
+ IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ using FSWritableFile::Append;
+ IOStatus Append(const Slice& slice, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ contents_.append(slice.data(), slice.size());
+ return IOStatus::OK();
+ }
+ void Drop(size_t bytes) {
+ contents_.resize(contents_.size() - bytes);
+ if (last_flush_ > contents_.size()) last_flush_ = contents_.size();
+ }
+
+ private:
+ std::string contents_;
+ Slice* reader_contents_;
+ size_t last_flush_;
+};
+
+class StringSource : public FSRandomAccessFile {
+ public:
+ explicit StringSource(const Slice& contents, uint64_t uniq_id = 0,
+ bool mmap = false)
+ : contents_(contents.data(), contents.size()),
+ uniq_id_(uniq_id),
+ mmap_(mmap),
+ total_reads_(0) {}
+
+ virtual ~StringSource() {}
+
+ uint64_t Size() const { return contents_.size(); }
+
+ IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ // If we are using mmap_, it is equivalent to performing a prefetch
+ if (mmap_) {
+ return IOStatus::OK();
+ } else {
+ return IOStatus::NotSupported("Prefetch not supported");
+ }
+ }
+
+ IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/,
+ Slice* result, char* scratch,
+ IODebugContext* /*dbg*/) const override {
+ total_reads_++;
+ if (offset > contents_.size()) {
+ return IOStatus::InvalidArgument("invalid Read offset");
+ }
+ if (offset + n > contents_.size()) {
+ n = contents_.size() - static_cast<size_t>(offset);
+ }
+ if (!mmap_) {
+ memcpy(scratch, &contents_[static_cast<size_t>(offset)], n);
+ *result = Slice(scratch, n);
+ } else {
+ *result = Slice(&contents_[static_cast<size_t>(offset)], n);
+ }
+ return IOStatus::OK();
+ }
+
+ size_t GetUniqueId(char* id, size_t max_size) const override {
+ if (max_size < 20) {
+ return 0;
+ }
+
+ char* rid = id;
+ rid = EncodeVarint64(rid, uniq_id_);
+ rid = EncodeVarint64(rid, 0);
+ return static_cast<size_t>(rid - id);
+ }
+
+ int total_reads() const { return total_reads_; }
+
+ void set_total_reads(int tr) { total_reads_ = tr; }
+
+ private:
+ std::string contents_;
+ uint64_t uniq_id_;
+ bool mmap_;
+ mutable int total_reads_;
+};
+
+class NullLogger : public Logger {
+ public:
+ using Logger::Logv;
+ virtual void Logv(const char* /*format*/, va_list /*ap*/) override {}
+ virtual size_t GetLogFileSize() const override { return 0; }
+};
+
+// Corrupts key by changing the type
+extern void CorruptKeyType(InternalKey* ikey);
+
+extern std::string KeyStr(const std::string& user_key,
+ const SequenceNumber& seq, const ValueType& t,
+ bool corrupt = false);
+
+extern std::string KeyStr(uint64_t ts, const std::string& user_key,
+ const SequenceNumber& seq, const ValueType& t,
+ bool corrupt = false);
+
+class SleepingBackgroundTask {
+ public:
+ SleepingBackgroundTask()
+ : bg_cv_(&mutex_),
+ should_sleep_(true),
+ done_with_sleep_(false),
+ sleeping_(false) {}
+
+ bool IsSleeping() {
+ MutexLock l(&mutex_);
+ return sleeping_;
+ }
+ void DoSleep() {
+ MutexLock l(&mutex_);
+ sleeping_ = true;
+ bg_cv_.SignalAll();
+ while (should_sleep_) {
+ bg_cv_.Wait();
+ }
+ sleeping_ = false;
+ done_with_sleep_ = true;
+ bg_cv_.SignalAll();
+ }
+ void WaitUntilSleeping() {
+ MutexLock l(&mutex_);
+ while (!sleeping_ || !should_sleep_) {
+ bg_cv_.Wait();
+ }
+ }
+ // Waits for the status to change to sleeping,
+ // otherwise times out.
+ // wait_time is in microseconds.
+ // Returns true when times out, false otherwise.
+ bool TimedWaitUntilSleeping(uint64_t wait_time);
+
+ void WakeUp() {
+ MutexLock l(&mutex_);
+ should_sleep_ = false;
+ bg_cv_.SignalAll();
+ }
+ void WaitUntilDone() {
+ MutexLock l(&mutex_);
+ while (!done_with_sleep_) {
+ bg_cv_.Wait();
+ }
+ }
+ // Similar to TimedWaitUntilSleeping.
+ // Waits until the task is done.
+ bool TimedWaitUntilDone(uint64_t wait_time);
+
+ bool WokenUp() {
+ MutexLock l(&mutex_);
+ return should_sleep_ == false;
+ }
+
+ void Reset() {
+ MutexLock l(&mutex_);
+ should_sleep_ = true;
+ done_with_sleep_ = false;
+ }
+
+ static void DoSleepTask(void* arg) {
+ reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
+ }
+
+ private:
+ port::Mutex mutex_;
+ port::CondVar bg_cv_; // Signalled when background work finishes
+ bool should_sleep_;
+ bool done_with_sleep_;
+ bool sleeping_;
+};
+
+// Filters merge operands and values that are equal to `num`.
+class FilterNumber : public CompactionFilter {
+ public:
+ explicit FilterNumber(uint64_t num) : num_(num) {}
+
+ std::string last_merge_operand_key() { return last_merge_operand_key_; }
+
+ bool Filter(int /*level*/, const ROCKSDB_NAMESPACE::Slice& /*key*/,
+ const ROCKSDB_NAMESPACE::Slice& value, std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ if (value.size() == sizeof(uint64_t)) {
+ return num_ == DecodeFixed64(value.data());
+ }
+ return true;
+ }
+
+ bool FilterMergeOperand(
+ int /*level*/, const ROCKSDB_NAMESPACE::Slice& key,
+ const ROCKSDB_NAMESPACE::Slice& value) const override {
+ last_merge_operand_key_ = key.ToString();
+ if (value.size() == sizeof(uint64_t)) {
+ return num_ == DecodeFixed64(value.data());
+ }
+ return true;
+ }
+
+ const char* Name() const override { return "FilterBadMergeOperand"; }
+
+ private:
+ mutable std::string last_merge_operand_key_;
+ uint64_t num_;
+};
+
+inline std::string EncodeInt(uint64_t x) {
+ std::string result;
+ PutFixed64(&result, x);
+ return result;
+}
+
+class SeqStringSource : public FSSequentialFile {
+ public:
+ SeqStringSource(const std::string& data, std::atomic<int>* read_count)
+ : data_(data), offset_(0), read_count_(read_count) {}
+ ~SeqStringSource() override {}
+ IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result,
+ char* scratch, IODebugContext* /*dbg*/) override {
+ std::string output;
+ if (offset_ < data_.size()) {
+ n = std::min(data_.size() - offset_, n);
+ memcpy(scratch, data_.data() + offset_, n);
+ offset_ += n;
+ *result = Slice(scratch, n);
+ } else {
+ return IOStatus::InvalidArgument(
+ "Attempt to read when it already reached eof.");
+ }
+ (*read_count_)++;
+ return IOStatus::OK();
+ }
+
+ IOStatus Skip(uint64_t n) override {
+ if (offset_ >= data_.size()) {
+ return IOStatus::InvalidArgument(
+ "Attempt to read when it already reached eof.");
+ }
+ // TODO(yhchiang): Currently doesn't handle the overflow case.
+ offset_ += static_cast<size_t>(n);
+ return IOStatus::OK();
+ }
+
+ private:
+ std::string data_;
+ size_t offset_;
+ std::atomic<int>* read_count_;
+};
+
+class StringFS : public FileSystemWrapper {
+ public:
+ class StringSink : public FSWritableFile {
+ public:
+ explicit StringSink(std::string* contents)
+ : FSWritableFile(), contents_(contents) {}
+ IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ contents_->resize(static_cast<size_t>(size));
+ return IOStatus::OK();
+ }
+ IOStatus Close(const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+ IOStatus Flush(const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+ IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
+
+ using FSWritableFile::Append;
+ IOStatus Append(const Slice& slice, const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) override {
+ contents_->append(slice.data(), slice.size());
+ return IOStatus::OK();
+ }
+
+ private:
+ std::string* contents_;
+ };
+
+ explicit StringFS(const std::shared_ptr<FileSystem>& t)
+ : FileSystemWrapper(t) {}
+ ~StringFS() override {}
+
+ static const char* kClassName() { return "StringFS"; }
+ const char* Name() const override { return kClassName(); }
+
+ const std::string& GetContent(const std::string& f) { return files_[f]; }
+
+ const IOStatus WriteToNewFile(const std::string& file_name,
+ const std::string& content) {
+ std::unique_ptr<FSWritableFile> r;
+ FileOptions file_opts;
+ IOOptions io_opts;
+
+ auto s = NewWritableFile(file_name, file_opts, &r, nullptr);
+ if (s.ok()) {
+ s = r->Append(content, io_opts, nullptr);
+ }
+ if (s.ok()) {
+ s = r->Flush(io_opts, nullptr);
+ }
+ if (s.ok()) {
+ s = r->Close(io_opts, nullptr);
+ }
+ assert(!s.ok() || files_[file_name] == content);
+ return s;
+ }
+
+ // The following text is boilerplate that forwards all methods to target()
+ IOStatus NewSequentialFile(const std::string& f,
+ const FileOptions& /*options*/,
+ std::unique_ptr<FSSequentialFile>* r,
+ IODebugContext* /*dbg*/) override {
+ auto iter = files_.find(f);
+ if (iter == files_.end()) {
+ return IOStatus::NotFound("The specified file does not exist", f);
+ }
+ r->reset(new SeqStringSource(iter->second, &num_seq_file_read_));
+ return IOStatus::OK();
+ }
+
+ IOStatus NewRandomAccessFile(const std::string& /*f*/,
+ const FileOptions& /*options*/,
+ std::unique_ptr<FSRandomAccessFile>* /*r*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus NewWritableFile(const std::string& f, const FileOptions& /*options*/,
+ std::unique_ptr<FSWritableFile>* r,
+ IODebugContext* /*dbg*/) override {
+ auto iter = files_.find(f);
+ if (iter != files_.end()) {
+ return IOStatus::IOError("The specified file already exists", f);
+ }
+ r->reset(new StringSink(&files_[f]));
+ return IOStatus::OK();
+ }
+ IOStatus NewDirectory(const std::string& /*name*/,
+ const IOOptions& /*options*/,
+ std::unique_ptr<FSDirectory>* /*result*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus FileExists(const std::string& f, const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ if (files_.find(f) == files_.end()) {
+ return IOStatus::NotFound();
+ }
+ return IOStatus::OK();
+ }
+
+ IOStatus GetChildren(const std::string& /*dir*/, const IOOptions& /*options*/,
+ std::vector<std::string>* /*r*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus DeleteFile(const std::string& f, const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ files_.erase(f);
+ return IOStatus::OK();
+ }
+
+ IOStatus CreateDir(const std::string& /*d*/, const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus CreateDirIfMissing(const std::string& /*d*/,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus DeleteDir(const std::string& /*d*/, const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus GetFileSize(const std::string& f, const IOOptions& /*options*/,
+ uint64_t* s, IODebugContext* /*dbg*/) override {
+ auto iter = files_.find(f);
+ if (iter == files_.end()) {
+ return IOStatus::NotFound("The specified file does not exist:", f);
+ }
+ *s = iter->second.size();
+ return IOStatus::OK();
+ }
+
+ IOStatus GetFileModificationTime(const std::string& /*fname*/,
+ const IOOptions& /*options*/,
+ uint64_t* /*file_mtime*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus RenameFile(const std::string& /*s*/, const std::string& /*t*/,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus LinkFile(const std::string& /*s*/, const std::string& /*t*/,
+ const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus LockFile(const std::string& /*f*/, const IOOptions& /*options*/,
+ FileLock** /*l*/, IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ IOStatus UnlockFile(FileLock* /*l*/, const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::NotSupported();
+ }
+
+ std::atomic<int> num_seq_file_read_;
+
+ protected:
+ std::unordered_map<std::string, std::string> files_;
+};
+
+// Randomly initialize the given DBOptions
+void RandomInitDBOptions(DBOptions* db_opt, Random* rnd);
+
+// Randomly initialize the given ColumnFamilyOptions
+// Note that the caller is responsible for releasing non-null
+// cf_opt->compaction_filter.
+void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions&, Random* rnd);
+
+// A dummy merge operator which can change its name
+class ChanglingMergeOperator : public MergeOperator {
+ public:
+ explicit ChanglingMergeOperator(const std::string& name)
+ : name_(name + "MergeOperator") {}
+ ~ChanglingMergeOperator() {}
+
+ void SetName(const std::string& name) { name_ = name; }
+
+ virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
+ MergeOperationOutput* /*merge_out*/) const override {
+ return false;
+ }
+ virtual bool PartialMergeMulti(const Slice& /*key*/,
+ const std::deque<Slice>& /*operand_list*/,
+ std::string* /*new_value*/,
+ Logger* /*logger*/) const override {
+ return false;
+ }
+ static const char* kClassName() { return "ChanglingMergeOperator"; }
+ const char* NickName() const override { return kNickName(); }
+ static const char* kNickName() { return "Changling"; }
+ bool IsInstanceOf(const std::string& id) const override {
+ if (id == kClassName()) {
+ return true;
+ } else {
+ return MergeOperator::IsInstanceOf(id);
+ }
+ }
+
+ virtual const char* Name() const override { return name_.c_str(); }
+
+ protected:
+ std::string name_;
+};
+
+// Returns a dummy merge operator with random name.
+MergeOperator* RandomMergeOperator(Random* rnd);
+
+// A dummy compaction filter which can change its name
+class ChanglingCompactionFilter : public CompactionFilter {
+ public:
+ explicit ChanglingCompactionFilter(const std::string& name)
+ : name_(name + "CompactionFilter") {}
+ ~ChanglingCompactionFilter() {}
+
+ void SetName(const std::string& name) { name_ = name; }
+
+ bool Filter(int /*level*/, const Slice& /*key*/,
+ const Slice& /*existing_value*/, std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ return false;
+ }
+
+ static const char* kClassName() { return "ChanglingCompactionFilter"; }
+ const char* NickName() const override { return kNickName(); }
+ static const char* kNickName() { return "Changling"; }
+
+ bool IsInstanceOf(const std::string& id) const override {
+ if (id == kClassName()) {
+ return true;
+ } else {
+ return CompactionFilter::IsInstanceOf(id);
+ }
+ }
+
+ const char* Name() const override { return name_.c_str(); }
+
+ private:
+ std::string name_;
+};
+
+// Returns a dummy compaction filter with a random name.
+CompactionFilter* RandomCompactionFilter(Random* rnd);
+
+// A dummy compaction filter factory which can change its name
+class ChanglingCompactionFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit ChanglingCompactionFilterFactory(const std::string& name)
+ : name_(name + "CompactionFilterFactory") {}
+ ~ChanglingCompactionFilterFactory() {}
+
+ void SetName(const std::string& name) { name_ = name; }
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override {
+ return std::unique_ptr<CompactionFilter>();
+ }
+
+ // Returns a name that identifies this compaction filter factory.
+ const char* Name() const override { return name_.c_str(); }
+ static const char* kClassName() { return "ChanglingCompactionFilterFactory"; }
+ const char* NickName() const override { return kNickName(); }
+ static const char* kNickName() { return "Changling"; }
+
+ bool IsInstanceOf(const std::string& id) const override {
+ if (id == kClassName()) {
+ return true;
+ } else {
+ return CompactionFilterFactory::IsInstanceOf(id);
+ }
+ }
+
+ protected:
+ std::string name_;
+};
+
+// The factory for the hacky skip list mem table that triggers flush after
+// number of entries exceeds a threshold.
+extern MemTableRepFactory* NewSpecialSkipListFactory(int num_entries_per_flush);
+
+CompressionType RandomCompressionType(Random* rnd);
+
+void RandomCompressionTypeVector(const size_t count,
+ std::vector<CompressionType>* types,
+ Random* rnd);
+
+CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd);
+
+const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined = -1);
+
+TableFactory* RandomTableFactory(Random* rnd, int pre_defined = -1);
+
+std::string RandomName(Random* rnd, const size_t len);
+
+bool IsDirectIOSupported(Env* env, const std::string& dir);
+
+bool IsPrefetchSupported(const std::shared_ptr<FileSystem>& fs,
+ const std::string& dir);
+
+// Return the number of lines where a given pattern was found in a file.
+size_t GetLinesCount(const std::string& fname, const std::string& pattern);
+
+Status CorruptFile(Env* env, const std::string& fname, int offset,
+ int bytes_to_corrupt, bool verify_checksum = true);
+Status TruncateFile(Env* env, const std::string& fname, uint64_t length);
+
+// Try and delete a directory if it exists
+Status TryDeleteDir(Env* env, const std::string& dirname);
+
+// Delete a directory if it exists
+void DeleteDir(Env* env, const std::string& dirname);
+
+// Creates an Env from the system environment by looking at the system
+// environment variables.
+Status CreateEnvFromSystem(const ConfigOptions& options, Env** result,
+ std::shared_ptr<Env>* guard);
+
+#ifndef ROCKSDB_LITE
+// Registers the testutil classes with the ObjectLibrary
+int RegisterTestObjects(ObjectLibrary& library, const std::string& /*arg*/);
+#endif // ROCKSDB_LITE
+
+// Register the testutil classes with the default ObjectRegistry/Library
+void RegisterTestLibrary(const std::string& arg = "");
+} // namespace test
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/test_util/testutil_test.cc b/src/rocksdb/test_util/testutil_test.cc
new file mode 100644
index 000000000..41f26e389
--- /dev/null
+++ b/src/rocksdb/test_util/testutil_test.cc
@@ -0,0 +1,43 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_util/testutil.h"
+
+#include "file/file_util.h"
+#include "port/port.h"
+#include "port/stack_trace.h"
+#include "test_util/testharness.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+void CreateFile(Env* env, const std::string& path) {
+ std::unique_ptr<WritableFile> f;
+ ASSERT_OK(env->NewWritableFile(path, &f, EnvOptions()));
+ f->Close();
+}
+
+TEST(TestUtil, DestroyDirRecursively) {
+ auto env = Env::Default();
+ // test_util/file
+ // /dir
+ // /dir/file
+ std::string test_dir = test::PerThreadDBPath("test_util");
+ ASSERT_OK(env->CreateDir(test_dir));
+ CreateFile(env, test_dir + "/file");
+ ASSERT_OK(env->CreateDir(test_dir + "/dir"));
+ CreateFile(env, test_dir + "/dir/file");
+
+ ASSERT_OK(DestroyDir(env, test_dir));
+ auto s = env->FileExists(test_dir);
+ ASSERT_TRUE(s.IsNotFound());
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/test_util/transaction_test_util.cc b/src/rocksdb/test_util/transaction_test_util.cc
new file mode 100644
index 000000000..99286d836
--- /dev/null
+++ b/src/rocksdb/test_util/transaction_test_util.cc
@@ -0,0 +1,402 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "test_util/transaction_test_util.h"
+
+#include <algorithm>
+#include <cinttypes>
+#include <numeric>
+#include <random>
+#include <string>
+#include <thread>
+
+#include "db/dbformat.h"
+#include "db/snapshot_impl.h"
+#include "logging/logging.h"
+#include "rocksdb/db.h"
+#include "rocksdb/utilities/optimistic_transaction_db.h"
+#include "rocksdb/utilities/transaction.h"
+#include "rocksdb/utilities/transaction_db.h"
+#include "util/random.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+RandomTransactionInserter::RandomTransactionInserter(
+ Random64* rand, const WriteOptions& write_options,
+ const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets,
+ const uint64_t cmt_delay_ms, const uint64_t first_id)
+ : rand_(rand),
+ write_options_(write_options),
+ read_options_(read_options),
+ num_keys_(num_keys),
+ num_sets_(num_sets),
+ txn_id_(first_id),
+ cmt_delay_ms_(cmt_delay_ms) {}
+
+RandomTransactionInserter::~RandomTransactionInserter() {
+ if (txn_ != nullptr) {
+ delete txn_;
+ }
+ if (optimistic_txn_ != nullptr) {
+ delete optimistic_txn_;
+ }
+}
+
+bool RandomTransactionInserter::TransactionDBInsert(
+ TransactionDB* db, const TransactionOptions& txn_options) {
+ txn_ = db->BeginTransaction(write_options_, txn_options, txn_);
+
+ std::hash<std::thread::id> hasher;
+ char name[64];
+ snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%" PRIu64,
+ hasher(std::this_thread::get_id()), txn_id_++);
+ assert(strlen(name) < 64 - 1);
+ assert(txn_->SetName(name).ok());
+
+ // Take a snapshot if set_snapshot was not set or with 50% change otherwise
+ bool take_snapshot = txn_->GetSnapshot() == nullptr || rand_->OneIn(2);
+ if (take_snapshot) {
+ txn_->SetSnapshot();
+ read_options_.snapshot = txn_->GetSnapshot();
+ }
+ auto res = DoInsert(db, txn_, false);
+ if (take_snapshot) {
+ read_options_.snapshot = nullptr;
+ }
+ return res;
+}
+
+bool RandomTransactionInserter::OptimisticTransactionDBInsert(
+ OptimisticTransactionDB* db,
+ const OptimisticTransactionOptions& txn_options) {
+ optimistic_txn_ =
+ db->BeginTransaction(write_options_, txn_options, optimistic_txn_);
+
+ return DoInsert(db, optimistic_txn_, true);
+}
+
+bool RandomTransactionInserter::DBInsert(DB* db) {
+ return DoInsert(db, nullptr, false);
+}
+
+Status RandomTransactionInserter::DBGet(
+ DB* db, Transaction* txn, ReadOptions& read_options, uint16_t set_i,
+ uint64_t ikey, bool get_for_update, uint64_t* int_value,
+ std::string* full_key, bool* unexpected_error) {
+ Status s;
+ // Five digits (since the largest uint16_t is 65535) plus the NUL
+ // end char.
+ char prefix_buf[6] = {0};
+ // Pad prefix appropriately so we can iterate over each set
+ assert(set_i + 1 <= 9999);
+ snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
+ // key format: [SET#][random#]
+ std::string skey = std::to_string(ikey);
+ Slice base_key(skey);
+ *full_key = std::string(prefix_buf) + base_key.ToString();
+ Slice key(*full_key);
+
+ std::string value;
+ if (txn != nullptr) {
+ if (get_for_update) {
+ s = txn->GetForUpdate(read_options, key, &value);
+ } else {
+ s = txn->Get(read_options, key, &value);
+ }
+ } else {
+ s = db->Get(read_options, key, &value);
+ }
+
+ if (s.ok()) {
+ // Found key, parse its value
+ *int_value = std::stoull(value);
+ if (*int_value == 0 || *int_value == ULONG_MAX) {
+ *unexpected_error = true;
+ fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str());
+ s = Status::Corruption();
+ }
+ } else if (s.IsNotFound()) {
+ // Have not yet written to this key, so assume its value is 0
+ *int_value = 0;
+ s = Status::OK();
+ }
+ return s;
+}
+
+bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
+ bool is_optimistic) {
+ Status s;
+ WriteBatch batch;
+
+ // pick a random number to use to increment a key in each set
+ uint64_t incr = (rand_->Next() % 100) + 1;
+ bool unexpected_error = false;
+
+ std::vector<uint16_t> set_vec(num_sets_);
+ std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
+ RandomShuffle(set_vec.begin(), set_vec.end());
+
+ // For each set, pick a key at random and increment it
+ for (uint16_t set_i : set_vec) {
+ uint64_t int_value = 0;
+ std::string full_key;
+ uint64_t rand_key = rand_->Next() % num_keys_;
+ const bool get_for_update = txn ? rand_->OneIn(2) : false;
+ s = DBGet(db, txn, read_options_, set_i, rand_key, get_for_update,
+ &int_value, &full_key, &unexpected_error);
+ Slice key(full_key);
+ if (!s.ok()) {
+ // Optimistic transactions should never return non-ok status here.
+ // Non-optimistic transactions may return write-coflict/timeout errors.
+ if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
+ fprintf(stderr, "Get returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ unexpected_error = true;
+ }
+ break;
+ }
+
+ if (s.ok()) {
+ // Increment key
+ std::string sum = std::to_string(int_value + incr);
+ if (txn != nullptr) {
+ if ((set_i % 4) != 0) {
+ s = txn->SingleDelete(key);
+ } else {
+ s = txn->Delete(key);
+ }
+ if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) {
+ // If the initial get was not for update, then the key is not locked
+ // before put and put could fail due to concurrent writes.
+ break;
+ } else if (!s.ok()) {
+ // Since we did a GetForUpdate, SingleDelete should not fail.
+ fprintf(stderr, "SingleDelete returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ unexpected_error = true;
+ }
+ s = txn->Put(key, sum);
+ if (!s.ok()) {
+ // Since we did a GetForUpdate, Put should not fail.
+ fprintf(stderr, "Put returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ unexpected_error = true;
+ }
+ } else {
+ batch.Put(key, sum);
+ }
+ bytes_inserted_ += key.size() + sum.size();
+ }
+ if (txn != nullptr) {
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
+ "Insert (%s) %s snap: %" PRIu64 " key:%s value: %" PRIu64
+ "+%" PRIu64 "=%" PRIu64,
+ txn->GetName().c_str(), s.ToString().c_str(),
+ txn->GetSnapshot()->GetSequenceNumber(), full_key.c_str(),
+ int_value, incr, int_value + incr);
+ }
+ }
+
+ if (s.ok()) {
+ if (txn != nullptr) {
+ bool with_prepare = !is_optimistic && !rand_->OneIn(10);
+ if (with_prepare) {
+ // Also try commit without prepare
+ s = txn->Prepare();
+ if (!s.ok()) {
+ fprintf(stderr, "Prepare returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ }
+ assert(s.ok());
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
+ "Prepare of %" PRIu64 " %s (%s)", txn->GetId(),
+ s.ToString().c_str(), txn->GetName().c_str());
+ if (rand_->OneIn(20)) {
+ // This currently only tests the mechanics of writing commit time
+ // write batch so the exact values would not matter.
+ s = txn_->GetCommitTimeWriteBatch()->Put("cat", "dog");
+ assert(s.ok());
+ }
+ db->GetDBOptions().env->SleepForMicroseconds(
+ static_cast<int>(cmt_delay_ms_ * 1000));
+ }
+ if (!rand_->OneIn(20)) {
+ s = txn->Commit();
+ assert(!with_prepare || s.ok());
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
+ "Commit of %" PRIu64 " %s (%s)", txn->GetId(),
+ s.ToString().c_str(), txn->GetName().c_str());
+ } else {
+ // Also try 5% rollback
+ s = txn->Rollback();
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
+ "Rollback %" PRIu64 " %s %s", txn->GetId(),
+ txn->GetName().c_str(), s.ToString().c_str());
+ assert(s.ok());
+ }
+ assert(is_optimistic || s.ok());
+
+ if (!s.ok()) {
+ if (is_optimistic) {
+ // Optimistic transactions can have write-conflict errors on commit.
+ // Any other error is unexpected.
+ if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
+ unexpected_error = true;
+ }
+ } else {
+ // Non-optimistic transactions should only fail due to expiration
+ // or write failures. For testing purproses, we do not expect any
+ // write failures.
+ if (!s.IsExpired()) {
+ unexpected_error = true;
+ }
+ }
+
+ if (unexpected_error) {
+ fprintf(stderr, "Commit returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ }
+ }
+ } else {
+ s = db->Write(write_options_, &batch);
+ if (!s.ok()) {
+ unexpected_error = true;
+ fprintf(stderr, "Write returned an unexpected error: %s\n",
+ s.ToString().c_str());
+ }
+ }
+ } else {
+ if (txn != nullptr) {
+ assert(txn->Rollback().ok());
+ ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, "Error %s for txn %s",
+ s.ToString().c_str(), txn->GetName().c_str());
+ }
+ }
+
+ if (s.ok()) {
+ success_count_++;
+ } else {
+ failure_count_++;
+ }
+
+ last_status_ = s;
+
+ // return success if we didn't get any unexpected errors
+ return !unexpected_error;
+}
+
+// Verify that the sum of the keys in each set are equal
+Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
+ uint64_t num_keys_per_set,
+ bool take_snapshot, Random64* rand,
+ uint64_t delay_ms) {
+ // delay_ms is the delay between taking a snapshot and doing the reads. It
+ // emulates reads from a long-running backup job.
+ assert(delay_ms == 0 || take_snapshot);
+ uint64_t prev_total = 0;
+ uint32_t prev_i = 0;
+ bool prev_assigned = false;
+
+ ReadOptions roptions;
+ if (take_snapshot) {
+ roptions.snapshot = db->GetSnapshot();
+ db->GetDBOptions().env->SleepForMicroseconds(
+ static_cast<int>(delay_ms * 1000));
+ }
+
+ std::vector<uint16_t> set_vec(num_sets);
+ std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
+ RandomShuffle(set_vec.begin(), set_vec.end());
+
+ // For each set of keys with the same prefix, sum all the values
+ for (uint16_t set_i : set_vec) {
+ // Five digits (since the largest uint16_t is 65535) plus the NUL
+ // end char.
+ char prefix_buf[6];
+ assert(set_i + 1 <= 9999);
+ snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
+ uint64_t total = 0;
+
+ // Use either point lookup or iterator. Point lookups are slower so we use
+ // it less often.
+ const bool use_point_lookup =
+ num_keys_per_set != 0 && rand && rand->OneIn(10);
+ if (use_point_lookup) {
+ ReadOptions read_options;
+ for (uint64_t k = 0; k < num_keys_per_set; k++) {
+ std::string dont_care;
+ uint64_t int_value = 0;
+ bool unexpected_error = false;
+ const bool FOR_UPDATE = false;
+ Status s = DBGet(db, nullptr, roptions, set_i, k, FOR_UPDATE,
+ &int_value, &dont_care, &unexpected_error);
+ assert(s.ok());
+ assert(!unexpected_error);
+ total += int_value;
+ }
+ } else { // user iterators
+ Iterator* iter = db->NewIterator(roptions);
+ for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) {
+ Slice key = iter->key();
+ // stop when we reach a different prefix
+ if (key.ToString().compare(0, 4, prefix_buf) != 0) {
+ break;
+ }
+ Slice value = iter->value();
+ uint64_t int_value = std::stoull(value.ToString());
+ if (int_value == 0 || int_value == ULONG_MAX) {
+ fprintf(stderr, "Iter returned unexpected value: %s\n",
+ value.ToString().c_str());
+ return Status::Corruption();
+ }
+ ROCKS_LOG_DEBUG(
+ db->GetDBOptions().info_log,
+ "VerifyRead at %" PRIu64 " (%" PRIu64 "): %.*s value: %" PRIu64,
+ roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul,
+ roptions.snapshot
+ ? ((SnapshotImpl*)roptions.snapshot)->min_uncommitted_
+ : 0ul,
+ static_cast<int>(key.size()), key.data(), int_value);
+ total += int_value;
+ }
+ iter->status().PermitUncheckedError();
+ delete iter;
+ }
+
+ if (prev_assigned && total != prev_total) {
+ db->GetDBOptions().info_log->Flush();
+ fprintf(stdout,
+ "RandomTransactionVerify found inconsistent totals using "
+ "pointlookup? %d "
+ "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64
+ " at snapshot %" PRIu64 "\n",
+ use_point_lookup, prev_i, prev_total, set_i, total,
+ roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
+ fflush(stdout);
+ return Status::Corruption();
+ } else {
+ ROCKS_LOG_DEBUG(
+ db->GetDBOptions().info_log,
+ "RandomTransactionVerify pass pointlookup? %d total: %" PRIu64
+ " snap: %" PRIu64,
+ use_point_lookup, total,
+ roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
+ }
+ prev_total = total;
+ prev_i = set_i;
+ prev_assigned = true;
+ }
+ if (take_snapshot) {
+ db->ReleaseSnapshot(roptions.snapshot);
+ }
+
+ return Status::OK();
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/test_util/transaction_test_util.h b/src/rocksdb/test_util/transaction_test_util.h
new file mode 100644
index 000000000..7a38ab626
--- /dev/null
+++ b/src/rocksdb/test_util/transaction_test_util.h
@@ -0,0 +1,149 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include "port/port.h"
+#include "rocksdb/options.h"
+#include "rocksdb/utilities/optimistic_transaction_db.h"
+#include "rocksdb/utilities/transaction_db.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class DB;
+class Random64;
+
+// Utility class for stress testing transactions. Can be used to write many
+// transactions in parallel and then validate that the data written is logically
+// consistent. This class assumes the input DB is initially empty.
+//
+// Each call to TransactionDBInsert()/OptimisticTransactionDBInsert() will
+// increment the value of a key in #num_sets sets of keys. Regardless of
+// whether the transaction succeeds, the total sum of values of keys in each
+// set is an invariant that should remain equal.
+//
+// After calling TransactionDBInsert()/OptimisticTransactionDBInsert() many
+// times, Verify() can be called to validate that the invariant holds.
+//
+// To test writing Transaction in parallel, multiple threads can create a
+// RandomTransactionInserter with similar arguments using the same DB.
+class RandomTransactionInserter {
+ public:
+ static bool RollbackDeletionTypeCallback(const Slice& key) {
+ // These are hard-coded atm. See how RandomTransactionInserter::DoInsert()
+ // determines whether to use SingleDelete or Delete for a key.
+ assert(key.size() >= 4);
+ const char* ptr = key.data();
+ assert(ptr);
+ while (ptr && ptr < key.data() + 4 && *ptr == '0') {
+ ++ptr;
+ }
+ std::string prefix(ptr, 4 - (ptr - key.data()));
+ unsigned long set_i = std::stoul(prefix);
+ assert(set_i > 0);
+ assert(set_i <= 9999);
+ --set_i;
+ return ((set_i % 4) != 0);
+ }
+
+ // num_keys is the number of keys in each set.
+ // num_sets is the number of sets of keys.
+ // cmt_delay_ms is the delay between prepare (if there is any) and commit
+ // first_id is the id of the first transaction
+ explicit RandomTransactionInserter(
+ Random64* rand, const WriteOptions& write_options = WriteOptions(),
+ const ReadOptions& read_options = ReadOptions(), uint64_t num_keys = 1000,
+ uint16_t num_sets = 3, const uint64_t cmt_delay_ms = 0,
+ const uint64_t first_id = 0);
+
+ ~RandomTransactionInserter();
+
+ // Increment a key in each set using a Transaction on a TransactionDB.
+ //
+ // Returns true if the transaction succeeded OR if any error encountered was
+ // expected (eg a write-conflict). Error status may be obtained by calling
+ // GetLastStatus();
+ bool TransactionDBInsert(
+ TransactionDB* db,
+ const TransactionOptions& txn_options = TransactionOptions());
+
+ // Increment a key in each set using a Transaction on an
+ // OptimisticTransactionDB
+ //
+ // Returns true if the transaction succeeded OR if any error encountered was
+ // expected (eg a write-conflict). Error status may be obtained by calling
+ // GetLastStatus();
+ bool OptimisticTransactionDBInsert(
+ OptimisticTransactionDB* db,
+ const OptimisticTransactionOptions& txn_options =
+ OptimisticTransactionOptions());
+ // Increment a key in each set without using a transaction. If this function
+ // is called in parallel, then Verify() may fail.
+ //
+ // Returns true if the write succeeds.
+ // Error status may be obtained by calling GetLastStatus().
+ bool DBInsert(DB* db);
+
+ // Get the ikey'th key from set set_i
+ static Status DBGet(DB* db, Transaction* txn, ReadOptions& read_options,
+ uint16_t set_i, uint64_t ikey, bool get_for_update,
+ uint64_t* int_value, std::string* full_key,
+ bool* unexpected_error);
+
+ // Returns OK if Invariant is true.
+ static Status Verify(DB* db, uint16_t num_sets, uint64_t num_keys_per_set = 0,
+ bool take_snapshot = false, Random64* rand = nullptr,
+ uint64_t delay_ms = 0);
+
+ // Returns the status of the previous Insert operation
+ Status GetLastStatus() { return last_status_; }
+
+ // Returns the number of successfully written calls to
+ // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert
+ uint64_t GetSuccessCount() { return success_count_; }
+
+ // Returns the number of calls to
+ // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert that did not
+ // write any data.
+ uint64_t GetFailureCount() { return failure_count_; }
+
+ // Returns the sum of user keys/values Put() to the DB.
+ size_t GetBytesInserted() { return bytes_inserted_; }
+
+ private:
+ // Input options
+ Random64* rand_;
+ const WriteOptions write_options_;
+ ReadOptions read_options_;
+ const uint64_t num_keys_;
+ const uint16_t num_sets_;
+
+ // Number of successful insert batches performed
+ uint64_t success_count_ = 0;
+
+ // Number of failed insert batches attempted
+ uint64_t failure_count_ = 0;
+
+ size_t bytes_inserted_ = 0;
+
+ // Status returned by most recent insert operation
+ Status last_status_;
+
+ // optimization: re-use allocated transaction objects.
+ Transaction* txn_ = nullptr;
+ Transaction* optimistic_txn_ = nullptr;
+
+ uint64_t txn_id_;
+ // The delay between ::Prepare and ::Commit
+ const uint64_t cmt_delay_ms_;
+
+ bool DoInsert(DB* db, Transaction* txn, bool is_optimistic);
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE