From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../utilities/checkpoint/checkpoint_impl.cc | 469 ++++++++++ src/rocksdb/utilities/checkpoint/checkpoint_impl.h | 66 ++ .../utilities/checkpoint/checkpoint_test.cc | 974 +++++++++++++++++++++ 3 files changed, 1509 insertions(+) create mode 100644 src/rocksdb/utilities/checkpoint/checkpoint_impl.cc create mode 100644 src/rocksdb/utilities/checkpoint/checkpoint_impl.h create mode 100644 src/rocksdb/utilities/checkpoint/checkpoint_test.cc (limited to 'src/rocksdb/utilities/checkpoint') diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc b/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc new file mode 100644 index 000000000..44ce70b1b --- /dev/null +++ b/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc @@ -0,0 +1,469 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2012 Facebook. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef ROCKSDB_LITE + +#include "utilities/checkpoint/checkpoint_impl.h" + +#include +#include +#include +#include +#include +#include + +#include "db/wal_manager.h" +#include "file/file_util.h" +#include "file/filename.h" +#include "logging/logging.h" +#include "port/port.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/metadata.h" +#include "rocksdb/options.h" +#include "rocksdb/transaction_log.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/checkpoint.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/file_checksum_helper.h" + +namespace ROCKSDB_NAMESPACE { + +Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) { + *checkpoint_ptr = new CheckpointImpl(db); + return Status::OK(); +} + +Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/, + uint64_t /*log_size_for_flush*/, + uint64_t* /*sequence_number_ptr*/) { + return Status::NotSupported(""); +} + +void CheckpointImpl::CleanStagingDirectory(const std::string& full_private_path, + Logger* info_log) { + std::vector subchildren; + Status s = db_->GetEnv()->FileExists(full_private_path); + if (s.IsNotFound()) { + return; + } + ROCKS_LOG_INFO(info_log, "File exists %s -- %s", full_private_path.c_str(), + s.ToString().c_str()); + s = db_->GetEnv()->GetChildren(full_private_path, &subchildren); + if (s.ok()) { + for (auto& subchild : subchildren) { + std::string subchild_path = full_private_path + "/" + subchild; + s = db_->GetEnv()->DeleteFile(subchild_path); + ROCKS_LOG_INFO(info_log, "Delete file %s -- %s", subchild_path.c_str(), + s.ToString().c_str()); + } + } + // finally delete the private dir + s = db_->GetEnv()->DeleteDir(full_private_path); + ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s", full_private_path.c_str(), + s.ToString().c_str()); +} + +Status Checkpoint::ExportColumnFamily( + ColumnFamilyHandle* /*handle*/, const std::string& /*export_dir*/, + ExportImportFilesMetaData** /*metadata*/) { + return Status::NotSupported(""); +} + +// Builds an openable snapshot of RocksDB +Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush, + uint64_t* sequence_number_ptr) { + DBOptions db_options = db_->GetDBOptions(); + + Status s = db_->GetEnv()->FileExists(checkpoint_dir); + if (s.ok()) { + return Status::InvalidArgument("Directory exists"); + } else if (!s.IsNotFound()) { + assert(s.IsIOError()); + return s; + } + + ROCKS_LOG_INFO( + db_options.info_log, + "Started the snapshot process -- creating snapshot in directory %s", + checkpoint_dir.c_str()); + + size_t final_nonslash_idx = checkpoint_dir.find_last_not_of('/'); + if (final_nonslash_idx == std::string::npos) { + // npos means it's only slashes or empty. Non-empty means it's the root + // directory, but it shouldn't be because we verified above the directory + // doesn't exist. + assert(checkpoint_dir.empty()); + return Status::InvalidArgument("invalid checkpoint directory name"); + } + + std::string full_private_path = + checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp"; + ROCKS_LOG_INFO(db_options.info_log, + "Snapshot process -- using temporary directory %s", + full_private_path.c_str()); + CleanStagingDirectory(full_private_path, db_options.info_log.get()); + // create snapshot directory + s = db_->GetEnv()->CreateDir(full_private_path); + uint64_t sequence_number = 0; + if (s.ok()) { + // enable file deletions + s = db_->DisableFileDeletions(); + const bool disabled_file_deletions = s.ok(); + + if (s.ok() || s.IsNotSupported()) { + s = CreateCustomCheckpoint( + [&](const std::string& src_dirname, const std::string& fname, + FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", + fname.c_str()); + return db_->GetFileSystem()->LinkFile( + src_dirname + "/" + fname, full_private_path + "/" + fname, + IOOptions(), nullptr); + } /* link_file_cb */, + [&](const std::string& src_dirname, const std::string& fname, + uint64_t size_limit_bytes, FileType, + const std::string& /* checksum_func_name */, + const std::string& /* checksum_val */, + const Temperature temperature) { + ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str()); + return CopyFile(db_->GetFileSystem(), src_dirname + "/" + fname, + full_private_path + "/" + fname, size_limit_bytes, + db_options.use_fsync, nullptr, temperature); + } /* copy_file_cb */, + [&](const std::string& fname, const std::string& contents, FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str()); + return CreateFile(db_->GetFileSystem(), + full_private_path + "/" + fname, contents, + db_options.use_fsync); + } /* create_file_cb */, + &sequence_number, log_size_for_flush); + + // we copied all the files, enable file deletions + if (disabled_file_deletions) { + Status ss = db_->EnableFileDeletions(false); + assert(ss.ok()); + ss.PermitUncheckedError(); + } + } + } + + if (s.ok()) { + // move tmp private backup to real snapshot directory + s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir); + } + if (s.ok()) { + std::unique_ptr checkpoint_directory; + s = db_->GetFileSystem()->NewDirectory(checkpoint_dir, IOOptions(), + &checkpoint_directory, nullptr); + if (s.ok() && checkpoint_directory != nullptr) { + s = checkpoint_directory->FsyncWithDirOptions( + IOOptions(), nullptr, + DirFsyncOptions(DirFsyncOptions::FsyncReason::kDirRenamed)); + } + } + + if (s.ok()) { + if (sequence_number_ptr != nullptr) { + *sequence_number_ptr = sequence_number; + } + // here we know that we succeeded and installed the new snapshot + ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good"); + ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64, + sequence_number); + } else { + // clean all the files we might have created + ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s", + s.ToString().c_str()); + CleanStagingDirectory(full_private_path, db_options.info_log.get()); + } + return s; +} + +Status CheckpointImpl::CreateCustomCheckpoint( + std::function + link_file_cb, + std::function< + Status(const std::string& src_dirname, const std::string& src_fname, + uint64_t size_limit_bytes, FileType type, + const std::string& checksum_func_name, + const std::string& checksum_val, const Temperature temperature)> + copy_file_cb, + std::function + create_file_cb, + uint64_t* sequence_number, uint64_t log_size_for_flush, + bool get_live_table_checksum) { + *sequence_number = db_->GetLatestSequenceNumber(); + + LiveFilesStorageInfoOptions opts; + opts.include_checksum_info = get_live_table_checksum; + opts.wal_size_for_flush = log_size_for_flush; + + std::vector infos; + { + Status s = db_->GetLiveFilesStorageInfo(opts, &infos); + if (!s.ok()) { + return s; + } + } + + // Verify that everything except WAL files are in same directory + // (db_paths / cf_paths not supported) + std::unordered_set dirs; + for (auto& info : infos) { + if (info.file_type != kWalFile) { + dirs.insert(info.directory); + } + } + if (dirs.size() > 1) { + return Status::NotSupported( + "db_paths / cf_paths not supported for Checkpoint nor BackupEngine"); + } + + bool same_fs = true; + + for (auto& info : infos) { + Status s; + if (!info.replacement_contents.empty()) { + // Currently should only be used for CURRENT file. + assert(info.file_type == kCurrentFile); + + if (info.size != info.replacement_contents.size()) { + s = Status::Corruption("Inconsistent size metadata for " + + info.relative_filename); + } else { + s = create_file_cb(info.relative_filename, info.replacement_contents, + info.file_type); + } + } else { + if (same_fs && !info.trim_to_size) { + s = link_file_cb(info.directory, info.relative_filename, + info.file_type); + if (s.IsNotSupported()) { + same_fs = false; + s = Status::OK(); + } + s.MustCheck(); + } + if (!same_fs || info.trim_to_size) { + assert(info.file_checksum_func_name.empty() == + !opts.include_checksum_info); + // no assertion on file_checksum because empty is used for both "not + // set" and "unknown" + if (opts.include_checksum_info) { + s = copy_file_cb(info.directory, info.relative_filename, info.size, + info.file_type, info.file_checksum_func_name, + info.file_checksum, info.temperature); + } else { + s = copy_file_cb(info.directory, info.relative_filename, info.size, + info.file_type, kUnknownFileChecksumFuncName, + kUnknownFileChecksum, info.temperature); + } + } + } + if (!s.ok()) { + return s; + } + } + + return Status::OK(); +} + +// Exports all live SST files of a specified Column Family onto export_dir, +// returning SST files information in metadata. +Status CheckpointImpl::ExportColumnFamily( + ColumnFamilyHandle* handle, const std::string& export_dir, + ExportImportFilesMetaData** metadata) { + auto cfh = static_cast_with_check(handle); + const auto cf_name = cfh->GetName(); + const auto db_options = db_->GetDBOptions(); + + assert(metadata != nullptr); + assert(*metadata == nullptr); + auto s = db_->GetEnv()->FileExists(export_dir); + if (s.ok()) { + return Status::InvalidArgument("Specified export_dir exists"); + } else if (!s.IsNotFound()) { + assert(s.IsIOError()); + return s; + } + + const auto final_nonslash_idx = export_dir.find_last_not_of('/'); + if (final_nonslash_idx == std::string::npos) { + return Status::InvalidArgument("Specified export_dir invalid"); + } + ROCKS_LOG_INFO(db_options.info_log, + "[%s] export column family onto export directory %s", + cf_name.c_str(), export_dir.c_str()); + + // Create a temporary export directory. + const auto tmp_export_dir = + export_dir.substr(0, final_nonslash_idx + 1) + ".tmp"; + s = db_->GetEnv()->CreateDir(tmp_export_dir); + + if (s.ok()) { + s = db_->Flush(ROCKSDB_NAMESPACE::FlushOptions(), handle); + } + + ColumnFamilyMetaData db_metadata; + if (s.ok()) { + // Export live sst files with file deletions disabled. + s = db_->DisableFileDeletions(); + if (s.ok()) { + db_->GetColumnFamilyMetaData(handle, &db_metadata); + + s = ExportFilesInMetaData( + db_options, db_metadata, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s", + cf_name.c_str(), fname.c_str()); + return db_->GetEnv()->LinkFile(src_dirname + fname, + tmp_export_dir + fname); + } /*link_file_cb*/, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s", + cf_name.c_str(), fname.c_str()); + return CopyFile(db_->GetFileSystem(), src_dirname + fname, + tmp_export_dir + fname, 0, db_options.use_fsync, + nullptr, Temperature::kUnknown); + } /*copy_file_cb*/); + + const auto enable_status = db_->EnableFileDeletions(false /*force*/); + if (s.ok()) { + s = enable_status; + } + } + } + + auto moved_to_user_specified_dir = false; + if (s.ok()) { + // Move temporary export directory to the actual export directory. + s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir); + } + + if (s.ok()) { + // Fsync export directory. + moved_to_user_specified_dir = true; + std::unique_ptr dir_ptr; + s = db_->GetFileSystem()->NewDirectory(export_dir, IOOptions(), &dir_ptr, + nullptr); + if (s.ok()) { + assert(dir_ptr != nullptr); + s = dir_ptr->FsyncWithDirOptions( + IOOptions(), nullptr, + DirFsyncOptions(DirFsyncOptions::FsyncReason::kDirRenamed)); + } + } + + if (s.ok()) { + // Export of files succeeded. Fill in the metadata information. + auto result_metadata = new ExportImportFilesMetaData(); + result_metadata->db_comparator_name = handle->GetComparator()->Name(); + for (const auto& level_metadata : db_metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + LiveFileMetaData live_file_metadata; + live_file_metadata.size = file_metadata.size; + live_file_metadata.name = std::move(file_metadata.name); + live_file_metadata.file_number = file_metadata.file_number; + live_file_metadata.db_path = export_dir; + live_file_metadata.smallest_seqno = file_metadata.smallest_seqno; + live_file_metadata.largest_seqno = file_metadata.largest_seqno; + live_file_metadata.smallestkey = std::move(file_metadata.smallestkey); + live_file_metadata.largestkey = std::move(file_metadata.largestkey); + live_file_metadata.oldest_blob_file_number = + file_metadata.oldest_blob_file_number; + live_file_metadata.level = level_metadata.level; + result_metadata->files.push_back(live_file_metadata); + } + *metadata = result_metadata; + } + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.", + cf_name.c_str()); + } else { + // Failure: Clean up all the files/directories created. + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s", + cf_name.c_str(), s.ToString().c_str()); + std::vector subchildren; + const auto cleanup_dir = + moved_to_user_specified_dir ? export_dir : tmp_export_dir; + db_->GetEnv()->GetChildren(cleanup_dir, &subchildren); + for (const auto& subchild : subchildren) { + const auto subchild_path = cleanup_dir + "/" + subchild; + const auto status = db_->GetEnv()->DeleteFile(subchild_path); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s", + subchild_path.c_str(), status.ToString().c_str()); + } + } + const auto status = db_->GetEnv()->DeleteDir(cleanup_dir); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s", + cleanup_dir.c_str(), status.ToString().c_str()); + } + } + return s; +} + +Status CheckpointImpl::ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function + link_file_cb, + std::function + copy_file_cb) { + Status s; + auto hardlink_file = true; + + // Copy/hard link files in metadata. + size_t num_files = 0; + for (const auto& level_metadata : metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + uint64_t number; + FileType type; + const auto ok = ParseFileName(file_metadata.name, &number, &type); + if (!ok) { + s = Status::Corruption("Could not parse file name"); + break; + } + + // We should only get sst files here. + assert(type == kTableFile); + assert(file_metadata.size > 0 && file_metadata.name[0] == '/'); + const auto src_fname = file_metadata.name; + ++num_files; + + if (hardlink_file) { + s = link_file_cb(db_->GetName(), src_fname); + if (num_files == 1 && s.IsNotSupported()) { + // Fallback to copy if link failed due to cross-device directories. + hardlink_file = false; + s = Status::OK(); + } + } + if (!hardlink_file) { + s = copy_file_cb(db_->GetName(), src_fname); + } + if (!s.ok()) { + break; + } + } + } + ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt, + num_files); + + return s; +} +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_impl.h b/src/rocksdb/utilities/checkpoint/checkpoint_impl.h new file mode 100644 index 000000000..2947330cc --- /dev/null +++ b/src/rocksdb/utilities/checkpoint/checkpoint_impl.h @@ -0,0 +1,66 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include + +#include "file/filename.h" +#include "rocksdb/db.h" +#include "rocksdb/utilities/checkpoint.h" + +namespace ROCKSDB_NAMESPACE { + +class CheckpointImpl : public Checkpoint { + public: + explicit CheckpointImpl(DB* db) : db_(db) {} + + Status CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush, + uint64_t* sequence_number_ptr) override; + + Status ExportColumnFamily(ColumnFamilyHandle* handle, + const std::string& export_dir, + ExportImportFilesMetaData** metadata) override; + + // Checkpoint logic can be customized by providing callbacks for link, copy, + // or create. + Status CreateCustomCheckpoint( + std::function + link_file_cb, + std::function + copy_file_cb, + std::function + create_file_cb, + uint64_t* sequence_number, uint64_t log_size_for_flush, + bool get_live_table_checksum = false); + + private: + void CleanStagingDirectory(const std::string& path, Logger* info_log); + + // Export logic customization by providing callbacks for link or copy. + Status ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function + link_file_cb, + std::function + copy_file_cb); + + private: + DB* db_; +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_test.cc b/src/rocksdb/utilities/checkpoint/checkpoint_test.cc new file mode 100644 index 000000000..3da753d5f --- /dev/null +++ b/src/rocksdb/utilities/checkpoint/checkpoint_test.cc @@ -0,0 +1,974 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// Syncpoint prevents us building and running tests in release +#ifndef ROCKSDB_LITE +#include "rocksdb/utilities/checkpoint.h" + +#ifndef OS_WIN +#include +#endif +#include +#include +#include + +#include "db/db_impl/db_impl.h" +#include "file/file_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/utilities/transaction_db.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "utilities/fault_injection_env.h" +#include "utilities/fault_injection_fs.h" + +namespace ROCKSDB_NAMESPACE { +class CheckpointTest : public testing::Test { + protected: + // Sequence of option configurations to try + enum OptionConfig { + kDefault = 0, + }; + int option_config_; + + public: + std::string dbname_; + std::string alternative_wal_dir_; + Env* env_; + DB* db_; + Options last_options_; + std::vector handles_; + std::string snapshot_name_; + std::string export_path_; + ColumnFamilyHandle* cfh_reverse_comp_; + ExportImportFilesMetaData* metadata_; + + CheckpointTest() : env_(Env::Default()) { + env_->SetBackgroundThreads(1, Env::LOW); + env_->SetBackgroundThreads(1, Env::HIGH); + dbname_ = test::PerThreadDBPath(env_, "checkpoint_test"); + alternative_wal_dir_ = dbname_ + "/wal"; + auto options = CurrentOptions(); + auto delete_options = options; + delete_options.wal_dir = alternative_wal_dir_; + EXPECT_OK(DestroyDB(dbname_, delete_options)); + // Destroy it for not alternative WAL dir is used. + EXPECT_OK(DestroyDB(dbname_, options)); + db_ = nullptr; + snapshot_name_ = test::PerThreadDBPath(env_, "snapshot"); + std::string snapshot_tmp_name = snapshot_name_ + ".tmp"; + EXPECT_OK(DestroyDB(snapshot_name_, options)); + test::DeleteDir(env_, snapshot_name_); + EXPECT_OK(DestroyDB(snapshot_tmp_name, options)); + test::DeleteDir(env_, snapshot_tmp_name); + Reopen(options); + export_path_ = test::PerThreadDBPath("/export"); + DestroyDir(env_, export_path_).PermitUncheckedError(); + cfh_reverse_comp_ = nullptr; + metadata_ = nullptr; + } + + ~CheckpointTest() override { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + if (cfh_reverse_comp_) { + EXPECT_OK(db_->DestroyColumnFamilyHandle(cfh_reverse_comp_)); + cfh_reverse_comp_ = nullptr; + } + if (metadata_) { + delete metadata_; + metadata_ = nullptr; + } + Close(); + Options options; + options.db_paths.emplace_back(dbname_, 0); + options.db_paths.emplace_back(dbname_ + "_2", 0); + options.db_paths.emplace_back(dbname_ + "_3", 0); + options.db_paths.emplace_back(dbname_ + "_4", 0); + EXPECT_OK(DestroyDB(dbname_, options)); + EXPECT_OK(DestroyDB(snapshot_name_, options)); + DestroyDir(env_, export_path_).PermitUncheckedError(); + } + + // Return the current option configuration. + Options CurrentOptions() { + Options options; + options.env = env_; + options.create_if_missing = true; + return options; + } + + void CreateColumnFamilies(const std::vector& cfs, + const Options& options) { + ColumnFamilyOptions cf_opts(options); + size_t cfi = handles_.size(); + handles_.resize(cfi + cfs.size()); + for (auto cf : cfs) { + ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++])); + } + } + + void CreateAndReopenWithCF(const std::vector& cfs, + const Options& options) { + CreateColumnFamilies(cfs, options); + std::vector cfs_plus_default = cfs; + cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); + ReopenWithColumnFamilies(cfs_plus_default, options); + } + + void ReopenWithColumnFamilies(const std::vector& cfs, + const std::vector& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + void ReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + Status TryReopenWithColumnFamilies(const std::vector& cfs, + const std::vector& options) { + Close(); + EXPECT_EQ(cfs.size(), options.size()); + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i])); + } + DBOptions db_opts = DBOptions(options[0]); + return DB::Open(db_opts, dbname_, column_families, &handles_, &db_); + } + + Status TryReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + Close(); + std::vector v_opts(cfs.size(), options); + return TryReopenWithColumnFamilies(cfs, v_opts); + } + + void Reopen(const Options& options) { ASSERT_OK(TryReopen(options)); } + + void CompactAll() { + for (auto h : handles_) { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), h, nullptr, nullptr)); + } + } + + void Close() { + for (auto h : handles_) { + delete h; + } + handles_.clear(); + delete db_; + db_ = nullptr; + } + + void DestroyAndReopen(const Options& options) { + // Destroy using last options + Destroy(last_options_); + ASSERT_OK(TryReopen(options)); + } + + void Destroy(const Options& options) { + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + } + + Status ReadOnlyReopen(const Options& options) { + return DB::OpenForReadOnly(options, dbname_, &db_); + } + + Status ReadOnlyReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + std::vector column_families; + for (const auto& cf : cfs) { + column_families.emplace_back(cf, options); + } + return DB::OpenForReadOnly(options, dbname_, column_families, &handles_, + &db_); + } + + Status TryReopen(const Options& options) { + Close(); + last_options_ = options; + return DB::Open(options, dbname_, &db_); + } + + Status Flush(int cf = 0) { + if (cf == 0) { + return db_->Flush(FlushOptions()); + } else { + return db_->Flush(FlushOptions(), handles_[cf]); + } + } + + Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) { + return db_->Put(wo, k, v); + } + + Status Put(int cf, const Slice& k, const Slice& v, + WriteOptions wo = WriteOptions()) { + return db_->Put(wo, handles_[cf], k, v); + } + + Status Delete(const std::string& k) { return db_->Delete(WriteOptions(), k); } + + Status Delete(int cf, const std::string& k) { + return db_->Delete(WriteOptions(), handles_[cf], k); + } + + std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + std::string Get(int cf, const std::string& k, + const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, handles_[cf], k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } +}; + +TEST_F(CheckpointTest, GetSnapshotLink) { + for (uint64_t log_size_for_flush : {0, 1000000}) { + Options options; + DB* snapshotDB; + ReadOptions roptions; + std::string result; + Checkpoint* checkpoint; + + options = CurrentOptions(); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + + // Create a database + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + std::string key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + // Take a snapshot + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, log_size_for_flush)); + ASSERT_OK(Put(key, "v2")); + ASSERT_EQ("v2", Get(key)); + ASSERT_OK(Flush()); + ASSERT_EQ("v2", Get(key)); + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, key, &result)); + ASSERT_EQ("v1", result); + delete snapshotDB; + snapshotDB = nullptr; + delete db_; + db_ = nullptr; + + // Destroy original DB + ASSERT_OK(DestroyDB(dbname_, options)); + + // Open snapshot and verify contents + options.create_if_missing = false; + dbname_ = snapshot_name_; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + ASSERT_EQ("v1", Get(key)); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + delete checkpoint; + + // Restore DB name + dbname_ = test::PerThreadDBPath(env_, "db_test"); + } +} + +TEST_F(CheckpointTest, CheckpointWithBlob) { + // Create a database with a blob file + Options options = CurrentOptions(); + options.create_if_missing = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + ASSERT_OK(Put(key, blob)); + ASSERT_OK(Flush()); + + // Create a checkpoint + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + std::unique_ptr checkpoint_guard(checkpoint); + + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + + // Make sure it contains the blob file + std::vector files; + ASSERT_OK(env_->GetChildren(snapshot_name_, &files)); + + bool blob_file_found = false; + for (const auto& file : files) { + uint64_t number = 0; + FileType type = kWalFile; + + if (ParseFileName(file, &number, &type) && type == kBlobFile) { + blob_file_found = true; + break; + } + } + + ASSERT_TRUE(blob_file_found); + + // Make sure the checkpoint can be opened and the blob value read + options.create_if_missing = false; + DB* checkpoint_db = nullptr; + ASSERT_OK(DB::Open(options, snapshot_name_, &checkpoint_db)); + + std::unique_ptr checkpoint_db_guard(checkpoint_db); + + PinnableSlice value; + ASSERT_OK(checkpoint_db->Get( + ReadOptions(), checkpoint_db->DefaultColumnFamily(), key, &value)); + + ASSERT_EQ(value, blob); +} + +TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) { + // Create a database + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + // Helper to verify the number of files in metadata and export dir + auto verify_files_exported = [&](const ExportImportFilesMetaData& metadata, + int num_files_expected) { + ASSERT_EQ(metadata.files.size(), num_files_expected); + std::vector subchildren; + ASSERT_OK(env_->GetChildren(export_path_, &subchildren)); + ASSERT_EQ(subchildren.size(), num_files_expected); + }; + + // Test DefaultColumnFamily + { + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + ASSERT_OK(DestroyDir(env_, export_path_)); + delete metadata_; + metadata_ = nullptr; + + // Check again after compaction + CompactAll(); + ASSERT_OK(Put(key, "v2")); + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 2); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + ASSERT_OK(DestroyDir(env_, export_path_)); + delete metadata_; + metadata_ = nullptr; + delete checkpoint; + } + + // Test non default column family with non default comparator + { + auto cf_options = CurrentOptions(); + cf_options.comparator = ReverseBytewiseComparator(); + ASSERT_OK(db_->CreateColumnFamily(cf_options, "yoyo", &cfh_reverse_comp_)); + + const auto key = std::string("foo"); + ASSERT_OK(db_->Put(WriteOptions(), cfh_reverse_comp_, key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(cfh_reverse_comp_, export_path_, + &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, + ReverseBytewiseComparator()->Name()); + delete checkpoint; + } +} + +TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) { + // Create a database + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export onto existing directory + ASSERT_OK(env_->CreateDirIfMissing(export_path_)); + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir exists")); + ASSERT_OK(DestroyDir(env_, export_path_)); + + // Export with invalid directory specification + export_path_ = ""; + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir invalid")); + delete checkpoint; +} + +TEST_F(CheckpointTest, CheckpointCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointTest::CheckpointCF:2", "DBImpl::GetLiveFiles:2"}, + {"DBImpl::GetLiveFiles:1", "CheckpointTest::CheckpointCF:1"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(0, "Default", "Default")); + ASSERT_OK(Put(1, "one", "one")); + ASSERT_OK(Put(2, "two", "two")); + ASSERT_OK(Put(3, "three", "three")); + ASSERT_OK(Put(4, "four", "four")); + ASSERT_OK(Put(5, "five", "five")); + + DB* snapshotDB; + ReadOptions roptions; + std::string result; + std::vector cphandles; + + // Take a snapshot + ROCKSDB_NAMESPACE::port::Thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + }); + TEST_SYNC_POINT("CheckpointTest::CheckpointCF:1"); + ASSERT_OK(Put(0, "Default", "Default1")); + ASSERT_OK(Put(1, "one", "eleven")); + ASSERT_OK(Put(2, "two", "twelve")); + ASSERT_OK(Put(3, "three", "thirteen")); + ASSERT_OK(Put(4, "four", "fourteen")); + ASSERT_OK(Put(5, "five", "fifteen")); + TEST_SYNC_POINT("CheckpointTest::CheckpointCF:2"); + t.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_OK(Put(1, "one", "twentyone")); + ASSERT_OK(Put(2, "two", "twentytwo")); + ASSERT_OK(Put(3, "three", "twentythree")); + ASSERT_OK(Put(4, "four", "twentyfour")); + ASSERT_OK(Put(5, "five", "twentyfive")); + ASSERT_OK(Flush()); + + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + std::vector cfs; + cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"}; + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options)); + } + ASSERT_OK(DB::Open(options, snapshot_name_, column_families, &cphandles, + &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result)); + ASSERT_EQ("Default1", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result)); + ASSERT_EQ("eleven", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result)); + for (auto h : cphandles) { + delete h; + } + cphandles.clear(); + delete snapshotDB; + snapshotDB = nullptr; +} + +TEST_F(CheckpointTest, CheckpointCFNoFlush) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(0, "Default", "Default")); + ASSERT_OK(Put(1, "one", "one")); + ASSERT_OK(Flush()); + ASSERT_OK(Put(2, "two", "two")); + + DB* snapshotDB; + ReadOptions roptions; + std::string result; + std::vector cphandles; + + // Take a snapshot + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) { + // Flush should never trigger. + FAIL(); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 1000000)); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + delete checkpoint; + ASSERT_OK(Put(1, "one", "two")); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(2, "two", "twentytwo")); + Close(); + EXPECT_OK(DestroyDB(dbname_, options)); + + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + std::vector cfs; + cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"}; + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options)); + } + ASSERT_OK(DB::Open(options, snapshot_name_, column_families, &cphandles, + &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result)); + ASSERT_EQ("Default", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result)); + ASSERT_EQ("one", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result)); + ASSERT_EQ("two", result); + for (auto h : cphandles) { + delete h; + } + cphandles.clear(); + delete snapshotDB; + snapshotDB = nullptr; +} + +TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) { + Options options = CurrentOptions(); + options.max_manifest_file_size = 0; // always rollover manifest for file add + Reopen(options); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {// Get past the flush in the checkpoint thread before adding any keys to + // the db so the checkpoint thread won't hit the WriteManifest + // syncpoints. + {"CheckpointImpl::CreateCheckpoint:FlushDone", + "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"}, + // Roll the manifest during checkpointing right after live files are + // snapshotted. + {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "VersionSet::LogAndApply:WriteManifest"}, + {"VersionSet::LogAndApply:WriteManifestDone", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + }); + TEST_SYNC_POINT( + "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"); + ASSERT_OK(Put("Default", "Default1")); + ASSERT_OK(Flush()); + t.join(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + DB* snapshotDB; + // Successful Open() implies that CURRENT pointed to the manifest in the + // checkpoint. + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB)); + delete snapshotDB; + snapshotDB = nullptr; +} + +TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) { + Close(); + const std::string dbname = test::PerThreadDBPath("transaction_testdb"); + ASSERT_OK(DestroyDB(dbname, CurrentOptions())); + test::DeleteDir(env_, dbname); + + Options options = CurrentOptions(); + options.allow_2pc = true; + // allow_2pc is implicitly set with tx prepare + // options.allow_2pc = true; + TransactionDBOptions txn_db_options; + TransactionDB* txdb; + Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb); + ASSERT_OK(s); + ColumnFamilyHandle* cfa; + ColumnFamilyHandle* cfb; + ColumnFamilyOptions cf_options; + ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFA", &cfa)); + + WriteOptions write_options; + // Insert something into CFB so lots of log files will be kept + // before creating the checkpoint. + ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFB", &cfb)); + ASSERT_OK(txdb->Put(write_options, cfb, "", "")); + + ReadOptions read_options; + std::string value; + TransactionOptions txn_options; + Transaction* txn = txdb->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + ASSERT_EQ(txdb->GetTransactionByName("xid"), txn); + + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa")); + ASSERT_OK(s); + // Writing prepare into middle of first WAL, then flush WALs many times + for (int i = 1; i <= 100000; i++) { + Transaction* tx = txdb->BeginTransaction(write_options, txn_options); + ASSERT_OK(tx->SetName("x")); + ASSERT_OK(tx->Put(Slice(std::to_string(i)), Slice("val"))); + ASSERT_OK(tx->Put(cfa, Slice("aaa"), Slice("111"))); + ASSERT_OK(tx->Prepare()); + ASSERT_OK(tx->Commit()); + if (i % 10000 == 0) { + ASSERT_OK(txdb->Flush(FlushOptions())); + } + if (i == 88888) { + ASSERT_OK(txn->Prepare()); + } + delete tx; + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"}, + {"CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::port::Thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(txdb, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + }); + TEST_SYNC_POINT( + "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"); + ASSERT_OK(txn->Commit()); + delete txn; + TEST_SYNC_POINT( + "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit"); + t.join(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + // No more than two logs files should exist. + std::vector files; + ASSERT_OK(env_->GetChildren(snapshot_name_, &files)); + int num_log_files = 0; + for (auto& file : files) { + uint64_t num; + FileType type; + WalFileType log_type; + if (ParseFileName(file, &num, &type, &log_type) && type == kWalFile) { + num_log_files++; + } + } + // One flush after preapare + one outstanding file before checkpoint + one log + // file generated after checkpoint. + ASSERT_LE(num_log_files, 3); + + TransactionDB* snapshotDB; + std::vector column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); + std::vector cf_handles; + ASSERT_OK(TransactionDB::Open(options, txn_db_options, snapshot_name_, + column_families, &cf_handles, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(read_options, "foo", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(snapshotDB->Get(read_options, cf_handles[1], "foocfa", &value)); + ASSERT_EQ(value, "barcfa"); + + delete cfa; + delete cfb; + delete cf_handles[0]; + delete cf_handles[1]; + delete cf_handles[2]; + delete snapshotDB; + snapshotDB = nullptr; + delete txdb; +} + +TEST_F(CheckpointTest, CheckpointInvalidDirectoryName) { + for (std::string checkpoint_dir : {"", "/", "////"}) { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_TRUE( + checkpoint->CreateCheckpoint(checkpoint_dir).IsInvalidArgument()); + delete checkpoint; + } +} + +TEST_F(CheckpointTest, CheckpointWithParallelWrites) { + // When run with TSAN, this exposes the data race fixed in + // https://github.com/facebook/rocksdb/pull/3603 + ASSERT_OK(Put("key1", "val1")); + port::Thread thread([this]() { ASSERT_OK(Put("key2", "val2")); }); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + thread.join(); +} + +TEST_F(CheckpointTest, CheckpointWithUnsyncedDataDropped) { + Options options = CurrentOptions(); + std::unique_ptr env(new FaultInjectionTestEnv(env_)); + options.env = env.get(); + Reopen(options); + ASSERT_OK(Put("key1", "val1")); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + ASSERT_OK(env->DropUnsyncedFileData()); + + // make sure it's openable even though whatever data that wasn't synced got + // dropped. + options.env = env_; + DB* snapshot_db; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result)); + ASSERT_EQ("val1", get_result); + delete snapshot_db; + delete db_; + db_ = nullptr; +} + +TEST_F(CheckpointTest, CheckpointOptionsFileFailedToPersist) { + // Regression test for a bug where checkpoint failed on a DB where persisting + // OPTIONS file failed and the DB was opened with + // `fail_if_options_file_error == false`. + Options options = CurrentOptions(); + options.fail_if_options_file_error = false; + auto fault_fs = std::make_shared(FileSystem::Default()); + + // Setup `FaultInjectionTestFS` and `SyncPoint` callbacks to fail one + // operation when inside the OPTIONS file persisting code. + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + fault_fs->SetRandomMetadataWriteError(1 /* one_in */); + SyncPoint::GetInstance()->SetCallBack( + "PersistRocksDBOptions:start", [fault_fs](void* /* arg */) { + fault_fs->EnableMetadataWriteErrorInjection(); + }); + SyncPoint::GetInstance()->SetCallBack( + "FaultInjectionTestFS::InjectMetadataWriteError:Injected", + [fault_fs](void* /* arg */) { + fault_fs->DisableMetadataWriteErrorInjection(); + }); + options.env = fault_fs_env.get(); + SyncPoint::GetInstance()->EnableProcessing(); + + Reopen(options); + ASSERT_OK(Put("key1", "val1")); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + + // Make sure it's usable. + options.env = env_; + DB* snapshot_db; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result)); + ASSERT_EQ("val1", get_result); + delete snapshot_db; + delete db_; + db_ = nullptr; +} + +TEST_F(CheckpointTest, CheckpointReadOnlyDB) { + ASSERT_OK(Put("foo", "foo_value")); + ASSERT_OK(Flush()); + Close(); + Options options = CurrentOptions(); + ASSERT_OK(ReadOnlyReopen(options)); + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + checkpoint = nullptr; + Close(); + DB* snapshot_db = nullptr; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "foo", &get_result)); + ASSERT_EQ("foo_value", get_result); + delete snapshot_db; +} + +TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + for (int i = 0; i != 3; ++i) { + ASSERT_OK(Put(i, "foo", "foo_value")); + ASSERT_OK(Flush(i)); + } + Close(); + Status s = ReadOnlyReopenWithColumnFamilies( + {kDefaultColumnFamilyName, "pikachu", "eevee"}, options); + ASSERT_OK(s); + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + checkpoint = nullptr; + Close(); + + std::vector column_families{ + {kDefaultColumnFamilyName, options}, + {"pikachu", options}, + {"eevee", options}}; + DB* snapshot_db = nullptr; + std::vector snapshot_handles; + s = DB::Open(options, snapshot_name_, column_families, &snapshot_handles, + &snapshot_db); + ASSERT_OK(s); + ReadOptions read_opts; + for (int i = 0; i != 3; ++i) { + std::string get_result; + s = snapshot_db->Get(read_opts, snapshot_handles[i], "foo", &get_result); + ASSERT_OK(s); + ASSERT_EQ("foo_value", get_result); + } + + for (auto snapshot_h : snapshot_handles) { + delete snapshot_h; + } + snapshot_handles.clear(); + delete snapshot_db; +} + +TEST_F(CheckpointTest, CheckpointWithDbPath) { + Options options = CurrentOptions(); + options.db_paths.emplace_back(dbname_ + "_2", 0); + Reopen(options); + ASSERT_OK(Put("key1", "val1")); + Flush(); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + // Currently not supported + ASSERT_TRUE(checkpoint->CreateCheckpoint(snapshot_name_).IsNotSupported()); + delete checkpoint; +} + +TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) { + // Repro for a race condition where a user write comes in after the checkpoint + // syncs WAL for `track_and_verify_wals_in_manifest` but before the + // corresponding MANIFEST update. With the bug, that scenario resulted in an + // unopenable DB with error "Corruption: Size mismatch: WAL ...". + Options options = CurrentOptions(); + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + options.env = fault_env.get(); + options.track_and_verify_wals_in_manifest = true; + Reopen(options); + + ASSERT_OK(Put("key1", "val1")); + + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncWAL:BeforeMarkLogsSynced:1", + [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::unique_ptr checkpoint; + { + Checkpoint* checkpoint_ptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr)); + checkpoint.reset(checkpoint_ptr); + } + + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + + // Ensure callback ran. + ASSERT_EQ("val2", Get("key2")); + + Close(); + + // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the + // DB WAL. + fault_env->DropUnsyncedFileData(); + + // Before the bug fix, reopening the DB would fail because the MANIFEST's + // AddWal entry indicated the WAL should be synced through "key2" -> "val2". + Reopen(options); + + // Need to close before `fault_env` goes out of scope. + Close(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as Checkpoint is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE -- cgit v1.2.3