summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db_stress_tool/db_stress_shared_state.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db_stress_tool/db_stress_shared_state.h')
-rw-r--r--src/rocksdb/db_stress_tool/db_stress_shared_state.h390
1 files changed, 390 insertions, 0 deletions
diff --git a/src/rocksdb/db_stress_tool/db_stress_shared_state.h b/src/rocksdb/db_stress_tool/db_stress_shared_state.h
new file mode 100644
index 000000000..b68670b58
--- /dev/null
+++ b/src/rocksdb/db_stress_tool/db_stress_shared_state.h
@@ -0,0 +1,390 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors
+
+#ifdef GFLAGS
+#pragma once
+
+#include "db_stress_tool/db_stress_stat.h"
+#include "util/gflags_compat.h"
+
+DECLARE_uint64(seed);
+DECLARE_int64(max_key);
+DECLARE_uint64(log2_keys_per_lock);
+DECLARE_int32(threads);
+DECLARE_int32(column_families);
+DECLARE_int32(nooverwritepercent);
+DECLARE_string(expected_values_path);
+DECLARE_int32(clear_column_family_one_in);
+DECLARE_bool(test_batches_snapshots);
+DECLARE_int32(compaction_thread_pool_adjust_interval);
+DECLARE_int32(continuous_verification_interval);
+
+namespace ROCKSDB_NAMESPACE {
+class StressTest;
+
+// State shared by all concurrent executions of the same benchmark.
+class SharedState {
+ public:
+ // indicates a key may have any value (or not be present) as an operation on
+ // it is incomplete.
+ static const uint32_t UNKNOWN_SENTINEL;
+ // indicates a key should definitely be deleted
+ static const uint32_t DELETION_SENTINEL;
+
+ SharedState(Env* env, StressTest* stress_test)
+ : cv_(&mu_),
+ seed_(static_cast<uint32_t>(FLAGS_seed)),
+ max_key_(FLAGS_max_key),
+ log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
+ num_threads_(FLAGS_threads),
+ num_initialized_(0),
+ num_populated_(0),
+ vote_reopen_(0),
+ num_done_(0),
+ start_(false),
+ start_verify_(false),
+ num_bg_threads_(0),
+ should_stop_bg_thread_(false),
+ bg_thread_finished_(0),
+ stress_test_(stress_test),
+ verification_failure_(false),
+ should_stop_test_(false),
+ no_overwrite_ids_(FLAGS_column_families),
+ values_(nullptr),
+ printing_verification_results_(false) {
+ // Pick random keys in each column family that will not experience
+ // overwrite
+
+ fprintf(stdout, "Choosing random keys with no overwrite\n");
+ Random64 rnd(seed_);
+ // Start with the identity permutation. Subsequent iterations of
+ // for loop below will start with perm of previous for loop
+ int64_t* permutation = new int64_t[max_key_];
+ for (int64_t i = 0; i < max_key_; i++) {
+ permutation[i] = i;
+ }
+ // Now do the Knuth shuffle
+ int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
+ // Only need to figure out first num_no_overwrite_keys of permutation
+ no_overwrite_ids_.reserve(num_no_overwrite_keys);
+ for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
+ int64_t rand_index = i + rnd.Next() % (max_key_ - i);
+ // Swap i and rand_index;
+ int64_t temp = permutation[i];
+ permutation[i] = permutation[rand_index];
+ permutation[rand_index] = temp;
+ // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
+ // permutation
+ no_overwrite_ids_.insert(permutation[i]);
+ }
+ delete[] permutation;
+
+ size_t expected_values_size =
+ sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_;
+ bool values_init_needed = false;
+ Status status;
+ if (!FLAGS_expected_values_path.empty()) {
+ if (!std::atomic<uint32_t>{}.is_lock_free()) {
+ status = Status::InvalidArgument(
+ "Cannot use --expected_values_path on platforms without lock-free "
+ "std::atomic<uint32_t>");
+ }
+ if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
+ status = Status::InvalidArgument(
+ "Cannot use --expected_values_path on when "
+ "--clear_column_family_one_in is greater than zero.");
+ }
+ uint64_t size = 0;
+ if (status.ok()) {
+ status = env->GetFileSize(FLAGS_expected_values_path, &size);
+ }
+ std::unique_ptr<WritableFile> wfile;
+ if (status.ok() && size == 0) {
+ const EnvOptions soptions;
+ status =
+ env->NewWritableFile(FLAGS_expected_values_path, &wfile, soptions);
+ }
+ if (status.ok() && size == 0) {
+ std::string buf(expected_values_size, '\0');
+ status = wfile->Append(buf);
+ values_init_needed = true;
+ }
+ if (status.ok()) {
+ status = env->NewMemoryMappedFileBuffer(FLAGS_expected_values_path,
+ &expected_mmap_buffer_);
+ }
+ if (status.ok()) {
+ assert(expected_mmap_buffer_->GetLen() == expected_values_size);
+ values_ = static_cast<std::atomic<uint32_t>*>(
+ expected_mmap_buffer_->GetBase());
+ assert(values_ != nullptr);
+ } else {
+ fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",
+ FLAGS_expected_values_path.c_str(), status.ToString().c_str());
+ assert(values_ == nullptr);
+ }
+ }
+ if (values_ == nullptr) {
+ values_allocation_.reset(
+ new std::atomic<uint32_t>[FLAGS_column_families * max_key_]);
+ values_ = &values_allocation_[0];
+ values_init_needed = true;
+ }
+ assert(values_ != nullptr);
+ if (values_init_needed) {
+ for (int i = 0; i < FLAGS_column_families; ++i) {
+ for (int j = 0; j < max_key_; ++j) {
+ Delete(i, j, false /* pending */);
+ }
+ }
+ }
+
+ if (FLAGS_test_batches_snapshots) {
+ fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
+ return;
+ }
+
+ long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
+ if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
+ num_locks++;
+ }
+ fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
+ key_locks_.resize(FLAGS_column_families);
+
+ for (int i = 0; i < FLAGS_column_families; ++i) {
+ key_locks_[i].resize(num_locks);
+ for (auto& ptr : key_locks_[i]) {
+ ptr.reset(new port::Mutex);
+ }
+ }
+ if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
+ ++num_bg_threads_;
+ fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n");
+ }
+ if (FLAGS_continuous_verification_interval > 0) {
+ ++num_bg_threads_;
+ fprintf(stdout, "Starting continuous_verification_thread\n");
+ }
+ }
+
+ ~SharedState() {}
+
+ port::Mutex* GetMutex() { return &mu_; }
+
+ port::CondVar* GetCondVar() { return &cv_; }
+
+ StressTest* GetStressTest() const { return stress_test_; }
+
+ int64_t GetMaxKey() const { return max_key_; }
+
+ uint32_t GetNumThreads() const { return num_threads_; }
+
+ void IncInitialized() { num_initialized_++; }
+
+ void IncOperated() { num_populated_++; }
+
+ void IncDone() { num_done_++; }
+
+ void IncVotedReopen() { vote_reopen_ = (vote_reopen_ + 1) % num_threads_; }
+
+ bool AllInitialized() const { return num_initialized_ >= num_threads_; }
+
+ bool AllOperated() const { return num_populated_ >= num_threads_; }
+
+ bool AllDone() const { return num_done_ >= num_threads_; }
+
+ bool AllVotedReopen() { return (vote_reopen_ == 0); }
+
+ void SetStart() { start_ = true; }
+
+ void SetStartVerify() { start_verify_ = true; }
+
+ bool Started() const { return start_; }
+
+ bool VerifyStarted() const { return start_verify_; }
+
+ void SetVerificationFailure() { verification_failure_.store(true); }
+
+ bool HasVerificationFailedYet() const { return verification_failure_.load(); }
+
+ void SetShouldStopTest() { should_stop_test_.store(true); }
+
+ bool ShouldStopTest() const { return should_stop_test_.load(); }
+
+ port::Mutex* GetMutexForKey(int cf, int64_t key) {
+ return key_locks_[cf][key >> log2_keys_per_lock_].get();
+ }
+
+ void LockColumnFamily(int cf) {
+ for (auto& mutex : key_locks_[cf]) {
+ mutex->Lock();
+ }
+ }
+
+ void UnlockColumnFamily(int cf) {
+ for (auto& mutex : key_locks_[cf]) {
+ mutex->Unlock();
+ }
+ }
+
+ std::atomic<uint32_t>& Value(int cf, int64_t key) const {
+ return values_[cf * max_key_ + key];
+ }
+
+ void ClearColumnFamily(int cf) {
+ std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */),
+ DELETION_SENTINEL);
+ }
+
+ // @param pending True if the update may have started but is not yet
+ // guaranteed finished. This is useful for crash-recovery testing when the
+ // process may crash before updating the expected values array.
+ void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
+ if (!pending) {
+ // prevent expected-value update from reordering before Write
+ std::atomic_thread_fence(std::memory_order_release);
+ }
+ Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base,
+ std::memory_order_relaxed);
+ if (pending) {
+ // prevent Write from reordering before expected-value update
+ std::atomic_thread_fence(std::memory_order_release);
+ }
+ }
+
+ uint32_t Get(int cf, int64_t key) const { return Value(cf, key); }
+
+ // @param pending See comment above Put()
+ // Returns true if the key was not yet deleted.
+ bool Delete(int cf, int64_t key, bool pending) {
+ if (Value(cf, key) == DELETION_SENTINEL) {
+ return false;
+ }
+ Put(cf, key, DELETION_SENTINEL, pending);
+ return true;
+ }
+
+ // @param pending See comment above Put()
+ // Returns true if the key was not yet deleted.
+ bool SingleDelete(int cf, int64_t key, bool pending) {
+ return Delete(cf, key, pending);
+ }
+
+ // @param pending See comment above Put()
+ // Returns number of keys deleted by the call.
+ int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
+ int covered = 0;
+ for (int64_t key = begin_key; key < end_key; ++key) {
+ if (Delete(cf, key, pending)) {
+ ++covered;
+ }
+ }
+ return covered;
+ }
+
+ bool AllowsOverwrite(int64_t key) {
+ return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
+ }
+
+ bool Exists(int cf, int64_t key) {
+ // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
+ // is disallowed can't be accidentally added a second time, in which case
+ // SingleDelete wouldn't be able to properly delete the key. It does allow
+ // the case where a SingleDelete might be added which covers nothing, but
+ // that's not a correctness issue.
+ uint32_t expected_value = Value(cf, key).load();
+ return expected_value != DELETION_SENTINEL;
+ }
+
+ uint32_t GetSeed() const { return seed_; }
+
+ void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
+
+ bool ShouldStopBgThread() { return should_stop_bg_thread_; }
+
+ void IncBgThreadsFinished() { ++bg_thread_finished_; }
+
+ bool BgThreadsFinished() const {
+ return bg_thread_finished_ == num_bg_threads_;
+ }
+
+ bool ShouldVerifyAtBeginning() const {
+ return expected_mmap_buffer_.get() != nullptr;
+ }
+
+ bool PrintingVerificationResults() {
+ bool tmp = false;
+ return !printing_verification_results_.compare_exchange_strong(
+ tmp, true, std::memory_order_relaxed);
+ }
+
+ void FinishPrintingVerificationResults() {
+ printing_verification_results_.store(false, std::memory_order_relaxed);
+ }
+
+ private:
+ port::Mutex mu_;
+ port::CondVar cv_;
+ const uint32_t seed_;
+ const int64_t max_key_;
+ const uint32_t log2_keys_per_lock_;
+ const int num_threads_;
+ long num_initialized_;
+ long num_populated_;
+ long vote_reopen_;
+ long num_done_;
+ bool start_;
+ bool start_verify_;
+ int num_bg_threads_;
+ bool should_stop_bg_thread_;
+ int bg_thread_finished_;
+ StressTest* stress_test_;
+ std::atomic<bool> verification_failure_;
+ std::atomic<bool> should_stop_test_;
+
+ // Keys that should not be overwritten
+ std::unordered_set<size_t> no_overwrite_ids_;
+
+ std::atomic<uint32_t>* values_;
+ std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_;
+ // Has to make it owned by a smart ptr as port::Mutex is not copyable
+ // and storing it in the container may require copying depending on the impl.
+ std::vector<std::vector<std::unique_ptr<port::Mutex>>> key_locks_;
+ std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
+ std::atomic<bool> printing_verification_results_;
+};
+
+// Per-thread state for concurrent executions of the same benchmark.
+struct ThreadState {
+ uint32_t tid; // 0..n-1
+ Random rand; // Has different seeds for different threads
+ SharedState* shared;
+ Stats stats;
+ struct SnapshotState {
+ const Snapshot* snapshot;
+ // The cf from which we did a Get at this snapshot
+ int cf_at;
+ // The name of the cf at the time that we did a read
+ std::string cf_at_name;
+ // The key with which we did a Get at this snapshot
+ std::string key;
+ // The status of the Get
+ Status status;
+ // The value of the Get
+ std::string value;
+ // optional state of all keys in the db
+ std::vector<bool>* key_vec;
+ };
+ std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue;
+
+ ThreadState(uint32_t index, SharedState* _shared)
+ : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
+};
+} // namespace ROCKSDB_NAMESPACE
+#endif // GFLAGS