diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/test_util | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/test_util')
-rw-r--r-- | src/rocksdb/test_util/mock_time_env.cc | 38 | ||||
-rw-r--r-- | src/rocksdb/test_util/mock_time_env.h | 78 | ||||
-rw-r--r-- | src/rocksdb/test_util/sync_point.cc | 82 | ||||
-rw-r--r-- | src/rocksdb/test_util/sync_point.h | 180 | ||||
-rw-r--r-- | src/rocksdb/test_util/sync_point_impl.cc | 152 | ||||
-rw-r--r-- | src/rocksdb/test_util/sync_point_impl.h | 96 | ||||
-rw-r--r-- | src/rocksdb/test_util/testharness.cc | 107 | ||||
-rw-r--r-- | src/rocksdb/test_util/testharness.h | 119 | ||||
-rw-r--r-- | src/rocksdb/test_util/testutil.cc | 738 | ||||
-rw-r--r-- | src/rocksdb/test_util/testutil.h | 852 | ||||
-rw-r--r-- | src/rocksdb/test_util/testutil_test.cc | 43 | ||||
-rw-r--r-- | src/rocksdb/test_util/transaction_test_util.cc | 402 | ||||
-rw-r--r-- | src/rocksdb/test_util/transaction_test_util.h | 149 |
13 files changed, 3036 insertions, 0 deletions
diff --git a/src/rocksdb/test_util/mock_time_env.cc b/src/rocksdb/test_util/mock_time_env.cc new file mode 100644 index 000000000..23888e69e --- /dev/null +++ b/src/rocksdb/test_util/mock_time_env.cc @@ -0,0 +1,38 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/mock_time_env.h" + +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { + +// TODO: this is a workaround for the different behavior on different platform +// for timedwait timeout. Ideally timedwait API should be moved to env. +// details: PR #7101. +void MockSystemClock::InstallTimedWaitFixCallback() { +#ifndef NDEBUG + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +#ifdef OS_MACOSX + // This is an alternate way (vs. SpecialEnv) of dealing with the fact + // that on some platforms, pthread_cond_timedwait does not appear to + // release the lock for other threads to operate if the deadline time + // is already passed. (TimedWait calls are currently a bad abstraction + // because the deadline parameter is usually computed from Env time, + // but is interpreted in real clock time.) + SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast<uint64_t*>(arg); + if (time_us < this->RealNowMicros()) { + *reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX + SyncPoint::GetInstance()->EnableProcessing(); +#endif // !NDEBUG +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/mock_time_env.h b/src/rocksdb/test_util/mock_time_env.h new file mode 100644 index 000000000..7834368e0 --- /dev/null +++ b/src/rocksdb/test_util/mock_time_env.h @@ -0,0 +1,78 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <limits> + +#include "rocksdb/system_clock.h" + +namespace ROCKSDB_NAMESPACE { + +// NOTE: SpecialEnv offers most of this functionality, along with hooks +// for safe DB behavior under a mock time environment, so should be used +// instead of MockSystemClock for DB tests. +class MockSystemClock : public SystemClockWrapper { + public: + explicit MockSystemClock(const std::shared_ptr<SystemClock>& base) + : SystemClockWrapper(base) {} + + static const char* kClassName() { return "MockSystemClock"; } + const char* Name() const override { return kClassName(); } + virtual Status GetCurrentTime(int64_t* time_sec) override { + assert(time_sec != nullptr); + *time_sec = static_cast<int64_t>(current_time_us_ / kMicrosInSecond); + return Status::OK(); + } + + virtual uint64_t NowSeconds() { return current_time_us_ / kMicrosInSecond; } + + virtual uint64_t NowMicros() override { return current_time_us_; } + + virtual uint64_t NowNanos() override { + assert(current_time_us_ <= std::numeric_limits<uint64_t>::max() / 1000); + return current_time_us_ * 1000; + } + + uint64_t RealNowMicros() { return target_->NowMicros(); } + + void SetCurrentTime(uint64_t time_sec) { + assert(time_sec < std::numeric_limits<uint64_t>::max() / kMicrosInSecond); + assert(time_sec * kMicrosInSecond >= current_time_us_); + current_time_us_ = time_sec * kMicrosInSecond; + } + + // It's a fake sleep that just updates the Env current time, which is similar + // to `NoSleepEnv.SleepForMicroseconds()` and + // `SpecialEnv.MockSleepForMicroseconds()`. + // It's also similar to `set_current_time()`, which takes an absolute time in + // seconds, vs. this one takes the sleep in microseconds. + // Note: Not thread safe. + void SleepForMicroseconds(int micros) override { + assert(micros >= 0); + assert(current_time_us_ + static_cast<uint64_t>(micros) >= + current_time_us_); + current_time_us_.fetch_add(micros); + } + + void MockSleepForSeconds(int seconds) { + assert(seconds >= 0); + uint64_t micros = static_cast<uint64_t>(seconds) * kMicrosInSecond; + assert(current_time_us_ + micros >= current_time_us_); + current_time_us_.fetch_add(micros); + } + + // TODO: this is a workaround for the different behavior on different platform + // for timedwait timeout. Ideally timedwait API should be moved to env. + // details: PR #7101. + void InstallTimedWaitFixCallback(); + + private: + std::atomic<uint64_t> current_time_us_{0}; + static constexpr uint64_t kMicrosInSecond = 1000U * 1000U; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/sync_point.cc b/src/rocksdb/test_util/sync_point.cc new file mode 100644 index 000000000..bec02d4f6 --- /dev/null +++ b/src/rocksdb/test_util/sync_point.cc @@ -0,0 +1,82 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/sync_point.h" + +#include <fcntl.h> + +#include "test_util/sync_point_impl.h" + +std::vector<std::string> rocksdb_kill_exclude_prefixes; + +#ifndef NDEBUG +namespace ROCKSDB_NAMESPACE { + +SyncPoint* SyncPoint::GetInstance() { + static SyncPoint sync_point; + return &sync_point; +} + +SyncPoint::SyncPoint() : impl_(new Data) {} + +SyncPoint::~SyncPoint() { delete impl_; } + +void SyncPoint::LoadDependency(const std::vector<SyncPointPair>& dependencies) { + impl_->LoadDependency(dependencies); +} + +void SyncPoint::LoadDependencyAndMarkers( + const std::vector<SyncPointPair>& dependencies, + const std::vector<SyncPointPair>& markers) { + impl_->LoadDependencyAndMarkers(dependencies, markers); +} + +void SyncPoint::SetCallBack(const std::string& point, + const std::function<void(void*)>& callback) { + impl_->SetCallBack(point, callback); +} + +void SyncPoint::ClearCallBack(const std::string& point) { + impl_->ClearCallBack(point); +} + +void SyncPoint::ClearAllCallBacks() { impl_->ClearAllCallBacks(); } + +void SyncPoint::EnableProcessing() { impl_->EnableProcessing(); } + +void SyncPoint::DisableProcessing() { impl_->DisableProcessing(); } + +void SyncPoint::ClearTrace() { impl_->ClearTrace(); } + +void SyncPoint::Process(const Slice& point, void* cb_arg) { + impl_->Process(point, cb_arg); +} + +} // namespace ROCKSDB_NAMESPACE +#endif // NDEBUG + +namespace ROCKSDB_NAMESPACE { +void SetupSyncPointsToMockDirectIO() { +#if !defined(NDEBUG) && !defined(OS_MACOSX) && !defined(OS_WIN) && \ + !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD) + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", [&](void* arg) { + int* val = static_cast<int*>(arg); + *val &= ~O_DIRECT; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "NewRandomAccessFile:O_DIRECT", [&](void* arg) { + int* val = static_cast<int*>(arg); + *val &= ~O_DIRECT; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "NewSequentialFile:O_DIRECT", [&](void* arg) { + int* val = static_cast<int*>(arg); + *val &= ~O_DIRECT; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); +#endif +} +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/sync_point.h b/src/rocksdb/test_util/sync_point.h new file mode 100644 index 000000000..65f1239ec --- /dev/null +++ b/src/rocksdb/test_util/sync_point.h @@ -0,0 +1,180 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include <assert.h> + +#include <functional> +#include <mutex> +#include <string> +#include <thread> +#include <vector> + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/slice.h" + +#ifdef NDEBUG +// empty in release build +#define TEST_KILL_RANDOM_WITH_WEIGHT(kill_point, rocksdb_kill_odds_weight) +#define TEST_KILL_RANDOM(kill_point) +#else + +namespace ROCKSDB_NAMESPACE { + +// To avoid crashing always at some frequently executed codepaths (during +// kill random test), use this factor to reduce odds +#define REDUCE_ODDS 2 +#define REDUCE_ODDS2 4 + +// A class used to pass when a kill point is reached. +struct KillPoint { + public: + // This is only set from db_stress.cc and for testing only. + // If non-zero, kill at various points in source code with probability 1/this + int rocksdb_kill_odds = 0; + // If kill point has a prefix on this list, will skip killing. + std::vector<std::string> rocksdb_kill_exclude_prefixes; + // Kill the process with probability 1/odds for testing. + void TestKillRandom(std::string kill_point, int odds, + const std::string& srcfile, int srcline); + + static KillPoint* GetInstance(); +}; + +#define TEST_KILL_RANDOM_WITH_WEIGHT(kill_point, rocksdb_kill_odds_weight) \ + { \ + KillPoint::GetInstance()->TestKillRandom( \ + kill_point, rocksdb_kill_odds_weight, __FILE__, __LINE__); \ + } +#define TEST_KILL_RANDOM(kill_point) TEST_KILL_RANDOM_WITH_WEIGHT(kill_point, 1) +} // namespace ROCKSDB_NAMESPACE + +#endif + +#ifdef NDEBUG +#define TEST_SYNC_POINT(x) +#define TEST_IDX_SYNC_POINT(x, index) +#define TEST_SYNC_POINT_CALLBACK(x, y) +#define INIT_SYNC_POINT_SINGLETONS() +#else + +namespace ROCKSDB_NAMESPACE { + +// This class provides facility to reproduce race conditions deterministically +// in unit tests. +// Developer could specify sync points in the codebase via TEST_SYNC_POINT. +// Each sync point represents a position in the execution stream of a thread. +// In the unit test, 'Happens After' relationship among sync points could be +// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of +// threads execution. +// Refer to (DBTest,TransactionLogIteratorRace), for an example use case. + +class SyncPoint { + public: + static SyncPoint* GetInstance(); + + SyncPoint(const SyncPoint&) = delete; + SyncPoint& operator=(const SyncPoint&) = delete; + ~SyncPoint(); + + struct SyncPointPair { + std::string predecessor; + std::string successor; + }; + + // call once at the beginning of a test to setup the dependency between + // sync points + void LoadDependency(const std::vector<SyncPointPair>& dependencies); + + // call once at the beginning of a test to setup the dependency between + // sync points and setup markers indicating the successor is only enabled + // when it is processed on the same thread as the predecessor. + // When adding a marker, it implicitly adds a dependency for the marker pair. + void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies, + const std::vector<SyncPointPair>& markers); + + // The argument to the callback is passed through from + // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or + // TEST_IDX_SYNC_POINT was used. + void SetCallBack(const std::string& point, + const std::function<void(void*)>& callback); + + // Clear callback function by point + void ClearCallBack(const std::string& point); + + // Clear all call back functions. + void ClearAllCallBacks(); + + // enable sync point processing (disabled on startup) + void EnableProcessing(); + + // disable sync point processing + void DisableProcessing(); + + // remove the execution trace of all sync points + void ClearTrace(); + + // triggered by TEST_SYNC_POINT, blocking execution until all predecessors + // are executed. + // And/or call registered callback function, with argument `cb_arg` + void Process(const Slice& point, void* cb_arg = nullptr); + + // template gets length of const string at compile time, + // avoiding strlen() at runtime + template <size_t kLen> + void Process(const char (&point)[kLen], void* cb_arg = nullptr) { + static_assert(kLen > 0, "Must not be empty"); + assert(point[kLen - 1] == '\0'); + Process(Slice(point, kLen - 1), cb_arg); + } + + // TODO: it might be useful to provide a function that blocks until all + // sync points are cleared. + + // We want this to be public so we can + // subclass the implementation + struct Data; + + private: + // Singleton + SyncPoint(); + Data* impl_; +}; + +// Sets up sync points to mock direct IO instead of actually issuing direct IO +// to the file system. +void SetupSyncPointsToMockDirectIO(); +} // namespace ROCKSDB_NAMESPACE + +// Use TEST_SYNC_POINT to specify sync points inside code base. +// Sync points can have happens-after dependency on other sync points, +// configured at runtime via SyncPoint::LoadDependency. This could be +// utilized to re-produce race conditions between threads. +// See TransactionLogIteratorRace in db_test.cc for an example use case. +// TEST_SYNC_POINT is no op in release build. +#define TEST_SYNC_POINT(x) \ + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x) +#define TEST_IDX_SYNC_POINT(x, index) \ + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x + \ + std::to_string(index)) +#define TEST_SYNC_POINT_CALLBACK(x, y) \ + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->Process(x, y) +#define INIT_SYNC_POINT_SINGLETONS() \ + (void)ROCKSDB_NAMESPACE::SyncPoint::GetInstance(); +#endif // NDEBUG + +// Callback sync point for any read IO errors that should be ignored by +// the fault injection framework +// Disable in release mode +#ifdef NDEBUG +#define IGNORE_STATUS_IF_ERROR(_status_) +#else +#define IGNORE_STATUS_IF_ERROR(_status_) \ + { \ + if (!_status_.ok()) { \ + TEST_SYNC_POINT("FaultInjectionIgnoreError"); \ + } \ + } +#endif // NDEBUG diff --git a/src/rocksdb/test_util/sync_point_impl.cc b/src/rocksdb/test_util/sync_point_impl.cc new file mode 100644 index 000000000..2a4bd3ccd --- /dev/null +++ b/src/rocksdb/test_util/sync_point_impl.cc @@ -0,0 +1,152 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/sync_point_impl.h" + +#ifndef NDEBUG +namespace ROCKSDB_NAMESPACE { +KillPoint* KillPoint::GetInstance() { + static KillPoint kp; + return &kp; +} + +void KillPoint::TestKillRandom(std::string kill_point, int odds_weight, + const std::string& srcfile, int srcline) { + if (rocksdb_kill_odds <= 0) { + return; + } + int odds = rocksdb_kill_odds * odds_weight; + for (auto& p : rocksdb_kill_exclude_prefixes) { + if (kill_point.substr(0, p.length()) == p) { + return; + } + } + + assert(odds > 0); + if (odds % 7 == 0) { + // class Random uses multiplier 16807, which is 7^5. If odds are + // multiplier of 7, there might be limited values generated. + odds++; + } + auto* r = Random::GetTLSInstance(); + bool crash = r->OneIn(odds); + if (crash) { + port::Crash(srcfile, srcline); + } +} + +void SyncPoint::Data::LoadDependency( + const std::vector<SyncPointPair>& dependencies) { + std::lock_guard<std::mutex> lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + point_filter_.Add(dependency.successor); + point_filter_.Add(dependency.predecessor); + } + cv_.notify_all(); +} + +void SyncPoint::Data::LoadDependencyAndMarkers( + const std::vector<SyncPointPair>& dependencies, + const std::vector<SyncPointPair>& markers) { + std::lock_guard<std::mutex> lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + markers_.clear(); + marked_thread_id_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + point_filter_.Add(dependency.successor); + point_filter_.Add(dependency.predecessor); + } + for (const auto& marker : markers) { + successors_[marker.predecessor].push_back(marker.successor); + predecessors_[marker.successor].push_back(marker.predecessor); + markers_[marker.predecessor].push_back(marker.successor); + point_filter_.Add(marker.predecessor); + point_filter_.Add(marker.successor); + } + cv_.notify_all(); +} + +bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) { + for (const auto& pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::Data::ClearCallBack(const std::string& point) { + std::unique_lock<std::mutex> lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.erase(point); +} + +void SyncPoint::Data::ClearAllCallBacks() { + std::unique_lock<std::mutex> lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.clear(); +} + +void SyncPoint::Data::Process(const Slice& point, void* cb_arg) { + if (!enabled_) { + return; + } + + // Use a filter to prevent mutex lock if possible. + if (!point_filter_.MayContain(point)) { + return; + } + + // Must convert to std::string for remaining work. Take + // heap hit. + std::string point_string(point.ToString()); + std::unique_lock<std::mutex> lock(mutex_); + auto thread_id = std::this_thread::get_id(); + + auto marker_iter = markers_.find(point_string); + if (marker_iter != markers_.end()) { + for (auto& marked_point : marker_iter->second) { + marked_thread_id_.emplace(marked_point, thread_id); + point_filter_.Add(marked_point); + } + } + + if (DisabledByMarker(point_string, thread_id)) { + return; + } + + while (!PredecessorsAllCleared(point_string)) { + cv_.wait(lock); + if (DisabledByMarker(point_string, thread_id)) { + return; + } + } + + auto callback_pair = callbacks_.find(point_string); + if (callback_pair != callbacks_.end()) { + num_callbacks_running_++; + mutex_.unlock(); + callback_pair->second(cb_arg); + mutex_.lock(); + num_callbacks_running_--; + } + cleared_points_.insert(point_string); + cv_.notify_all(); +} +} // namespace ROCKSDB_NAMESPACE +#endif diff --git a/src/rocksdb/test_util/sync_point_impl.h b/src/rocksdb/test_util/sync_point_impl.h new file mode 100644 index 000000000..64cc0445e --- /dev/null +++ b/src/rocksdb/test_util/sync_point_impl.h @@ -0,0 +1,96 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <assert.h> + +#include <atomic> +#include <condition_variable> +#include <functional> +#include <mutex> +#include <string> +#include <thread> +#include <unordered_map> +#include <unordered_set> + +#include "memory/concurrent_arena.h" +#include "port/port.h" +#include "test_util/sync_point.h" +#include "util/dynamic_bloom.h" +#include "util/random.h" + +#pragma once + +#ifndef NDEBUG +namespace ROCKSDB_NAMESPACE { +// A hacky allocator for single use. +// Arena depends on SyncPoint and create circular dependency. +class SingleAllocator : public Allocator { + public: + char* Allocate(size_t) override { + assert(false); + return nullptr; + } + char* AllocateAligned(size_t bytes, size_t, Logger*) override { + buf_.resize(bytes); + return const_cast<char*>(buf_.data()); + } + size_t BlockSize() const override { + assert(false); + return 0; + } + + private: + std::string buf_; +}; + +struct SyncPoint::Data { + Data() : point_filter_(&alloc_, /*total_bits=*/8192), enabled_(false) {} + // Enable proper deletion by subclasses + virtual ~Data() {} + // successor/predecessor map loaded from LoadDependency + std::unordered_map<std::string, std::vector<std::string>> successors_; + std::unordered_map<std::string, std::vector<std::string>> predecessors_; + std::unordered_map<std::string, std::function<void(void*)>> callbacks_; + std::unordered_map<std::string, std::vector<std::string>> markers_; + std::unordered_map<std::string, std::thread::id> marked_thread_id_; + + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set<std::string> cleared_points_; + SingleAllocator alloc_; + // A filter before holding mutex to speed up process. + DynamicBloom point_filter_; + std::atomic<bool> enabled_; + int num_callbacks_running_ = 0; + + void LoadDependency(const std::vector<SyncPointPair>& dependencies); + void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies, + const std::vector<SyncPointPair>& markers); + bool PredecessorsAllCleared(const std::string& point); + void SetCallBack(const std::string& point, + const std::function<void(void*)>& callback) { + std::lock_guard<std::mutex> lock(mutex_); + callbacks_[point] = callback; + point_filter_.Add(point); + } + + void ClearCallBack(const std::string& point); + void ClearAllCallBacks(); + void EnableProcessing() { enabled_ = true; } + void DisableProcessing() { enabled_ = false; } + void ClearTrace() { + std::lock_guard<std::mutex> lock(mutex_); + cleared_points_.clear(); + } + bool DisabledByMarker(const std::string& point, std::thread::id thread_id) { + auto marked_point_iter = marked_thread_id_.find(point); + return marked_point_iter != marked_thread_id_.end() && + thread_id != marked_point_iter->second; + } + void Process(const Slice& point, void* cb_arg); +}; +} // namespace ROCKSDB_NAMESPACE +#endif // NDEBUG diff --git a/src/rocksdb/test_util/testharness.cc b/src/rocksdb/test_util/testharness.cc new file mode 100644 index 000000000..3c7b835d2 --- /dev/null +++ b/src/rocksdb/test_util/testharness.cc @@ -0,0 +1,107 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "test_util/testharness.h" + +#include <regex> +#include <string> +#include <thread> + +namespace ROCKSDB_NAMESPACE { +namespace test { + +#ifdef OS_WIN +#include <windows.h> + +std::string GetPidStr() { return std::to_string(GetCurrentProcessId()); } +#else +std::string GetPidStr() { return std::to_string(getpid()); } +#endif + +::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s) { + if (s.ok()) { + return ::testing::AssertionSuccess(); + } else { + return ::testing::AssertionFailure() << s_expr << std::endl << s.ToString(); + } +} + +std::string TmpDir(Env* env) { + std::string dir; + Status s = env->GetTestDirectory(&dir); + EXPECT_OK(s); + return dir; +} + +std::string PerThreadDBPath(std::string dir, std::string name) { + size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id()); + return dir + "/" + name + "_" + GetPidStr() + "_" + std::to_string(tid); +} + +std::string PerThreadDBPath(std::string name) { + return PerThreadDBPath(test::TmpDir(), name); +} + +std::string PerThreadDBPath(Env* env, std::string name) { + return PerThreadDBPath(test::TmpDir(env), name); +} + +int RandomSeed() { + const char* env = getenv("TEST_RANDOM_SEED"); + int result = (env != nullptr ? atoi(env) : 301); + if (result <= 0) { + result = 301; + } + return result; +} + +TestRegex::TestRegex(const std::string& pattern) + : impl_(std::make_shared<Impl>(pattern)), pattern_(pattern) {} +TestRegex::TestRegex(const char* pattern) + : impl_(std::make_shared<Impl>(pattern)), pattern_(pattern) {} + +const std::string& TestRegex::GetPattern() const { return pattern_; } + +class TestRegex::Impl : public std::regex { + public: + using std::regex::basic_regex; +}; + +bool TestRegex::Matches(const std::string& str) const { + if (impl_) { + return std::regex_match(str, *impl_); + } else { + // Should not call Matches on unset Regex + assert(false); + return false; + } +} + +::testing::AssertionResult AssertMatchesRegex(const char* str_expr, + const char* pattern_expr, + const std::string& str, + const TestRegex& pattern) { + if (pattern.Matches(str)) { + return ::testing::AssertionSuccess(); + } else if (TestRegex("\".*\"").Matches(pattern_expr)) { + // constant regex string + return ::testing::AssertionFailure() + << str << " (" << str_expr << ")" << std::endl + << "does not match regex " << pattern.GetPattern(); + } else { + // runtime regex string + return ::testing::AssertionFailure() + << str << " (" << str_expr << ")" << std::endl + << "does not match regex" << std::endl + << pattern.GetPattern() << " (" << pattern_expr << ")"; + } +} + +} // namespace test +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/testharness.h b/src/rocksdb/test_util/testharness.h new file mode 100644 index 000000000..69018629a --- /dev/null +++ b/src/rocksdb/test_util/testharness.h @@ -0,0 +1,119 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#ifdef OS_AIX +#include "gtest/gtest.h" +#else +#include <gtest/gtest.h> +#endif + +// A "skipped" test has a specific meaning in Facebook infrastructure: the +// test is in good shape and should be run, but something about the +// compilation or execution environment means the test cannot be run. +// Specifically, there is a hole in intended testing if any +// parameterization of a test (e.g. Foo/FooTest.Bar/42) is skipped for all +// tested build configurations/platforms/etc. +// +// If GTEST_SKIP is available, use it. Otherwise, define skip as success. +// +// The GTEST macros do not seem to print the message, even with -verbose, +// so these print to stderr. Note that these do not exit the test themselves; +// calling code should 'return' or similar from the test. +#ifdef GTEST_SKIP_ +#define ROCKSDB_GTEST_SKIP(m) \ + do { \ + fputs("SKIPPED: " m "\n", stderr); \ + GTEST_SKIP_(m); \ + } while (false) /* user ; */ +#else +#define ROCKSDB_GTEST_SKIP(m) \ + do { \ + fputs("SKIPPED: " m "\n", stderr); \ + GTEST_SUCCESS_("SKIPPED: " m); \ + } while (false) /* user ; */ +#endif + +// We add "bypass" as an alternative to ROCKSDB_GTEST_SKIP that is allowed to +// be a permanent condition, e.g. for intentionally omitting or disabling some +// parameterizations for some tests. (Use _DISABLED at the end of the test +// name to disable an entire test.) +#define ROCKSDB_GTEST_BYPASS(m) \ + do { \ + fputs("BYPASSED: " m "\n", stderr); \ + GTEST_SUCCESS_("BYPASSED: " m); \ + } while (false) /* user ; */ + +#include <string> + +#include "port/stack_trace.h" +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { +namespace test { + +// Return the directory to use for temporary storage. +std::string TmpDir(Env* env = Env::Default()); + +// A path unique within the thread +std::string PerThreadDBPath(std::string name); +std::string PerThreadDBPath(Env* env, std::string name); +std::string PerThreadDBPath(std::string dir, std::string name); + +// Return a randomization seed for this run. Typically returns the +// same number on repeated invocations of this binary, but automated +// runs may be able to vary the seed. +int RandomSeed(); + +::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s); + +#define ASSERT_OK(s) \ + ASSERT_PRED_FORMAT1(ROCKSDB_NAMESPACE::test::AssertStatus, s) +#define ASSERT_NOK(s) ASSERT_FALSE((s).ok()) +#define EXPECT_OK(s) \ + EXPECT_PRED_FORMAT1(ROCKSDB_NAMESPACE::test::AssertStatus, s) +#define EXPECT_NOK(s) EXPECT_FALSE((s).ok()) + +// Useful for testing +// * No need to deal with Status like in Regex public API +// * No triggering lint reports on use of std::regex in tests +// * Available in LITE (unlike public API) +class TestRegex { + public: + // These throw on bad pattern + /*implicit*/ TestRegex(const std::string& pattern); + /*implicit*/ TestRegex(const char* pattern); + + // Checks that the whole of str is matched by this regex + bool Matches(const std::string& str) const; + + const std::string& GetPattern() const; + + private: + class Impl; + std::shared_ptr<Impl> impl_; // shared_ptr for simple implementation + std::string pattern_; +}; + +::testing::AssertionResult AssertMatchesRegex(const char* str_expr, + const char* pattern_expr, + const std::string& str, + const TestRegex& pattern); + +#define ASSERT_MATCHES_REGEX(str, pattern) \ + ASSERT_PRED_FORMAT2(ROCKSDB_NAMESPACE::test::AssertMatchesRegex, str, pattern) +#define EXPECT_MATCHES_REGEX(str, pattern) \ + EXPECT_PRED_FORMAT2(ROCKSDB_NAMESPACE::test::AssertMatchesRegex, str, pattern) + +} // namespace test + +using test::TestRegex; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/testutil.cc b/src/rocksdb/test_util/testutil.cc new file mode 100644 index 000000000..5e1b909f9 --- /dev/null +++ b/src/rocksdb/test_util/testutil.cc @@ -0,0 +1,738 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "test_util/testutil.h" + +#include <fcntl.h> +#include <sys/stat.h> + +#include <array> +#include <cctype> +#include <fstream> +#include <sstream> + +#include "db/memtable_list.h" +#include "env/composite_env_wrapper.h" +#include "file/random_access_file_reader.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" +#include "port/port.h" +#include "rocksdb/convenience.h" +#include "rocksdb/system_clock.h" +#include "rocksdb/utilities/object_registry.h" +#include "test_util/mock_time_env.h" +#include "test_util/sync_point.h" +#include "util/random.h" + +#ifndef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS +void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {} +#endif + +namespace ROCKSDB_NAMESPACE { +namespace test { + +const uint32_t kDefaultFormatVersion = BlockBasedTableOptions().format_version; +const std::set<uint32_t> kFooterFormatVersionsToTest{ + 5U, + // In case any interesting future changes + kDefaultFormatVersion, + kLatestFormatVersion, +}; + +std::string RandomKey(Random* rnd, int len, RandomKeyType type) { + // Make sure to generate a wide variety of characters so we + // test the boundary conditions for short-key optimizations. + static const char kTestChars[] = {'\0', '\1', 'a', 'b', 'c', + 'd', 'e', '\xfd', '\xfe', '\xff'}; + std::string result; + for (int i = 0; i < len; i++) { + std::size_t indx = 0; + switch (type) { + case RandomKeyType::RANDOM: + indx = rnd->Uniform(sizeof(kTestChars)); + break; + case RandomKeyType::LARGEST: + indx = sizeof(kTestChars) - 1; + break; + case RandomKeyType::MIDDLE: + indx = sizeof(kTestChars) / 2; + break; + case RandomKeyType::SMALLEST: + indx = 0; + break; + } + result += kTestChars[indx]; + } + return result; +} + +extern Slice CompressibleString(Random* rnd, double compressed_fraction, + int len, std::string* dst) { + int raw = static_cast<int>(len * compressed_fraction); + if (raw < 1) raw = 1; + std::string raw_data = rnd->RandomString(raw); + + // Duplicate the random data until we have filled "len" bytes + dst->clear(); + while (dst->size() < (unsigned int)len) { + dst->append(raw_data); + } + dst->resize(len); + return Slice(*dst); +} + +namespace { +class Uint64ComparatorImpl : public Comparator { + public: + Uint64ComparatorImpl() {} + + const char* Name() const override { return "rocksdb.Uint64Comparator"; } + + int Compare(const Slice& a, const Slice& b) const override { + assert(a.size() == sizeof(uint64_t) && b.size() == sizeof(uint64_t)); + const uint64_t* left = reinterpret_cast<const uint64_t*>(a.data()); + const uint64_t* right = reinterpret_cast<const uint64_t*>(b.data()); + uint64_t leftValue; + uint64_t rightValue; + GetUnaligned(left, &leftValue); + GetUnaligned(right, &rightValue); + if (leftValue == rightValue) { + return 0; + } else if (leftValue < rightValue) { + return -1; + } else { + return 1; + } + } + + void FindShortestSeparator(std::string* /*start*/, + const Slice& /*limit*/) const override { + return; + } + + void FindShortSuccessor(std::string* /*key*/) const override { return; } +}; +} // namespace + +const Comparator* Uint64Comparator() { + static Uint64ComparatorImpl uint64comp; + return &uint64comp; +} + +const Comparator* BytewiseComparatorWithU64TsWrapper() { + ConfigOptions config_options; + const Comparator* user_comparator = nullptr; + Status s = Comparator::CreateFromString( + config_options, "leveldb.BytewiseComparator.u64ts", &user_comparator); + s.PermitUncheckedError(); + return user_comparator; +} + +void CorruptKeyType(InternalKey* ikey) { + std::string keystr = ikey->Encode().ToString(); + keystr[keystr.size() - 8] = kTypeLogData; + ikey->DecodeFrom(Slice(keystr.data(), keystr.size())); +} + +std::string KeyStr(const std::string& user_key, const SequenceNumber& seq, + const ValueType& t, bool corrupt) { + InternalKey k(user_key, seq, t); + if (corrupt) { + CorruptKeyType(&k); + } + return k.Encode().ToString(); +} + +std::string KeyStr(uint64_t ts, const std::string& user_key, + const SequenceNumber& seq, const ValueType& t, + bool corrupt) { + std::string user_key_with_ts(user_key); + std::string ts_str; + PutFixed64(&ts_str, ts); + user_key_with_ts.append(ts_str); + return KeyStr(user_key_with_ts, seq, t, corrupt); +} + +bool SleepingBackgroundTask::TimedWaitUntilSleeping(uint64_t wait_time) { + auto abs_time = SystemClock::Default()->NowMicros() + wait_time; + MutexLock l(&mutex_); + while (!sleeping_ || !should_sleep_) { + if (bg_cv_.TimedWait(abs_time)) { + return true; + } + } + return false; +} + +bool SleepingBackgroundTask::TimedWaitUntilDone(uint64_t wait_time) { + auto abs_time = SystemClock::Default()->NowMicros() + wait_time; + MutexLock l(&mutex_); + while (!done_with_sleep_) { + if (bg_cv_.TimedWait(abs_time)) { + return true; + } + } + return false; +} + +std::string RandomName(Random* rnd, const size_t len) { + std::stringstream ss; + for (size_t i = 0; i < len; ++i) { + ss << static_cast<char>(rnd->Uniform(26) + 'a'); + } + return ss.str(); +} + +CompressionType RandomCompressionType(Random* rnd) { + auto ret = static_cast<CompressionType>(rnd->Uniform(6)); + while (!CompressionTypeSupported(ret)) { + ret = static_cast<CompressionType>((static_cast<int>(ret) + 1) % 6); + } + return ret; +} + +void RandomCompressionTypeVector(const size_t count, + std::vector<CompressionType>* types, + Random* rnd) { + types->clear(); + for (size_t i = 0; i < count; ++i) { + types->emplace_back(RandomCompressionType(rnd)); + } +} + +const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined) { + int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4); + switch (random_num) { + case 0: + return NewFixedPrefixTransform(rnd->Uniform(20) + 1); + case 1: + return NewCappedPrefixTransform(rnd->Uniform(20) + 1); + case 2: + return NewNoopTransform(); + default: + return nullptr; + } +} + +BlockBasedTableOptions RandomBlockBasedTableOptions(Random* rnd) { + BlockBasedTableOptions opt; + opt.cache_index_and_filter_blocks = rnd->Uniform(2); + opt.pin_l0_filter_and_index_blocks_in_cache = rnd->Uniform(2); + opt.pin_top_level_index_and_filter = rnd->Uniform(2); + using IndexType = BlockBasedTableOptions::IndexType; + const std::array<IndexType, 4> index_types = { + {IndexType::kBinarySearch, IndexType::kHashSearch, + IndexType::kTwoLevelIndexSearch, IndexType::kBinarySearchWithFirstKey}}; + opt.index_type = + index_types[rnd->Uniform(static_cast<int>(index_types.size()))]; + opt.checksum = static_cast<ChecksumType>(rnd->Uniform(3)); + opt.block_size = rnd->Uniform(10000000); + opt.block_size_deviation = rnd->Uniform(100); + opt.block_restart_interval = rnd->Uniform(100); + opt.index_block_restart_interval = rnd->Uniform(100); + opt.whole_key_filtering = rnd->Uniform(2); + + return opt; +} + +TableFactory* RandomTableFactory(Random* rnd, int pre_defined) { +#ifndef ROCKSDB_LITE + int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4); + switch (random_num) { + case 0: + return NewPlainTableFactory(); + case 1: + return NewCuckooTableFactory(); + default: + return NewBlockBasedTableFactory(); + } +#else + (void)rnd; + (void)pre_defined; + return NewBlockBasedTableFactory(); +#endif // !ROCKSDB_LITE +} + +MergeOperator* RandomMergeOperator(Random* rnd) { + return new ChanglingMergeOperator(RandomName(rnd, 10)); +} + +CompactionFilter* RandomCompactionFilter(Random* rnd) { + return new ChanglingCompactionFilter(RandomName(rnd, 10)); +} + +CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd) { + return new ChanglingCompactionFilterFactory(RandomName(rnd, 10)); +} + +void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) { + // boolean options + db_opt->advise_random_on_open = rnd->Uniform(2); + db_opt->allow_mmap_reads = rnd->Uniform(2); + db_opt->allow_mmap_writes = rnd->Uniform(2); + db_opt->use_direct_reads = rnd->Uniform(2); + db_opt->use_direct_io_for_flush_and_compaction = rnd->Uniform(2); + db_opt->create_if_missing = rnd->Uniform(2); + db_opt->create_missing_column_families = rnd->Uniform(2); + db_opt->enable_thread_tracking = rnd->Uniform(2); + db_opt->error_if_exists = rnd->Uniform(2); + db_opt->is_fd_close_on_exec = rnd->Uniform(2); + db_opt->paranoid_checks = rnd->Uniform(2); + db_opt->track_and_verify_wals_in_manifest = rnd->Uniform(2); + db_opt->verify_sst_unique_id_in_manifest = rnd->Uniform(2); + db_opt->skip_stats_update_on_db_open = rnd->Uniform(2); + db_opt->skip_checking_sst_file_sizes_on_db_open = rnd->Uniform(2); + db_opt->use_adaptive_mutex = rnd->Uniform(2); + db_opt->use_fsync = rnd->Uniform(2); + db_opt->recycle_log_file_num = rnd->Uniform(2); + db_opt->avoid_flush_during_recovery = rnd->Uniform(2); + db_opt->avoid_flush_during_shutdown = rnd->Uniform(2); + db_opt->enforce_single_del_contracts = rnd->Uniform(2); + + // int options + db_opt->max_background_compactions = rnd->Uniform(100); + db_opt->max_background_flushes = rnd->Uniform(100); + db_opt->max_file_opening_threads = rnd->Uniform(100); + db_opt->max_open_files = rnd->Uniform(100); + db_opt->table_cache_numshardbits = rnd->Uniform(100); + + // size_t options + db_opt->db_write_buffer_size = rnd->Uniform(10000); + db_opt->keep_log_file_num = rnd->Uniform(10000); + db_opt->log_file_time_to_roll = rnd->Uniform(10000); + db_opt->manifest_preallocation_size = rnd->Uniform(10000); + db_opt->max_log_file_size = rnd->Uniform(10000); + + // std::string options + db_opt->db_log_dir = "path/to/db_log_dir"; + db_opt->wal_dir = "path/to/wal_dir"; + + // uint32_t options + db_opt->max_subcompactions = rnd->Uniform(100000); + + // uint64_t options + static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX); + db_opt->WAL_size_limit_MB = uint_max + rnd->Uniform(100000); + db_opt->WAL_ttl_seconds = uint_max + rnd->Uniform(100000); + db_opt->bytes_per_sync = uint_max + rnd->Uniform(100000); + db_opt->delayed_write_rate = uint_max + rnd->Uniform(100000); + db_opt->delete_obsolete_files_period_micros = uint_max + rnd->Uniform(100000); + db_opt->max_manifest_file_size = uint_max + rnd->Uniform(100000); + db_opt->max_total_wal_size = uint_max + rnd->Uniform(100000); + db_opt->wal_bytes_per_sync = uint_max + rnd->Uniform(100000); + + // unsigned int options + db_opt->stats_dump_period_sec = rnd->Uniform(100000); +} + +void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options, + Random* rnd) { + cf_opt->compaction_style = (CompactionStyle)(rnd->Uniform(4)); + + // boolean options + cf_opt->report_bg_io_stats = rnd->Uniform(2); + cf_opt->disable_auto_compactions = rnd->Uniform(2); + cf_opt->inplace_update_support = rnd->Uniform(2); + cf_opt->level_compaction_dynamic_level_bytes = rnd->Uniform(2); + cf_opt->optimize_filters_for_hits = rnd->Uniform(2); + cf_opt->paranoid_file_checks = rnd->Uniform(2); + cf_opt->force_consistency_checks = rnd->Uniform(2); + cf_opt->compaction_options_fifo.allow_compaction = rnd->Uniform(2); + cf_opt->memtable_whole_key_filtering = rnd->Uniform(2); + cf_opt->enable_blob_files = rnd->Uniform(2); + cf_opt->enable_blob_garbage_collection = rnd->Uniform(2); + + // double options + cf_opt->memtable_prefix_bloom_size_ratio = + static_cast<double>(rnd->Uniform(10000)) / 20000.0; + cf_opt->blob_garbage_collection_age_cutoff = rnd->Uniform(10000) / 10000.0; + cf_opt->blob_garbage_collection_force_threshold = + rnd->Uniform(10000) / 10000.0; + + // int options + cf_opt->level0_file_num_compaction_trigger = rnd->Uniform(100); + cf_opt->level0_slowdown_writes_trigger = rnd->Uniform(100); + cf_opt->level0_stop_writes_trigger = rnd->Uniform(100); + cf_opt->max_bytes_for_level_multiplier = rnd->Uniform(100); + cf_opt->max_write_buffer_number = rnd->Uniform(100); + cf_opt->max_write_buffer_number_to_maintain = rnd->Uniform(100); + cf_opt->max_write_buffer_size_to_maintain = rnd->Uniform(10000); + cf_opt->min_write_buffer_number_to_merge = rnd->Uniform(100); + cf_opt->num_levels = rnd->Uniform(100); + cf_opt->target_file_size_multiplier = rnd->Uniform(100); + + // vector int options + cf_opt->max_bytes_for_level_multiplier_additional.resize(cf_opt->num_levels); + for (int i = 0; i < cf_opt->num_levels; i++) { + cf_opt->max_bytes_for_level_multiplier_additional[i] = rnd->Uniform(100); + } + + // size_t options + cf_opt->arena_block_size = rnd->Uniform(10000); + cf_opt->inplace_update_num_locks = rnd->Uniform(10000); + cf_opt->max_successive_merges = rnd->Uniform(10000); + cf_opt->memtable_huge_page_size = rnd->Uniform(10000); + cf_opt->write_buffer_size = rnd->Uniform(10000); + + // uint32_t options + cf_opt->bloom_locality = rnd->Uniform(10000); + cf_opt->max_bytes_for_level_base = rnd->Uniform(10000); + + // uint64_t options + static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX); + cf_opt->ttl = + db_options.max_open_files == -1 ? uint_max + rnd->Uniform(10000) : 0; + cf_opt->periodic_compaction_seconds = + db_options.max_open_files == -1 ? uint_max + rnd->Uniform(10000) : 0; + cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000); + cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000); + cf_opt->max_compaction_bytes = + cf_opt->target_file_size_base * rnd->Uniform(100); + cf_opt->compaction_options_fifo.max_table_files_size = + uint_max + rnd->Uniform(10000); + cf_opt->min_blob_size = uint_max + rnd->Uniform(10000); + cf_opt->blob_file_size = uint_max + rnd->Uniform(10000); + cf_opt->blob_compaction_readahead_size = uint_max + rnd->Uniform(10000); + + // pointer typed options + cf_opt->prefix_extractor.reset(RandomSliceTransform(rnd)); + cf_opt->table_factory.reset(RandomTableFactory(rnd)); + cf_opt->merge_operator.reset(RandomMergeOperator(rnd)); + if (cf_opt->compaction_filter) { + delete cf_opt->compaction_filter; + } + cf_opt->compaction_filter = RandomCompactionFilter(rnd); + cf_opt->compaction_filter_factory.reset(RandomCompactionFilterFactory(rnd)); + + // custom typed options + cf_opt->compression = RandomCompressionType(rnd); + RandomCompressionTypeVector(cf_opt->num_levels, + &cf_opt->compression_per_level, rnd); + cf_opt->blob_compression_type = RandomCompressionType(rnd); +} + +bool IsDirectIOSupported(Env* env, const std::string& dir) { + EnvOptions env_options; + env_options.use_mmap_writes = false; + env_options.use_direct_writes = true; + std::string tmp = TempFileName(dir, 999); + Status s; + { + std::unique_ptr<WritableFile> file; + s = env->NewWritableFile(tmp, &file, env_options); + } + if (s.ok()) { + s = env->DeleteFile(tmp); + } + return s.ok(); +} + +bool IsPrefetchSupported(const std::shared_ptr<FileSystem>& fs, + const std::string& dir) { + bool supported = false; + std::string tmp = TempFileName(dir, 999); + Random rnd(301); + std::string test_string = rnd.RandomString(4096); + Slice data(test_string); + Status s = WriteStringToFile(fs.get(), data, tmp, true); + if (s.ok()) { + std::unique_ptr<FSRandomAccessFile> file; + auto io_s = fs->NewRandomAccessFile(tmp, FileOptions(), &file, nullptr); + if (io_s.ok()) { + supported = !(file->Prefetch(0, data.size(), IOOptions(), nullptr) + .IsNotSupported()); + } + s = fs->DeleteFile(tmp, IOOptions(), nullptr); + } + return s.ok() && supported; +} + +size_t GetLinesCount(const std::string& fname, const std::string& pattern) { + std::stringstream ssbuf; + std::string line; + size_t count = 0; + + std::ifstream inFile(fname.c_str()); + ssbuf << inFile.rdbuf(); + + while (getline(ssbuf, line)) { + if (line.find(pattern) != std::string::npos) { + count++; + } + } + + return count; +} + +Status CorruptFile(Env* env, const std::string& fname, int offset, + int bytes_to_corrupt, bool verify_checksum /*=true*/) { + uint64_t size; + Status s = env->GetFileSize(fname, &size); + if (!s.ok()) { + return s; + } else if (offset < 0) { + // Relative to end of file; make it absolute + if (-offset > static_cast<int>(size)) { + offset = 0; + } else { + offset = static_cast<int>(size + offset); + } + } + if (offset > static_cast<int>(size)) { + offset = static_cast<int>(size); + } + if (offset + bytes_to_corrupt > static_cast<int>(size)) { + bytes_to_corrupt = static_cast<int>(size - offset); + } + + // Do it + std::string contents; + s = ReadFileToString(env, fname, &contents); + if (s.ok()) { + for (int i = 0; i < bytes_to_corrupt; i++) { + contents[i + offset] ^= 0x80; + } + s = WriteStringToFile(env, contents, fname); + } + if (s.ok() && verify_checksum) { +#ifndef ROCKSDB_LITE + Options options; + options.env = env; + EnvOptions env_options; + Status v = VerifySstFileChecksum(options, env_options, fname); + assert(!v.ok()); +#endif + } + return s; +} + +Status TruncateFile(Env* env, const std::string& fname, uint64_t new_length) { + uint64_t old_length; + Status s = env->GetFileSize(fname, &old_length); + if (!s.ok() || new_length == old_length) { + return s; + } + // Do it + std::string contents; + s = ReadFileToString(env, fname, &contents); + if (s.ok()) { + contents.resize(static_cast<size_t>(new_length), 'b'); + s = WriteStringToFile(env, contents, fname); + } + return s; +} + +// Try and delete a directory if it exists +Status TryDeleteDir(Env* env, const std::string& dirname) { + bool is_dir = false; + Status s = env->IsDirectory(dirname, &is_dir); + if (s.ok() && is_dir) { + s = env->DeleteDir(dirname); + } + return s; +} + +// Delete a directory if it exists +void DeleteDir(Env* env, const std::string& dirname) { + TryDeleteDir(env, dirname).PermitUncheckedError(); +} + +Status CreateEnvFromSystem(const ConfigOptions& config_options, Env** result, + std::shared_ptr<Env>* guard) { + const char* env_uri = getenv("TEST_ENV_URI"); + const char* fs_uri = getenv("TEST_FS_URI"); + if (env_uri || fs_uri) { + return Env::CreateFromUri(config_options, + (env_uri != nullptr) ? env_uri : "", + (fs_uri != nullptr) ? fs_uri : "", result, guard); + } else { + // Neither specified. Use the default + *result = config_options.env; + guard->reset(); + return Status::OK(); + } +} +namespace { +// A hacky skip list mem table that triggers flush after number of entries. +class SpecialMemTableRep : public MemTableRep { + public: + explicit SpecialMemTableRep(Allocator* allocator, MemTableRep* memtable, + int num_entries_flush) + : MemTableRep(allocator), + memtable_(memtable), + num_entries_flush_(num_entries_flush), + num_entries_(0) {} + + virtual KeyHandle Allocate(const size_t len, char** buf) override { + return memtable_->Allocate(len, buf); + } + + // Insert key into the list. + // REQUIRES: nothing that compares equal to key is currently in the list. + virtual void Insert(KeyHandle handle) override { + num_entries_++; + memtable_->Insert(handle); + } + + void InsertConcurrently(KeyHandle handle) override { + num_entries_++; + memtable_->Insert(handle); + } + + // Returns true iff an entry that compares equal to key is in the list. + virtual bool Contains(const char* key) const override { + return memtable_->Contains(key); + } + + virtual size_t ApproximateMemoryUsage() override { + // Return a high memory usage when number of entries exceeds the threshold + // to trigger a flush. + return (num_entries_ < num_entries_flush_) ? 0 : 1024 * 1024 * 1024; + } + + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override { + memtable_->Get(k, callback_args, callback_func); + } + + uint64_t ApproximateNumEntries(const Slice& start_ikey, + const Slice& end_ikey) override { + return memtable_->ApproximateNumEntries(start_ikey, end_ikey); + } + + virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override { + return memtable_->GetIterator(arena); + } + + virtual ~SpecialMemTableRep() override {} + + private: + std::unique_ptr<MemTableRep> memtable_; + int num_entries_flush_; + int num_entries_; +}; +class SpecialSkipListFactory : public MemTableRepFactory { + public: +#ifndef ROCKSDB_LITE + static bool Register(ObjectLibrary& library, const std::string& /*arg*/) { + library.AddFactory<MemTableRepFactory>( + ObjectLibrary::PatternEntry(SpecialSkipListFactory::kClassName(), true) + .AddNumber(":"), + [](const std::string& uri, std::unique_ptr<MemTableRepFactory>* guard, + std::string* /* errmsg */) { + auto colon = uri.find(":"); + if (colon != std::string::npos) { + auto count = ParseInt(uri.substr(colon + 1)); + guard->reset(new SpecialSkipListFactory(count)); + } else { + guard->reset(new SpecialSkipListFactory(2)); + } + return guard->get(); + }); + return true; + } +#endif // ROCKSDB_LITE + // After number of inserts exceeds `num_entries_flush` in a mem table, trigger + // flush. + explicit SpecialSkipListFactory(int num_entries_flush) + : num_entries_flush_(num_entries_flush) {} + + using MemTableRepFactory::CreateMemTableRep; + virtual MemTableRep* CreateMemTableRep( + const MemTableRep::KeyComparator& compare, Allocator* allocator, + const SliceTransform* transform, Logger* /*logger*/) override { + return new SpecialMemTableRep( + allocator, + factory_.CreateMemTableRep(compare, allocator, transform, nullptr), + num_entries_flush_); + } + static const char* kClassName() { return "SpecialSkipListFactory"; } + virtual const char* Name() const override { return kClassName(); } + std::string GetId() const override { + std::string id = Name(); + if (num_entries_flush_ > 0) { + id.append(":").append(std::to_string(num_entries_flush_)); + } + return id; + } + + bool IsInsertConcurrentlySupported() const override { + return factory_.IsInsertConcurrentlySupported(); + } + + private: + SkipListFactory factory_; + int num_entries_flush_; +}; +} // namespace + +MemTableRepFactory* NewSpecialSkipListFactory(int num_entries_per_flush) { + RegisterTestLibrary(); + return new SpecialSkipListFactory(num_entries_per_flush); +} + +#ifndef ROCKSDB_LITE +// This method loads existing test classes into the ObjectRegistry +int RegisterTestObjects(ObjectLibrary& library, const std::string& arg) { + size_t num_types; + library.AddFactory<const Comparator>( + test::SimpleSuffixReverseComparator::kClassName(), + [](const std::string& /*uri*/, + std::unique_ptr<const Comparator>* /*guard*/, + std::string* /* errmsg */) { + static test::SimpleSuffixReverseComparator ssrc; + return &ssrc; + }); + SpecialSkipListFactory::Register(library, arg); + library.AddFactory<MergeOperator>( + "Changling", + [](const std::string& uri, std::unique_ptr<MergeOperator>* guard, + std::string* /* errmsg */) { + guard->reset(new test::ChanglingMergeOperator(uri)); + return guard->get(); + }); + library.AddFactory<CompactionFilter>( + "Changling", + [](const std::string& uri, std::unique_ptr<CompactionFilter>* /*guard*/, + std::string* /* errmsg */) { + return new test::ChanglingCompactionFilter(uri); + }); + library.AddFactory<CompactionFilterFactory>( + "Changling", [](const std::string& uri, + std::unique_ptr<CompactionFilterFactory>* guard, + std::string* /* errmsg */) { + guard->reset(new test::ChanglingCompactionFilterFactory(uri)); + return guard->get(); + }); + library.AddFactory<SystemClock>( + MockSystemClock::kClassName(), + [](const std::string& /*uri*/, std::unique_ptr<SystemClock>* guard, + std::string* /* errmsg */) { + guard->reset(new MockSystemClock(SystemClock::Default())); + return guard->get(); + }); + return static_cast<int>(library.GetFactoryCount(&num_types)); +} + +#endif // ROCKSDB_LITE + +void RegisterTestLibrary(const std::string& arg) { + static bool registered = false; + if (!registered) { + registered = true; +#ifndef ROCKSDB_LITE + ObjectRegistry::Default()->AddLibrary("test", RegisterTestObjects, arg); +#else + (void)arg; +#endif // ROCKSDB_LITE + } +} +} // namespace test +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/testutil.h b/src/rocksdb/test_util/testutil.h new file mode 100644 index 000000000..1f43156ab --- /dev/null +++ b/src/rocksdb/test_util/testutil.h @@ -0,0 +1,852 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include <algorithm> +#include <deque> +#include <string> +#include <vector> + +#include "env/composite_env_wrapper.h" +#include "file/writable_file_writer.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/table.h" +#include "table/internal_iterator.h" +#include "util/mutexlock.h" + +#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS +extern "C" { +void RegisterCustomObjects(int argc, char** argv); +} +#else +void RegisterCustomObjects(int argc, char** argv); +#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS + +namespace ROCKSDB_NAMESPACE { +class FileSystem; +class MemTableRepFactory; +class ObjectLibrary; +class Random; +class SequentialFile; +class SequentialFileReader; + +namespace test { + +extern const uint32_t kDefaultFormatVersion; +extern const std::set<uint32_t> kFooterFormatVersionsToTest; + +// Return a random key with the specified length that may contain interesting +// characters (e.g. \x00, \xff, etc.). +enum RandomKeyType : char { RANDOM, LARGEST, SMALLEST, MIDDLE }; +extern std::string RandomKey(Random* rnd, int len, + RandomKeyType type = RandomKeyType::RANDOM); + +// Store in *dst a string of length "len" that will compress to +// "N*compressed_fraction" bytes and return a Slice that references +// the generated data. +extern Slice CompressibleString(Random* rnd, double compressed_fraction, + int len, std::string* dst); + +#ifndef NDEBUG +// An internal comparator that just forward comparing results from the +// user comparator in it. Can be used to test entities that have no dependency +// on internal key structure but consumes InternalKeyComparator, like +// BlockBasedTable. +class PlainInternalKeyComparator : public InternalKeyComparator { + public: + explicit PlainInternalKeyComparator(const Comparator* c) + : InternalKeyComparator(c) {} + + virtual ~PlainInternalKeyComparator() {} + + virtual int Compare(const Slice& a, const Slice& b) const override { + return user_comparator()->Compare(a, b); + } +}; +#endif + +// A test comparator which compare two strings in this way: +// (1) first compare prefix of 8 bytes in alphabet order, +// (2) if two strings share the same prefix, sort the other part of the string +// in the reverse alphabet order. +// This helps simulate the case of compounded key of [entity][timestamp] and +// latest timestamp first. +class SimpleSuffixReverseComparator : public Comparator { + public: + SimpleSuffixReverseComparator() {} + static const char* kClassName() { return "SimpleSuffixReverseComparator"; } + virtual const char* Name() const override { return kClassName(); } + + virtual int Compare(const Slice& a, const Slice& b) const override { + Slice prefix_a = Slice(a.data(), 8); + Slice prefix_b = Slice(b.data(), 8); + int prefix_comp = prefix_a.compare(prefix_b); + if (prefix_comp != 0) { + return prefix_comp; + } else { + Slice suffix_a = Slice(a.data() + 8, a.size() - 8); + Slice suffix_b = Slice(b.data() + 8, b.size() - 8); + return -(suffix_a.compare(suffix_b)); + } + } + virtual void FindShortestSeparator(std::string* /*start*/, + const Slice& /*limit*/) const override {} + + virtual void FindShortSuccessor(std::string* /*key*/) const override {} +}; + +// Returns a user key comparator that can be used for comparing two uint64_t +// slices. Instead of comparing slices byte-wise, it compares all the 8 bytes +// at once. Assumes same endian-ness is used though the database's lifetime. +// Symantics of comparison would differ from Bytewise comparator in little +// endian machines. +extern const Comparator* Uint64Comparator(); + +// A wrapper api for getting the ComparatorWithU64Ts<BytewiseComparator> +extern const Comparator* BytewiseComparatorWithU64TsWrapper(); + +class StringSink : public FSWritableFile { + public: + std::string contents_; + + explicit StringSink(Slice* reader_contents = nullptr) + : FSWritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) { + if (reader_contents_ != nullptr) { + *reader_contents_ = Slice(contents_.data(), 0); + } + } + + const std::string& contents() const { return contents_; } + + IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_.resize(static_cast<size_t>(size)); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + if (reader_contents_ != nullptr) { + assert(reader_contents_->size() <= last_flush_); + size_t offset = last_flush_ - reader_contents_->size(); + *reader_contents_ = + Slice(contents_.data() + offset, contents_.size() - offset); + last_flush_ = contents_.size(); + } + + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + using FSWritableFile::Append; + IOStatus Append(const Slice& slice, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_.append(slice.data(), slice.size()); + return IOStatus::OK(); + } + void Drop(size_t bytes) { + if (reader_contents_ != nullptr) { + contents_.resize(contents_.size() - bytes); + *reader_contents_ = + Slice(reader_contents_->data(), reader_contents_->size() - bytes); + last_flush_ = contents_.size(); + } + } + + private: + Slice* reader_contents_; + size_t last_flush_; +}; + +// A wrapper around a StringSink to give it a RandomRWFile interface +class RandomRWStringSink : public FSRandomRWFile { + public: + explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {} + + IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + if (offset + data.size() > ss_->contents_.size()) { + ss_->contents_.resize(static_cast<size_t>(offset) + data.size(), '\0'); + } + + char* pos = const_cast<char*>(ss_->contents_.data() + offset); + memcpy(pos, data.data(), data.size()); + return IOStatus::OK(); + } + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/, + Slice* result, char* /*scratch*/, + IODebugContext* /*dbg*/) const override { + *result = Slice(nullptr, 0); + if (offset < ss_->contents_.size()) { + size_t str_res_sz = + std::min(static_cast<size_t>(ss_->contents_.size() - offset), n); + *result = Slice(ss_->contents_.data() + offset, str_res_sz); + } + return IOStatus::OK(); + } + + IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + const std::string& contents() const { return ss_->contents(); } + + private: + StringSink* ss_; +}; + +// Like StringSink, this writes into a string. Unlink StringSink, it +// has some initial content and overwrites it, just like a recycled +// log file. +class OverwritingStringSink : public FSWritableFile { + public: + explicit OverwritingStringSink(Slice* reader_contents) + : FSWritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) {} + + const std::string& contents() const { return contents_; } + + IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_.resize(static_cast<size_t>(size)); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + if (last_flush_ < contents_.size()) { + assert(reader_contents_->size() >= contents_.size()); + memcpy((char*)reader_contents_->data() + last_flush_, + contents_.data() + last_flush_, contents_.size() - last_flush_); + last_flush_ = contents_.size(); + } + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + using FSWritableFile::Append; + IOStatus Append(const Slice& slice, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_.append(slice.data(), slice.size()); + return IOStatus::OK(); + } + void Drop(size_t bytes) { + contents_.resize(contents_.size() - bytes); + if (last_flush_ > contents_.size()) last_flush_ = contents_.size(); + } + + private: + std::string contents_; + Slice* reader_contents_; + size_t last_flush_; +}; + +class StringSource : public FSRandomAccessFile { + public: + explicit StringSource(const Slice& contents, uint64_t uniq_id = 0, + bool mmap = false) + : contents_(contents.data(), contents.size()), + uniq_id_(uniq_id), + mmap_(mmap), + total_reads_(0) {} + + virtual ~StringSource() {} + + uint64_t Size() const { return contents_.size(); } + + IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + // If we are using mmap_, it is equivalent to performing a prefetch + if (mmap_) { + return IOStatus::OK(); + } else { + return IOStatus::NotSupported("Prefetch not supported"); + } + } + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/, + Slice* result, char* scratch, + IODebugContext* /*dbg*/) const override { + total_reads_++; + if (offset > contents_.size()) { + return IOStatus::InvalidArgument("invalid Read offset"); + } + if (offset + n > contents_.size()) { + n = contents_.size() - static_cast<size_t>(offset); + } + if (!mmap_) { + memcpy(scratch, &contents_[static_cast<size_t>(offset)], n); + *result = Slice(scratch, n); + } else { + *result = Slice(&contents_[static_cast<size_t>(offset)], n); + } + return IOStatus::OK(); + } + + size_t GetUniqueId(char* id, size_t max_size) const override { + if (max_size < 20) { + return 0; + } + + char* rid = id; + rid = EncodeVarint64(rid, uniq_id_); + rid = EncodeVarint64(rid, 0); + return static_cast<size_t>(rid - id); + } + + int total_reads() const { return total_reads_; } + + void set_total_reads(int tr) { total_reads_ = tr; } + + private: + std::string contents_; + uint64_t uniq_id_; + bool mmap_; + mutable int total_reads_; +}; + +class NullLogger : public Logger { + public: + using Logger::Logv; + virtual void Logv(const char* /*format*/, va_list /*ap*/) override {} + virtual size_t GetLogFileSize() const override { return 0; } +}; + +// Corrupts key by changing the type +extern void CorruptKeyType(InternalKey* ikey); + +extern std::string KeyStr(const std::string& user_key, + const SequenceNumber& seq, const ValueType& t, + bool corrupt = false); + +extern std::string KeyStr(uint64_t ts, const std::string& user_key, + const SequenceNumber& seq, const ValueType& t, + bool corrupt = false); + +class SleepingBackgroundTask { + public: + SleepingBackgroundTask() + : bg_cv_(&mutex_), + should_sleep_(true), + done_with_sleep_(false), + sleeping_(false) {} + + bool IsSleeping() { + MutexLock l(&mutex_); + return sleeping_; + } + void DoSleep() { + MutexLock l(&mutex_); + sleeping_ = true; + bg_cv_.SignalAll(); + while (should_sleep_) { + bg_cv_.Wait(); + } + sleeping_ = false; + done_with_sleep_ = true; + bg_cv_.SignalAll(); + } + void WaitUntilSleeping() { + MutexLock l(&mutex_); + while (!sleeping_ || !should_sleep_) { + bg_cv_.Wait(); + } + } + // Waits for the status to change to sleeping, + // otherwise times out. + // wait_time is in microseconds. + // Returns true when times out, false otherwise. + bool TimedWaitUntilSleeping(uint64_t wait_time); + + void WakeUp() { + MutexLock l(&mutex_); + should_sleep_ = false; + bg_cv_.SignalAll(); + } + void WaitUntilDone() { + MutexLock l(&mutex_); + while (!done_with_sleep_) { + bg_cv_.Wait(); + } + } + // Similar to TimedWaitUntilSleeping. + // Waits until the task is done. + bool TimedWaitUntilDone(uint64_t wait_time); + + bool WokenUp() { + MutexLock l(&mutex_); + return should_sleep_ == false; + } + + void Reset() { + MutexLock l(&mutex_); + should_sleep_ = true; + done_with_sleep_ = false; + } + + static void DoSleepTask(void* arg) { + reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep(); + } + + private: + port::Mutex mutex_; + port::CondVar bg_cv_; // Signalled when background work finishes + bool should_sleep_; + bool done_with_sleep_; + bool sleeping_; +}; + +// Filters merge operands and values that are equal to `num`. +class FilterNumber : public CompactionFilter { + public: + explicit FilterNumber(uint64_t num) : num_(num) {} + + std::string last_merge_operand_key() { return last_merge_operand_key_; } + + bool Filter(int /*level*/, const ROCKSDB_NAMESPACE::Slice& /*key*/, + const ROCKSDB_NAMESPACE::Slice& value, std::string* /*new_value*/, + bool* /*value_changed*/) const override { + if (value.size() == sizeof(uint64_t)) { + return num_ == DecodeFixed64(value.data()); + } + return true; + } + + bool FilterMergeOperand( + int /*level*/, const ROCKSDB_NAMESPACE::Slice& key, + const ROCKSDB_NAMESPACE::Slice& value) const override { + last_merge_operand_key_ = key.ToString(); + if (value.size() == sizeof(uint64_t)) { + return num_ == DecodeFixed64(value.data()); + } + return true; + } + + const char* Name() const override { return "FilterBadMergeOperand"; } + + private: + mutable std::string last_merge_operand_key_; + uint64_t num_; +}; + +inline std::string EncodeInt(uint64_t x) { + std::string result; + PutFixed64(&result, x); + return result; +} + +class SeqStringSource : public FSSequentialFile { + public: + SeqStringSource(const std::string& data, std::atomic<int>* read_count) + : data_(data), offset_(0), read_count_(read_count) {} + ~SeqStringSource() override {} + IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result, + char* scratch, IODebugContext* /*dbg*/) override { + std::string output; + if (offset_ < data_.size()) { + n = std::min(data_.size() - offset_, n); + memcpy(scratch, data_.data() + offset_, n); + offset_ += n; + *result = Slice(scratch, n); + } else { + return IOStatus::InvalidArgument( + "Attempt to read when it already reached eof."); + } + (*read_count_)++; + return IOStatus::OK(); + } + + IOStatus Skip(uint64_t n) override { + if (offset_ >= data_.size()) { + return IOStatus::InvalidArgument( + "Attempt to read when it already reached eof."); + } + // TODO(yhchiang): Currently doesn't handle the overflow case. + offset_ += static_cast<size_t>(n); + return IOStatus::OK(); + } + + private: + std::string data_; + size_t offset_; + std::atomic<int>* read_count_; +}; + +class StringFS : public FileSystemWrapper { + public: + class StringSink : public FSWritableFile { + public: + explicit StringSink(std::string* contents) + : FSWritableFile(), contents_(contents) {} + IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_->resize(static_cast<size_t>(size)); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + using FSWritableFile::Append; + IOStatus Append(const Slice& slice, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_->append(slice.data(), slice.size()); + return IOStatus::OK(); + } + + private: + std::string* contents_; + }; + + explicit StringFS(const std::shared_ptr<FileSystem>& t) + : FileSystemWrapper(t) {} + ~StringFS() override {} + + static const char* kClassName() { return "StringFS"; } + const char* Name() const override { return kClassName(); } + + const std::string& GetContent(const std::string& f) { return files_[f]; } + + const IOStatus WriteToNewFile(const std::string& file_name, + const std::string& content) { + std::unique_ptr<FSWritableFile> r; + FileOptions file_opts; + IOOptions io_opts; + + auto s = NewWritableFile(file_name, file_opts, &r, nullptr); + if (s.ok()) { + s = r->Append(content, io_opts, nullptr); + } + if (s.ok()) { + s = r->Flush(io_opts, nullptr); + } + if (s.ok()) { + s = r->Close(io_opts, nullptr); + } + assert(!s.ok() || files_[file_name] == content); + return s; + } + + // The following text is boilerplate that forwards all methods to target() + IOStatus NewSequentialFile(const std::string& f, + const FileOptions& /*options*/, + std::unique_ptr<FSSequentialFile>* r, + IODebugContext* /*dbg*/) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return IOStatus::NotFound("The specified file does not exist", f); + } + r->reset(new SeqStringSource(iter->second, &num_seq_file_read_)); + return IOStatus::OK(); + } + + IOStatus NewRandomAccessFile(const std::string& /*f*/, + const FileOptions& /*options*/, + std::unique_ptr<FSRandomAccessFile>* /*r*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus NewWritableFile(const std::string& f, const FileOptions& /*options*/, + std::unique_ptr<FSWritableFile>* r, + IODebugContext* /*dbg*/) override { + auto iter = files_.find(f); + if (iter != files_.end()) { + return IOStatus::IOError("The specified file already exists", f); + } + r->reset(new StringSink(&files_[f])); + return IOStatus::OK(); + } + IOStatus NewDirectory(const std::string& /*name*/, + const IOOptions& /*options*/, + std::unique_ptr<FSDirectory>* /*result*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus FileExists(const std::string& f, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (files_.find(f) == files_.end()) { + return IOStatus::NotFound(); + } + return IOStatus::OK(); + } + + IOStatus GetChildren(const std::string& /*dir*/, const IOOptions& /*options*/, + std::vector<std::string>* /*r*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus DeleteFile(const std::string& f, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + files_.erase(f); + return IOStatus::OK(); + } + + IOStatus CreateDir(const std::string& /*d*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus CreateDirIfMissing(const std::string& /*d*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus DeleteDir(const std::string& /*d*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus GetFileSize(const std::string& f, const IOOptions& /*options*/, + uint64_t* s, IODebugContext* /*dbg*/) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return IOStatus::NotFound("The specified file does not exist:", f); + } + *s = iter->second.size(); + return IOStatus::OK(); + } + + IOStatus GetFileModificationTime(const std::string& /*fname*/, + const IOOptions& /*options*/, + uint64_t* /*file_mtime*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus RenameFile(const std::string& /*s*/, const std::string& /*t*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus LinkFile(const std::string& /*s*/, const std::string& /*t*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus LockFile(const std::string& /*f*/, const IOOptions& /*options*/, + FileLock** /*l*/, IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus UnlockFile(FileLock* /*l*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + std::atomic<int> num_seq_file_read_; + + protected: + std::unordered_map<std::string, std::string> files_; +}; + +// Randomly initialize the given DBOptions +void RandomInitDBOptions(DBOptions* db_opt, Random* rnd); + +// Randomly initialize the given ColumnFamilyOptions +// Note that the caller is responsible for releasing non-null +// cf_opt->compaction_filter. +void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions&, Random* rnd); + +// A dummy merge operator which can change its name +class ChanglingMergeOperator : public MergeOperator { + public: + explicit ChanglingMergeOperator(const std::string& name) + : name_(name + "MergeOperator") {} + ~ChanglingMergeOperator() {} + + void SetName(const std::string& name) { name_ = name; } + + virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/, + MergeOperationOutput* /*merge_out*/) const override { + return false; + } + virtual bool PartialMergeMulti(const Slice& /*key*/, + const std::deque<Slice>& /*operand_list*/, + std::string* /*new_value*/, + Logger* /*logger*/) const override { + return false; + } + static const char* kClassName() { return "ChanglingMergeOperator"; } + const char* NickName() const override { return kNickName(); } + static const char* kNickName() { return "Changling"; } + bool IsInstanceOf(const std::string& id) const override { + if (id == kClassName()) { + return true; + } else { + return MergeOperator::IsInstanceOf(id); + } + } + + virtual const char* Name() const override { return name_.c_str(); } + + protected: + std::string name_; +}; + +// Returns a dummy merge operator with random name. +MergeOperator* RandomMergeOperator(Random* rnd); + +// A dummy compaction filter which can change its name +class ChanglingCompactionFilter : public CompactionFilter { + public: + explicit ChanglingCompactionFilter(const std::string& name) + : name_(name + "CompactionFilter") {} + ~ChanglingCompactionFilter() {} + + void SetName(const std::string& name) { name_ = name; } + + bool Filter(int /*level*/, const Slice& /*key*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + bool* /*value_changed*/) const override { + return false; + } + + static const char* kClassName() { return "ChanglingCompactionFilter"; } + const char* NickName() const override { return kNickName(); } + static const char* kNickName() { return "Changling"; } + + bool IsInstanceOf(const std::string& id) const override { + if (id == kClassName()) { + return true; + } else { + return CompactionFilter::IsInstanceOf(id); + } + } + + const char* Name() const override { return name_.c_str(); } + + private: + std::string name_; +}; + +// Returns a dummy compaction filter with a random name. +CompactionFilter* RandomCompactionFilter(Random* rnd); + +// A dummy compaction filter factory which can change its name +class ChanglingCompactionFilterFactory : public CompactionFilterFactory { + public: + explicit ChanglingCompactionFilterFactory(const std::string& name) + : name_(name + "CompactionFilterFactory") {} + ~ChanglingCompactionFilterFactory() {} + + void SetName(const std::string& name) { name_ = name; } + + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override { + return std::unique_ptr<CompactionFilter>(); + } + + // Returns a name that identifies this compaction filter factory. + const char* Name() const override { return name_.c_str(); } + static const char* kClassName() { return "ChanglingCompactionFilterFactory"; } + const char* NickName() const override { return kNickName(); } + static const char* kNickName() { return "Changling"; } + + bool IsInstanceOf(const std::string& id) const override { + if (id == kClassName()) { + return true; + } else { + return CompactionFilterFactory::IsInstanceOf(id); + } + } + + protected: + std::string name_; +}; + +// The factory for the hacky skip list mem table that triggers flush after +// number of entries exceeds a threshold. +extern MemTableRepFactory* NewSpecialSkipListFactory(int num_entries_per_flush); + +CompressionType RandomCompressionType(Random* rnd); + +void RandomCompressionTypeVector(const size_t count, + std::vector<CompressionType>* types, + Random* rnd); + +CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd); + +const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined = -1); + +TableFactory* RandomTableFactory(Random* rnd, int pre_defined = -1); + +std::string RandomName(Random* rnd, const size_t len); + +bool IsDirectIOSupported(Env* env, const std::string& dir); + +bool IsPrefetchSupported(const std::shared_ptr<FileSystem>& fs, + const std::string& dir); + +// Return the number of lines where a given pattern was found in a file. +size_t GetLinesCount(const std::string& fname, const std::string& pattern); + +Status CorruptFile(Env* env, const std::string& fname, int offset, + int bytes_to_corrupt, bool verify_checksum = true); +Status TruncateFile(Env* env, const std::string& fname, uint64_t length); + +// Try and delete a directory if it exists +Status TryDeleteDir(Env* env, const std::string& dirname); + +// Delete a directory if it exists +void DeleteDir(Env* env, const std::string& dirname); + +// Creates an Env from the system environment by looking at the system +// environment variables. +Status CreateEnvFromSystem(const ConfigOptions& options, Env** result, + std::shared_ptr<Env>* guard); + +#ifndef ROCKSDB_LITE +// Registers the testutil classes with the ObjectLibrary +int RegisterTestObjects(ObjectLibrary& library, const std::string& /*arg*/); +#endif // ROCKSDB_LITE + +// Register the testutil classes with the default ObjectRegistry/Library +void RegisterTestLibrary(const std::string& arg = ""); +} // namespace test +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/test_util/testutil_test.cc b/src/rocksdb/test_util/testutil_test.cc new file mode 100644 index 000000000..41f26e389 --- /dev/null +++ b/src/rocksdb/test_util/testutil_test.cc @@ -0,0 +1,43 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/testutil.h" + +#include "file/file_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { + +void CreateFile(Env* env, const std::string& path) { + std::unique_ptr<WritableFile> f; + ASSERT_OK(env->NewWritableFile(path, &f, EnvOptions())); + f->Close(); +} + +TEST(TestUtil, DestroyDirRecursively) { + auto env = Env::Default(); + // test_util/file + // /dir + // /dir/file + std::string test_dir = test::PerThreadDBPath("test_util"); + ASSERT_OK(env->CreateDir(test_dir)); + CreateFile(env, test_dir + "/file"); + ASSERT_OK(env->CreateDir(test_dir + "/dir")); + CreateFile(env, test_dir + "/dir/file"); + + ASSERT_OK(DestroyDir(env, test_dir)); + auto s = env->FileExists(test_dir); + ASSERT_TRUE(s.IsNotFound()); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/test_util/transaction_test_util.cc b/src/rocksdb/test_util/transaction_test_util.cc new file mode 100644 index 000000000..99286d836 --- /dev/null +++ b/src/rocksdb/test_util/transaction_test_util.cc @@ -0,0 +1,402 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#include "test_util/transaction_test_util.h" + +#include <algorithm> +#include <cinttypes> +#include <numeric> +#include <random> +#include <string> +#include <thread> + +#include "db/dbformat.h" +#include "db/snapshot_impl.h" +#include "logging/logging.h" +#include "rocksdb/db.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "util/random.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +RandomTransactionInserter::RandomTransactionInserter( + Random64* rand, const WriteOptions& write_options, + const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets, + const uint64_t cmt_delay_ms, const uint64_t first_id) + : rand_(rand), + write_options_(write_options), + read_options_(read_options), + num_keys_(num_keys), + num_sets_(num_sets), + txn_id_(first_id), + cmt_delay_ms_(cmt_delay_ms) {} + +RandomTransactionInserter::~RandomTransactionInserter() { + if (txn_ != nullptr) { + delete txn_; + } + if (optimistic_txn_ != nullptr) { + delete optimistic_txn_; + } +} + +bool RandomTransactionInserter::TransactionDBInsert( + TransactionDB* db, const TransactionOptions& txn_options) { + txn_ = db->BeginTransaction(write_options_, txn_options, txn_); + + std::hash<std::thread::id> hasher; + char name[64]; + snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%" PRIu64, + hasher(std::this_thread::get_id()), txn_id_++); + assert(strlen(name) < 64 - 1); + assert(txn_->SetName(name).ok()); + + // Take a snapshot if set_snapshot was not set or with 50% change otherwise + bool take_snapshot = txn_->GetSnapshot() == nullptr || rand_->OneIn(2); + if (take_snapshot) { + txn_->SetSnapshot(); + read_options_.snapshot = txn_->GetSnapshot(); + } + auto res = DoInsert(db, txn_, false); + if (take_snapshot) { + read_options_.snapshot = nullptr; + } + return res; +} + +bool RandomTransactionInserter::OptimisticTransactionDBInsert( + OptimisticTransactionDB* db, + const OptimisticTransactionOptions& txn_options) { + optimistic_txn_ = + db->BeginTransaction(write_options_, txn_options, optimistic_txn_); + + return DoInsert(db, optimistic_txn_, true); +} + +bool RandomTransactionInserter::DBInsert(DB* db) { + return DoInsert(db, nullptr, false); +} + +Status RandomTransactionInserter::DBGet( + DB* db, Transaction* txn, ReadOptions& read_options, uint16_t set_i, + uint64_t ikey, bool get_for_update, uint64_t* int_value, + std::string* full_key, bool* unexpected_error) { + Status s; + // Five digits (since the largest uint16_t is 65535) plus the NUL + // end char. + char prefix_buf[6] = {0}; + // Pad prefix appropriately so we can iterate over each set + assert(set_i + 1 <= 9999); + snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1); + // key format: [SET#][random#] + std::string skey = std::to_string(ikey); + Slice base_key(skey); + *full_key = std::string(prefix_buf) + base_key.ToString(); + Slice key(*full_key); + + std::string value; + if (txn != nullptr) { + if (get_for_update) { + s = txn->GetForUpdate(read_options, key, &value); + } else { + s = txn->Get(read_options, key, &value); + } + } else { + s = db->Get(read_options, key, &value); + } + + if (s.ok()) { + // Found key, parse its value + *int_value = std::stoull(value); + if (*int_value == 0 || *int_value == ULONG_MAX) { + *unexpected_error = true; + fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str()); + s = Status::Corruption(); + } + } else if (s.IsNotFound()) { + // Have not yet written to this key, so assume its value is 0 + *int_value = 0; + s = Status::OK(); + } + return s; +} + +bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, + bool is_optimistic) { + Status s; + WriteBatch batch; + + // pick a random number to use to increment a key in each set + uint64_t incr = (rand_->Next() % 100) + 1; + bool unexpected_error = false; + + std::vector<uint16_t> set_vec(num_sets_); + std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0)); + RandomShuffle(set_vec.begin(), set_vec.end()); + + // For each set, pick a key at random and increment it + for (uint16_t set_i : set_vec) { + uint64_t int_value = 0; + std::string full_key; + uint64_t rand_key = rand_->Next() % num_keys_; + const bool get_for_update = txn ? rand_->OneIn(2) : false; + s = DBGet(db, txn, read_options_, set_i, rand_key, get_for_update, + &int_value, &full_key, &unexpected_error); + Slice key(full_key); + if (!s.ok()) { + // Optimistic transactions should never return non-ok status here. + // Non-optimistic transactions may return write-coflict/timeout errors. + if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { + fprintf(stderr, "Get returned an unexpected error: %s\n", + s.ToString().c_str()); + unexpected_error = true; + } + break; + } + + if (s.ok()) { + // Increment key + std::string sum = std::to_string(int_value + incr); + if (txn != nullptr) { + if ((set_i % 4) != 0) { + s = txn->SingleDelete(key); + } else { + s = txn->Delete(key); + } + if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) { + // If the initial get was not for update, then the key is not locked + // before put and put could fail due to concurrent writes. + break; + } else if (!s.ok()) { + // Since we did a GetForUpdate, SingleDelete should not fail. + fprintf(stderr, "SingleDelete returned an unexpected error: %s\n", + s.ToString().c_str()); + unexpected_error = true; + } + s = txn->Put(key, sum); + if (!s.ok()) { + // Since we did a GetForUpdate, Put should not fail. + fprintf(stderr, "Put returned an unexpected error: %s\n", + s.ToString().c_str()); + unexpected_error = true; + } + } else { + batch.Put(key, sum); + } + bytes_inserted_ += key.size() + sum.size(); + } + if (txn != nullptr) { + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Insert (%s) %s snap: %" PRIu64 " key:%s value: %" PRIu64 + "+%" PRIu64 "=%" PRIu64, + txn->GetName().c_str(), s.ToString().c_str(), + txn->GetSnapshot()->GetSequenceNumber(), full_key.c_str(), + int_value, incr, int_value + incr); + } + } + + if (s.ok()) { + if (txn != nullptr) { + bool with_prepare = !is_optimistic && !rand_->OneIn(10); + if (with_prepare) { + // Also try commit without prepare + s = txn->Prepare(); + if (!s.ok()) { + fprintf(stderr, "Prepare returned an unexpected error: %s\n", + s.ToString().c_str()); + } + assert(s.ok()); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Prepare of %" PRIu64 " %s (%s)", txn->GetId(), + s.ToString().c_str(), txn->GetName().c_str()); + if (rand_->OneIn(20)) { + // This currently only tests the mechanics of writing commit time + // write batch so the exact values would not matter. + s = txn_->GetCommitTimeWriteBatch()->Put("cat", "dog"); + assert(s.ok()); + } + db->GetDBOptions().env->SleepForMicroseconds( + static_cast<int>(cmt_delay_ms_ * 1000)); + } + if (!rand_->OneIn(20)) { + s = txn->Commit(); + assert(!with_prepare || s.ok()); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Commit of %" PRIu64 " %s (%s)", txn->GetId(), + s.ToString().c_str(), txn->GetName().c_str()); + } else { + // Also try 5% rollback + s = txn->Rollback(); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Rollback %" PRIu64 " %s %s", txn->GetId(), + txn->GetName().c_str(), s.ToString().c_str()); + assert(s.ok()); + } + assert(is_optimistic || s.ok()); + + if (!s.ok()) { + if (is_optimistic) { + // Optimistic transactions can have write-conflict errors on commit. + // Any other error is unexpected. + if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { + unexpected_error = true; + } + } else { + // Non-optimistic transactions should only fail due to expiration + // or write failures. For testing purproses, we do not expect any + // write failures. + if (!s.IsExpired()) { + unexpected_error = true; + } + } + + if (unexpected_error) { + fprintf(stderr, "Commit returned an unexpected error: %s\n", + s.ToString().c_str()); + } + } + } else { + s = db->Write(write_options_, &batch); + if (!s.ok()) { + unexpected_error = true; + fprintf(stderr, "Write returned an unexpected error: %s\n", + s.ToString().c_str()); + } + } + } else { + if (txn != nullptr) { + assert(txn->Rollback().ok()); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, "Error %s for txn %s", + s.ToString().c_str(), txn->GetName().c_str()); + } + } + + if (s.ok()) { + success_count_++; + } else { + failure_count_++; + } + + last_status_ = s; + + // return success if we didn't get any unexpected errors + return !unexpected_error; +} + +// Verify that the sum of the keys in each set are equal +Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets, + uint64_t num_keys_per_set, + bool take_snapshot, Random64* rand, + uint64_t delay_ms) { + // delay_ms is the delay between taking a snapshot and doing the reads. It + // emulates reads from a long-running backup job. + assert(delay_ms == 0 || take_snapshot); + uint64_t prev_total = 0; + uint32_t prev_i = 0; + bool prev_assigned = false; + + ReadOptions roptions; + if (take_snapshot) { + roptions.snapshot = db->GetSnapshot(); + db->GetDBOptions().env->SleepForMicroseconds( + static_cast<int>(delay_ms * 1000)); + } + + std::vector<uint16_t> set_vec(num_sets); + std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0)); + RandomShuffle(set_vec.begin(), set_vec.end()); + + // For each set of keys with the same prefix, sum all the values + for (uint16_t set_i : set_vec) { + // Five digits (since the largest uint16_t is 65535) plus the NUL + // end char. + char prefix_buf[6]; + assert(set_i + 1 <= 9999); + snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1); + uint64_t total = 0; + + // Use either point lookup or iterator. Point lookups are slower so we use + // it less often. + const bool use_point_lookup = + num_keys_per_set != 0 && rand && rand->OneIn(10); + if (use_point_lookup) { + ReadOptions read_options; + for (uint64_t k = 0; k < num_keys_per_set; k++) { + std::string dont_care; + uint64_t int_value = 0; + bool unexpected_error = false; + const bool FOR_UPDATE = false; + Status s = DBGet(db, nullptr, roptions, set_i, k, FOR_UPDATE, + &int_value, &dont_care, &unexpected_error); + assert(s.ok()); + assert(!unexpected_error); + total += int_value; + } + } else { // user iterators + Iterator* iter = db->NewIterator(roptions); + for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { + Slice key = iter->key(); + // stop when we reach a different prefix + if (key.ToString().compare(0, 4, prefix_buf) != 0) { + break; + } + Slice value = iter->value(); + uint64_t int_value = std::stoull(value.ToString()); + if (int_value == 0 || int_value == ULONG_MAX) { + fprintf(stderr, "Iter returned unexpected value: %s\n", + value.ToString().c_str()); + return Status::Corruption(); + } + ROCKS_LOG_DEBUG( + db->GetDBOptions().info_log, + "VerifyRead at %" PRIu64 " (%" PRIu64 "): %.*s value: %" PRIu64, + roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul, + roptions.snapshot + ? ((SnapshotImpl*)roptions.snapshot)->min_uncommitted_ + : 0ul, + static_cast<int>(key.size()), key.data(), int_value); + total += int_value; + } + iter->status().PermitUncheckedError(); + delete iter; + } + + if (prev_assigned && total != prev_total) { + db->GetDBOptions().info_log->Flush(); + fprintf(stdout, + "RandomTransactionVerify found inconsistent totals using " + "pointlookup? %d " + "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 + " at snapshot %" PRIu64 "\n", + use_point_lookup, prev_i, prev_total, set_i, total, + roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul); + fflush(stdout); + return Status::Corruption(); + } else { + ROCKS_LOG_DEBUG( + db->GetDBOptions().info_log, + "RandomTransactionVerify pass pointlookup? %d total: %" PRIu64 + " snap: %" PRIu64, + use_point_lookup, total, + roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul); + } + prev_total = total; + prev_i = set_i; + prev_assigned = true; + } + if (take_snapshot) { + db->ReleaseSnapshot(roptions.snapshot); + } + + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/test_util/transaction_test_util.h b/src/rocksdb/test_util/transaction_test_util.h new file mode 100644 index 000000000..7a38ab626 --- /dev/null +++ b/src/rocksdb/test_util/transaction_test_util.h @@ -0,0 +1,149 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include "port/port.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/transaction_db.h" + +namespace ROCKSDB_NAMESPACE { + +class DB; +class Random64; + +// Utility class for stress testing transactions. Can be used to write many +// transactions in parallel and then validate that the data written is logically +// consistent. This class assumes the input DB is initially empty. +// +// Each call to TransactionDBInsert()/OptimisticTransactionDBInsert() will +// increment the value of a key in #num_sets sets of keys. Regardless of +// whether the transaction succeeds, the total sum of values of keys in each +// set is an invariant that should remain equal. +// +// After calling TransactionDBInsert()/OptimisticTransactionDBInsert() many +// times, Verify() can be called to validate that the invariant holds. +// +// To test writing Transaction in parallel, multiple threads can create a +// RandomTransactionInserter with similar arguments using the same DB. +class RandomTransactionInserter { + public: + static bool RollbackDeletionTypeCallback(const Slice& key) { + // These are hard-coded atm. See how RandomTransactionInserter::DoInsert() + // determines whether to use SingleDelete or Delete for a key. + assert(key.size() >= 4); + const char* ptr = key.data(); + assert(ptr); + while (ptr && ptr < key.data() + 4 && *ptr == '0') { + ++ptr; + } + std::string prefix(ptr, 4 - (ptr - key.data())); + unsigned long set_i = std::stoul(prefix); + assert(set_i > 0); + assert(set_i <= 9999); + --set_i; + return ((set_i % 4) != 0); + } + + // num_keys is the number of keys in each set. + // num_sets is the number of sets of keys. + // cmt_delay_ms is the delay between prepare (if there is any) and commit + // first_id is the id of the first transaction + explicit RandomTransactionInserter( + Random64* rand, const WriteOptions& write_options = WriteOptions(), + const ReadOptions& read_options = ReadOptions(), uint64_t num_keys = 1000, + uint16_t num_sets = 3, const uint64_t cmt_delay_ms = 0, + const uint64_t first_id = 0); + + ~RandomTransactionInserter(); + + // Increment a key in each set using a Transaction on a TransactionDB. + // + // Returns true if the transaction succeeded OR if any error encountered was + // expected (eg a write-conflict). Error status may be obtained by calling + // GetLastStatus(); + bool TransactionDBInsert( + TransactionDB* db, + const TransactionOptions& txn_options = TransactionOptions()); + + // Increment a key in each set using a Transaction on an + // OptimisticTransactionDB + // + // Returns true if the transaction succeeded OR if any error encountered was + // expected (eg a write-conflict). Error status may be obtained by calling + // GetLastStatus(); + bool OptimisticTransactionDBInsert( + OptimisticTransactionDB* db, + const OptimisticTransactionOptions& txn_options = + OptimisticTransactionOptions()); + // Increment a key in each set without using a transaction. If this function + // is called in parallel, then Verify() may fail. + // + // Returns true if the write succeeds. + // Error status may be obtained by calling GetLastStatus(). + bool DBInsert(DB* db); + + // Get the ikey'th key from set set_i + static Status DBGet(DB* db, Transaction* txn, ReadOptions& read_options, + uint16_t set_i, uint64_t ikey, bool get_for_update, + uint64_t* int_value, std::string* full_key, + bool* unexpected_error); + + // Returns OK if Invariant is true. + static Status Verify(DB* db, uint16_t num_sets, uint64_t num_keys_per_set = 0, + bool take_snapshot = false, Random64* rand = nullptr, + uint64_t delay_ms = 0); + + // Returns the status of the previous Insert operation + Status GetLastStatus() { return last_status_; } + + // Returns the number of successfully written calls to + // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert + uint64_t GetSuccessCount() { return success_count_; } + + // Returns the number of calls to + // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert that did not + // write any data. + uint64_t GetFailureCount() { return failure_count_; } + + // Returns the sum of user keys/values Put() to the DB. + size_t GetBytesInserted() { return bytes_inserted_; } + + private: + // Input options + Random64* rand_; + const WriteOptions write_options_; + ReadOptions read_options_; + const uint64_t num_keys_; + const uint16_t num_sets_; + + // Number of successful insert batches performed + uint64_t success_count_ = 0; + + // Number of failed insert batches attempted + uint64_t failure_count_ = 0; + + size_t bytes_inserted_ = 0; + + // Status returned by most recent insert operation + Status last_status_; + + // optimization: re-use allocated transaction objects. + Transaction* txn_ = nullptr; + Transaction* optimistic_txn_ = nullptr; + + uint64_t txn_id_; + // The delay between ::Prepare and ::Commit + const uint64_t cmt_delay_ms_; + + bool DoInsert(DB* db, Transaction* txn, bool is_optimistic); +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE |