From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/rocksdb/db/db_impl/db_secondary_test.cc | 869 ++++++++++++++++++++++++++++ 1 file changed, 869 insertions(+) create mode 100644 src/rocksdb/db/db_impl/db_secondary_test.cc (limited to 'src/rocksdb/db/db_impl/db_secondary_test.cc') diff --git a/src/rocksdb/db/db_impl/db_secondary_test.cc b/src/rocksdb/db/db_impl/db_secondary_test.cc new file mode 100644 index 000000000..0b34181de --- /dev/null +++ b/src/rocksdb/db/db_impl/db_secondary_test.cc @@ -0,0 +1,869 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_impl/db_impl_secondary.h" +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/fault_injection_test_env.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { + +#ifndef ROCKSDB_LITE +class DBSecondaryTest : public DBTestBase { + public: + DBSecondaryTest() + : DBTestBase("/db_secondary_test"), + secondary_path_(), + handles_secondary_(), + db_secondary_(nullptr) { + secondary_path_ = + test::PerThreadDBPath(env_, "/db_secondary_test_secondary"); + } + + ~DBSecondaryTest() override { + CloseSecondary(); + if (getenv("KEEP_DB") != nullptr) { + fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str()); + } else { + Options options; + options.env = env_; + EXPECT_OK(DestroyDB(secondary_path_, options)); + } + } + + protected: + Status ReopenAsSecondary(const Options& options) { + return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_); + } + + void OpenSecondary(const Options& options); + + void OpenSecondaryWithColumnFamilies( + const std::vector& column_families, const Options& options); + + void CloseSecondary() { + for (auto h : handles_secondary_) { + db_secondary_->DestroyColumnFamilyHandle(h); + } + handles_secondary_.clear(); + delete db_secondary_; + db_secondary_ = nullptr; + } + + DBImplSecondary* db_secondary_full() { + return static_cast(db_secondary_); + } + + void CheckFileTypeCounts(const std::string& dir, int expected_log, + int expected_sst, int expected_manifest) const; + + std::string secondary_path_; + std::vector handles_secondary_; + DB* db_secondary_; +}; + +void DBSecondaryTest::OpenSecondary(const Options& options) { + Status s = + DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_); + ASSERT_OK(s); +} + +void DBSecondaryTest::OpenSecondaryWithColumnFamilies( + const std::vector& column_families, const Options& options) { + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + for (const auto& cf_name : column_families) { + cf_descs.emplace_back(cf_name, options); + } + Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs, + &handles_secondary_, &db_secondary_); + ASSERT_OK(s); +} + +void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir, + int expected_log, int expected_sst, + int expected_manifest) const { + std::vector filenames; + env_->GetChildren(dir, &filenames); + + int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0; + for (auto file : filenames) { + uint64_t number; + FileType type; + if (ParseFileName(file, &number, &type)) { + log_cnt += (type == kLogFile); + sst_cnt += (type == kTableFile); + manifest_cnt += (type == kDescriptorFile); + } + } + ASSERT_EQ(expected_log, log_cnt); + ASSERT_EQ(expected_sst, sst_cnt); + ASSERT_EQ(expected_manifest, manifest_cnt); +} + +TEST_F(DBSecondaryTest, ReopenAsSecondary) { + Options options; + options.env = env_; + Reopen(options); + ASSERT_OK(Put("foo", "foo_value")); + ASSERT_OK(Put("bar", "bar_value")); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + Close(); + + ASSERT_OK(ReopenAsSecondary(options)); + ASSERT_EQ("foo_value", Get("foo")); + ASSERT_EQ("bar_value", Get("bar")); + ReadOptions ropts; + ropts.verify_checksums = true; + auto db1 = static_cast(db_); + ASSERT_NE(nullptr, db1); + Iterator* iter = db1->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + if (0 == count) { + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value", iter->value().ToString()); + } else if (1 == count) { + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value", iter->value().ToString()); + } + ++count; + } + delete iter; + ASSERT_EQ(2, count); +} + +TEST_F(DBSecondaryTest, OpenAsSecondary) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ReadOptions ropts; + ropts.verify_checksums = true; + const auto verify_db_func = [&](const std::string& foo_val, + const std::string& bar_val) { + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ(foo_val, value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ(bar_val, value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ(foo_val, iter->value().ToString()); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ(bar_val, iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; + }; + + verify_db_func("foo_value2", "bar_value2"); + + ASSERT_OK(Put("foo", "new_foo_value")); + ASSERT_OK(Put("bar", "new_bar_value")); + ASSERT_OK(Flush()); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value", "new_bar_value"); +} + +namespace { +class TraceFileEnv : public EnvWrapper { + public: + explicit TraceFileEnv(Env* _target) : EnvWrapper(_target) {} + Status NewRandomAccessFile(const std::string& f, + std::unique_ptr* r, + const EnvOptions& env_options) override { + class TracedRandomAccessFile : public RandomAccessFile { + public: + TracedRandomAccessFile(std::unique_ptr&& target, + std::atomic& counter) + : target_(std::move(target)), files_closed_(counter) {} + ~TracedRandomAccessFile() override { + files_closed_.fetch_add(1, std::memory_order_relaxed); + } + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + return target_->Read(offset, n, result, scratch); + } + + private: + std::unique_ptr target_; + std::atomic& files_closed_; + }; + Status s = target()->NewRandomAccessFile(f, r, env_options); + if (s.ok()) { + r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_)); + } + return s; + } + + int files_closed() const { + return files_closed_.load(std::memory_order_relaxed); + } + + private: + std::atomic files_closed_{0}; +}; +} // namespace + +TEST_F(DBSecondaryTest, SecondaryCloseFiles) { + Options options; + options.env = env_; + options.max_open_files = 1; + options.disable_auto_compactions = true; + Reopen(options); + Options options1; + std::unique_ptr traced_env(new TraceFileEnv(env_)); + options1.env = traced_env.get(); + OpenSecondary(options1); + + static const auto verify_db = [&]() { + std::unique_ptr iter1(dbfull()->NewIterator(ReadOptions())); + std::unique_ptr iter2(db_secondary_->NewIterator(ReadOptions())); + for (iter1->SeekToFirst(), iter2->SeekToFirst(); + iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) { + ASSERT_EQ(iter1->key(), iter2->key()); + ASSERT_EQ(iter1->value(), iter2->value()); + } + ASSERT_FALSE(iter1->Valid()); + ASSERT_FALSE(iter2->Valid()); + }; + + ASSERT_OK(Put("a", "value")); + ASSERT_OK(Put("c", "value")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(); + + ASSERT_OK(Put("b", "value")); + ASSERT_OK(Put("d", "value")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(); + + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ASSERT_EQ(2, static_cast(traced_env.get())->files_closed()); + + Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}}); + ASSERT_TRUE(s.IsNotSupported()); + CloseSecondary(); +} + +TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + } + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + ReadOptions ropts; + ropts.verify_checksums = true; + const auto verify_db_func = [&](const std::string& foo_val, + const std::string& bar_val) { + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ(foo_val, value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ(bar_val, value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ(foo_val, iter->value().ToString()); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ(bar_val, iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; + }; + + verify_db_func("foo_value2", "bar_value2"); + + ASSERT_OK(Put("foo", "new_foo_value")); + ASSERT_OK(Put("bar", "new_bar_value")); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value", "new_bar_value"); + + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "new_foo_value_1")); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value_1", "new_bar_value"); +} + +TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) { + Options options; + options.env = env_; + CreateAndReopenWithCF({"pikachu"}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options1); + cf_descs.emplace_back("pikachu", options1); + cf_descs.emplace_back("eevee", options1); + Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs, + &handles_secondary_, &db_secondary_); + ASSERT_NOK(s); +} + +TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) { + Options options; + options.env = env_; + CreateAndReopenWithCF({"pikachu"}, options); + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + ASSERT_EQ(0, handles_secondary_.size()); + ASSERT_NE(nullptr, db_secondary_); + + ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value")); + ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value")); + ASSERT_OK(Flush(0 /*cf*/)); + ASSERT_OK(Flush(1 /*cf*/)); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value", value); +} + +TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) { + Options options; + options.env = env_; + Reopen(options); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency( + {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0", + "VersionSet::ProcessManifestWrites:BeforeNewManifest"}, + {"VersionSet::ProcessManifestWrites:AfterNewManifest", + "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:" + "1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + // Make sure db calls RecoverLogFiles so as to trigger a manifest write, + // which causes the db to switch to a new MANIFEST upon start. + port::Thread ro_db_thread([&]() { + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + CloseSecondary(); + }); + Reopen(options); + ro_db_thread.join(); +} + +TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; +} + +TEST_F(DBSecondaryTest, MissingTableFile) { + int table_files_not_exist = 0; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers", + [&](void* arg) { + Status s = *reinterpret_cast(arg); + if (s.IsPathNotFound()) { + ++table_files_not_exist; + } else if (!s.ok()) { + assert(false); // Should not reach here + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_NE(nullptr, db_secondary_full()); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value)); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist); + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; +} + +TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) { + Options options; + options.env = env_; + const std::string kCfName1 = "pikachu"; + CreateAndReopenWithCF({kCfName1}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondaryWithColumnFamilies({kCfName1}, options1); + ASSERT_EQ(2, handles_secondary_.size()); + + ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1")); + ASSERT_OK(Flush(1 /*cf*/)); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); + ASSERT_EQ("foo_val_1", value); + + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + Close(); + CheckFileTypeCounts(dbname_, 1, 0, 1); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + value.clear(); + ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); + ASSERT_EQ("foo_val_1", value); +} + +TEST_F(DBSecondaryTest, SwitchManifest) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + const int kNumFiles = options.level0_file_num_compaction_trigger - 1; + // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1, + // ..., 9. + const int kNumKeys = 10; + // Create two sst + for (int i = 0; i != kNumFiles; ++i) { + for (int j = 0; j != kNumKeys; ++j) { + ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i))); + } + ASSERT_OK(Flush()); + } + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + const auto& range_scan_db = [&]() { + ReadOptions tmp_ropts; + tmp_ropts.total_order_seek = true; + tmp_ropts.verify_checksums = true; + std::unique_ptr iter(db_secondary_->NewIterator(tmp_ropts)); + int cnt = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) { + ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString()); + ASSERT_EQ("value_" + std::to_string(kNumFiles - 1), + iter->value().ToString()); + } + }; + + range_scan_db(); + + // While secondary instance still keeps old MANIFEST open, we close primary, + // restart primary, performs full compaction, close again, restart again so + // that next time secondary tries to catch up with primary, the secondary + // will skip the MANIFEST in middle. + Reopen(options); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + Reopen(options); + ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + range_scan_db(); +} + +// Here, "Snapshot" refers to the version edits written by +// VersionSet::WriteSnapshot() at the beginning of the new MANIFEST after +// switching from the old one. +TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) { + Options options; + options.env = env_; + options.disable_auto_compactions = true; + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + ASSERT_OK(Put("0", "value0")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + std::string value; + ReadOptions ropts; + ropts.verify_checksums = true; + ASSERT_OK(db_secondary_->Get(ropts, "0", &value)); + ASSERT_EQ("value0", value); + + Reopen(options); + ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); +} + +TEST_F(DBSecondaryTest, SwitchWAL) { + const int kNumKeysPerMemtable = 1; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + new SpecialSkipListFactory(kNumKeysPerMemtable)); + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + const auto& verify_db = [](DB* db1, DB* db2) { + ASSERT_NE(nullptr, db1); + ASSERT_NE(nullptr, db2); + ReadOptions read_opts; + read_opts.verify_checksums = true; + std::unique_ptr it1(db1->NewIterator(read_opts)); + std::unique_ptr it2(db2->NewIterator(read_opts)); + it1->SeekToFirst(); + it2->SeekToFirst(); + for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) { + ASSERT_EQ(it1->key(), it2->key()); + ASSERT_EQ(it1->value(), it2->value()); + } + ASSERT_FALSE(it1->Valid()); + ASSERT_FALSE(it2->Valid()); + + for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { + std::string value; + ASSERT_OK(db2->Get(read_opts, it1->key(), &value)); + ASSERT_EQ(it1->value(), value); + } + for (it2->SeekToFirst(); it2->Valid(); it2->Next()) { + std::string value; + ASSERT_OK(db1->Get(read_opts, it2->key(), &value)); + ASSERT_EQ(it2->value(), value); + } + }; + for (int k = 0; k != 16; ++k) { + ASSERT_OK(Put("key" + std::to_string(k), "value" + std::to_string(k))); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(dbfull(), db_secondary_); + } +} + +TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { + const int kNumKeysPerMemtable = 1; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BackgroundCallFlush:ContextCleanedUp", + "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}}); + SyncPoint::GetInstance()->EnableProcessing(); + const std::string kCFName1 = "pikachu"; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + new SpecialSkipListFactory(kNumKeysPerMemtable)); + CreateAndReopenWithCF({kCFName1}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondaryWithColumnFamilies({kCFName1}, options1); + ASSERT_EQ(2, handles_secondary_.size()); + + const auto& verify_db = [](DB* db1, + const std::vector& handles1, + DB* db2, + const std::vector& handles2) { + ASSERT_NE(nullptr, db1); + ASSERT_NE(nullptr, db2); + ReadOptions read_opts; + read_opts.verify_checksums = true; + ASSERT_EQ(handles1.size(), handles2.size()); + for (size_t i = 0; i != handles1.size(); ++i) { + std::unique_ptr it1(db1->NewIterator(read_opts, handles1[i])); + std::unique_ptr it2(db2->NewIterator(read_opts, handles2[i])); + it1->SeekToFirst(); + it2->SeekToFirst(); + for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) { + ASSERT_EQ(it1->key(), it2->key()); + ASSERT_EQ(it1->value(), it2->value()); + } + ASSERT_FALSE(it1->Valid()); + ASSERT_FALSE(it2->Valid()); + + for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { + std::string value; + ASSERT_OK(db2->Get(read_opts, handles2[i], it1->key(), &value)); + ASSERT_EQ(it1->value(), value); + } + for (it2->SeekToFirst(); it2->Valid(); it2->Next()) { + std::string value; + ASSERT_OK(db1->Get(read_opts, handles1[i], it2->key(), &value)); + ASSERT_EQ(it2->value(), value); + } + } + }; + for (int k = 0; k != 8; ++k) { + ASSERT_OK( + Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); + ASSERT_OK( + Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); + TEST_SYNC_POINT( + "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(dbfull(), handles_, db_secondary_, handles_secondary_); + SyncPoint::GetInstance()->ClearTrace(); + } +} + +TEST_F(DBSecondaryTest, CatchUpAfterFlush) { + const int kNumKeysPerMemtable = 16; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + new SpecialSkipListFactory(kNumKeysPerMemtable)); + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + WriteOptions write_opts; + WriteBatch wb; + wb.Put("key0", "value0"); + wb.Put("key1", "value1"); + ASSERT_OK(dbfull()->Write(write_opts, &wb)); + ReadOptions read_opts; + std::unique_ptr iter1(db_secondary_->NewIterator(read_opts)); + iter1->Seek("key0"); + ASSERT_FALSE(iter1->Valid()); + iter1->Seek("key1"); + ASSERT_FALSE(iter1->Valid()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + iter1->Seek("key0"); + ASSERT_FALSE(iter1->Valid()); + iter1->Seek("key1"); + ASSERT_FALSE(iter1->Valid()); + std::unique_ptr iter2(db_secondary_->NewIterator(read_opts)); + iter2->Seek("key0"); + ASSERT_TRUE(iter2->Valid()); + ASSERT_EQ("value0", iter2->value()); + iter2->Seek("key1"); + ASSERT_TRUE(iter2->Valid()); + ASSERT_EQ("value1", iter2->value()); + + { + WriteBatch wb1; + wb1.Put("key0", "value01"); + wb1.Put("key1", "value11"); + ASSERT_OK(dbfull()->Write(write_opts, &wb1)); + } + + { + WriteBatch wb2; + wb2.Put("key0", "new_value0"); + wb2.Delete("key1"); + ASSERT_OK(dbfull()->Write(write_opts, &wb2)); + } + + ASSERT_OK(Flush()); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + std::unique_ptr iter3(db_secondary_->NewIterator(read_opts)); + // iter3 should not see value01 and value11 at all. + iter3->Seek("key0"); + ASSERT_TRUE(iter3->Valid()); + ASSERT_EQ("new_value0", iter3->value()); + iter3->Seek("key1"); + ASSERT_FALSE(iter3->Valid()); +} + +TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) { + bool called = false; + Options options; + options.env = env_; + options.disable_auto_compactions = true; + Reopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "DBImplSecondary::CheckConsistency:AfterFirstAttempt", [&](void* arg) { + ASSERT_NE(nullptr, arg); + called = true; + auto* s = reinterpret_cast(arg); + ASSERT_NOK(*s); + }); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CheckConsistency:AfterGetLiveFilesMetaData", + "BackgroundCallCompaction:0"}, + {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", + "DBImpl::CheckConsistency:BeforeGetFileSize"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("a", "value0")); + ASSERT_OK(Put("c", "value0")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("b", "value1")); + ASSERT_OK(Put("d", "value1")); + ASSERT_OK(Flush()); + port::Thread thread([this]() { + Options opts; + opts.env = env_; + opts.max_open_files = -1; + OpenSecondary(opts); + }); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + thread.join(); + ASSERT_TRUE(called); +} +#endif //! ROCKSDB_LITE + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} -- cgit v1.2.3