From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../utilities/checkpoint/checkpoint_test.cc | 974 +++++++++++++++++++++ 1 file changed, 974 insertions(+) create mode 100644 src/rocksdb/utilities/checkpoint/checkpoint_test.cc (limited to 'src/rocksdb/utilities/checkpoint/checkpoint_test.cc') diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_test.cc b/src/rocksdb/utilities/checkpoint/checkpoint_test.cc new file mode 100644 index 000000000..3da753d5f --- /dev/null +++ b/src/rocksdb/utilities/checkpoint/checkpoint_test.cc @@ -0,0 +1,974 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// Syncpoint prevents us building and running tests in release +#ifndef ROCKSDB_LITE +#include "rocksdb/utilities/checkpoint.h" + +#ifndef OS_WIN +#include +#endif +#include +#include +#include + +#include "db/db_impl/db_impl.h" +#include "file/file_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/utilities/transaction_db.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "utilities/fault_injection_env.h" +#include "utilities/fault_injection_fs.h" + +namespace ROCKSDB_NAMESPACE { +class CheckpointTest : public testing::Test { + protected: + // Sequence of option configurations to try + enum OptionConfig { + kDefault = 0, + }; + int option_config_; + + public: + std::string dbname_; + std::string alternative_wal_dir_; + Env* env_; + DB* db_; + Options last_options_; + std::vector handles_; + std::string snapshot_name_; + std::string export_path_; + ColumnFamilyHandle* cfh_reverse_comp_; + ExportImportFilesMetaData* metadata_; + + CheckpointTest() : env_(Env::Default()) { + env_->SetBackgroundThreads(1, Env::LOW); + env_->SetBackgroundThreads(1, Env::HIGH); + dbname_ = test::PerThreadDBPath(env_, "checkpoint_test"); + alternative_wal_dir_ = dbname_ + "/wal"; + auto options = CurrentOptions(); + auto delete_options = options; + delete_options.wal_dir = alternative_wal_dir_; + EXPECT_OK(DestroyDB(dbname_, delete_options)); + // Destroy it for not alternative WAL dir is used. + EXPECT_OK(DestroyDB(dbname_, options)); + db_ = nullptr; + snapshot_name_ = test::PerThreadDBPath(env_, "snapshot"); + std::string snapshot_tmp_name = snapshot_name_ + ".tmp"; + EXPECT_OK(DestroyDB(snapshot_name_, options)); + test::DeleteDir(env_, snapshot_name_); + EXPECT_OK(DestroyDB(snapshot_tmp_name, options)); + test::DeleteDir(env_, snapshot_tmp_name); + Reopen(options); + export_path_ = test::PerThreadDBPath("/export"); + DestroyDir(env_, export_path_).PermitUncheckedError(); + cfh_reverse_comp_ = nullptr; + metadata_ = nullptr; + } + + ~CheckpointTest() override { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + if (cfh_reverse_comp_) { + EXPECT_OK(db_->DestroyColumnFamilyHandle(cfh_reverse_comp_)); + cfh_reverse_comp_ = nullptr; + } + if (metadata_) { + delete metadata_; + metadata_ = nullptr; + } + Close(); + Options options; + options.db_paths.emplace_back(dbname_, 0); + options.db_paths.emplace_back(dbname_ + "_2", 0); + options.db_paths.emplace_back(dbname_ + "_3", 0); + options.db_paths.emplace_back(dbname_ + "_4", 0); + EXPECT_OK(DestroyDB(dbname_, options)); + EXPECT_OK(DestroyDB(snapshot_name_, options)); + DestroyDir(env_, export_path_).PermitUncheckedError(); + } + + // Return the current option configuration. + Options CurrentOptions() { + Options options; + options.env = env_; + options.create_if_missing = true; + return options; + } + + void CreateColumnFamilies(const std::vector& cfs, + const Options& options) { + ColumnFamilyOptions cf_opts(options); + size_t cfi = handles_.size(); + handles_.resize(cfi + cfs.size()); + for (auto cf : cfs) { + ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++])); + } + } + + void CreateAndReopenWithCF(const std::vector& cfs, + const Options& options) { + CreateColumnFamilies(cfs, options); + std::vector cfs_plus_default = cfs; + cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); + ReopenWithColumnFamilies(cfs_plus_default, options); + } + + void ReopenWithColumnFamilies(const std::vector& cfs, + const std::vector& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + void ReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + Status TryReopenWithColumnFamilies(const std::vector& cfs, + const std::vector& options) { + Close(); + EXPECT_EQ(cfs.size(), options.size()); + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i])); + } + DBOptions db_opts = DBOptions(options[0]); + return DB::Open(db_opts, dbname_, column_families, &handles_, &db_); + } + + Status TryReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + Close(); + std::vector v_opts(cfs.size(), options); + return TryReopenWithColumnFamilies(cfs, v_opts); + } + + void Reopen(const Options& options) { ASSERT_OK(TryReopen(options)); } + + void CompactAll() { + for (auto h : handles_) { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), h, nullptr, nullptr)); + } + } + + void Close() { + for (auto h : handles_) { + delete h; + } + handles_.clear(); + delete db_; + db_ = nullptr; + } + + void DestroyAndReopen(const Options& options) { + // Destroy using last options + Destroy(last_options_); + ASSERT_OK(TryReopen(options)); + } + + void Destroy(const Options& options) { + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + } + + Status ReadOnlyReopen(const Options& options) { + return DB::OpenForReadOnly(options, dbname_, &db_); + } + + Status ReadOnlyReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + std::vector column_families; + for (const auto& cf : cfs) { + column_families.emplace_back(cf, options); + } + return DB::OpenForReadOnly(options, dbname_, column_families, &handles_, + &db_); + } + + Status TryReopen(const Options& options) { + Close(); + last_options_ = options; + return DB::Open(options, dbname_, &db_); + } + + Status Flush(int cf = 0) { + if (cf == 0) { + return db_->Flush(FlushOptions()); + } else { + return db_->Flush(FlushOptions(), handles_[cf]); + } + } + + Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) { + return db_->Put(wo, k, v); + } + + Status Put(int cf, const Slice& k, const Slice& v, + WriteOptions wo = WriteOptions()) { + return db_->Put(wo, handles_[cf], k, v); + } + + Status Delete(const std::string& k) { return db_->Delete(WriteOptions(), k); } + + Status Delete(int cf, const std::string& k) { + return db_->Delete(WriteOptions(), handles_[cf], k); + } + + std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + std::string Get(int cf, const std::string& k, + const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, handles_[cf], k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } +}; + +TEST_F(CheckpointTest, GetSnapshotLink) { + for (uint64_t log_size_for_flush : {0, 1000000}) { + Options options; + DB* snapshotDB; + ReadOptions roptions; + std::string result; + Checkpoint* checkpoint; + + options = CurrentOptions(); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + + // Create a database + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + std::string key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + // Take a snapshot + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, log_size_for_flush)); + ASSERT_OK(Put(key, "v2")); + ASSERT_EQ("v2", Get(key)); + ASSERT_OK(Flush()); + ASSERT_EQ("v2", Get(key)); + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, key, &result)); + ASSERT_EQ("v1", result); + delete snapshotDB; + snapshotDB = nullptr; + delete db_; + db_ = nullptr; + + // Destroy original DB + ASSERT_OK(DestroyDB(dbname_, options)); + + // Open snapshot and verify contents + options.create_if_missing = false; + dbname_ = snapshot_name_; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + ASSERT_EQ("v1", Get(key)); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + delete checkpoint; + + // Restore DB name + dbname_ = test::PerThreadDBPath(env_, "db_test"); + } +} + +TEST_F(CheckpointTest, CheckpointWithBlob) { + // Create a database with a blob file + Options options = CurrentOptions(); + options.create_if_missing = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + ASSERT_OK(Put(key, blob)); + ASSERT_OK(Flush()); + + // Create a checkpoint + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + std::unique_ptr checkpoint_guard(checkpoint); + + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + + // Make sure it contains the blob file + std::vector files; + ASSERT_OK(env_->GetChildren(snapshot_name_, &files)); + + bool blob_file_found = false; + for (const auto& file : files) { + uint64_t number = 0; + FileType type = kWalFile; + + if (ParseFileName(file, &number, &type) && type == kBlobFile) { + blob_file_found = true; + break; + } + } + + ASSERT_TRUE(blob_file_found); + + // Make sure the checkpoint can be opened and the blob value read + options.create_if_missing = false; + DB* checkpoint_db = nullptr; + ASSERT_OK(DB::Open(options, snapshot_name_, &checkpoint_db)); + + std::unique_ptr checkpoint_db_guard(checkpoint_db); + + PinnableSlice value; + ASSERT_OK(checkpoint_db->Get( + ReadOptions(), checkpoint_db->DefaultColumnFamily(), key, &value)); + + ASSERT_EQ(value, blob); +} + +TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) { + // Create a database + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + // Helper to verify the number of files in metadata and export dir + auto verify_files_exported = [&](const ExportImportFilesMetaData& metadata, + int num_files_expected) { + ASSERT_EQ(metadata.files.size(), num_files_expected); + std::vector subchildren; + ASSERT_OK(env_->GetChildren(export_path_, &subchildren)); + ASSERT_EQ(subchildren.size(), num_files_expected); + }; + + // Test DefaultColumnFamily + { + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + ASSERT_OK(DestroyDir(env_, export_path_)); + delete metadata_; + metadata_ = nullptr; + + // Check again after compaction + CompactAll(); + ASSERT_OK(Put(key, "v2")); + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 2); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + ASSERT_OK(DestroyDir(env_, export_path_)); + delete metadata_; + metadata_ = nullptr; + delete checkpoint; + } + + // Test non default column family with non default comparator + { + auto cf_options = CurrentOptions(); + cf_options.comparator = ReverseBytewiseComparator(); + ASSERT_OK(db_->CreateColumnFamily(cf_options, "yoyo", &cfh_reverse_comp_)); + + const auto key = std::string("foo"); + ASSERT_OK(db_->Put(WriteOptions(), cfh_reverse_comp_, key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(cfh_reverse_comp_, export_path_, + &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, + ReverseBytewiseComparator()->Name()); + delete checkpoint; + } +} + +TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) { + // Create a database + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export onto existing directory + ASSERT_OK(env_->CreateDirIfMissing(export_path_)); + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir exists")); + ASSERT_OK(DestroyDir(env_, export_path_)); + + // Export with invalid directory specification + export_path_ = ""; + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir invalid")); + delete checkpoint; +} + +TEST_F(CheckpointTest, CheckpointCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointTest::CheckpointCF:2", "DBImpl::GetLiveFiles:2"}, + {"DBImpl::GetLiveFiles:1", "CheckpointTest::CheckpointCF:1"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(0, "Default", "Default")); + ASSERT_OK(Put(1, "one", "one")); + ASSERT_OK(Put(2, "two", "two")); + ASSERT_OK(Put(3, "three", "three")); + ASSERT_OK(Put(4, "four", "four")); + ASSERT_OK(Put(5, "five", "five")); + + DB* snapshotDB; + ReadOptions roptions; + std::string result; + std::vector cphandles; + + // Take a snapshot + ROCKSDB_NAMESPACE::port::Thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + }); + TEST_SYNC_POINT("CheckpointTest::CheckpointCF:1"); + ASSERT_OK(Put(0, "Default", "Default1")); + ASSERT_OK(Put(1, "one", "eleven")); + ASSERT_OK(Put(2, "two", "twelve")); + ASSERT_OK(Put(3, "three", "thirteen")); + ASSERT_OK(Put(4, "four", "fourteen")); + ASSERT_OK(Put(5, "five", "fifteen")); + TEST_SYNC_POINT("CheckpointTest::CheckpointCF:2"); + t.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_OK(Put(1, "one", "twentyone")); + ASSERT_OK(Put(2, "two", "twentytwo")); + ASSERT_OK(Put(3, "three", "twentythree")); + ASSERT_OK(Put(4, "four", "twentyfour")); + ASSERT_OK(Put(5, "five", "twentyfive")); + ASSERT_OK(Flush()); + + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + std::vector cfs; + cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"}; + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options)); + } + ASSERT_OK(DB::Open(options, snapshot_name_, column_families, &cphandles, + &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result)); + ASSERT_EQ("Default1", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result)); + ASSERT_EQ("eleven", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result)); + for (auto h : cphandles) { + delete h; + } + cphandles.clear(); + delete snapshotDB; + snapshotDB = nullptr; +} + +TEST_F(CheckpointTest, CheckpointCFNoFlush) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(0, "Default", "Default")); + ASSERT_OK(Put(1, "one", "one")); + ASSERT_OK(Flush()); + ASSERT_OK(Put(2, "two", "two")); + + DB* snapshotDB; + ReadOptions roptions; + std::string result; + std::vector cphandles; + + // Take a snapshot + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) { + // Flush should never trigger. + FAIL(); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 1000000)); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + delete checkpoint; + ASSERT_OK(Put(1, "one", "two")); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(2, "two", "twentytwo")); + Close(); + EXPECT_OK(DestroyDB(dbname_, options)); + + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + std::vector cfs; + cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"}; + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options)); + } + ASSERT_OK(DB::Open(options, snapshot_name_, column_families, &cphandles, + &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result)); + ASSERT_EQ("Default", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result)); + ASSERT_EQ("one", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result)); + ASSERT_EQ("two", result); + for (auto h : cphandles) { + delete h; + } + cphandles.clear(); + delete snapshotDB; + snapshotDB = nullptr; +} + +TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) { + Options options = CurrentOptions(); + options.max_manifest_file_size = 0; // always rollover manifest for file add + Reopen(options); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {// Get past the flush in the checkpoint thread before adding any keys to + // the db so the checkpoint thread won't hit the WriteManifest + // syncpoints. + {"CheckpointImpl::CreateCheckpoint:FlushDone", + "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"}, + // Roll the manifest during checkpointing right after live files are + // snapshotted. + {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "VersionSet::LogAndApply:WriteManifest"}, + {"VersionSet::LogAndApply:WriteManifestDone", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + }); + TEST_SYNC_POINT( + "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"); + ASSERT_OK(Put("Default", "Default1")); + ASSERT_OK(Flush()); + t.join(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + DB* snapshotDB; + // Successful Open() implies that CURRENT pointed to the manifest in the + // checkpoint. + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB)); + delete snapshotDB; + snapshotDB = nullptr; +} + +TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) { + Close(); + const std::string dbname = test::PerThreadDBPath("transaction_testdb"); + ASSERT_OK(DestroyDB(dbname, CurrentOptions())); + test::DeleteDir(env_, dbname); + + Options options = CurrentOptions(); + options.allow_2pc = true; + // allow_2pc is implicitly set with tx prepare + // options.allow_2pc = true; + TransactionDBOptions txn_db_options; + TransactionDB* txdb; + Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb); + ASSERT_OK(s); + ColumnFamilyHandle* cfa; + ColumnFamilyHandle* cfb; + ColumnFamilyOptions cf_options; + ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFA", &cfa)); + + WriteOptions write_options; + // Insert something into CFB so lots of log files will be kept + // before creating the checkpoint. + ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFB", &cfb)); + ASSERT_OK(txdb->Put(write_options, cfb, "", "")); + + ReadOptions read_options; + std::string value; + TransactionOptions txn_options; + Transaction* txn = txdb->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + ASSERT_EQ(txdb->GetTransactionByName("xid"), txn); + + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa")); + ASSERT_OK(s); + // Writing prepare into middle of first WAL, then flush WALs many times + for (int i = 1; i <= 100000; i++) { + Transaction* tx = txdb->BeginTransaction(write_options, txn_options); + ASSERT_OK(tx->SetName("x")); + ASSERT_OK(tx->Put(Slice(std::to_string(i)), Slice("val"))); + ASSERT_OK(tx->Put(cfa, Slice("aaa"), Slice("111"))); + ASSERT_OK(tx->Prepare()); + ASSERT_OK(tx->Commit()); + if (i % 10000 == 0) { + ASSERT_OK(txdb->Flush(FlushOptions())); + } + if (i == 88888) { + ASSERT_OK(txn->Prepare()); + } + delete tx; + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"}, + {"CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::port::Thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(txdb, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + }); + TEST_SYNC_POINT( + "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"); + ASSERT_OK(txn->Commit()); + delete txn; + TEST_SYNC_POINT( + "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit"); + t.join(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + // No more than two logs files should exist. + std::vector files; + ASSERT_OK(env_->GetChildren(snapshot_name_, &files)); + int num_log_files = 0; + for (auto& file : files) { + uint64_t num; + FileType type; + WalFileType log_type; + if (ParseFileName(file, &num, &type, &log_type) && type == kWalFile) { + num_log_files++; + } + } + // One flush after preapare + one outstanding file before checkpoint + one log + // file generated after checkpoint. + ASSERT_LE(num_log_files, 3); + + TransactionDB* snapshotDB; + std::vector column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); + std::vector cf_handles; + ASSERT_OK(TransactionDB::Open(options, txn_db_options, snapshot_name_, + column_families, &cf_handles, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(read_options, "foo", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(snapshotDB->Get(read_options, cf_handles[1], "foocfa", &value)); + ASSERT_EQ(value, "barcfa"); + + delete cfa; + delete cfb; + delete cf_handles[0]; + delete cf_handles[1]; + delete cf_handles[2]; + delete snapshotDB; + snapshotDB = nullptr; + delete txdb; +} + +TEST_F(CheckpointTest, CheckpointInvalidDirectoryName) { + for (std::string checkpoint_dir : {"", "/", "////"}) { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_TRUE( + checkpoint->CreateCheckpoint(checkpoint_dir).IsInvalidArgument()); + delete checkpoint; + } +} + +TEST_F(CheckpointTest, CheckpointWithParallelWrites) { + // When run with TSAN, this exposes the data race fixed in + // https://github.com/facebook/rocksdb/pull/3603 + ASSERT_OK(Put("key1", "val1")); + port::Thread thread([this]() { ASSERT_OK(Put("key2", "val2")); }); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + thread.join(); +} + +TEST_F(CheckpointTest, CheckpointWithUnsyncedDataDropped) { + Options options = CurrentOptions(); + std::unique_ptr env(new FaultInjectionTestEnv(env_)); + options.env = env.get(); + Reopen(options); + ASSERT_OK(Put("key1", "val1")); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + ASSERT_OK(env->DropUnsyncedFileData()); + + // make sure it's openable even though whatever data that wasn't synced got + // dropped. + options.env = env_; + DB* snapshot_db; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result)); + ASSERT_EQ("val1", get_result); + delete snapshot_db; + delete db_; + db_ = nullptr; +} + +TEST_F(CheckpointTest, CheckpointOptionsFileFailedToPersist) { + // Regression test for a bug where checkpoint failed on a DB where persisting + // OPTIONS file failed and the DB was opened with + // `fail_if_options_file_error == false`. + Options options = CurrentOptions(); + options.fail_if_options_file_error = false; + auto fault_fs = std::make_shared(FileSystem::Default()); + + // Setup `FaultInjectionTestFS` and `SyncPoint` callbacks to fail one + // operation when inside the OPTIONS file persisting code. + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + fault_fs->SetRandomMetadataWriteError(1 /* one_in */); + SyncPoint::GetInstance()->SetCallBack( + "PersistRocksDBOptions:start", [fault_fs](void* /* arg */) { + fault_fs->EnableMetadataWriteErrorInjection(); + }); + SyncPoint::GetInstance()->SetCallBack( + "FaultInjectionTestFS::InjectMetadataWriteError:Injected", + [fault_fs](void* /* arg */) { + fault_fs->DisableMetadataWriteErrorInjection(); + }); + options.env = fault_fs_env.get(); + SyncPoint::GetInstance()->EnableProcessing(); + + Reopen(options); + ASSERT_OK(Put("key1", "val1")); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + + // Make sure it's usable. + options.env = env_; + DB* snapshot_db; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result)); + ASSERT_EQ("val1", get_result); + delete snapshot_db; + delete db_; + db_ = nullptr; +} + +TEST_F(CheckpointTest, CheckpointReadOnlyDB) { + ASSERT_OK(Put("foo", "foo_value")); + ASSERT_OK(Flush()); + Close(); + Options options = CurrentOptions(); + ASSERT_OK(ReadOnlyReopen(options)); + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + checkpoint = nullptr; + Close(); + DB* snapshot_db = nullptr; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "foo", &get_result)); + ASSERT_EQ("foo_value", get_result); + delete snapshot_db; +} + +TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + for (int i = 0; i != 3; ++i) { + ASSERT_OK(Put(i, "foo", "foo_value")); + ASSERT_OK(Flush(i)); + } + Close(); + Status s = ReadOnlyReopenWithColumnFamilies( + {kDefaultColumnFamilyName, "pikachu", "eevee"}, options); + ASSERT_OK(s); + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + checkpoint = nullptr; + Close(); + + std::vector column_families{ + {kDefaultColumnFamilyName, options}, + {"pikachu", options}, + {"eevee", options}}; + DB* snapshot_db = nullptr; + std::vector snapshot_handles; + s = DB::Open(options, snapshot_name_, column_families, &snapshot_handles, + &snapshot_db); + ASSERT_OK(s); + ReadOptions read_opts; + for (int i = 0; i != 3; ++i) { + std::string get_result; + s = snapshot_db->Get(read_opts, snapshot_handles[i], "foo", &get_result); + ASSERT_OK(s); + ASSERT_EQ("foo_value", get_result); + } + + for (auto snapshot_h : snapshot_handles) { + delete snapshot_h; + } + snapshot_handles.clear(); + delete snapshot_db; +} + +TEST_F(CheckpointTest, CheckpointWithDbPath) { + Options options = CurrentOptions(); + options.db_paths.emplace_back(dbname_ + "_2", 0); + Reopen(options); + ASSERT_OK(Put("key1", "val1")); + Flush(); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + // Currently not supported + ASSERT_TRUE(checkpoint->CreateCheckpoint(snapshot_name_).IsNotSupported()); + delete checkpoint; +} + +TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) { + // Repro for a race condition where a user write comes in after the checkpoint + // syncs WAL for `track_and_verify_wals_in_manifest` but before the + // corresponding MANIFEST update. With the bug, that scenario resulted in an + // unopenable DB with error "Corruption: Size mismatch: WAL ...". + Options options = CurrentOptions(); + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + options.env = fault_env.get(); + options.track_and_verify_wals_in_manifest = true; + Reopen(options); + + ASSERT_OK(Put("key1", "val1")); + + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncWAL:BeforeMarkLogsSynced:1", + [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::unique_ptr checkpoint; + { + Checkpoint* checkpoint_ptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr)); + checkpoint.reset(checkpoint_ptr); + } + + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + + // Ensure callback ran. + ASSERT_EQ("val2", Get("key2")); + + Close(); + + // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the + // DB WAL. + fault_env->DropUnsyncedFileData(); + + // Before the bug fix, reopening the DB would fail because the MANIFEST's + // AddWal entry indicated the WAL should be synced through "key2" -> "val2". + Reopen(options); + + // Need to close before `fault_env` goes out of scope. + Close(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as Checkpoint is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE -- cgit v1.2.3