summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/checkpoint
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/checkpoint
parentInitial commit. (diff)
downloadceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz
ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/checkpoint')
-rw-r--r--src/rocksdb/utilities/checkpoint/checkpoint_impl.cc469
-rw-r--r--src/rocksdb/utilities/checkpoint/checkpoint_impl.h66
-rw-r--r--src/rocksdb/utilities/checkpoint/checkpoint_test.cc974
3 files changed, 1509 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc b/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc
new file mode 100644
index 000000000..44ce70b1b
--- /dev/null
+++ b/src/rocksdb/utilities/checkpoint/checkpoint_impl.cc
@@ -0,0 +1,469 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2012 Facebook.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/checkpoint/checkpoint_impl.h"
+
+#include <algorithm>
+#include <cinttypes>
+#include <string>
+#include <tuple>
+#include <unordered_set>
+#include <vector>
+
+#include "db/wal_manager.h"
+#include "file/file_util.h"
+#include "file/filename.h"
+#include "logging/logging.h"
+#include "port/port.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/metadata.h"
+#include "rocksdb/options.h"
+#include "rocksdb/transaction_log.h"
+#include "rocksdb/types.h"
+#include "rocksdb/utilities/checkpoint.h"
+#include "test_util/sync_point.h"
+#include "util/cast_util.h"
+#include "util/file_checksum_helper.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
+ *checkpoint_ptr = new CheckpointImpl(db);
+ return Status::OK();
+}
+
+Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
+ uint64_t /*log_size_for_flush*/,
+ uint64_t* /*sequence_number_ptr*/) {
+ return Status::NotSupported("");
+}
+
+void CheckpointImpl::CleanStagingDirectory(const std::string& full_private_path,
+ Logger* info_log) {
+ std::vector<std::string> subchildren;
+ Status s = db_->GetEnv()->FileExists(full_private_path);
+ if (s.IsNotFound()) {
+ return;
+ }
+ ROCKS_LOG_INFO(info_log, "File exists %s -- %s", full_private_path.c_str(),
+ s.ToString().c_str());
+ s = db_->GetEnv()->GetChildren(full_private_path, &subchildren);
+ if (s.ok()) {
+ for (auto& subchild : subchildren) {
+ std::string subchild_path = full_private_path + "/" + subchild;
+ s = db_->GetEnv()->DeleteFile(subchild_path);
+ ROCKS_LOG_INFO(info_log, "Delete file %s -- %s", subchild_path.c_str(),
+ s.ToString().c_str());
+ }
+ }
+ // finally delete the private dir
+ s = db_->GetEnv()->DeleteDir(full_private_path);
+ ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s", full_private_path.c_str(),
+ s.ToString().c_str());
+}
+
+Status Checkpoint::ExportColumnFamily(
+ ColumnFamilyHandle* /*handle*/, const std::string& /*export_dir*/,
+ ExportImportFilesMetaData** /*metadata*/) {
+ return Status::NotSupported("");
+}
+
+// Builds an openable snapshot of RocksDB
+Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
+ uint64_t log_size_for_flush,
+ uint64_t* sequence_number_ptr) {
+ DBOptions db_options = db_->GetDBOptions();
+
+ Status s = db_->GetEnv()->FileExists(checkpoint_dir);
+ if (s.ok()) {
+ return Status::InvalidArgument("Directory exists");
+ } else if (!s.IsNotFound()) {
+ assert(s.IsIOError());
+ return s;
+ }
+
+ ROCKS_LOG_INFO(
+ db_options.info_log,
+ "Started the snapshot process -- creating snapshot in directory %s",
+ checkpoint_dir.c_str());
+
+ size_t final_nonslash_idx = checkpoint_dir.find_last_not_of('/');
+ if (final_nonslash_idx == std::string::npos) {
+ // npos means it's only slashes or empty. Non-empty means it's the root
+ // directory, but it shouldn't be because we verified above the directory
+ // doesn't exist.
+ assert(checkpoint_dir.empty());
+ return Status::InvalidArgument("invalid checkpoint directory name");
+ }
+
+ std::string full_private_path =
+ checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
+ ROCKS_LOG_INFO(db_options.info_log,
+ "Snapshot process -- using temporary directory %s",
+ full_private_path.c_str());
+ CleanStagingDirectory(full_private_path, db_options.info_log.get());
+ // create snapshot directory
+ s = db_->GetEnv()->CreateDir(full_private_path);
+ uint64_t sequence_number = 0;
+ if (s.ok()) {
+ // enable file deletions
+ s = db_->DisableFileDeletions();
+ const bool disabled_file_deletions = s.ok();
+
+ if (s.ok() || s.IsNotSupported()) {
+ s = CreateCustomCheckpoint(
+ [&](const std::string& src_dirname, const std::string& fname,
+ FileType) {
+ ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s",
+ fname.c_str());
+ return db_->GetFileSystem()->LinkFile(
+ src_dirname + "/" + fname, full_private_path + "/" + fname,
+ IOOptions(), nullptr);
+ } /* link_file_cb */,
+ [&](const std::string& src_dirname, const std::string& fname,
+ uint64_t size_limit_bytes, FileType,
+ const std::string& /* checksum_func_name */,
+ const std::string& /* checksum_val */,
+ const Temperature temperature) {
+ ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
+ return CopyFile(db_->GetFileSystem(), src_dirname + "/" + fname,
+ full_private_path + "/" + fname, size_limit_bytes,
+ db_options.use_fsync, nullptr, temperature);
+ } /* copy_file_cb */,
+ [&](const std::string& fname, const std::string& contents, FileType) {
+ ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
+ return CreateFile(db_->GetFileSystem(),
+ full_private_path + "/" + fname, contents,
+ db_options.use_fsync);
+ } /* create_file_cb */,
+ &sequence_number, log_size_for_flush);
+
+ // we copied all the files, enable file deletions
+ if (disabled_file_deletions) {
+ Status ss = db_->EnableFileDeletions(false);
+ assert(ss.ok());
+ ss.PermitUncheckedError();
+ }
+ }
+ }
+
+ if (s.ok()) {
+ // move tmp private backup to real snapshot directory
+ s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
+ }
+ if (s.ok()) {
+ std::unique_ptr<FSDirectory> checkpoint_directory;
+ s = db_->GetFileSystem()->NewDirectory(checkpoint_dir, IOOptions(),
+ &checkpoint_directory, nullptr);
+ if (s.ok() && checkpoint_directory != nullptr) {
+ s = checkpoint_directory->FsyncWithDirOptions(
+ IOOptions(), nullptr,
+ DirFsyncOptions(DirFsyncOptions::FsyncReason::kDirRenamed));
+ }
+ }
+
+ if (s.ok()) {
+ if (sequence_number_ptr != nullptr) {
+ *sequence_number_ptr = sequence_number;
+ }
+ // here we know that we succeeded and installed the new snapshot
+ ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good");
+ ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64,
+ sequence_number);
+ } else {
+ // clean all the files we might have created
+ ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s",
+ s.ToString().c_str());
+ CleanStagingDirectory(full_private_path, db_options.info_log.get());
+ }
+ return s;
+}
+
+Status CheckpointImpl::CreateCustomCheckpoint(
+ std::function<Status(const std::string& src_dirname,
+ const std::string& src_fname, FileType type)>
+ link_file_cb,
+ std::function<
+ Status(const std::string& src_dirname, const std::string& src_fname,
+ uint64_t size_limit_bytes, FileType type,
+ const std::string& checksum_func_name,
+ const std::string& checksum_val, const Temperature temperature)>
+ copy_file_cb,
+ std::function<Status(const std::string& fname, const std::string& contents,
+ FileType type)>
+ create_file_cb,
+ uint64_t* sequence_number, uint64_t log_size_for_flush,
+ bool get_live_table_checksum) {
+ *sequence_number = db_->GetLatestSequenceNumber();
+
+ LiveFilesStorageInfoOptions opts;
+ opts.include_checksum_info = get_live_table_checksum;
+ opts.wal_size_for_flush = log_size_for_flush;
+
+ std::vector<LiveFileStorageInfo> infos;
+ {
+ Status s = db_->GetLiveFilesStorageInfo(opts, &infos);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ // Verify that everything except WAL files are in same directory
+ // (db_paths / cf_paths not supported)
+ std::unordered_set<std::string> dirs;
+ for (auto& info : infos) {
+ if (info.file_type != kWalFile) {
+ dirs.insert(info.directory);
+ }
+ }
+ if (dirs.size() > 1) {
+ return Status::NotSupported(
+ "db_paths / cf_paths not supported for Checkpoint nor BackupEngine");
+ }
+
+ bool same_fs = true;
+
+ for (auto& info : infos) {
+ Status s;
+ if (!info.replacement_contents.empty()) {
+ // Currently should only be used for CURRENT file.
+ assert(info.file_type == kCurrentFile);
+
+ if (info.size != info.replacement_contents.size()) {
+ s = Status::Corruption("Inconsistent size metadata for " +
+ info.relative_filename);
+ } else {
+ s = create_file_cb(info.relative_filename, info.replacement_contents,
+ info.file_type);
+ }
+ } else {
+ if (same_fs && !info.trim_to_size) {
+ s = link_file_cb(info.directory, info.relative_filename,
+ info.file_type);
+ if (s.IsNotSupported()) {
+ same_fs = false;
+ s = Status::OK();
+ }
+ s.MustCheck();
+ }
+ if (!same_fs || info.trim_to_size) {
+ assert(info.file_checksum_func_name.empty() ==
+ !opts.include_checksum_info);
+ // no assertion on file_checksum because empty is used for both "not
+ // set" and "unknown"
+ if (opts.include_checksum_info) {
+ s = copy_file_cb(info.directory, info.relative_filename, info.size,
+ info.file_type, info.file_checksum_func_name,
+ info.file_checksum, info.temperature);
+ } else {
+ s = copy_file_cb(info.directory, info.relative_filename, info.size,
+ info.file_type, kUnknownFileChecksumFuncName,
+ kUnknownFileChecksum, info.temperature);
+ }
+ }
+ }
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ return Status::OK();
+}
+
+// Exports all live SST files of a specified Column Family onto export_dir,
+// returning SST files information in metadata.
+Status CheckpointImpl::ExportColumnFamily(
+ ColumnFamilyHandle* handle, const std::string& export_dir,
+ ExportImportFilesMetaData** metadata) {
+ auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(handle);
+ const auto cf_name = cfh->GetName();
+ const auto db_options = db_->GetDBOptions();
+
+ assert(metadata != nullptr);
+ assert(*metadata == nullptr);
+ auto s = db_->GetEnv()->FileExists(export_dir);
+ if (s.ok()) {
+ return Status::InvalidArgument("Specified export_dir exists");
+ } else if (!s.IsNotFound()) {
+ assert(s.IsIOError());
+ return s;
+ }
+
+ const auto final_nonslash_idx = export_dir.find_last_not_of('/');
+ if (final_nonslash_idx == std::string::npos) {
+ return Status::InvalidArgument("Specified export_dir invalid");
+ }
+ ROCKS_LOG_INFO(db_options.info_log,
+ "[%s] export column family onto export directory %s",
+ cf_name.c_str(), export_dir.c_str());
+
+ // Create a temporary export directory.
+ const auto tmp_export_dir =
+ export_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
+ s = db_->GetEnv()->CreateDir(tmp_export_dir);
+
+ if (s.ok()) {
+ s = db_->Flush(ROCKSDB_NAMESPACE::FlushOptions(), handle);
+ }
+
+ ColumnFamilyMetaData db_metadata;
+ if (s.ok()) {
+ // Export live sst files with file deletions disabled.
+ s = db_->DisableFileDeletions();
+ if (s.ok()) {
+ db_->GetColumnFamilyMetaData(handle, &db_metadata);
+
+ s = ExportFilesInMetaData(
+ db_options, db_metadata,
+ [&](const std::string& src_dirname, const std::string& fname) {
+ ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s",
+ cf_name.c_str(), fname.c_str());
+ return db_->GetEnv()->LinkFile(src_dirname + fname,
+ tmp_export_dir + fname);
+ } /*link_file_cb*/,
+ [&](const std::string& src_dirname, const std::string& fname) {
+ ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s",
+ cf_name.c_str(), fname.c_str());
+ return CopyFile(db_->GetFileSystem(), src_dirname + fname,
+ tmp_export_dir + fname, 0, db_options.use_fsync,
+ nullptr, Temperature::kUnknown);
+ } /*copy_file_cb*/);
+
+ const auto enable_status = db_->EnableFileDeletions(false /*force*/);
+ if (s.ok()) {
+ s = enable_status;
+ }
+ }
+ }
+
+ auto moved_to_user_specified_dir = false;
+ if (s.ok()) {
+ // Move temporary export directory to the actual export directory.
+ s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir);
+ }
+
+ if (s.ok()) {
+ // Fsync export directory.
+ moved_to_user_specified_dir = true;
+ std::unique_ptr<FSDirectory> dir_ptr;
+ s = db_->GetFileSystem()->NewDirectory(export_dir, IOOptions(), &dir_ptr,
+ nullptr);
+ if (s.ok()) {
+ assert(dir_ptr != nullptr);
+ s = dir_ptr->FsyncWithDirOptions(
+ IOOptions(), nullptr,
+ DirFsyncOptions(DirFsyncOptions::FsyncReason::kDirRenamed));
+ }
+ }
+
+ if (s.ok()) {
+ // Export of files succeeded. Fill in the metadata information.
+ auto result_metadata = new ExportImportFilesMetaData();
+ result_metadata->db_comparator_name = handle->GetComparator()->Name();
+ for (const auto& level_metadata : db_metadata.levels) {
+ for (const auto& file_metadata : level_metadata.files) {
+ LiveFileMetaData live_file_metadata;
+ live_file_metadata.size = file_metadata.size;
+ live_file_metadata.name = std::move(file_metadata.name);
+ live_file_metadata.file_number = file_metadata.file_number;
+ live_file_metadata.db_path = export_dir;
+ live_file_metadata.smallest_seqno = file_metadata.smallest_seqno;
+ live_file_metadata.largest_seqno = file_metadata.largest_seqno;
+ live_file_metadata.smallestkey = std::move(file_metadata.smallestkey);
+ live_file_metadata.largestkey = std::move(file_metadata.largestkey);
+ live_file_metadata.oldest_blob_file_number =
+ file_metadata.oldest_blob_file_number;
+ live_file_metadata.level = level_metadata.level;
+ result_metadata->files.push_back(live_file_metadata);
+ }
+ *metadata = result_metadata;
+ }
+ ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.",
+ cf_name.c_str());
+ } else {
+ // Failure: Clean up all the files/directories created.
+ ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s",
+ cf_name.c_str(), s.ToString().c_str());
+ std::vector<std::string> subchildren;
+ const auto cleanup_dir =
+ moved_to_user_specified_dir ? export_dir : tmp_export_dir;
+ db_->GetEnv()->GetChildren(cleanup_dir, &subchildren);
+ for (const auto& subchild : subchildren) {
+ const auto subchild_path = cleanup_dir + "/" + subchild;
+ const auto status = db_->GetEnv()->DeleteFile(subchild_path);
+ if (!status.ok()) {
+ ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s",
+ subchild_path.c_str(), status.ToString().c_str());
+ }
+ }
+ const auto status = db_->GetEnv()->DeleteDir(cleanup_dir);
+ if (!status.ok()) {
+ ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s",
+ cleanup_dir.c_str(), status.ToString().c_str());
+ }
+ }
+ return s;
+}
+
+Status CheckpointImpl::ExportFilesInMetaData(
+ const DBOptions& db_options, const ColumnFamilyMetaData& metadata,
+ std::function<Status(const std::string& src_dirname,
+ const std::string& src_fname)>
+ link_file_cb,
+ std::function<Status(const std::string& src_dirname,
+ const std::string& src_fname)>
+ copy_file_cb) {
+ Status s;
+ auto hardlink_file = true;
+
+ // Copy/hard link files in metadata.
+ size_t num_files = 0;
+ for (const auto& level_metadata : metadata.levels) {
+ for (const auto& file_metadata : level_metadata.files) {
+ uint64_t number;
+ FileType type;
+ const auto ok = ParseFileName(file_metadata.name, &number, &type);
+ if (!ok) {
+ s = Status::Corruption("Could not parse file name");
+ break;
+ }
+
+ // We should only get sst files here.
+ assert(type == kTableFile);
+ assert(file_metadata.size > 0 && file_metadata.name[0] == '/');
+ const auto src_fname = file_metadata.name;
+ ++num_files;
+
+ if (hardlink_file) {
+ s = link_file_cb(db_->GetName(), src_fname);
+ if (num_files == 1 && s.IsNotSupported()) {
+ // Fallback to copy if link failed due to cross-device directories.
+ hardlink_file = false;
+ s = Status::OK();
+ }
+ }
+ if (!hardlink_file) {
+ s = copy_file_cb(db_->GetName(), src_fname);
+ }
+ if (!s.ok()) {
+ break;
+ }
+ }
+ }
+ ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt,
+ num_files);
+
+ return s;
+}
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_impl.h b/src/rocksdb/utilities/checkpoint/checkpoint_impl.h
new file mode 100644
index 000000000..2947330cc
--- /dev/null
+++ b/src/rocksdb/utilities/checkpoint/checkpoint_impl.h
@@ -0,0 +1,66 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <string>
+
+#include "file/filename.h"
+#include "rocksdb/db.h"
+#include "rocksdb/utilities/checkpoint.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class CheckpointImpl : public Checkpoint {
+ public:
+ explicit CheckpointImpl(DB* db) : db_(db) {}
+
+ Status CreateCheckpoint(const std::string& checkpoint_dir,
+ uint64_t log_size_for_flush,
+ uint64_t* sequence_number_ptr) override;
+
+ Status ExportColumnFamily(ColumnFamilyHandle* handle,
+ const std::string& export_dir,
+ ExportImportFilesMetaData** metadata) override;
+
+ // Checkpoint logic can be customized by providing callbacks for link, copy,
+ // or create.
+ Status CreateCustomCheckpoint(
+ std::function<Status(const std::string& src_dirname,
+ const std::string& fname, FileType type)>
+ link_file_cb,
+ std::function<Status(const std::string& src_dirname,
+ const std::string& fname, uint64_t size_limit_bytes,
+ FileType type, const std::string& checksum_func_name,
+ const std::string& checksum_val,
+ const Temperature src_temperature)>
+ copy_file_cb,
+ std::function<Status(const std::string& fname,
+ const std::string& contents, FileType type)>
+ create_file_cb,
+ uint64_t* sequence_number, uint64_t log_size_for_flush,
+ bool get_live_table_checksum = false);
+
+ private:
+ void CleanStagingDirectory(const std::string& path, Logger* info_log);
+
+ // Export logic customization by providing callbacks for link or copy.
+ Status ExportFilesInMetaData(
+ const DBOptions& db_options, const ColumnFamilyMetaData& metadata,
+ std::function<Status(const std::string& src_dirname,
+ const std::string& fname)>
+ link_file_cb,
+ std::function<Status(const std::string& src_dirname,
+ const std::string& fname)>
+ copy_file_cb);
+
+ private:
+ DB* db_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/checkpoint/checkpoint_test.cc b/src/rocksdb/utilities/checkpoint/checkpoint_test.cc
new file mode 100644
index 000000000..3da753d5f
--- /dev/null
+++ b/src/rocksdb/utilities/checkpoint/checkpoint_test.cc
@@ -0,0 +1,974 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+// Syncpoint prevents us building and running tests in release
+#ifndef ROCKSDB_LITE
+#include "rocksdb/utilities/checkpoint.h"
+
+#ifndef OS_WIN
+#include <unistd.h>
+#endif
+#include <iostream>
+#include <thread>
+#include <utility>
+
+#include "db/db_impl/db_impl.h"
+#include "file/file_util.h"
+#include "port/port.h"
+#include "port/stack_trace.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/utilities/transaction_db.h"
+#include "test_util/sync_point.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
+#include "utilities/fault_injection_env.h"
+#include "utilities/fault_injection_fs.h"
+
+namespace ROCKSDB_NAMESPACE {
+class CheckpointTest : public testing::Test {
+ protected:
+ // Sequence of option configurations to try
+ enum OptionConfig {
+ kDefault = 0,
+ };
+ int option_config_;
+
+ public:
+ std::string dbname_;
+ std::string alternative_wal_dir_;
+ Env* env_;
+ DB* db_;
+ Options last_options_;
+ std::vector<ColumnFamilyHandle*> handles_;
+ std::string snapshot_name_;
+ std::string export_path_;
+ ColumnFamilyHandle* cfh_reverse_comp_;
+ ExportImportFilesMetaData* metadata_;
+
+ CheckpointTest() : env_(Env::Default()) {
+ env_->SetBackgroundThreads(1, Env::LOW);
+ env_->SetBackgroundThreads(1, Env::HIGH);
+ dbname_ = test::PerThreadDBPath(env_, "checkpoint_test");
+ alternative_wal_dir_ = dbname_ + "/wal";
+ auto options = CurrentOptions();
+ auto delete_options = options;
+ delete_options.wal_dir = alternative_wal_dir_;
+ EXPECT_OK(DestroyDB(dbname_, delete_options));
+ // Destroy it for not alternative WAL dir is used.
+ EXPECT_OK(DestroyDB(dbname_, options));
+ db_ = nullptr;
+ snapshot_name_ = test::PerThreadDBPath(env_, "snapshot");
+ std::string snapshot_tmp_name = snapshot_name_ + ".tmp";
+ EXPECT_OK(DestroyDB(snapshot_name_, options));
+ test::DeleteDir(env_, snapshot_name_);
+ EXPECT_OK(DestroyDB(snapshot_tmp_name, options));
+ test::DeleteDir(env_, snapshot_tmp_name);
+ Reopen(options);
+ export_path_ = test::PerThreadDBPath("/export");
+ DestroyDir(env_, export_path_).PermitUncheckedError();
+ cfh_reverse_comp_ = nullptr;
+ metadata_ = nullptr;
+ }
+
+ ~CheckpointTest() override {
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ if (cfh_reverse_comp_) {
+ EXPECT_OK(db_->DestroyColumnFamilyHandle(cfh_reverse_comp_));
+ cfh_reverse_comp_ = nullptr;
+ }
+ if (metadata_) {
+ delete metadata_;
+ metadata_ = nullptr;
+ }
+ Close();
+ Options options;
+ options.db_paths.emplace_back(dbname_, 0);
+ options.db_paths.emplace_back(dbname_ + "_2", 0);
+ options.db_paths.emplace_back(dbname_ + "_3", 0);
+ options.db_paths.emplace_back(dbname_ + "_4", 0);
+ EXPECT_OK(DestroyDB(dbname_, options));
+ EXPECT_OK(DestroyDB(snapshot_name_, options));
+ DestroyDir(env_, export_path_).PermitUncheckedError();
+ }
+
+ // Return the current option configuration.
+ Options CurrentOptions() {
+ Options options;
+ options.env = env_;
+ options.create_if_missing = true;
+ return options;
+ }
+
+ void CreateColumnFamilies(const std::vector<std::string>& cfs,
+ const Options& options) {
+ ColumnFamilyOptions cf_opts(options);
+ size_t cfi = handles_.size();
+ handles_.resize(cfi + cfs.size());
+ for (auto cf : cfs) {
+ ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
+ }
+ }
+
+ void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
+ const Options& options) {
+ CreateColumnFamilies(cfs, options);
+ std::vector<std::string> cfs_plus_default = cfs;
+ cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
+ ReopenWithColumnFamilies(cfs_plus_default, options);
+ }
+
+ void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+ const std::vector<Options>& options) {
+ ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
+ }
+
+ void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+ const Options& options) {
+ ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
+ }
+
+ Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+ const std::vector<Options>& options) {
+ Close();
+ EXPECT_EQ(cfs.size(), options.size());
+ std::vector<ColumnFamilyDescriptor> column_families;
+ for (size_t i = 0; i < cfs.size(); ++i) {
+ column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
+ }
+ DBOptions db_opts = DBOptions(options[0]);
+ return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
+ }
+
+ Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+ const Options& options) {
+ Close();
+ std::vector<Options> v_opts(cfs.size(), options);
+ return TryReopenWithColumnFamilies(cfs, v_opts);
+ }
+
+ void Reopen(const Options& options) { ASSERT_OK(TryReopen(options)); }
+
+ void CompactAll() {
+ for (auto h : handles_) {
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), h, nullptr, nullptr));
+ }
+ }
+
+ void Close() {
+ for (auto h : handles_) {
+ delete h;
+ }
+ handles_.clear();
+ delete db_;
+ db_ = nullptr;
+ }
+
+ void DestroyAndReopen(const Options& options) {
+ // Destroy using last options
+ Destroy(last_options_);
+ ASSERT_OK(TryReopen(options));
+ }
+
+ void Destroy(const Options& options) {
+ Close();
+ ASSERT_OK(DestroyDB(dbname_, options));
+ }
+
+ Status ReadOnlyReopen(const Options& options) {
+ return DB::OpenForReadOnly(options, dbname_, &db_);
+ }
+
+ Status ReadOnlyReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+ const Options& options) {
+ std::vector<ColumnFamilyDescriptor> column_families;
+ for (const auto& cf : cfs) {
+ column_families.emplace_back(cf, options);
+ }
+ return DB::OpenForReadOnly(options, dbname_, column_families, &handles_,
+ &db_);
+ }
+
+ Status TryReopen(const Options& options) {
+ Close();
+ last_options_ = options;
+ return DB::Open(options, dbname_, &db_);
+ }
+
+ Status Flush(int cf = 0) {
+ if (cf == 0) {
+ return db_->Flush(FlushOptions());
+ } else {
+ return db_->Flush(FlushOptions(), handles_[cf]);
+ }
+ }
+
+ Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
+ return db_->Put(wo, k, v);
+ }
+
+ Status Put(int cf, const Slice& k, const Slice& v,
+ WriteOptions wo = WriteOptions()) {
+ return db_->Put(wo, handles_[cf], k, v);
+ }
+
+ Status Delete(const std::string& k) { return db_->Delete(WriteOptions(), k); }
+
+ Status Delete(int cf, const std::string& k) {
+ return db_->Delete(WriteOptions(), handles_[cf], k);
+ }
+
+ std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
+ ReadOptions options;
+ options.verify_checksums = true;
+ options.snapshot = snapshot;
+ std::string result;
+ Status s = db_->Get(options, k, &result);
+ if (s.IsNotFound()) {
+ result = "NOT_FOUND";
+ } else if (!s.ok()) {
+ result = s.ToString();
+ }
+ return result;
+ }
+
+ std::string Get(int cf, const std::string& k,
+ const Snapshot* snapshot = nullptr) {
+ ReadOptions options;
+ options.verify_checksums = true;
+ options.snapshot = snapshot;
+ std::string result;
+ Status s = db_->Get(options, handles_[cf], k, &result);
+ if (s.IsNotFound()) {
+ result = "NOT_FOUND";
+ } else if (!s.ok()) {
+ result = s.ToString();
+ }
+ return result;
+ }
+};
+
+TEST_F(CheckpointTest, GetSnapshotLink) {
+ for (uint64_t log_size_for_flush : {0, 1000000}) {
+ Options options;
+ DB* snapshotDB;
+ ReadOptions roptions;
+ std::string result;
+ Checkpoint* checkpoint;
+
+ options = CurrentOptions();
+ delete db_;
+ db_ = nullptr;
+ ASSERT_OK(DestroyDB(dbname_, options));
+
+ // Create a database
+ options.create_if_missing = true;
+ ASSERT_OK(DB::Open(options, dbname_, &db_));
+ std::string key = std::string("foo");
+ ASSERT_OK(Put(key, "v1"));
+ // Take a snapshot
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, log_size_for_flush));
+ ASSERT_OK(Put(key, "v2"));
+ ASSERT_EQ("v2", Get(key));
+ ASSERT_OK(Flush());
+ ASSERT_EQ("v2", Get(key));
+ // Open snapshot and verify contents while DB is running
+ options.create_if_missing = false;
+ ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB));
+ ASSERT_OK(snapshotDB->Get(roptions, key, &result));
+ ASSERT_EQ("v1", result);
+ delete snapshotDB;
+ snapshotDB = nullptr;
+ delete db_;
+ db_ = nullptr;
+
+ // Destroy original DB
+ ASSERT_OK(DestroyDB(dbname_, options));
+
+ // Open snapshot and verify contents
+ options.create_if_missing = false;
+ dbname_ = snapshot_name_;
+ ASSERT_OK(DB::Open(options, dbname_, &db_));
+ ASSERT_EQ("v1", Get(key));
+ delete db_;
+ db_ = nullptr;
+ ASSERT_OK(DestroyDB(dbname_, options));
+ delete checkpoint;
+
+ // Restore DB name
+ dbname_ = test::PerThreadDBPath(env_, "db_test");
+ }
+}
+
+TEST_F(CheckpointTest, CheckpointWithBlob) {
+ // Create a database with a blob file
+ Options options = CurrentOptions();
+ options.create_if_missing = true;
+ options.enable_blob_files = true;
+ options.min_blob_size = 0;
+
+ Reopen(options);
+
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ ASSERT_OK(Put(key, blob));
+ ASSERT_OK(Flush());
+
+ // Create a checkpoint
+ Checkpoint* checkpoint = nullptr;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+
+ std::unique_ptr<Checkpoint> checkpoint_guard(checkpoint);
+
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+
+ // Make sure it contains the blob file
+ std::vector<std::string> files;
+ ASSERT_OK(env_->GetChildren(snapshot_name_, &files));
+
+ bool blob_file_found = false;
+ for (const auto& file : files) {
+ uint64_t number = 0;
+ FileType type = kWalFile;
+
+ if (ParseFileName(file, &number, &type) && type == kBlobFile) {
+ blob_file_found = true;
+ break;
+ }
+ }
+
+ ASSERT_TRUE(blob_file_found);
+
+ // Make sure the checkpoint can be opened and the blob value read
+ options.create_if_missing = false;
+ DB* checkpoint_db = nullptr;
+ ASSERT_OK(DB::Open(options, snapshot_name_, &checkpoint_db));
+
+ std::unique_ptr<DB> checkpoint_db_guard(checkpoint_db);
+
+ PinnableSlice value;
+ ASSERT_OK(checkpoint_db->Get(
+ ReadOptions(), checkpoint_db->DefaultColumnFamily(), key, &value));
+
+ ASSERT_EQ(value, blob);
+}
+
+TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) {
+ // Create a database
+ auto options = CurrentOptions();
+ options.create_if_missing = true;
+ CreateAndReopenWithCF({}, options);
+
+ // Helper to verify the number of files in metadata and export dir
+ auto verify_files_exported = [&](const ExportImportFilesMetaData& metadata,
+ int num_files_expected) {
+ ASSERT_EQ(metadata.files.size(), num_files_expected);
+ std::vector<std::string> subchildren;
+ ASSERT_OK(env_->GetChildren(export_path_, &subchildren));
+ ASSERT_EQ(subchildren.size(), num_files_expected);
+ };
+
+ // Test DefaultColumnFamily
+ {
+ const auto key = std::string("foo");
+ ASSERT_OK(Put(key, "v1"));
+
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+
+ // Export the Tables and verify
+ ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
+ export_path_, &metadata_));
+ verify_files_exported(*metadata_, 1);
+ ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name());
+ ASSERT_OK(DestroyDir(env_, export_path_));
+ delete metadata_;
+ metadata_ = nullptr;
+
+ // Check again after compaction
+ CompactAll();
+ ASSERT_OK(Put(key, "v2"));
+ ASSERT_OK(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
+ export_path_, &metadata_));
+ verify_files_exported(*metadata_, 2);
+ ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name());
+ ASSERT_OK(DestroyDir(env_, export_path_));
+ delete metadata_;
+ metadata_ = nullptr;
+ delete checkpoint;
+ }
+
+ // Test non default column family with non default comparator
+ {
+ auto cf_options = CurrentOptions();
+ cf_options.comparator = ReverseBytewiseComparator();
+ ASSERT_OK(db_->CreateColumnFamily(cf_options, "yoyo", &cfh_reverse_comp_));
+
+ const auto key = std::string("foo");
+ ASSERT_OK(db_->Put(WriteOptions(), cfh_reverse_comp_, key, "v1"));
+
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+
+ // Export the Tables and verify
+ ASSERT_OK(checkpoint->ExportColumnFamily(cfh_reverse_comp_, export_path_,
+ &metadata_));
+ verify_files_exported(*metadata_, 1);
+ ASSERT_EQ(metadata_->db_comparator_name,
+ ReverseBytewiseComparator()->Name());
+ delete checkpoint;
+ }
+}
+
+TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) {
+ // Create a database
+ auto options = CurrentOptions();
+ options.create_if_missing = true;
+ CreateAndReopenWithCF({}, options);
+
+ const auto key = std::string("foo");
+ ASSERT_OK(Put(key, "v1"));
+
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+
+ // Export onto existing directory
+ ASSERT_OK(env_->CreateDirIfMissing(export_path_));
+ ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
+ export_path_, &metadata_),
+ Status::InvalidArgument("Specified export_dir exists"));
+ ASSERT_OK(DestroyDir(env_, export_path_));
+
+ // Export with invalid directory specification
+ export_path_ = "";
+ ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
+ export_path_, &metadata_),
+ Status::InvalidArgument("Specified export_dir invalid"));
+ delete checkpoint;
+}
+
+TEST_F(CheckpointTest, CheckpointCF) {
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"CheckpointTest::CheckpointCF:2", "DBImpl::GetLiveFiles:2"},
+ {"DBImpl::GetLiveFiles:1", "CheckpointTest::CheckpointCF:1"}});
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(Put(0, "Default", "Default"));
+ ASSERT_OK(Put(1, "one", "one"));
+ ASSERT_OK(Put(2, "two", "two"));
+ ASSERT_OK(Put(3, "three", "three"));
+ ASSERT_OK(Put(4, "four", "four"));
+ ASSERT_OK(Put(5, "five", "five"));
+
+ DB* snapshotDB;
+ ReadOptions roptions;
+ std::string result;
+ std::vector<ColumnFamilyHandle*> cphandles;
+
+ // Take a snapshot
+ ROCKSDB_NAMESPACE::port::Thread t([&]() {
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+ delete checkpoint;
+ });
+ TEST_SYNC_POINT("CheckpointTest::CheckpointCF:1");
+ ASSERT_OK(Put(0, "Default", "Default1"));
+ ASSERT_OK(Put(1, "one", "eleven"));
+ ASSERT_OK(Put(2, "two", "twelve"));
+ ASSERT_OK(Put(3, "three", "thirteen"));
+ ASSERT_OK(Put(4, "four", "fourteen"));
+ ASSERT_OK(Put(5, "five", "fifteen"));
+ TEST_SYNC_POINT("CheckpointTest::CheckpointCF:2");
+ t.join();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ASSERT_OK(Put(1, "one", "twentyone"));
+ ASSERT_OK(Put(2, "two", "twentytwo"));
+ ASSERT_OK(Put(3, "three", "twentythree"));
+ ASSERT_OK(Put(4, "four", "twentyfour"));
+ ASSERT_OK(Put(5, "five", "twentyfive"));
+ ASSERT_OK(Flush());
+
+ // Open snapshot and verify contents while DB is running
+ options.create_if_missing = false;
+ std::vector<std::string> cfs;
+ cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
+ std::vector<ColumnFamilyDescriptor> column_families;
+ for (size_t i = 0; i < cfs.size(); ++i) {
+ column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
+ }
+ ASSERT_OK(DB::Open(options, snapshot_name_, column_families, &cphandles,
+ &snapshotDB));
+ ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result));
+ ASSERT_EQ("Default1", result);
+ ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result));
+ ASSERT_EQ("eleven", result);
+ ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result));
+ for (auto h : cphandles) {
+ delete h;
+ }
+ cphandles.clear();
+ delete snapshotDB;
+ snapshotDB = nullptr;
+}
+
+TEST_F(CheckpointTest, CheckpointCFNoFlush) {
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(Put(0, "Default", "Default"));
+ ASSERT_OK(Put(1, "one", "one"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put(2, "two", "two"));
+
+ DB* snapshotDB;
+ ReadOptions roptions;
+ std::string result;
+ std::vector<ColumnFamilyHandle*> cphandles;
+
+ // Take a snapshot
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) {
+ // Flush should never trigger.
+ FAIL();
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 1000000));
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+ delete checkpoint;
+ ASSERT_OK(Put(1, "one", "two"));
+ ASSERT_OK(Flush(1));
+ ASSERT_OK(Put(2, "two", "twentytwo"));
+ Close();
+ EXPECT_OK(DestroyDB(dbname_, options));
+
+ // Open snapshot and verify contents while DB is running
+ options.create_if_missing = false;
+ std::vector<std::string> cfs;
+ cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
+ std::vector<ColumnFamilyDescriptor> column_families;
+ for (size_t i = 0; i < cfs.size(); ++i) {
+ column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
+ }
+ ASSERT_OK(DB::Open(options, snapshot_name_, column_families, &cphandles,
+ &snapshotDB));
+ ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result));
+ ASSERT_EQ("Default", result);
+ ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result));
+ ASSERT_EQ("one", result);
+ ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result));
+ ASSERT_EQ("two", result);
+ for (auto h : cphandles) {
+ delete h;
+ }
+ cphandles.clear();
+ delete snapshotDB;
+ snapshotDB = nullptr;
+}
+
+TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) {
+ Options options = CurrentOptions();
+ options.max_manifest_file_size = 0; // always rollover manifest for file add
+ Reopen(options);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {// Get past the flush in the checkpoint thread before adding any keys to
+ // the db so the checkpoint thread won't hit the WriteManifest
+ // syncpoints.
+ {"CheckpointImpl::CreateCheckpoint:FlushDone",
+ "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"},
+ // Roll the manifest during checkpointing right after live files are
+ // snapshotted.
+ {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
+ "VersionSet::LogAndApply:WriteManifest"},
+ {"VersionSet::LogAndApply:WriteManifestDone",
+ "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ROCKSDB_NAMESPACE::port::Thread t([&]() {
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+ delete checkpoint;
+ });
+ TEST_SYNC_POINT(
+ "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut");
+ ASSERT_OK(Put("Default", "Default1"));
+ ASSERT_OK(Flush());
+ t.join();
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+ DB* snapshotDB;
+ // Successful Open() implies that CURRENT pointed to the manifest in the
+ // checkpoint.
+ ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB));
+ delete snapshotDB;
+ snapshotDB = nullptr;
+}
+
+TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) {
+ Close();
+ const std::string dbname = test::PerThreadDBPath("transaction_testdb");
+ ASSERT_OK(DestroyDB(dbname, CurrentOptions()));
+ test::DeleteDir(env_, dbname);
+
+ Options options = CurrentOptions();
+ options.allow_2pc = true;
+ // allow_2pc is implicitly set with tx prepare
+ // options.allow_2pc = true;
+ TransactionDBOptions txn_db_options;
+ TransactionDB* txdb;
+ Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb);
+ ASSERT_OK(s);
+ ColumnFamilyHandle* cfa;
+ ColumnFamilyHandle* cfb;
+ ColumnFamilyOptions cf_options;
+ ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFA", &cfa));
+
+ WriteOptions write_options;
+ // Insert something into CFB so lots of log files will be kept
+ // before creating the checkpoint.
+ ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFB", &cfb));
+ ASSERT_OK(txdb->Put(write_options, cfb, "", ""));
+
+ ReadOptions read_options;
+ std::string value;
+ TransactionOptions txn_options;
+ Transaction* txn = txdb->BeginTransaction(write_options, txn_options);
+ s = txn->SetName("xid");
+ ASSERT_OK(s);
+ ASSERT_EQ(txdb->GetTransactionByName("xid"), txn);
+
+ s = txn->Put(Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+ s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa"));
+ ASSERT_OK(s);
+ // Writing prepare into middle of first WAL, then flush WALs many times
+ for (int i = 1; i <= 100000; i++) {
+ Transaction* tx = txdb->BeginTransaction(write_options, txn_options);
+ ASSERT_OK(tx->SetName("x"));
+ ASSERT_OK(tx->Put(Slice(std::to_string(i)), Slice("val")));
+ ASSERT_OK(tx->Put(cfa, Slice("aaa"), Slice("111")));
+ ASSERT_OK(tx->Prepare());
+ ASSERT_OK(tx->Commit());
+ if (i % 10000 == 0) {
+ ASSERT_OK(txdb->Flush(FlushOptions()));
+ }
+ if (i == 88888) {
+ ASSERT_OK(txn->Prepare());
+ }
+ delete tx;
+ }
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
+ "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"},
+ {"CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit",
+ "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ROCKSDB_NAMESPACE::port::Thread t([&]() {
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(txdb, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+ delete checkpoint;
+ });
+ TEST_SYNC_POINT(
+ "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit");
+ ASSERT_OK(txn->Commit());
+ delete txn;
+ TEST_SYNC_POINT(
+ "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit");
+ t.join();
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+ // No more than two logs files should exist.
+ std::vector<std::string> files;
+ ASSERT_OK(env_->GetChildren(snapshot_name_, &files));
+ int num_log_files = 0;
+ for (auto& file : files) {
+ uint64_t num;
+ FileType type;
+ WalFileType log_type;
+ if (ParseFileName(file, &num, &type, &log_type) && type == kWalFile) {
+ num_log_files++;
+ }
+ }
+ // One flush after preapare + one outstanding file before checkpoint + one log
+ // file generated after checkpoint.
+ ASSERT_LE(num_log_files, 3);
+
+ TransactionDB* snapshotDB;
+ std::vector<ColumnFamilyDescriptor> column_families;
+ column_families.push_back(
+ ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
+ column_families.push_back(
+ ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
+ column_families.push_back(
+ ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
+ std::vector<ROCKSDB_NAMESPACE::ColumnFamilyHandle*> cf_handles;
+ ASSERT_OK(TransactionDB::Open(options, txn_db_options, snapshot_name_,
+ column_families, &cf_handles, &snapshotDB));
+ ASSERT_OK(snapshotDB->Get(read_options, "foo", &value));
+ ASSERT_EQ(value, "bar");
+ ASSERT_OK(snapshotDB->Get(read_options, cf_handles[1], "foocfa", &value));
+ ASSERT_EQ(value, "barcfa");
+
+ delete cfa;
+ delete cfb;
+ delete cf_handles[0];
+ delete cf_handles[1];
+ delete cf_handles[2];
+ delete snapshotDB;
+ snapshotDB = nullptr;
+ delete txdb;
+}
+
+TEST_F(CheckpointTest, CheckpointInvalidDirectoryName) {
+ for (std::string checkpoint_dir : {"", "/", "////"}) {
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_TRUE(
+ checkpoint->CreateCheckpoint(checkpoint_dir).IsInvalidArgument());
+ delete checkpoint;
+ }
+}
+
+TEST_F(CheckpointTest, CheckpointWithParallelWrites) {
+ // When run with TSAN, this exposes the data race fixed in
+ // https://github.com/facebook/rocksdb/pull/3603
+ ASSERT_OK(Put("key1", "val1"));
+ port::Thread thread([this]() { ASSERT_OK(Put("key2", "val2")); });
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+ delete checkpoint;
+ thread.join();
+}
+
+TEST_F(CheckpointTest, CheckpointWithUnsyncedDataDropped) {
+ Options options = CurrentOptions();
+ std::unique_ptr<FaultInjectionTestEnv> env(new FaultInjectionTestEnv(env_));
+ options.env = env.get();
+ Reopen(options);
+ ASSERT_OK(Put("key1", "val1"));
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+ delete checkpoint;
+ ASSERT_OK(env->DropUnsyncedFileData());
+
+ // make sure it's openable even though whatever data that wasn't synced got
+ // dropped.
+ options.env = env_;
+ DB* snapshot_db;
+ ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
+ ReadOptions read_opts;
+ std::string get_result;
+ ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result));
+ ASSERT_EQ("val1", get_result);
+ delete snapshot_db;
+ delete db_;
+ db_ = nullptr;
+}
+
+TEST_F(CheckpointTest, CheckpointOptionsFileFailedToPersist) {
+ // Regression test for a bug where checkpoint failed on a DB where persisting
+ // OPTIONS file failed and the DB was opened with
+ // `fail_if_options_file_error == false`.
+ Options options = CurrentOptions();
+ options.fail_if_options_file_error = false;
+ auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
+
+ // Setup `FaultInjectionTestFS` and `SyncPoint` callbacks to fail one
+ // operation when inside the OPTIONS file persisting code.
+ std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+ fault_fs->SetRandomMetadataWriteError(1 /* one_in */);
+ SyncPoint::GetInstance()->SetCallBack(
+ "PersistRocksDBOptions:start", [fault_fs](void* /* arg */) {
+ fault_fs->EnableMetadataWriteErrorInjection();
+ });
+ SyncPoint::GetInstance()->SetCallBack(
+ "FaultInjectionTestFS::InjectMetadataWriteError:Injected",
+ [fault_fs](void* /* arg */) {
+ fault_fs->DisableMetadataWriteErrorInjection();
+ });
+ options.env = fault_fs_env.get();
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Reopen(options);
+ ASSERT_OK(Put("key1", "val1"));
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+ delete checkpoint;
+
+ // Make sure it's usable.
+ options.env = env_;
+ DB* snapshot_db;
+ ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
+ ReadOptions read_opts;
+ std::string get_result;
+ ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result));
+ ASSERT_EQ("val1", get_result);
+ delete snapshot_db;
+ delete db_;
+ db_ = nullptr;
+}
+
+TEST_F(CheckpointTest, CheckpointReadOnlyDB) {
+ ASSERT_OK(Put("foo", "foo_value"));
+ ASSERT_OK(Flush());
+ Close();
+ Options options = CurrentOptions();
+ ASSERT_OK(ReadOnlyReopen(options));
+ Checkpoint* checkpoint = nullptr;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+ delete checkpoint;
+ checkpoint = nullptr;
+ Close();
+ DB* snapshot_db = nullptr;
+ ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
+ ReadOptions read_opts;
+ std::string get_result;
+ ASSERT_OK(snapshot_db->Get(read_opts, "foo", &get_result));
+ ASSERT_EQ("foo_value", get_result);
+ delete snapshot_db;
+}
+
+TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) {
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"pikachu", "eevee"}, options);
+ for (int i = 0; i != 3; ++i) {
+ ASSERT_OK(Put(i, "foo", "foo_value"));
+ ASSERT_OK(Flush(i));
+ }
+ Close();
+ Status s = ReadOnlyReopenWithColumnFamilies(
+ {kDefaultColumnFamilyName, "pikachu", "eevee"}, options);
+ ASSERT_OK(s);
+ Checkpoint* checkpoint = nullptr;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+ delete checkpoint;
+ checkpoint = nullptr;
+ Close();
+
+ std::vector<ColumnFamilyDescriptor> column_families{
+ {kDefaultColumnFamilyName, options},
+ {"pikachu", options},
+ {"eevee", options}};
+ DB* snapshot_db = nullptr;
+ std::vector<ColumnFamilyHandle*> snapshot_handles;
+ s = DB::Open(options, snapshot_name_, column_families, &snapshot_handles,
+ &snapshot_db);
+ ASSERT_OK(s);
+ ReadOptions read_opts;
+ for (int i = 0; i != 3; ++i) {
+ std::string get_result;
+ s = snapshot_db->Get(read_opts, snapshot_handles[i], "foo", &get_result);
+ ASSERT_OK(s);
+ ASSERT_EQ("foo_value", get_result);
+ }
+
+ for (auto snapshot_h : snapshot_handles) {
+ delete snapshot_h;
+ }
+ snapshot_handles.clear();
+ delete snapshot_db;
+}
+
+TEST_F(CheckpointTest, CheckpointWithDbPath) {
+ Options options = CurrentOptions();
+ options.db_paths.emplace_back(dbname_ + "_2", 0);
+ Reopen(options);
+ ASSERT_OK(Put("key1", "val1"));
+ Flush();
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ // Currently not supported
+ ASSERT_TRUE(checkpoint->CreateCheckpoint(snapshot_name_).IsNotSupported());
+ delete checkpoint;
+}
+
+TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) {
+ // Repro for a race condition where a user write comes in after the checkpoint
+ // syncs WAL for `track_and_verify_wals_in_manifest` but before the
+ // corresponding MANIFEST update. With the bug, that scenario resulted in an
+ // unopenable DB with error "Corruption: Size mismatch: WAL ...".
+ Options options = CurrentOptions();
+ std::unique_ptr<FaultInjectionTestEnv> fault_env(
+ new FaultInjectionTestEnv(env_));
+ options.env = fault_env.get();
+ options.track_and_verify_wals_in_manifest = true;
+ Reopen(options);
+
+ ASSERT_OK(Put("key1", "val1"));
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::SyncWAL:BeforeMarkLogsSynced:1",
+ [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::unique_ptr<Checkpoint> checkpoint;
+ {
+ Checkpoint* checkpoint_ptr;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr));
+ checkpoint.reset(checkpoint_ptr);
+ }
+
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+
+ // Ensure callback ran.
+ ASSERT_EQ("val2", Get("key2"));
+
+ Close();
+
+ // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
+ // DB WAL.
+ fault_env->DropUnsyncedFileData();
+
+ // Before the bug fix, reopening the DB would fail because the MANIFEST's
+ // AddWal entry indicated the WAL should be synced through "key2" -> "val2".
+ Reopen(options);
+
+ // Need to close before `fault_env` goes out of scope.
+ Close();
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+#include <stdio.h>
+
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr, "SKIPPED as Checkpoint is not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+
+#endif // !ROCKSDB_LITE