summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db_stress_tool/db_stress_common.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db_stress_tool/db_stress_common.cc')
-rw-r--r--src/rocksdb/db_stress_tool/db_stress_common.cc460
1 files changed, 460 insertions, 0 deletions
diff --git a/src/rocksdb/db_stress_tool/db_stress_common.cc b/src/rocksdb/db_stress_tool/db_stress_common.cc
new file mode 100644
index 000000000..af8db9e2f
--- /dev/null
+++ b/src/rocksdb/db_stress_tool/db_stress_common.cc
@@ -0,0 +1,460 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+
+#ifdef GFLAGS
+#include "db_stress_tool/db_stress_common.h"
+
+#include <cmath>
+
+#include "util/file_checksum_helper.h"
+#include "util/xxhash.h"
+
+ROCKSDB_NAMESPACE::Env* db_stress_listener_env = nullptr;
+ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr;
+// If non-null, injects read error at a rate specified by the
+// read_fault_one_in or write_fault_one_in flag
+std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
+enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =
+ ROCKSDB_NAMESPACE::kSnappyCompression;
+enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e =
+ ROCKSDB_NAMESPACE::kSnappyCompression;
+enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e =
+ ROCKSDB_NAMESPACE::kCRC32c;
+enum RepFactory FLAGS_rep_factory = kSkipList;
+std::vector<double> sum_probs(100001);
+constexpr int64_t zipf_sum_size = 100000;
+
+namespace ROCKSDB_NAMESPACE {
+
+// Zipfian distribution is generated based on a pre-calculated array.
+// It should be used before start the stress test.
+// First, the probability distribution function (PDF) of this Zipfian follows
+// power low. P(x) = 1/(x^alpha).
+// So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop
+// and add the PDF value togetger as c. So we get the total probability in c.
+// Next, we calculate inverse CDF of Zipfian and store the value of each in
+// an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for
+// integer k, its Zipfian CDF value is sum_probs[k].
+// Third, when we need to get an integer whose probability follows Zipfian
+// distribution, we use a rand_seed [0,1] which follows uniform distribution
+// as a seed and search it in the sum_probs via binary search. When we find
+// the closest sum_probs[i] of rand_seed, i is the integer that in
+// [0, zipf_sum_size] following Zipfian distribution with parameter alpha.
+// Finally, we can scale i to [0, max_key] scale.
+// In order to avoid that hot keys are close to each other and skew towards 0,
+// we use Rando64 to shuffle it.
+void InitializeHotKeyGenerator(double alpha) {
+ double c = 0;
+ for (int64_t i = 1; i <= zipf_sum_size; i++) {
+ c = c + (1.0 / std::pow(static_cast<double>(i), alpha));
+ }
+ c = 1.0 / c;
+
+ sum_probs[0] = 0;
+ for (int64_t i = 1; i <= zipf_sum_size; i++) {
+ sum_probs[i] =
+ sum_probs[i - 1] + c / std::pow(static_cast<double>(i), alpha);
+ }
+}
+
+// Generate one key that follows the Zipfian distribution. The skewness
+// is decided by the parameter alpha. Input is the rand_seed [0,1] and
+// the max of the key to be generated. If we directly return tmp_zipf_seed,
+// the closer to 0, the higher probability will be. To randomly distribute
+// the hot keys in [0, max_key], we use Random64 to shuffle it.
+int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) {
+ int64_t low = 1, mid, high = zipf_sum_size, zipf = 0;
+ while (low <= high) {
+ mid = (low + high) / 2;
+ if (sum_probs[mid] >= rand_seed && sum_probs[mid - 1] < rand_seed) {
+ zipf = mid;
+ break;
+ } else if (sum_probs[mid] >= rand_seed) {
+ high = mid - 1;
+ } else {
+ low = mid + 1;
+ }
+ }
+ int64_t tmp_zipf_seed = zipf * max_key / zipf_sum_size;
+ Random64 rand_local(tmp_zipf_seed);
+ return rand_local.Next() % max_key;
+}
+
+void PoolSizeChangeThread(void* v) {
+ assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
+ ThreadState* thread = reinterpret_cast<ThreadState*>(v);
+ SharedState* shared = thread->shared;
+
+ while (true) {
+ {
+ MutexLock l(shared->GetMutex());
+ if (shared->ShouldStopBgThread()) {
+ shared->IncBgThreadsFinished();
+ if (shared->BgThreadsFinished()) {
+ shared->GetCondVar()->SignalAll();
+ }
+ return;
+ }
+ }
+
+ auto thread_pool_size_base = FLAGS_max_background_compactions;
+ auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations;
+ int new_thread_pool_size =
+ thread_pool_size_base - thread_pool_size_var +
+ thread->rand.Next() % (thread_pool_size_var * 2 + 1);
+ if (new_thread_pool_size < 1) {
+ new_thread_pool_size = 1;
+ }
+ db_stress_env->SetBackgroundThreads(new_thread_pool_size,
+ ROCKSDB_NAMESPACE::Env::Priority::LOW);
+ // Sleep up to 3 seconds
+ db_stress_env->SleepForMicroseconds(
+ thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
+ 1000 +
+ 1);
+ }
+}
+
+void DbVerificationThread(void* v) {
+ assert(FLAGS_continuous_verification_interval > 0);
+ auto* thread = reinterpret_cast<ThreadState*>(v);
+ SharedState* shared = thread->shared;
+ StressTest* stress_test = shared->GetStressTest();
+ assert(stress_test != nullptr);
+ while (true) {
+ {
+ MutexLock l(shared->GetMutex());
+ if (shared->ShouldStopBgThread()) {
+ shared->IncBgThreadsFinished();
+ if (shared->BgThreadsFinished()) {
+ shared->GetCondVar()->SignalAll();
+ }
+ return;
+ }
+ }
+ if (!shared->HasVerificationFailedYet()) {
+ stress_test->ContinuouslyVerifyDb(thread);
+ }
+ db_stress_env->SleepForMicroseconds(
+ thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 +
+ 1);
+ }
+}
+
+void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) {
+ if (!FLAGS_verbose) {
+ return;
+ }
+ std::string tmp;
+ tmp.reserve(sz * 2 + 16);
+ char buf[4];
+ for (size_t i = 0; i < sz; i++) {
+ snprintf(buf, 4, "%X", value[i]);
+ tmp.append(buf);
+ }
+ auto key_str = Key(key);
+ Slice key_slice = key_str;
+ fprintf(stdout, "[CF %d] %s (%" PRIi64 ") == > (%" ROCKSDB_PRIszt ") %s\n",
+ cf, key_slice.ToString(true).c_str(), key, sz, tmp.c_str());
+}
+
+// Note that if hot_key_alpha != 0, it generates the key based on Zipfian
+// distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does
+// not ensure the order of the keys being generated and the keys does not have
+// the active range which is related to FLAGS_active_width.
+int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {
+ const double completed_ratio =
+ static_cast<double>(iteration) / FLAGS_ops_per_thread;
+ const int64_t base_key = static_cast<int64_t>(
+ completed_ratio * (FLAGS_max_key - FLAGS_active_width));
+ int64_t rand_seed = base_key + thread->rand.Next() % FLAGS_active_width;
+ int64_t cur_key = rand_seed;
+ if (FLAGS_hot_key_alpha != 0) {
+ // If set the Zipfian distribution Alpha to non 0, use Zipfian
+ double float_rand =
+ (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
+ FLAGS_max_key;
+ cur_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
+ }
+ return cur_key;
+}
+
+// Note that if hot_key_alpha != 0, it generates the key based on Zipfian
+// distribution. Keys being generated are in random order.
+// If user want to generate keys based on uniform distribution, user needs to
+// set hot_key_alpha == 0. It will generate the random keys in increasing
+// order in the key array (ensure key[i] >= key[i+1]) and constrained in a
+// range related to FLAGS_active_width.
+std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
+ uint64_t iteration) {
+ const double completed_ratio =
+ static_cast<double>(iteration) / FLAGS_ops_per_thread;
+ const int64_t base_key = static_cast<int64_t>(
+ completed_ratio * (FLAGS_max_key - FLAGS_active_width));
+ std::vector<int64_t> keys;
+ keys.reserve(num_keys);
+ int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
+ keys.push_back(next_key);
+ for (int i = 1; i < num_keys; ++i) {
+ // Generate the key follows zipfian distribution
+ if (FLAGS_hot_key_alpha != 0) {
+ double float_rand =
+ (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
+ FLAGS_max_key;
+ next_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
+ } else {
+ // This may result in some duplicate keys
+ next_key = next_key + thread->rand.Next() %
+ (FLAGS_active_width - (next_key - base_key));
+ }
+ keys.push_back(next_key);
+ }
+ return keys;
+}
+
+size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) {
+ size_t value_sz =
+ ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
+ assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
+ (void)max_sz;
+ PutUnaligned(reinterpret_cast<uint32_t*>(v), rand);
+ for (size_t i = sizeof(uint32_t); i < value_sz; i++) {
+ v[i] = (char)(rand ^ i);
+ }
+ v[value_sz] = '\0';
+ return value_sz; // the size of the value set.
+}
+
+uint32_t GetValueBase(Slice s) {
+ assert(s.size() >= sizeof(uint32_t));
+ uint32_t res;
+ GetUnaligned(reinterpret_cast<const uint32_t*>(s.data()), &res);
+ return res;
+}
+
+WideColumns GenerateWideColumns(uint32_t value_base, const Slice& slice) {
+ WideColumns columns;
+
+ constexpr size_t max_columns = 4;
+ const size_t num_columns = (value_base % max_columns) + 1;
+
+ columns.reserve(num_columns);
+
+ assert(slice.size() >= num_columns);
+
+ columns.emplace_back(kDefaultWideColumnName, slice);
+
+ for (size_t i = 1; i < num_columns; ++i) {
+ const Slice name(slice.data(), i);
+ const Slice value(slice.data() + i, slice.size() - i);
+
+ columns.emplace_back(name, value);
+ }
+
+ return columns;
+}
+
+WideColumns GenerateExpectedWideColumns(uint32_t value_base,
+ const Slice& slice) {
+ if (FLAGS_use_put_entity_one_in == 0 ||
+ (value_base % FLAGS_use_put_entity_one_in) != 0) {
+ return WideColumns{{kDefaultWideColumnName, slice}};
+ }
+
+ WideColumns columns = GenerateWideColumns(value_base, slice);
+
+ std::sort(columns.begin(), columns.end(),
+ [](const WideColumn& lhs, const WideColumn& rhs) {
+ return lhs.name().compare(rhs.name()) < 0;
+ });
+
+ return columns;
+}
+
+std::string GetNowNanos() {
+ uint64_t t = db_stress_env->NowNanos();
+ std::string ret;
+ PutFixed64(&ret, t);
+ return ret;
+}
+
+namespace {
+
+class MyXXH64Checksum : public FileChecksumGenerator {
+ public:
+ explicit MyXXH64Checksum(bool big) : big_(big) {
+ state_ = XXH64_createState();
+ XXH64_reset(state_, 0);
+ }
+
+ virtual ~MyXXH64Checksum() override { XXH64_freeState(state_); }
+
+ void Update(const char* data, size_t n) override {
+ XXH64_update(state_, data, n);
+ }
+
+ void Finalize() override {
+ assert(str_.empty());
+ uint64_t digest = XXH64_digest(state_);
+ // Store as little endian raw bytes
+ PutFixed64(&str_, digest);
+ if (big_) {
+ // Throw in some more data for stress testing (448 bits total)
+ PutFixed64(&str_, GetSliceHash64(str_));
+ PutFixed64(&str_, GetSliceHash64(str_));
+ PutFixed64(&str_, GetSliceHash64(str_));
+ PutFixed64(&str_, GetSliceHash64(str_));
+ PutFixed64(&str_, GetSliceHash64(str_));
+ PutFixed64(&str_, GetSliceHash64(str_));
+ }
+ }
+
+ std::string GetChecksum() const override {
+ assert(!str_.empty());
+ return str_;
+ }
+
+ const char* Name() const override {
+ return big_ ? "MyBigChecksum" : "MyXXH64Checksum";
+ }
+
+ private:
+ bool big_;
+ XXH64_state_t* state_;
+ std::string str_;
+};
+
+class DbStressChecksumGenFactory : public FileChecksumGenFactory {
+ std::string default_func_name_;
+
+ std::unique_ptr<FileChecksumGenerator> CreateFromFuncName(
+ const std::string& func_name) {
+ std::unique_ptr<FileChecksumGenerator> rv;
+ if (func_name == "FileChecksumCrc32c") {
+ rv.reset(new FileChecksumGenCrc32c(FileChecksumGenContext()));
+ } else if (func_name == "MyXXH64Checksum") {
+ rv.reset(new MyXXH64Checksum(false /* big */));
+ } else if (func_name == "MyBigChecksum") {
+ rv.reset(new MyXXH64Checksum(true /* big */));
+ } else {
+ // Should be a recognized function when we get here
+ assert(false);
+ }
+ return rv;
+ }
+
+ public:
+ explicit DbStressChecksumGenFactory(const std::string& default_func_name)
+ : default_func_name_(default_func_name) {}
+
+ std::unique_ptr<FileChecksumGenerator> CreateFileChecksumGenerator(
+ const FileChecksumGenContext& context) override {
+ if (context.requested_checksum_func_name.empty()) {
+ return CreateFromFuncName(default_func_name_);
+ } else {
+ return CreateFromFuncName(context.requested_checksum_func_name);
+ }
+ }
+
+ const char* Name() const override { return "FileChecksumGenCrc32cFactory"; }
+};
+
+} // namespace
+
+std::shared_ptr<FileChecksumGenFactory> GetFileChecksumImpl(
+ const std::string& name) {
+ // Translate from friendly names to internal names
+ std::string internal_name;
+ if (name == "crc32c") {
+ internal_name = "FileChecksumCrc32c";
+ } else if (name == "xxh64") {
+ internal_name = "MyXXH64Checksum";
+ } else if (name == "big") {
+ internal_name = "MyBigChecksum";
+ } else {
+ assert(name.empty() || name == "none");
+ return nullptr;
+ }
+ return std::make_shared<DbStressChecksumGenFactory>(internal_name);
+}
+
+Status DeleteFilesInDirectory(const std::string& dirname) {
+ std::vector<std::string> filenames;
+ Status s = Env::Default()->GetChildren(dirname, &filenames);
+ for (size_t i = 0; s.ok() && i < filenames.size(); ++i) {
+ s = Env::Default()->DeleteFile(dirname + "/" + filenames[i]);
+ }
+ return s;
+}
+
+Status SaveFilesInDirectory(const std::string& src_dirname,
+ const std::string& dst_dirname) {
+ std::vector<std::string> filenames;
+ Status s = Env::Default()->GetChildren(src_dirname, &filenames);
+ for (size_t i = 0; s.ok() && i < filenames.size(); ++i) {
+ bool is_dir = false;
+ s = Env::Default()->IsDirectory(src_dirname + "/" + filenames[i], &is_dir);
+ if (s.ok()) {
+ if (is_dir) {
+ continue;
+ }
+ s = Env::Default()->LinkFile(src_dirname + "/" + filenames[i],
+ dst_dirname + "/" + filenames[i]);
+ }
+ }
+ return s;
+}
+
+Status InitUnverifiedSubdir(const std::string& dirname) {
+ Status s = Env::Default()->FileExists(dirname);
+ if (s.IsNotFound()) {
+ return Status::OK();
+ }
+
+ const std::string kUnverifiedDirname = dirname + "/unverified";
+ if (s.ok()) {
+ s = Env::Default()->CreateDirIfMissing(kUnverifiedDirname);
+ }
+ if (s.ok()) {
+ // It might already exist with some stale contents. Delete any such
+ // contents.
+ s = DeleteFilesInDirectory(kUnverifiedDirname);
+ }
+ if (s.ok()) {
+ s = SaveFilesInDirectory(dirname, kUnverifiedDirname);
+ }
+ return s;
+}
+
+Status DestroyUnverifiedSubdir(const std::string& dirname) {
+ Status s = Env::Default()->FileExists(dirname);
+ if (s.IsNotFound()) {
+ return Status::OK();
+ }
+
+ const std::string kUnverifiedDirname = dirname + "/unverified";
+ if (s.ok()) {
+ s = Env::Default()->FileExists(kUnverifiedDirname);
+ }
+ if (s.IsNotFound()) {
+ return Status::OK();
+ }
+
+ if (s.ok()) {
+ s = DeleteFilesInDirectory(kUnverifiedDirname);
+ }
+ if (s.ok()) {
+ s = Env::Default()->DeleteDir(kUnverifiedDirname);
+ }
+ return s;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // GFLAGS