diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/db_secondary_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/db_secondary_test.cc')
-rw-r--r-- | src/rocksdb/db/db_secondary_test.cc | 1693 |
1 files changed, 1693 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_secondary_test.cc b/src/rocksdb/db/db_secondary_test.cc new file mode 100644 index 000000000..20d7534e0 --- /dev/null +++ b/src/rocksdb/db/db_secondary_test.cc @@ -0,0 +1,1693 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_impl/db_impl_secondary.h" +#include "db/db_test_util.h" +#include "db/db_with_timestamp_test_util.h" +#include "port/stack_trace.h" +#include "rocksdb/utilities/transaction_db.h" +#include "test_util/sync_point.h" +#include "test_util/testutil.h" +#include "utilities/fault_injection_env.h" + +namespace ROCKSDB_NAMESPACE { + +#ifndef ROCKSDB_LITE +class DBSecondaryTestBase : public DBBasicTestWithTimestampBase { + public: + explicit DBSecondaryTestBase(const std::string& dbname) + : DBBasicTestWithTimestampBase(dbname), + secondary_path_(), + handles_secondary_(), + db_secondary_(nullptr) { + secondary_path_ = + test::PerThreadDBPath(env_, "/db_secondary_test_secondary"); + } + + ~DBSecondaryTestBase() override { + CloseSecondary(); + if (getenv("KEEP_DB") != nullptr) { + fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str()); + } else { + Options options; + options.env = env_; + EXPECT_OK(DestroyDB(secondary_path_, options)); + } + } + + protected: + Status ReopenAsSecondary(const Options& options) { + return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_); + } + + void OpenSecondary(const Options& options); + + Status TryOpenSecondary(const Options& options); + + void OpenSecondaryWithColumnFamilies( + const std::vector<std::string>& column_families, const Options& options); + + void CloseSecondary() { + for (auto h : handles_secondary_) { + ASSERT_OK(db_secondary_->DestroyColumnFamilyHandle(h)); + } + handles_secondary_.clear(); + delete db_secondary_; + db_secondary_ = nullptr; + } + + DBImplSecondary* db_secondary_full() { + return static_cast<DBImplSecondary*>(db_secondary_); + } + + void CheckFileTypeCounts(const std::string& dir, int expected_log, + int expected_sst, int expected_manifest) const; + + std::string secondary_path_; + std::vector<ColumnFamilyHandle*> handles_secondary_; + DB* db_secondary_; +}; + +void DBSecondaryTestBase::OpenSecondary(const Options& options) { + ASSERT_OK(TryOpenSecondary(options)); +} + +Status DBSecondaryTestBase::TryOpenSecondary(const Options& options) { + Status s = + DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_); + return s; +} + +void DBSecondaryTestBase::OpenSecondaryWithColumnFamilies( + const std::vector<std::string>& column_families, const Options& options) { + std::vector<ColumnFamilyDescriptor> cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + for (const auto& cf_name : column_families) { + cf_descs.emplace_back(cf_name, options); + } + Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs, + &handles_secondary_, &db_secondary_); + ASSERT_OK(s); +} + +void DBSecondaryTestBase::CheckFileTypeCounts(const std::string& dir, + int expected_log, + int expected_sst, + int expected_manifest) const { + std::vector<std::string> filenames; + ASSERT_OK(env_->GetChildren(dir, &filenames)); + + int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0; + for (auto file : filenames) { + uint64_t number; + FileType type; + if (ParseFileName(file, &number, &type)) { + log_cnt += (type == kWalFile); + sst_cnt += (type == kTableFile); + manifest_cnt += (type == kDescriptorFile); + } + } + ASSERT_EQ(expected_log, log_cnt); + ASSERT_EQ(expected_sst, sst_cnt); + ASSERT_EQ(expected_manifest, manifest_cnt); +} + +class DBSecondaryTest : public DBSecondaryTestBase { + public: + explicit DBSecondaryTest() : DBSecondaryTestBase("db_secondary_test") {} +}; + +TEST_F(DBSecondaryTest, FailOpenIfLoggerCreationFail) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + Reopen(options); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "rocksdb::CreateLoggerFromOptions:AfterGetPath", [&](void* arg) { + auto* s = reinterpret_cast<Status*>(arg); + assert(s); + *s = Status::IOError("Injected"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + options.max_open_files = -1; + Status s = TryOpenSecondary(options); + ASSERT_EQ(nullptr, options.info_log); + ASSERT_TRUE(s.IsIOError()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBSecondaryTest, NonExistingDb) { + Destroy(last_options_); + + Options options = GetDefaultOptions(); + options.env = env_; + options.max_open_files = -1; + const std::string dbname = "/doesnt/exist"; + Status s = + DB::OpenAsSecondary(options, dbname, secondary_path_, &db_secondary_); + ASSERT_TRUE(s.IsIOError()); +} + +TEST_F(DBSecondaryTest, ReopenAsSecondary) { + Options options; + options.env = env_; + Reopen(options); + ASSERT_OK(Put("foo", "foo_value")); + ASSERT_OK(Put("bar", "bar_value")); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + Close(); + + ASSERT_OK(ReopenAsSecondary(options)); + ASSERT_EQ("foo_value", Get("foo")); + ASSERT_EQ("bar_value", Get("bar")); + ReadOptions ropts; + ropts.verify_checksums = true; + auto db1 = static_cast<DBImplSecondary*>(db_); + ASSERT_NE(nullptr, db1); + Iterator* iter = db1->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + if (0 == count) { + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value", iter->value().ToString()); + } else if (1 == count) { + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value", iter->value().ToString()); + } + ++count; + } + delete iter; + ASSERT_EQ(2, count); +} + +TEST_F(DBSecondaryTest, SimpleInternalCompaction) { + Options options; + options.env = env_; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + CompactionServiceInput input; + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + for (auto& file : meta.levels[0].files) { + ASSERT_EQ(0, meta.levels[0].level); + input.input_files.push_back(file.name); + } + ASSERT_EQ(input.input_files.size(), 3); + + input.output_level = 1; + ASSERT_OK(db_->GetDbIdentity(input.db_id)); + Close(); + + options.max_open_files = -1; + OpenSecondary(options); + auto cfh = db_secondary_->DefaultColumnFamily(); + + CompactionServiceResult result; + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input, &result)); + + ASSERT_EQ(result.output_files.size(), 1); + InternalKey smallest, largest; + smallest.DecodeFrom(result.output_files[0].smallest_internal_key); + largest.DecodeFrom(result.output_files[0].largest_internal_key); + ASSERT_EQ(smallest.user_key().ToString(), "bar"); + ASSERT_EQ(largest.user_key().ToString(), "foo"); + ASSERT_EQ(result.output_level, 1); + ASSERT_EQ(result.output_path, this->secondary_path_); + ASSERT_EQ(result.num_output_records, 2); + ASSERT_GT(result.bytes_written, 0); + ASSERT_OK(result.status); +} + +TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { + Options options; + options.env = env_; + options.disable_auto_compactions = true; + Reopen(options); + const int kRangeL2 = 10; + const int kRangeL1 = 30; + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i * kRangeL2), "value" + std::to_string(i))); + ASSERT_OK(Put(Key((i + 1) * kRangeL2 - 1), "value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + for (int i = 0; i < 5; i++) { + ASSERT_OK(Put(Key(i * kRangeL1), "value" + std::to_string(i))); + ASSERT_OK(Put(Key((i + 1) * kRangeL1 - 1), "value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(1); + for (int i = 0; i < 4; i++) { + ASSERT_OK(Put(Key(i * 30), "value" + std::to_string(i))); + ASSERT_OK(Put(Key(i * 30 + 50), "value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + + // pick 2 files on level 0 for compaction, which has 3 overlap files on L1 + CompactionServiceInput input1; + input1.input_files.push_back(meta.levels[0].files[2].name); + input1.input_files.push_back(meta.levels[0].files[3].name); + input1.input_files.push_back(meta.levels[1].files[0].name); + input1.input_files.push_back(meta.levels[1].files[1].name); + input1.input_files.push_back(meta.levels[1].files[2].name); + + input1.output_level = 1; + ASSERT_OK(db_->GetDbIdentity(input1.db_id)); + + options.max_open_files = -1; + Close(); + + OpenSecondary(options); + auto cfh = db_secondary_->DefaultColumnFamily(); + CompactionServiceResult result; + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input1, &result)); + ASSERT_OK(result.status); + + // pick 2 files on level 1 for compaction, which has 6 overlap files on L2 + CompactionServiceInput input2; + input2.input_files.push_back(meta.levels[1].files[1].name); + input2.input_files.push_back(meta.levels[1].files[2].name); + for (int i = 3; i < 9; i++) { + input2.input_files.push_back(meta.levels[2].files[i].name); + } + + input2.output_level = 2; + input2.db_id = input1.db_id; + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input2, &result)); + ASSERT_OK(result.status); + + CloseSecondary(); + + // delete all l2 files, without update manifest + for (auto& file : meta.levels[2].files) { + ASSERT_OK(env_->DeleteFile(dbname_ + file.name)); + } + OpenSecondary(options); + cfh = db_secondary_->DefaultColumnFamily(); + Status s = db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input2, &result); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_OK(result.status); + + // TODO: L0 -> L1 compaction should success, currently version is not built + // if files is missing. + // ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(OpenAndCompactOptions(), + // cfh, input1, &result)); +} + +TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + CompactionServiceInput input; + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + for (auto& file : meta.levels[0].files) { + ASSERT_EQ(0, meta.levels[0].level); + input.input_files.push_back(file.name); + } + ASSERT_EQ(input.input_files.size(), 3); + + input.output_level = 1; + ASSERT_OK(db_->GetDbIdentity(input.db_id)); + + // trigger compaction to delete the files for secondary instance compaction + ASSERT_OK(Put("foo", "foo_value" + std::to_string(3))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(3))); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + Close(); + + options.max_open_files = -1; + OpenSecondary(options); + auto cfh = db_secondary_->DefaultColumnFamily(); + + CompactionServiceResult result; + Status s = db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input, &result); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_OK(result.status); +} + +TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + CompactionServiceInput input; + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + for (auto& file : meta.levels[0].files) { + ASSERT_EQ(0, meta.levels[0].level); + input.input_files.push_back(file.name); + } + ASSERT_EQ(input.input_files.size(), 3); + + input.output_level = 1; + ASSERT_OK(db_->GetDbIdentity(input.db_id)); + + Close(); + + ASSERT_OK(env_->DeleteFile(dbname_ + input.input_files[0])); + + options.max_open_files = -1; + OpenSecondary(options); + auto cfh = db_secondary_->DefaultColumnFamily(); + + CompactionServiceResult result; + Status s = db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input, &result); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_OK(result.status); + + input.input_files.erase(input.input_files.begin()); + + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input, &result)); + ASSERT_OK(result.status); +} + +TEST_F(DBSecondaryTest, OpenAsSecondary) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ReadOptions ropts; + ropts.verify_checksums = true; + const auto verify_db_func = [&](const std::string& foo_val, + const std::string& bar_val) { + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ(foo_val, value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ(bar_val, value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ(foo_val, iter->value().ToString()); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ(bar_val, iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; + }; + + verify_db_func("foo_value2", "bar_value2"); + + ASSERT_OK(Put("foo", "new_foo_value")); + ASSERT_OK(Put("bar", "new_bar_value")); + ASSERT_OK(Flush()); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value", "new_bar_value"); +} + +namespace { +class TraceFileEnv : public EnvWrapper { + public: + explicit TraceFileEnv(Env* _target) : EnvWrapper(_target) {} + static const char* kClassName() { return "TraceFileEnv"; } + const char* Name() const override { return kClassName(); } + + Status NewRandomAccessFile(const std::string& f, + std::unique_ptr<RandomAccessFile>* r, + const EnvOptions& env_options) override { + class TracedRandomAccessFile : public RandomAccessFile { + public: + TracedRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target, + std::atomic<int>& counter) + : target_(std::move(target)), files_closed_(counter) {} + ~TracedRandomAccessFile() override { + files_closed_.fetch_add(1, std::memory_order_relaxed); + } + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + return target_->Read(offset, n, result, scratch); + } + + private: + std::unique_ptr<RandomAccessFile> target_; + std::atomic<int>& files_closed_; + }; + Status s = target()->NewRandomAccessFile(f, r, env_options); + if (s.ok()) { + r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_)); + } + return s; + } + + int files_closed() const { + return files_closed_.load(std::memory_order_relaxed); + } + + private: + std::atomic<int> files_closed_{0}; +}; +} // anonymous namespace + +TEST_F(DBSecondaryTest, SecondaryCloseFiles) { + Options options; + options.env = env_; + options.max_open_files = 1; + options.disable_auto_compactions = true; + Reopen(options); + Options options1; + std::unique_ptr<Env> traced_env(new TraceFileEnv(env_)); + options1.env = traced_env.get(); + OpenSecondary(options1); + + static const auto verify_db = [&]() { + std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions())); + std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(ReadOptions())); + for (iter1->SeekToFirst(), iter2->SeekToFirst(); + iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) { + ASSERT_EQ(iter1->key(), iter2->key()); + ASSERT_EQ(iter1->value(), iter2->value()); + } + ASSERT_FALSE(iter1->Valid()); + ASSERT_FALSE(iter2->Valid()); + }; + + ASSERT_OK(Put("a", "value")); + ASSERT_OK(Put("c", "value")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(); + + ASSERT_OK(Put("b", "value")); + ASSERT_OK(Put("d", "value")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(); + + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ASSERT_EQ(2, static_cast<TraceFileEnv*>(traced_env.get())->files_closed()); + + Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}}); + ASSERT_TRUE(s.IsNotSupported()); + CloseSecondary(); +} + +TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + } + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + ReadOptions ropts; + ropts.verify_checksums = true; + const auto verify_db_func = [&](const std::string& foo_val, + const std::string& bar_val) { + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ(foo_val, value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ(bar_val, value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ(foo_val, iter->value().ToString()); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ(bar_val, iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; + }; + + verify_db_func("foo_value2", "bar_value2"); + + ASSERT_OK(Put("foo", "new_foo_value")); + ASSERT_OK(Put("bar", "new_bar_value")); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value", "new_bar_value"); + + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "new_foo_value_1")); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value_1", "new_bar_value"); +} + +TEST_F(DBSecondaryTest, SecondaryTailingBug_ISSUE_8467) { + Options options; + options.env = env_; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + } + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + const auto verify_db = [&](const std::string& foo_val, + const std::string& bar_val) { + std::string value; + ReadOptions ropts; + Status s = db_secondary_->Get(ropts, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(foo_val, value); + + s = db_secondary_->Get(ropts, "bar", &value); + ASSERT_OK(s); + ASSERT_EQ(bar_val, value); + }; + + for (int i = 0; i < 2; ++i) { + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db("foo_value2", "bar_value2"); + } +} + +TEST_F(DBSecondaryTest, RefreshIterator) { + Options options; + options.env = env_; + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + std::unique_ptr<Iterator> it(db_secondary_->NewIterator(ReadOptions())); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + if (0 == i) { + it->Seek("foo"); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + + ASSERT_OK(it->Refresh()); + + it->Seek("foo"); + ASSERT_OK(it->status()); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("foo", it->key()); + ASSERT_EQ("foo_value0", it->value()); + } else { + it->Seek("foo"); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("foo", it->key()); + ASSERT_EQ("foo_value" + std::to_string(i - 1), it->value()); + ASSERT_OK(it->status()); + + ASSERT_OK(it->Refresh()); + + it->Seek("foo"); + ASSERT_OK(it->status()); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("foo", it->key()); + ASSERT_EQ("foo_value" + std::to_string(i), it->value()); + } + } +} + +TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) { + Options options; + options.env = env_; + CreateAndReopenWithCF({"pikachu"}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + std::vector<ColumnFamilyDescriptor> cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options1); + cf_descs.emplace_back("pikachu", options1); + cf_descs.emplace_back("eevee", options1); + Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs, + &handles_secondary_, &db_secondary_); + ASSERT_NOK(s); +} + +TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) { + Options options; + options.env = env_; + CreateAndReopenWithCF({"pikachu"}, options); + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + ASSERT_EQ(0, handles_secondary_.size()); + ASSERT_NE(nullptr, db_secondary_); + + ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value")); + ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value")); + ASSERT_OK(Flush(0 /*cf*/)); + ASSERT_OK(Flush(1 /*cf*/)); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value", value); +} + +TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) { + Options options; + options.env = env_; + Reopen(options); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency( + {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0", + "VersionSet::ProcessManifestWrites:BeforeNewManifest"}, + {"DBImpl::Open:AfterDeleteFiles", + "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:" + "1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread ro_db_thread([&]() { + Options options1; + options1.env = env_; + options1.max_open_files = -1; + Status s = TryOpenSecondary(options1); + ASSERT_TRUE(s.IsTryAgain()); + + // Try again + OpenSecondary(options1); + CloseSecondary(); + }); + Reopen(options); + ro_db_thread.join(); +} + +TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; +} + +TEST_F(DBSecondaryTest, MissingTableFile) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_NE(nullptr, db_secondary_full()); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value)); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + Iterator* iter = db_secondary_->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; +} + +TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) { + Options options; + options.env = env_; + const std::string kCfName1 = "pikachu"; + CreateAndReopenWithCF({kCfName1}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondaryWithColumnFamilies({kCfName1}, options1); + ASSERT_EQ(2, handles_secondary_.size()); + + ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1")); + ASSERT_OK(Flush(1 /*cf*/)); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); + ASSERT_EQ("foo_val_1", value); + + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + Close(); + CheckFileTypeCounts(dbname_, 1, 0, 1); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + value.clear(); + ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); + ASSERT_EQ("foo_val_1", value); +} + +TEST_F(DBSecondaryTest, SwitchManifest) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + const std::string cf1_name("test_cf"); + CreateAndReopenWithCF({cf1_name}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondaryWithColumnFamilies({kDefaultColumnFamilyName, cf1_name}, + options1); + + const int kNumFiles = options.level0_file_num_compaction_trigger - 1; + // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1, + // ..., 9. + const int kNumKeys = 10; + // Create two sst + for (int i = 0; i != kNumFiles; ++i) { + for (int j = 0; j != kNumKeys; ++j) { + ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i))); + } + ASSERT_OK(Flush()); + } + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + const auto& range_scan_db = [&]() { + ReadOptions tmp_ropts; + tmp_ropts.total_order_seek = true; + tmp_ropts.verify_checksums = true; + std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts)); + int cnt = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) { + ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString()); + ASSERT_EQ("value_" + std::to_string(kNumFiles - 1), + iter->value().ToString()); + } + }; + + range_scan_db(); + + // While secondary instance still keeps old MANIFEST open, we close primary, + // restart primary, performs full compaction, close again, restart again so + // that next time secondary tries to catch up with primary, the secondary + // will skip the MANIFEST in middle. + ReopenWithColumnFamilies({kDefaultColumnFamilyName, cf1_name}, options); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ReopenWithColumnFamilies({kDefaultColumnFamilyName, cf1_name}, options); + ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + range_scan_db(); +} + +TEST_F(DBSecondaryTest, SwitchManifestTwice) { + Options options; + options.env = env_; + options.disable_auto_compactions = true; + const std::string cf1_name("test_cf"); + CreateAndReopenWithCF({cf1_name}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondaryWithColumnFamilies({kDefaultColumnFamilyName, cf1_name}, + options1); + + ASSERT_OK(Put("0", "value0")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + std::string value; + ReadOptions ropts; + ropts.verify_checksums = true; + ASSERT_OK(db_secondary_->Get(ropts, "0", &value)); + ASSERT_EQ("value0", value); + + ReopenWithColumnFamilies({kDefaultColumnFamilyName, cf1_name}, options); + ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + ReopenWithColumnFamilies({kDefaultColumnFamilyName, cf1_name}, options); + ASSERT_OK(Put("0", "value1")); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + + ASSERT_OK(db_secondary_->Get(ropts, "0", &value)); + ASSERT_EQ("value1", value); +} + +TEST_F(DBSecondaryTest, DISABLED_SwitchWAL) { + const int kNumKeysPerMemtable = 1; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerMemtable)); + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + const auto& verify_db = [](DB* db1, DB* db2) { + ASSERT_NE(nullptr, db1); + ASSERT_NE(nullptr, db2); + ReadOptions read_opts; + read_opts.verify_checksums = true; + std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts)); + std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts)); + it1->SeekToFirst(); + it2->SeekToFirst(); + for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) { + ASSERT_EQ(it1->key(), it2->key()); + ASSERT_EQ(it1->value(), it2->value()); + } + ASSERT_FALSE(it1->Valid()); + ASSERT_FALSE(it2->Valid()); + + for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { + std::string value; + ASSERT_OK(db2->Get(read_opts, it1->key(), &value)); + ASSERT_EQ(it1->value(), value); + } + for (it2->SeekToFirst(); it2->Valid(); it2->Next()) { + std::string value; + ASSERT_OK(db1->Get(read_opts, it2->key(), &value)); + ASSERT_EQ(it2->value(), value); + } + }; + for (int k = 0; k != 16; ++k) { + ASSERT_OK(Put("key" + std::to_string(k), "value" + std::to_string(k))); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(dbfull(), db_secondary_); + } +} + +TEST_F(DBSecondaryTest, DISABLED_SwitchWALMultiColumnFamilies) { + const int kNumKeysPerMemtable = 1; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BackgroundCallFlush:ContextCleanedUp", + "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}}); + SyncPoint::GetInstance()->EnableProcessing(); + const std::string kCFName1 = "pikachu"; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerMemtable)); + CreateAndReopenWithCF({kCFName1}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondaryWithColumnFamilies({kCFName1}, options1); + ASSERT_EQ(2, handles_secondary_.size()); + + const auto& verify_db = [](DB* db1, + const std::vector<ColumnFamilyHandle*>& handles1, + DB* db2, + const std::vector<ColumnFamilyHandle*>& handles2) { + ASSERT_NE(nullptr, db1); + ASSERT_NE(nullptr, db2); + ReadOptions read_opts; + read_opts.verify_checksums = true; + ASSERT_EQ(handles1.size(), handles2.size()); + for (size_t i = 0; i != handles1.size(); ++i) { + std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts, handles1[i])); + std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts, handles2[i])); + it1->SeekToFirst(); + it2->SeekToFirst(); + for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) { + ASSERT_EQ(it1->key(), it2->key()); + ASSERT_EQ(it1->value(), it2->value()); + } + ASSERT_FALSE(it1->Valid()); + ASSERT_FALSE(it2->Valid()); + + for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { + std::string value; + ASSERT_OK(db2->Get(read_opts, handles2[i], it1->key(), &value)); + ASSERT_EQ(it1->value(), value); + } + for (it2->SeekToFirst(); it2->Valid(); it2->Next()) { + std::string value; + ASSERT_OK(db1->Get(read_opts, handles1[i], it2->key(), &value)); + ASSERT_EQ(it2->value(), value); + } + } + }; + for (int k = 0; k != 8; ++k) { + for (int j = 0; j < 2; ++j) { + ASSERT_OK(Put(0 /*cf*/, "key" + std::to_string(k), + "value" + std::to_string(k))); + ASSERT_OK(Put(1 /*cf*/, "key" + std::to_string(k), + "value" + std::to_string(k))); + } + TEST_SYNC_POINT( + "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(dbfull(), handles_, db_secondary_, handles_secondary_); + SyncPoint::GetInstance()->ClearTrace(); + } +} + +TEST_F(DBSecondaryTest, CatchUpAfterFlush) { + const int kNumKeysPerMemtable = 16; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerMemtable)); + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + WriteOptions write_opts; + WriteBatch wb; + ASSERT_OK(wb.Put("key0", "value0")); + ASSERT_OK(wb.Put("key1", "value1")); + ASSERT_OK(dbfull()->Write(write_opts, &wb)); + ReadOptions read_opts; + std::unique_ptr<Iterator> iter1(db_secondary_->NewIterator(read_opts)); + iter1->Seek("key0"); + ASSERT_FALSE(iter1->Valid()); + iter1->Seek("key1"); + ASSERT_FALSE(iter1->Valid()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + iter1->Seek("key0"); + ASSERT_FALSE(iter1->Valid()); + iter1->Seek("key1"); + ASSERT_FALSE(iter1->Valid()); + ASSERT_OK(iter1->status()); + std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(read_opts)); + iter2->Seek("key0"); + ASSERT_TRUE(iter2->Valid()); + ASSERT_EQ("value0", iter2->value()); + iter2->Seek("key1"); + ASSERT_TRUE(iter2->Valid()); + ASSERT_OK(iter2->status()); + ASSERT_EQ("value1", iter2->value()); + + { + WriteBatch wb1; + ASSERT_OK(wb1.Put("key0", "value01")); + ASSERT_OK(wb1.Put("key1", "value11")); + ASSERT_OK(dbfull()->Write(write_opts, &wb1)); + } + + { + WriteBatch wb2; + ASSERT_OK(wb2.Put("key0", "new_value0")); + ASSERT_OK(wb2.Delete("key1")); + ASSERT_OK(dbfull()->Write(write_opts, &wb2)); + } + + ASSERT_OK(Flush()); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + std::unique_ptr<Iterator> iter3(db_secondary_->NewIterator(read_opts)); + // iter3 should not see value01 and value11 at all. + iter3->Seek("key0"); + ASSERT_TRUE(iter3->Valid()); + ASSERT_EQ("new_value0", iter3->value()); + iter3->Seek("key1"); + ASSERT_FALSE(iter3->Valid()); + ASSERT_OK(iter3->status()); +} + +TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) { + bool called = false; + Options options; + options.env = env_; + options.disable_auto_compactions = true; + Reopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "DBImplSecondary::CheckConsistency:AfterFirstAttempt", [&](void* arg) { + ASSERT_NE(nullptr, arg); + called = true; + auto* s = reinterpret_cast<Status*>(arg); + ASSERT_NOK(*s); + }); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CheckConsistency:AfterGetLiveFilesMetaData", + "BackgroundCallCompaction:0"}, + {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", + "DBImpl::CheckConsistency:BeforeGetFileSize"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("a", "value0")); + ASSERT_OK(Put("c", "value0")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("b", "value1")); + ASSERT_OK(Put("d", "value1")); + ASSERT_OK(Flush()); + port::Thread thread([this]() { + Options opts; + opts.env = env_; + opts.max_open_files = -1; + OpenSecondary(opts); + }); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + thread.join(); + ASSERT_TRUE(called); +} + +TEST_F(DBSecondaryTest, StartFromInconsistent) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "value")); + ASSERT_OK(Flush()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) { + ASSERT_NE(nullptr, arg); + *(reinterpret_cast<Status*>(arg)) = + Status::Corruption("Inject corruption"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + Options options1; + options1.env = env_; + Status s = TryOpenSecondary(options1); + ASSERT_TRUE(s.IsCorruption()); +} + +TEST_F(DBSecondaryTest, InconsistencyDuringCatchUp) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "value")); + ASSERT_OK(Flush()); + + Options options1; + options1.env = env_; + OpenSecondary(options1); + + { + std::string value; + ASSERT_OK(db_secondary_->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ("value", value); + } + + ASSERT_OK(Put("bar", "value1")); + ASSERT_OK(Flush()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) { + ASSERT_NE(nullptr, arg); + *(reinterpret_cast<Status*>(arg)) = + Status::Corruption("Inject corruption"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + Status s = db_secondary_->TryCatchUpWithPrimary(); + ASSERT_TRUE(s.IsCorruption()); +} + +TEST_F(DBSecondaryTest, OpenWithTransactionDB) { + Options options = CurrentOptions(); + options.create_if_missing = true; + + // Destroy the DB to recreate as a TransactionDB. + Close(); + Destroy(options, true); + + // Create a TransactionDB. + TransactionDB* txn_db = nullptr; + TransactionDBOptions txn_db_opts; + ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db)); + ASSERT_NE(txn_db, nullptr); + db_ = txn_db; + + std::vector<std::string> cfs = {"new_CF"}; + CreateColumnFamilies(cfs, options); + ASSERT_EQ(handles_.size(), 1); + + WriteOptions wopts; + TransactionOptions txn_opts; + Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr); + ASSERT_NE(txn1, nullptr); + ASSERT_OK(txn1->Put(handles_[0], "k1", "v1")); + ASSERT_OK(txn1->Commit()); + delete txn1; + + options = CurrentOptions(); + options.max_open_files = -1; + ASSERT_OK(TryOpenSecondary(options)); +} + +class DBSecondaryTestWithTimestamp : public DBSecondaryTestBase { + public: + explicit DBSecondaryTestWithTimestamp() + : DBSecondaryTestBase("db_secondary_test_with_timestamp") {} +}; +TEST_F(DBSecondaryTestWithTimestamp, IteratorAndGetReadTimestampSizeMismatch) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::string write_timestamp = Timestamp(1, 0); + WriteOptions write_opts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), write_timestamp, + "value" + std::to_string(key)); + ASSERT_OK(s); + } + + // Reopen the database as secondary instance to test its timestamp support. + Close(); + options.max_open_files = -1; + ASSERT_OK(ReopenAsSecondary(options)); + + ReadOptions read_opts; + std::string different_size_read_timestamp; + PutFixed32(&different_size_read_timestamp, 2); + Slice different_size_read_ts = different_size_read_timestamp; + read_opts.timestamp = &different_size_read_ts; + { + std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts)); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + for (uint64_t key = 0; key <= kMaxKey; ++key) { + std::string value_from_get; + std::string timestamp; + ASSERT_TRUE(db_->Get(read_opts, Key1(key), &value_from_get, ×tamp) + .IsInvalidArgument()); + } + + Close(); +} + +TEST_F(DBSecondaryTestWithTimestamp, + IteratorAndGetReadTimestampSpecifiedWithoutWriteTimestamp) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + WriteOptions write_opts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(key)); + ASSERT_OK(s); + } + + // Reopen the database as secondary instance to test its timestamp support. + Close(); + options.max_open_files = -1; + ASSERT_OK(ReopenAsSecondary(options)); + + ReadOptions read_opts; + const std::string read_timestamp = Timestamp(2, 0); + Slice read_ts = read_timestamp; + read_opts.timestamp = &read_ts; + { + std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts)); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + for (uint64_t key = 0; key <= kMaxKey; ++key) { + std::string value_from_get; + std::string timestamp; + ASSERT_TRUE(db_->Get(read_opts, Key1(key), &value_from_get, ×tamp) + .IsInvalidArgument()); + } + + Close(); +} + +TEST_F(DBSecondaryTestWithTimestamp, + IteratorAndGetWriteWithTimestampReadWithoutTimestamp) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::string write_timestamp = Timestamp(1, 0); + WriteOptions write_opts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), write_timestamp, + "value" + std::to_string(key)); + ASSERT_OK(s); + } + + // Reopen the database as secondary instance to test its timestamp support. + Close(); + options.max_open_files = -1; + ASSERT_OK(ReopenAsSecondary(options)); + + ReadOptions read_opts; + { + std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts)); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + for (uint64_t key = 0; key <= kMaxKey; ++key) { + std::string value_from_get; + ASSERT_TRUE( + db_->Get(read_opts, Key1(key), &value_from_get).IsInvalidArgument()); + } + + Close(); +} + +TEST_F(DBSecondaryTestWithTimestamp, IteratorAndGet) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::vector<uint64_t> start_keys = {1, 0}; + const std::vector<std::string> write_timestamps = {Timestamp(1, 0), + Timestamp(3, 0)}; + const std::vector<std::string> read_timestamps = {Timestamp(2, 0), + Timestamp(4, 0)}; + for (size_t i = 0; i < write_timestamps.size(); ++i) { + WriteOptions write_opts; + for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), write_timestamps[i], + "value" + std::to_string(i)); + ASSERT_OK(s); + } + } + + // Reopen the database as secondary instance to test its timestamp support. + Close(); + options.max_open_files = -1; + ASSERT_OK(ReopenAsSecondary(options)); + + auto get_value_and_check = [](DB* db, ReadOptions read_opts, Slice key, + Slice expected_value, std::string expected_ts) { + std::string value_from_get; + std::string timestamp; + ASSERT_OK(db->Get(read_opts, key.ToString(), &value_from_get, ×tamp)); + ASSERT_EQ(expected_value, value_from_get); + ASSERT_EQ(expected_ts, timestamp); + }; + for (size_t i = 0; i < read_timestamps.size(); ++i) { + ReadOptions read_opts; + Slice read_ts = read_timestamps[i]; + read_opts.timestamp = &read_ts; + std::unique_ptr<Iterator> it(db_->NewIterator(read_opts)); + int count = 0; + uint64_t key = 0; + // Forward iterate. + for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid(); + it->Next(), ++count, ++key) { + CheckIterUserEntry(it.get(), Key1(key), kTypeValue, + "value" + std::to_string(i), write_timestamps[i]); + get_value_and_check(db_, read_opts, it->key(), it->value(), + write_timestamps[i]); + } + size_t expected_count = kMaxKey - start_keys[i] + 1; + ASSERT_EQ(expected_count, count); + + // Backward iterate. + count = 0; + for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid(); + it->Prev(), ++count, --key) { + CheckIterUserEntry(it.get(), Key1(key), kTypeValue, + "value" + std::to_string(i), write_timestamps[i]); + get_value_and_check(db_, read_opts, it->key(), it->value(), + write_timestamps[i]); + } + ASSERT_EQ(static_cast<size_t>(kMaxKey) - start_keys[i] + 1, count); + + // SeekToFirst()/SeekToLast() with lower/upper bounds. + // Then iter with lower and upper bounds. + uint64_t l = 0; + uint64_t r = kMaxKey + 1; + while (l < r) { + std::string lb_str = Key1(l); + Slice lb = lb_str; + std::string ub_str = Key1(r); + Slice ub = ub_str; + read_opts.iterate_lower_bound = &lb; + read_opts.iterate_upper_bound = &ub; + it.reset(db_->NewIterator(read_opts)); + for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0; + it->Valid(); it->Next(), ++key, ++count) { + CheckIterUserEntry(it.get(), Key1(key), kTypeValue, + "value" + std::to_string(i), write_timestamps[i]); + get_value_and_check(db_, read_opts, it->key(), it->value(), + write_timestamps[i]); + } + ASSERT_EQ(r - std::max(l, start_keys[i]), count); + + for (it->SeekToLast(), key = std::min(r, kMaxKey + 1), count = 0; + it->Valid(); it->Prev(), --key, ++count) { + CheckIterUserEntry(it.get(), Key1(key - 1), kTypeValue, + "value" + std::to_string(i), write_timestamps[i]); + get_value_and_check(db_, read_opts, it->key(), it->value(), + write_timestamps[i]); + } + l += (kMaxKey / 100); + r -= (kMaxKey / 100); + } + } + Close(); +} + +TEST_F(DBSecondaryTestWithTimestamp, IteratorsReadTimestampSizeMismatch) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::string write_timestamp = Timestamp(1, 0); + WriteOptions write_opts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), write_timestamp, + "value" + std::to_string(key)); + ASSERT_OK(s); + } + + // Reopen the database as secondary instance to test its timestamp support. + Close(); + options.max_open_files = -1; + ASSERT_OK(ReopenAsSecondary(options)); + + ReadOptions read_opts; + std::string different_size_read_timestamp; + PutFixed32(&different_size_read_timestamp, 2); + Slice different_size_read_ts = different_size_read_timestamp; + read_opts.timestamp = &different_size_read_ts; + { + std::vector<Iterator*> iters; + ASSERT_TRUE( + db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters) + .IsInvalidArgument()); + } + + Close(); +} + +TEST_F(DBSecondaryTestWithTimestamp, + IteratorsReadTimestampSpecifiedWithoutWriteTimestamp) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + WriteOptions write_opts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(key)); + ASSERT_OK(s); + } + + // Reopen the database as secondary instance to test its timestamp support. + Close(); + options.max_open_files = -1; + ASSERT_OK(ReopenAsSecondary(options)); + + ReadOptions read_opts; + const std::string read_timestamp = Timestamp(2, 0); + Slice read_ts = read_timestamp; + read_opts.timestamp = &read_ts; + { + std::vector<Iterator*> iters; + ASSERT_TRUE( + db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters) + .IsInvalidArgument()); + } + + Close(); +} + +TEST_F(DBSecondaryTestWithTimestamp, + IteratorsWriteWithTimestampReadWithoutTimestamp) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::string write_timestamp = Timestamp(1, 0); + WriteOptions write_opts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), write_timestamp, + "value" + std::to_string(key)); + ASSERT_OK(s); + } + + // Reopen the database as secondary instance to test its timestamp support. + Close(); + options.max_open_files = -1; + ASSERT_OK(ReopenAsSecondary(options)); + + ReadOptions read_opts; + { + std::vector<Iterator*> iters; + ASSERT_TRUE( + db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters) + .IsInvalidArgument()); + } + + Close(); +} + +TEST_F(DBSecondaryTestWithTimestamp, Iterators) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::string write_timestamp = Timestamp(1, 0); + const std::string read_timestamp = Timestamp(2, 0); + WriteOptions write_opts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), write_timestamp, + "value" + std::to_string(key)); + ASSERT_OK(s); + } + + // Reopen the database as secondary instance to test its timestamp support. + Close(); + options.max_open_files = -1; + ASSERT_OK(ReopenAsSecondary(options)); + + ReadOptions read_opts; + Slice read_ts = read_timestamp; + read_opts.timestamp = &read_ts; + std::vector<Iterator*> iters; + ASSERT_OK(db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters)); + ASSERT_EQ(static_cast<uint64_t>(1), iters.size()); + + int count = 0; + uint64_t key = 0; + // Forward iterate. + for (iters[0]->Seek(Key1(0)), key = 0; iters[0]->Valid(); + iters[0]->Next(), ++count, ++key) { + CheckIterUserEntry(iters[0], Key1(key), kTypeValue, + "value" + std::to_string(key), write_timestamp); + } + + size_t expected_count = kMaxKey - 0 + 1; + ASSERT_EQ(expected_count, count); + delete iters[0]; + + Close(); +} +#endif //! ROCKSDB_LITE + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |