diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rocksdb/utilities/checkpoint | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/utilities/checkpoint/checkpoint_impl.cc | 516 | ||||
-rw-r--r-- | src/rocksdb/utilities/checkpoint/checkpoint_impl.h | 79 | ||||
-rw-r--r-- | src/rocksdb/utilities/checkpoint/checkpoint_test.cc | 829 |
3 files changed, 1424 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc b/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc new file mode 100644 index 000000000..98e609609 --- /dev/null +++ b/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc @@ -0,0 +1,516 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2012 Facebook. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef ROCKSDB_LITE + +#include "utilities/checkpoint/checkpoint_impl.h" + +#include <algorithm> +#include <cinttypes> +#include <string> +#include <vector> + +#include "db/wal_manager.h" +#include "file/file_util.h" +#include "file/filename.h" +#include "port/port.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/metadata.h" +#include "rocksdb/transaction_log.h" +#include "rocksdb/utilities/checkpoint.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { + +Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) { + *checkpoint_ptr = new CheckpointImpl(db); + return Status::OK(); +} + +Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/, + uint64_t /*log_size_for_flush*/) { + return Status::NotSupported(""); +} + +void CheckpointImpl::CleanStagingDirectory( + const std::string& full_private_path, Logger* info_log) { + std::vector<std::string> subchildren; + Status s = db_->GetEnv()->FileExists(full_private_path); + if (s.IsNotFound()) { + return; + } + ROCKS_LOG_INFO(info_log, "File exists %s -- %s", + full_private_path.c_str(), s.ToString().c_str()); + db_->GetEnv()->GetChildren(full_private_path, &subchildren); + for (auto& subchild : subchildren) { + std::string subchild_path = full_private_path + "/" + subchild; + s = db_->GetEnv()->DeleteFile(subchild_path); + ROCKS_LOG_INFO(info_log, "Delete file %s -- %s", + subchild_path.c_str(), s.ToString().c_str()); + } + // finally delete the private dir + s = db_->GetEnv()->DeleteDir(full_private_path); + ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s", + full_private_path.c_str(), s.ToString().c_str()); +} + +Status Checkpoint::ExportColumnFamily( + ColumnFamilyHandle* /*handle*/, const std::string& /*export_dir*/, + ExportImportFilesMetaData** /*metadata*/) { + return Status::NotSupported(""); +} + +// Builds an openable snapshot of RocksDB +Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush) { + DBOptions db_options = db_->GetDBOptions(); + + Status s = db_->GetEnv()->FileExists(checkpoint_dir); + if (s.ok()) { + return Status::InvalidArgument("Directory exists"); + } else if (!s.IsNotFound()) { + assert(s.IsIOError()); + return s; + } + + ROCKS_LOG_INFO( + db_options.info_log, + "Started the snapshot process -- creating snapshot in directory %s", + checkpoint_dir.c_str()); + + size_t final_nonslash_idx = checkpoint_dir.find_last_not_of('/'); + if (final_nonslash_idx == std::string::npos) { + // npos means it's only slashes or empty. Non-empty means it's the root + // directory, but it shouldn't be because we verified above the directory + // doesn't exist. + assert(checkpoint_dir.empty()); + return Status::InvalidArgument("invalid checkpoint directory name"); + } + + std::string full_private_path = + checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp"; + ROCKS_LOG_INFO( + db_options.info_log, + "Snapshot process -- using temporary directory %s", + full_private_path.c_str()); + CleanStagingDirectory(full_private_path, db_options.info_log.get()); + // create snapshot directory + s = db_->GetEnv()->CreateDir(full_private_path); + uint64_t sequence_number = 0; + if (s.ok()) { + db_->DisableFileDeletions(); + s = CreateCustomCheckpoint( + db_options, + [&](const std::string& src_dirname, const std::string& fname, + FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str()); + return db_->GetFileSystem()->LinkFile(src_dirname + fname, + full_private_path + fname, + IOOptions(), nullptr); + } /* link_file_cb */, + [&](const std::string& src_dirname, const std::string& fname, + uint64_t size_limit_bytes, FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str()); + return CopyFile(db_->GetFileSystem(), src_dirname + fname, + full_private_path + fname, size_limit_bytes, + db_options.use_fsync); + } /* copy_file_cb */, + [&](const std::string& fname, const std::string& contents, FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str()); + return CreateFile(db_->GetFileSystem(), full_private_path + fname, + contents, db_options.use_fsync); + } /* create_file_cb */, + &sequence_number, log_size_for_flush); + // we copied all the files, enable file deletions + db_->EnableFileDeletions(false); + } + + if (s.ok()) { + // move tmp private backup to real snapshot directory + s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir); + } + if (s.ok()) { + std::unique_ptr<Directory> checkpoint_directory; + db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory); + if (checkpoint_directory != nullptr) { + s = checkpoint_directory->Fsync(); + } + } + + if (s.ok()) { + // here we know that we succeeded and installed the new snapshot + ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good"); + ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64, + sequence_number); + } else { + // clean all the files we might have created + ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s", + s.ToString().c_str()); + CleanStagingDirectory(full_private_path, db_options.info_log.get()); + } + return s; +} + +Status CheckpointImpl::CreateCustomCheckpoint( + const DBOptions& db_options, + std::function<Status(const std::string& src_dirname, + const std::string& src_fname, FileType type)> + link_file_cb, + std::function<Status(const std::string& src_dirname, + const std::string& src_fname, + uint64_t size_limit_bytes, FileType type)> + copy_file_cb, + std::function<Status(const std::string& fname, const std::string& contents, + FileType type)> + create_file_cb, + uint64_t* sequence_number, uint64_t log_size_for_flush) { + Status s; + std::vector<std::string> live_files; + uint64_t manifest_file_size = 0; + uint64_t min_log_num = port::kMaxUint64; + *sequence_number = db_->GetLatestSequenceNumber(); + bool same_fs = true; + VectorLogPtr live_wal_files; + + bool flush_memtable = true; + if (s.ok()) { + if (!db_options.allow_2pc) { + if (log_size_for_flush == port::kMaxUint64) { + flush_memtable = false; + } else if (log_size_for_flush > 0) { + // If out standing log files are small, we skip the flush. + s = db_->GetSortedWalFiles(live_wal_files); + + if (!s.ok()) { + return s; + } + + // Don't flush column families if total log size is smaller than + // log_size_for_flush. We copy the log files instead. + // We may be able to cover 2PC case too. + uint64_t total_wal_size = 0; + for (auto& wal : live_wal_files) { + total_wal_size += wal->SizeFileBytes(); + } + if (total_wal_size < log_size_for_flush) { + flush_memtable = false; + } + live_wal_files.clear(); + } + } + + // this will return live_files prefixed with "/" + s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); + + if (s.ok() && db_options.allow_2pc) { + // If 2PC is enabled, we need to get minimum log number after the flush. + // Need to refetch the live files to recapture the snapshot. + if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep, + &min_log_num)) { + return Status::InvalidArgument( + "2PC enabled but cannot fine the min log number to keep."); + } + // We need to refetch live files with flush to handle this case: + // A previous 000001.log contains the prepare record of transaction tnx1. + // The current log file is 000002.log, and sequence_number points to this + // file. + // After calling GetLiveFiles(), 000003.log is created. + // Then tnx1 is committed. The commit record is written to 000003.log. + // Now we fetch min_log_num, which will be 3. + // Then only 000002.log and 000003.log will be copied, and 000001.log will + // be skipped. 000003.log contains commit message of tnx1, but we don't + // have respective prepare record for it. + // In order to avoid this situation, we need to force flush to make sure + // all transactions committed before getting min_log_num will be flushed + // to SST files. + // We cannot get min_log_num before calling the GetLiveFiles() for the + // first time, because if we do that, all the logs files will be included, + // far more than needed. + s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); + } + + TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); + TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"); + db_->FlushWAL(false /* sync */); + } + // if we have more than one column family, we need to also get WAL files + if (s.ok()) { + s = db_->GetSortedWalFiles(live_wal_files); + } + if (!s.ok()) { + return s; + } + + size_t wal_size = live_wal_files.size(); + + // copy/hard link live_files + std::string manifest_fname, current_fname; + for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { + uint64_t number; + FileType type; + bool ok = ParseFileName(live_files[i], &number, &type); + if (!ok) { + s = Status::Corruption("Can't parse file name. This is very bad"); + break; + } + // we should only get sst, options, manifest and current files here + assert(type == kTableFile || type == kDescriptorFile || + type == kCurrentFile || type == kOptionsFile); + assert(live_files[i].size() > 0 && live_files[i][0] == '/'); + if (type == kCurrentFile) { + // We will craft the current file manually to ensure it's consistent with + // the manifest number. This is necessary because current's file contents + // can change during checkpoint creation. + current_fname = live_files[i]; + continue; + } else if (type == kDescriptorFile) { + manifest_fname = live_files[i]; + } + std::string src_fname = live_files[i]; + + // rules: + // * if it's kTableFile, then it's shared + // * if it's kDescriptorFile, limit the size to manifest_file_size + // * always copy if cross-device link + if ((type == kTableFile) && same_fs) { + s = link_file_cb(db_->GetName(), src_fname, type); + if (s.IsNotSupported()) { + same_fs = false; + s = Status::OK(); + } + } + if ((type != kTableFile) || (!same_fs)) { + s = copy_file_cb(db_->GetName(), src_fname, + (type == kDescriptorFile) ? manifest_file_size : 0, + type); + } + } + if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) { + create_file_cb(current_fname, manifest_fname.substr(1) + "\n", + kCurrentFile); + } + ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt, + live_wal_files.size()); + + // Link WAL files. Copy exact size of last one because it is the only one + // that has changes after the last flush. + for (size_t i = 0; s.ok() && i < wal_size; ++i) { + if ((live_wal_files[i]->Type() == kAliveLogFile) && + (!flush_memtable || + live_wal_files[i]->StartSequence() >= *sequence_number || + live_wal_files[i]->LogNumber() >= min_log_num)) { + if (i + 1 == wal_size) { + s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), + live_wal_files[i]->SizeFileBytes(), kLogFile); + break; + } + if (same_fs) { + // we only care about live log files + s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), + kLogFile); + if (s.IsNotSupported()) { + same_fs = false; + s = Status::OK(); + } + } + if (!same_fs) { + s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0, + kLogFile); + } + } + } + + return s; +} + +// Exports all live SST files of a specified Column Family onto export_dir, +// returning SST files information in metadata. +Status CheckpointImpl::ExportColumnFamily( + ColumnFamilyHandle* handle, const std::string& export_dir, + ExportImportFilesMetaData** metadata) { + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handle); + const auto cf_name = cfh->GetName(); + const auto db_options = db_->GetDBOptions(); + + assert(metadata != nullptr); + assert(*metadata == nullptr); + auto s = db_->GetEnv()->FileExists(export_dir); + if (s.ok()) { + return Status::InvalidArgument("Specified export_dir exists"); + } else if (!s.IsNotFound()) { + assert(s.IsIOError()); + return s; + } + + const auto final_nonslash_idx = export_dir.find_last_not_of('/'); + if (final_nonslash_idx == std::string::npos) { + return Status::InvalidArgument("Specified export_dir invalid"); + } + ROCKS_LOG_INFO(db_options.info_log, + "[%s] export column family onto export directory %s", + cf_name.c_str(), export_dir.c_str()); + + // Create a temporary export directory. + const auto tmp_export_dir = + export_dir.substr(0, final_nonslash_idx + 1) + ".tmp"; + s = db_->GetEnv()->CreateDir(tmp_export_dir); + + if (s.ok()) { + s = db_->Flush(ROCKSDB_NAMESPACE::FlushOptions(), handle); + } + + ColumnFamilyMetaData db_metadata; + if (s.ok()) { + // Export live sst files with file deletions disabled. + s = db_->DisableFileDeletions(); + if (s.ok()) { + db_->GetColumnFamilyMetaData(handle, &db_metadata); + + s = ExportFilesInMetaData( + db_options, db_metadata, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s", + cf_name.c_str(), fname.c_str()); + return db_->GetEnv()->LinkFile(src_dirname + fname, + tmp_export_dir + fname); + } /*link_file_cb*/, + [&](const std::string& src_dirname, const std::string& fname) { + ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s", + cf_name.c_str(), fname.c_str()); + return CopyFile(db_->GetFileSystem(), src_dirname + fname, + tmp_export_dir + fname, 0, db_options.use_fsync); + } /*copy_file_cb*/); + + const auto enable_status = db_->EnableFileDeletions(false /*force*/); + if (s.ok()) { + s = enable_status; + } + } + } + + auto moved_to_user_specified_dir = false; + if (s.ok()) { + // Move temporary export directory to the actual export directory. + s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir); + } + + if (s.ok()) { + // Fsync export directory. + moved_to_user_specified_dir = true; + std::unique_ptr<Directory> dir_ptr; + s = db_->GetEnv()->NewDirectory(export_dir, &dir_ptr); + if (s.ok()) { + assert(dir_ptr != nullptr); + s = dir_ptr->Fsync(); + } + } + + if (s.ok()) { + // Export of files succeeded. Fill in the metadata information. + auto result_metadata = new ExportImportFilesMetaData(); + result_metadata->db_comparator_name = handle->GetComparator()->Name(); + for (const auto& level_metadata : db_metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + LiveFileMetaData live_file_metadata; + live_file_metadata.size = file_metadata.size; + live_file_metadata.name = std::move(file_metadata.name); + live_file_metadata.file_number = file_metadata.file_number; + live_file_metadata.db_path = export_dir; + live_file_metadata.smallest_seqno = file_metadata.smallest_seqno; + live_file_metadata.largest_seqno = file_metadata.largest_seqno; + live_file_metadata.smallestkey = std::move(file_metadata.smallestkey); + live_file_metadata.largestkey = std::move(file_metadata.largestkey); + live_file_metadata.oldest_blob_file_number = + file_metadata.oldest_blob_file_number; + live_file_metadata.level = level_metadata.level; + result_metadata->files.push_back(live_file_metadata); + } + *metadata = result_metadata; + } + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.", + cf_name.c_str()); + } else { + // Failure: Clean up all the files/directories created. + ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s", + cf_name.c_str(), s.ToString().c_str()); + std::vector<std::string> subchildren; + const auto cleanup_dir = + moved_to_user_specified_dir ? export_dir : tmp_export_dir; + db_->GetEnv()->GetChildren(cleanup_dir, &subchildren); + for (const auto& subchild : subchildren) { + const auto subchild_path = cleanup_dir + "/" + subchild; + const auto status = db_->GetEnv()->DeleteFile(subchild_path); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s", + subchild_path.c_str(), status.ToString().c_str()); + } + } + const auto status = db_->GetEnv()->DeleteDir(cleanup_dir); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s", + cleanup_dir.c_str(), status.ToString().c_str()); + } + } + return s; +} + +Status CheckpointImpl::ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function<Status(const std::string& src_dirname, + const std::string& src_fname)> + link_file_cb, + std::function<Status(const std::string& src_dirname, + const std::string& src_fname)> + copy_file_cb) { + Status s; + auto hardlink_file = true; + + // Copy/hard link files in metadata. + size_t num_files = 0; + for (const auto& level_metadata : metadata.levels) { + for (const auto& file_metadata : level_metadata.files) { + uint64_t number; + FileType type; + const auto ok = ParseFileName(file_metadata.name, &number, &type); + if (!ok) { + s = Status::Corruption("Could not parse file name"); + break; + } + + // We should only get sst files here. + assert(type == kTableFile); + assert(file_metadata.size > 0 && file_metadata.name[0] == '/'); + const auto src_fname = file_metadata.name; + ++num_files; + + if (hardlink_file) { + s = link_file_cb(db_->GetName(), src_fname); + if (num_files == 1 && s.IsNotSupported()) { + // Fallback to copy if link failed due to cross-device directories. + hardlink_file = false; + s = Status::OK(); + } + } + if (!hardlink_file) { + s = copy_file_cb(db_->GetName(), src_fname); + } + if (!s.ok()) { + break; + } + } + } + ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt, + num_files); + + return s; +} +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_impl.h b/src/rocksdb/utilities/checkpoint/checkpoint_impl.h new file mode 100644 index 000000000..81ee8320b --- /dev/null +++ b/src/rocksdb/utilities/checkpoint/checkpoint_impl.h @@ -0,0 +1,79 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/checkpoint.h" + +#include <string> +#include "file/filename.h" +#include "rocksdb/db.h" + +namespace ROCKSDB_NAMESPACE { + +class CheckpointImpl : public Checkpoint { + public: + // Creates a Checkpoint object to be used for creating openable snapshots + explicit CheckpointImpl(DB* db) : db_(db) {} + + // Builds an openable snapshot of RocksDB on the same disk, which + // accepts an output directory on the same disk, and under the directory + // (1) hard-linked SST files pointing to existing live SST files + // SST files will be copied if output directory is on a different filesystem + // (2) a copied manifest files and other files + // The directory should not already exist and will be created by this API. + // The directory will be an absolute path + using Checkpoint::CreateCheckpoint; + virtual Status CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush) override; + + // Exports all live SST files of a specified Column Family onto export_dir + // and returning SST files information in metadata. + // - SST files will be created as hard links when the directory specified + // is in the same partition as the db directory, copied otherwise. + // - export_dir should not already exist and will be created by this API. + // - Always triggers a flush. + using Checkpoint::ExportColumnFamily; + virtual Status ExportColumnFamily( + ColumnFamilyHandle* handle, const std::string& export_dir, + ExportImportFilesMetaData** metadata) override; + + // Checkpoint logic can be customized by providing callbacks for link, copy, + // or create. + Status CreateCustomCheckpoint( + const DBOptions& db_options, + std::function<Status(const std::string& src_dirname, + const std::string& fname, FileType type)> + link_file_cb, + std::function<Status(const std::string& src_dirname, + const std::string& fname, uint64_t size_limit_bytes, + FileType type)> + copy_file_cb, + std::function<Status(const std::string& fname, + const std::string& contents, FileType type)> + create_file_cb, + uint64_t* sequence_number, uint64_t log_size_for_flush); + + private: + void CleanStagingDirectory(const std::string& path, Logger* info_log); + + // Export logic customization by providing callbacks for link or copy. + Status ExportFilesInMetaData( + const DBOptions& db_options, const ColumnFamilyMetaData& metadata, + std::function<Status(const std::string& src_dirname, + const std::string& fname)> + link_file_cb, + std::function<Status(const std::string& src_dirname, + const std::string& fname)> + copy_file_cb); + + private: + DB* db_; +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_test.cc b/src/rocksdb/utilities/checkpoint/checkpoint_test.cc new file mode 100644 index 000000000..1a31c40ff --- /dev/null +++ b/src/rocksdb/utilities/checkpoint/checkpoint_test.cc @@ -0,0 +1,829 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// Syncpoint prevents us building and running tests in release +#ifndef ROCKSDB_LITE + +#ifndef OS_WIN +#include <unistd.h> +#endif +#include <iostream> +#include <thread> +#include <utility> +#include "db/db_impl/db_impl.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/utilities/checkpoint.h" +#include "rocksdb/utilities/transaction_db.h" +#include "test_util/fault_injection_test_env.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { +class CheckpointTest : public testing::Test { + protected: + // Sequence of option configurations to try + enum OptionConfig { + kDefault = 0, + }; + int option_config_; + + public: + std::string dbname_; + std::string alternative_wal_dir_; + Env* env_; + DB* db_; + Options last_options_; + std::vector<ColumnFamilyHandle*> handles_; + std::string snapshot_name_; + std::string export_path_; + ColumnFamilyHandle* cfh_reverse_comp_; + ExportImportFilesMetaData* metadata_; + + CheckpointTest() : env_(Env::Default()) { + env_->SetBackgroundThreads(1, Env::LOW); + env_->SetBackgroundThreads(1, Env::HIGH); + dbname_ = test::PerThreadDBPath(env_, "checkpoint_test"); + alternative_wal_dir_ = dbname_ + "/wal"; + auto options = CurrentOptions(); + auto delete_options = options; + delete_options.wal_dir = alternative_wal_dir_; + EXPECT_OK(DestroyDB(dbname_, delete_options)); + // Destroy it for not alternative WAL dir is used. + EXPECT_OK(DestroyDB(dbname_, options)); + db_ = nullptr; + snapshot_name_ = test::PerThreadDBPath(env_, "snapshot"); + std::string snapshot_tmp_name = snapshot_name_ + ".tmp"; + EXPECT_OK(DestroyDB(snapshot_name_, options)); + env_->DeleteDir(snapshot_name_); + EXPECT_OK(DestroyDB(snapshot_tmp_name, options)); + env_->DeleteDir(snapshot_tmp_name); + Reopen(options); + export_path_ = test::TmpDir(env_) + "/export"; + test::DestroyDir(env_, export_path_); + cfh_reverse_comp_ = nullptr; + metadata_ = nullptr; + } + + ~CheckpointTest() override { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + if (cfh_reverse_comp_) { + EXPECT_OK(db_->DestroyColumnFamilyHandle(cfh_reverse_comp_)); + cfh_reverse_comp_ = nullptr; + } + if (metadata_) { + delete metadata_; + metadata_ = nullptr; + } + Close(); + Options options; + options.db_paths.emplace_back(dbname_, 0); + options.db_paths.emplace_back(dbname_ + "_2", 0); + options.db_paths.emplace_back(dbname_ + "_3", 0); + options.db_paths.emplace_back(dbname_ + "_4", 0); + EXPECT_OK(DestroyDB(dbname_, options)); + EXPECT_OK(DestroyDB(snapshot_name_, options)); + test::DestroyDir(env_, export_path_); + } + + // Return the current option configuration. + Options CurrentOptions() { + Options options; + options.env = env_; + options.create_if_missing = true; + return options; + } + + void CreateColumnFamilies(const std::vector<std::string>& cfs, + const Options& options) { + ColumnFamilyOptions cf_opts(options); + size_t cfi = handles_.size(); + handles_.resize(cfi + cfs.size()); + for (auto cf : cfs) { + ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++])); + } + } + + void CreateAndReopenWithCF(const std::vector<std::string>& cfs, + const Options& options) { + CreateColumnFamilies(cfs, options); + std::vector<std::string> cfs_plus_default = cfs; + cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); + ReopenWithColumnFamilies(cfs_plus_default, options); + } + + void ReopenWithColumnFamilies(const std::vector<std::string>& cfs, + const std::vector<Options>& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + void ReopenWithColumnFamilies(const std::vector<std::string>& cfs, + const Options& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + Status TryReopenWithColumnFamilies( + const std::vector<std::string>& cfs, + const std::vector<Options>& options) { + Close(); + EXPECT_EQ(cfs.size(), options.size()); + std::vector<ColumnFamilyDescriptor> column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i])); + } + DBOptions db_opts = DBOptions(options[0]); + return DB::Open(db_opts, dbname_, column_families, &handles_, &db_); + } + + Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs, + const Options& options) { + Close(); + std::vector<Options> v_opts(cfs.size(), options); + return TryReopenWithColumnFamilies(cfs, v_opts); + } + + void Reopen(const Options& options) { + ASSERT_OK(TryReopen(options)); + } + + void CompactAll() { + for (auto h : handles_) { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), h, nullptr, nullptr)); + } + } + + void Close() { + for (auto h : handles_) { + delete h; + } + handles_.clear(); + delete db_; + db_ = nullptr; + } + + void DestroyAndReopen(const Options& options) { + // Destroy using last options + Destroy(last_options_); + ASSERT_OK(TryReopen(options)); + } + + void Destroy(const Options& options) { + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + } + + Status ReadOnlyReopen(const Options& options) { + return DB::OpenForReadOnly(options, dbname_, &db_); + } + + Status ReadOnlyReopenWithColumnFamilies(const std::vector<std::string>& cfs, + const Options& options) { + std::vector<ColumnFamilyDescriptor> column_families; + for (const auto& cf : cfs) { + column_families.emplace_back(cf, options); + } + return DB::OpenForReadOnly(options, dbname_, column_families, &handles_, + &db_); + } + + Status TryReopen(const Options& options) { + Close(); + last_options_ = options; + return DB::Open(options, dbname_, &db_); + } + + Status Flush(int cf = 0) { + if (cf == 0) { + return db_->Flush(FlushOptions()); + } else { + return db_->Flush(FlushOptions(), handles_[cf]); + } + } + + Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) { + return db_->Put(wo, k, v); + } + + Status Put(int cf, const Slice& k, const Slice& v, + WriteOptions wo = WriteOptions()) { + return db_->Put(wo, handles_[cf], k, v); + } + + Status Delete(const std::string& k) { + return db_->Delete(WriteOptions(), k); + } + + Status Delete(int cf, const std::string& k) { + return db_->Delete(WriteOptions(), handles_[cf], k); + } + + std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + std::string Get(int cf, const std::string& k, + const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, handles_[cf], k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } +}; + +TEST_F(CheckpointTest, GetSnapshotLink) { + for (uint64_t log_size_for_flush : {0, 1000000}) { + Options options; + DB* snapshotDB; + ReadOptions roptions; + std::string result; + Checkpoint* checkpoint; + + options = CurrentOptions(); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + + // Create a database + Status s; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + std::string key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + // Take a snapshot + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, log_size_for_flush)); + ASSERT_OK(Put(key, "v2")); + ASSERT_EQ("v2", Get(key)); + ASSERT_OK(Flush()); + ASSERT_EQ("v2", Get(key)); + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, key, &result)); + ASSERT_EQ("v1", result); + delete snapshotDB; + snapshotDB = nullptr; + delete db_; + db_ = nullptr; + + // Destroy original DB + ASSERT_OK(DestroyDB(dbname_, options)); + + // Open snapshot and verify contents + options.create_if_missing = false; + dbname_ = snapshot_name_; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + ASSERT_EQ("v1", Get(key)); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + delete checkpoint; + + // Restore DB name + dbname_ = test::PerThreadDBPath(env_, "db_test"); + } +} + +TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) { + // Create a database + Status s; + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + // Helper to verify the number of files in metadata and export dir + auto verify_files_exported = [&](const ExportImportFilesMetaData& metadata, + int num_files_expected) { + ASSERT_EQ(metadata.files.size(), num_files_expected); + std::vector<std::string> subchildren; + env_->GetChildren(export_path_, &subchildren); + int num_children = 0; + for (const auto& child : subchildren) { + if (child != "." && child != "..") { + ++num_children; + } + } + ASSERT_EQ(num_children, num_files_expected); + }; + + // Test DefaultColumnFamily + { + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + test::DestroyDir(env_, export_path_); + delete metadata_; + metadata_ = nullptr; + + // Check again after compaction + CompactAll(); + ASSERT_OK(Put(key, "v2")); + ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_)); + verify_files_exported(*metadata_, 2); + ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name()); + test::DestroyDir(env_, export_path_); + delete metadata_; + metadata_ = nullptr; + delete checkpoint; + } + + // Test non default column family with non default comparator + { + auto cf_options = CurrentOptions(); + cf_options.comparator = ReverseBytewiseComparator(); + ASSERT_OK(db_->CreateColumnFamily(cf_options, "yoyo", &cfh_reverse_comp_)); + + const auto key = std::string("foo"); + ASSERT_OK(db_->Put(WriteOptions(), cfh_reverse_comp_, key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export the Tables and verify + ASSERT_OK(checkpoint->ExportColumnFamily(cfh_reverse_comp_, export_path_, + &metadata_)); + verify_files_exported(*metadata_, 1); + ASSERT_EQ(metadata_->db_comparator_name, + ReverseBytewiseComparator()->Name()); + delete checkpoint; + } +} + +TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) { + // Create a database + Status s; + auto options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({}, options); + + const auto key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + + // Export onto existing directory + env_->CreateDirIfMissing(export_path_); + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir exists")); + test::DestroyDir(env_, export_path_); + + // Export with invalid directory specification + export_path_ = ""; + ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(), + export_path_, &metadata_), + Status::InvalidArgument("Specified export_dir invalid")); + delete checkpoint; +} + +TEST_F(CheckpointTest, CheckpointCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointTest::CheckpointCF:2", "DBImpl::GetLiveFiles:2"}, + {"DBImpl::GetLiveFiles:1", "CheckpointTest::CheckpointCF:1"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(0, "Default", "Default")); + ASSERT_OK(Put(1, "one", "one")); + ASSERT_OK(Put(2, "two", "two")); + ASSERT_OK(Put(3, "three", "three")); + ASSERT_OK(Put(4, "four", "four")); + ASSERT_OK(Put(5, "five", "five")); + + DB* snapshotDB; + ReadOptions roptions; + std::string result; + std::vector<ColumnFamilyHandle*> cphandles; + + Status s; + // Take a snapshot + ROCKSDB_NAMESPACE::port::Thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + }); + TEST_SYNC_POINT("CheckpointTest::CheckpointCF:1"); + ASSERT_OK(Put(0, "Default", "Default1")); + ASSERT_OK(Put(1, "one", "eleven")); + ASSERT_OK(Put(2, "two", "twelve")); + ASSERT_OK(Put(3, "three", "thirteen")); + ASSERT_OK(Put(4, "four", "fourteen")); + ASSERT_OK(Put(5, "five", "fifteen")); + TEST_SYNC_POINT("CheckpointTest::CheckpointCF:2"); + t.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_OK(Put(1, "one", "twentyone")); + ASSERT_OK(Put(2, "two", "twentytwo")); + ASSERT_OK(Put(3, "three", "twentythree")); + ASSERT_OK(Put(4, "four", "twentyfour")); + ASSERT_OK(Put(5, "five", "twentyfive")); + ASSERT_OK(Flush()); + + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + std::vector<std::string> cfs; + cfs= {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"}; + std::vector<ColumnFamilyDescriptor> column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options)); + } + ASSERT_OK(DB::Open(options, snapshot_name_, + column_families, &cphandles, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result)); + ASSERT_EQ("Default1", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result)); + ASSERT_EQ("eleven", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result)); + for (auto h : cphandles) { + delete h; + } + cphandles.clear(); + delete snapshotDB; + snapshotDB = nullptr; +} + +TEST_F(CheckpointTest, CheckpointCFNoFlush) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(0, "Default", "Default")); + ASSERT_OK(Put(1, "one", "one")); + Flush(); + ASSERT_OK(Put(2, "two", "two")); + + DB* snapshotDB; + ReadOptions roptions; + std::string result; + std::vector<ColumnFamilyHandle*> cphandles; + + Status s; + // Take a snapshot + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) { + // Flush should never trigger. + FAIL(); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 1000000)); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + delete checkpoint; + ASSERT_OK(Put(1, "one", "two")); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(2, "two", "twentytwo")); + Close(); + EXPECT_OK(DestroyDB(dbname_, options)); + + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + std::vector<std::string> cfs; + cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"}; + std::vector<ColumnFamilyDescriptor> column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options)); + } + ASSERT_OK(DB::Open(options, snapshot_name_, column_families, &cphandles, + &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result)); + ASSERT_EQ("Default", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result)); + ASSERT_EQ("one", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result)); + ASSERT_EQ("two", result); + for (auto h : cphandles) { + delete h; + } + cphandles.clear(); + delete snapshotDB; + snapshotDB = nullptr; +} + +TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) { + Options options = CurrentOptions(); + options.max_manifest_file_size = 0; // always rollover manifest for file add + Reopen(options); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {// Get past the flush in the checkpoint thread before adding any keys to + // the db so the checkpoint thread won't hit the WriteManifest + // syncpoints. + {"DBImpl::GetLiveFiles:1", + "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"}, + // Roll the manifest during checkpointing right after live files are + // snapshotted. + {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "VersionSet::LogAndApply:WriteManifest"}, + {"VersionSet::LogAndApply:WriteManifestDone", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + }); + TEST_SYNC_POINT( + "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"); + ASSERT_OK(Put("Default", "Default1")); + ASSERT_OK(Flush()); + t.join(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + DB* snapshotDB; + // Successful Open() implies that CURRENT pointed to the manifest in the + // checkpoint. + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB)); + delete snapshotDB; + snapshotDB = nullptr; +} + +TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) { + Close(); + const std::string dbname = test::PerThreadDBPath("transaction_testdb"); + ASSERT_OK(DestroyDB(dbname, CurrentOptions())); + env_->DeleteDir(dbname); + + Options options = CurrentOptions(); + options.allow_2pc = true; + // allow_2pc is implicitly set with tx prepare + // options.allow_2pc = true; + TransactionDBOptions txn_db_options; + TransactionDB* txdb; + Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb); + assert(s.ok()); + ColumnFamilyHandle* cfa; + ColumnFamilyHandle* cfb; + ColumnFamilyOptions cf_options; + ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFA", &cfa)); + + WriteOptions write_options; + // Insert something into CFB so lots of log files will be kept + // before creating the checkpoint. + ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFB", &cfb)); + ASSERT_OK(txdb->Put(write_options, cfb, "", "")); + + ReadOptions read_options; + std::string value; + TransactionOptions txn_options; + Transaction* txn = txdb->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + ASSERT_EQ(txdb->GetTransactionByName("xid"), txn); + + s = txn->Put(Slice("foo"), Slice("bar")); + s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa")); + ASSERT_OK(s); + // Writing prepare into middle of first WAL, then flush WALs many times + for (int i = 1; i <= 100000; i++) { + Transaction* tx = txdb->BeginTransaction(write_options, txn_options); + ASSERT_OK(tx->SetName("x")); + ASSERT_OK(tx->Put(Slice(std::to_string(i)), Slice("val"))); + ASSERT_OK(tx->Put(cfa, Slice("aaa"), Slice("111"))); + ASSERT_OK(tx->Prepare()); + ASSERT_OK(tx->Commit()); + if (i % 10000 == 0) { + txdb->Flush(FlushOptions()); + } + if (i == 88888) { + ASSERT_OK(txn->Prepare()); + } + delete tx; + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"}, + {"CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::port::Thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(txdb, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + }); + TEST_SYNC_POINT( + "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"); + ASSERT_OK(txn->Commit()); + delete txn; + TEST_SYNC_POINT( + "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit"); + t.join(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + // No more than two logs files should exist. + std::vector<std::string> files; + env_->GetChildren(snapshot_name_, &files); + int num_log_files = 0; + for (auto& file : files) { + uint64_t num; + FileType type; + WalFileType log_type; + if (ParseFileName(file, &num, &type, &log_type) && type == kLogFile) { + num_log_files++; + } + } + // One flush after preapare + one outstanding file before checkpoint + one log + // file generated after checkpoint. + ASSERT_LE(num_log_files, 3); + + TransactionDB* snapshotDB; + std::vector<ColumnFamilyDescriptor> column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); + std::vector<ROCKSDB_NAMESPACE::ColumnFamilyHandle*> cf_handles; + ASSERT_OK(TransactionDB::Open(options, txn_db_options, snapshot_name_, + column_families, &cf_handles, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(read_options, "foo", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(snapshotDB->Get(read_options, cf_handles[1], "foocfa", &value)); + ASSERT_EQ(value, "barcfa"); + + delete cfa; + delete cfb; + delete cf_handles[0]; + delete cf_handles[1]; + delete cf_handles[2]; + delete snapshotDB; + snapshotDB = nullptr; + delete txdb; +} + +TEST_F(CheckpointTest, CheckpointInvalidDirectoryName) { + for (std::string checkpoint_dir : {"", "/", "////"}) { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_TRUE(checkpoint->CreateCheckpoint("").IsInvalidArgument()); + delete checkpoint; + } +} + +TEST_F(CheckpointTest, CheckpointWithParallelWrites) { + // When run with TSAN, this exposes the data race fixed in + // https://github.com/facebook/rocksdb/pull/3603 + ASSERT_OK(Put("key1", "val1")); + port::Thread thread([this]() { ASSERT_OK(Put("key2", "val2")); }); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + thread.join(); +} + +TEST_F(CheckpointTest, CheckpointWithUnsyncedDataDropped) { + Options options = CurrentOptions(); + std::unique_ptr<FaultInjectionTestEnv> env(new FaultInjectionTestEnv(env_)); + options.env = env.get(); + Reopen(options); + ASSERT_OK(Put("key1", "val1")); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + env->DropUnsyncedFileData(); + + // make sure it's openable even though whatever data that wasn't synced got + // dropped. + options.env = env_; + DB* snapshot_db; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result)); + ASSERT_EQ("val1", get_result); + delete snapshot_db; + delete db_; + db_ = nullptr; +} + +TEST_F(CheckpointTest, CheckpointReadOnlyDB) { + ASSERT_OK(Put("foo", "foo_value")); + ASSERT_OK(Flush()); + Close(); + Options options = CurrentOptions(); + ASSERT_OK(ReadOnlyReopen(options)); + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + checkpoint = nullptr; + Close(); + DB* snapshot_db = nullptr; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "foo", &get_result)); + ASSERT_EQ("foo_value", get_result); + delete snapshot_db; +} + +TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + for (int i = 0; i != 3; ++i) { + ASSERT_OK(Put(i, "foo", "foo_value")); + ASSERT_OK(Flush(i)); + } + Close(); + Status s = ReadOnlyReopenWithColumnFamilies( + {kDefaultColumnFamilyName, "pikachu", "eevee"}, options); + ASSERT_OK(s); + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + checkpoint = nullptr; + Close(); + + std::vector<ColumnFamilyDescriptor> column_families{ + {kDefaultColumnFamilyName, options}, + {"pikachu", options}, + {"eevee", options}}; + DB* snapshot_db = nullptr; + std::vector<ColumnFamilyHandle*> snapshot_handles; + s = DB::Open(options, snapshot_name_, column_families, &snapshot_handles, + &snapshot_db); + ASSERT_OK(s); + ReadOptions read_opts; + for (int i = 0; i != 3; ++i) { + std::string get_result; + s = snapshot_db->Get(read_opts, snapshot_handles[i], "foo", &get_result); + ASSERT_OK(s); + ASSERT_EQ("foo_value", get_result); + } + + for (auto snapshot_h : snapshot_handles) { + delete snapshot_h; + } + snapshot_handles.clear(); + delete snapshot_db; +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as Checkpoint is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE |