From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/rocksdb/utilities/backupable/backupable_db.cc | 1989 ++++++++++++++++++++ .../utilities/backupable/backupable_db_test.cc | 1863 ++++++++++++++++++ 2 files changed, 3852 insertions(+) create mode 100644 src/rocksdb/utilities/backupable/backupable_db.cc create mode 100644 src/rocksdb/utilities/backupable/backupable_db_test.cc (limited to 'src/rocksdb/utilities/backupable') diff --git a/src/rocksdb/utilities/backupable/backupable_db.cc b/src/rocksdb/utilities/backupable/backupable_db.cc new file mode 100644 index 000000000..0ca67670b --- /dev/null +++ b/src/rocksdb/utilities/backupable/backupable_db.cc @@ -0,0 +1,1989 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "env/composite_env_wrapper.h" +#include "file/filename.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" +#include "logging/logging.h" +#include "port/port.h" +#include "rocksdb/rate_limiter.h" +#include "rocksdb/transaction_log.h" +#include "rocksdb/utilities/backupable_db.h" +#include "test_util/sync_point.h" +#include "util/channel.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/string_util.h" +#include "utilities/checkpoint/checkpoint_impl.h" + +namespace ROCKSDB_NAMESPACE { + +void BackupStatistics::IncrementNumberSuccessBackup() { + number_success_backup++; +} +void BackupStatistics::IncrementNumberFailBackup() { + number_fail_backup++; +} + +uint32_t BackupStatistics::GetNumberSuccessBackup() const { + return number_success_backup; +} +uint32_t BackupStatistics::GetNumberFailBackup() const { + return number_fail_backup; +} + +std::string BackupStatistics::ToString() const { + char result[50]; + snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u", + GetNumberSuccessBackup(), GetNumberFailBackup()); + return result; +} + +void BackupableDBOptions::Dump(Logger* logger) const { + ROCKS_LOG_INFO(logger, " Options.backup_dir: %s", + backup_dir.c_str()); + ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env); + ROCKS_LOG_INFO(logger, " Options.share_table_files: %d", + static_cast(share_table_files)); + ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log); + ROCKS_LOG_INFO(logger, " Options.sync: %d", + static_cast(sync)); + ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d", + static_cast(destroy_old_data)); + ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d", + static_cast(backup_log_files)); + ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64, + backup_rate_limit); + ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64, + restore_rate_limit); + ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d", + max_background_operations); +} + +// -------- BackupEngineImpl class --------- +class BackupEngineImpl : public BackupEngine { + public: + BackupEngineImpl(Env* db_env, const BackupableDBOptions& options, + bool read_only = false); + ~BackupEngineImpl() override; + Status CreateNewBackupWithMetadata(DB* db, const std::string& app_metadata, + bool flush_before_backup = false, + std::function progress_callback = + []() {}) override; + Status PurgeOldBackups(uint32_t num_backups_to_keep) override; + Status DeleteBackup(BackupID backup_id) override; + void StopBackup() override { + stop_backup_.store(true, std::memory_order_release); + } + Status GarbageCollect() override; + + // The returned BackupInfos are in chronological order, which means the + // latest backup comes last. + void GetBackupInfo(std::vector* backup_info) override; + void GetCorruptedBackups(std::vector* corrupt_backup_ids) override; + Status RestoreDBFromBackup( + BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, + const RestoreOptions& restore_options = RestoreOptions()) override; + Status RestoreDBFromLatestBackup( + const std::string& db_dir, const std::string& wal_dir, + const RestoreOptions& restore_options = RestoreOptions()) override { + return RestoreDBFromBackup(latest_valid_backup_id_, db_dir, wal_dir, + restore_options); + } + + Status VerifyBackup(BackupID backup_id) override; + + Status Initialize(); + + private: + void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0); + Status DeleteBackupInternal(BackupID backup_id); + + // Extends the "result" map with pathname->size mappings for the contents of + // "dir" in "env". Pathnames are prefixed with "dir". + Status InsertPathnameToSizeBytes( + const std::string& dir, Env* env, + std::unordered_map* result); + + struct FileInfo { + FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum) + : refs(0), filename(fname), size(sz), checksum_value(checksum) {} + + FileInfo(const FileInfo&) = delete; + FileInfo& operator=(const FileInfo&) = delete; + + int refs; + const std::string filename; + const uint64_t size; + const uint32_t checksum_value; + }; + + class BackupMeta { + public: + BackupMeta( + const std::string& meta_filename, const std::string& meta_tmp_filename, + std::unordered_map>* file_infos, + Env* env) + : timestamp_(0), + sequence_number_(0), + size_(0), + meta_filename_(meta_filename), + meta_tmp_filename_(meta_tmp_filename), + file_infos_(file_infos), + env_(env) {} + + BackupMeta(const BackupMeta&) = delete; + BackupMeta& operator=(const BackupMeta&) = delete; + + ~BackupMeta() {} + + void RecordTimestamp() { + env_->GetCurrentTime(×tamp_); + } + int64_t GetTimestamp() const { + return timestamp_; + } + uint64_t GetSize() const { + return size_; + } + uint32_t GetNumberFiles() { return static_cast(files_.size()); } + void SetSequenceNumber(uint64_t sequence_number) { + sequence_number_ = sequence_number; + } + uint64_t GetSequenceNumber() { + return sequence_number_; + } + + const std::string& GetAppMetadata() const { return app_metadata_; } + + void SetAppMetadata(const std::string& app_metadata) { + app_metadata_ = app_metadata; + } + + Status AddFile(std::shared_ptr file_info); + + Status Delete(bool delete_meta = true); + + bool Empty() { + return files_.empty(); + } + + std::shared_ptr GetFile(const std::string& filename) const { + auto it = file_infos_->find(filename); + if (it == file_infos_->end()) + return nullptr; + return it->second; + } + + const std::vector>& GetFiles() { + return files_; + } + + // @param abs_path_to_size Pre-fetched file sizes (bytes). + Status LoadFromFile( + const std::string& backup_dir, + const std::unordered_map& abs_path_to_size); + Status StoreToFile(bool sync); + + std::string GetInfoString() { + std::ostringstream ss; + ss << "Timestamp: " << timestamp_ << std::endl; + char human_size[16]; + AppendHumanBytes(size_, human_size, sizeof(human_size)); + ss << "Size: " << human_size << std::endl; + ss << "Files:" << std::endl; + for (const auto& file : files_) { + AppendHumanBytes(file->size, human_size, sizeof(human_size)); + ss << file->filename << ", size " << human_size << ", refs " + << file->refs << std::endl; + } + return ss.str(); + } + + private: + int64_t timestamp_; + // sequence number is only approximate, should not be used + // by clients + uint64_t sequence_number_; + uint64_t size_; + std::string app_metadata_; + std::string const meta_filename_; + std::string const meta_tmp_filename_; + // files with relative paths (without "/" prefix!!) + std::vector> files_; + std::unordered_map>* file_infos_; + Env* env_; + + static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB + }; // BackupMeta + + inline std::string GetAbsolutePath( + const std::string &relative_path = "") const { + assert(relative_path.size() == 0 || relative_path[0] != '/'); + return options_.backup_dir + "/" + relative_path; + } + inline std::string GetPrivateDirRel() const { + return "private"; + } + inline std::string GetSharedChecksumDirRel() const { + return "shared_checksum"; + } + inline std::string GetPrivateFileRel(BackupID backup_id, + bool tmp = false, + const std::string& file = "") const { + assert(file.size() == 0 || file[0] != '/'); + return GetPrivateDirRel() + "/" + ROCKSDB_NAMESPACE::ToString(backup_id) + + (tmp ? ".tmp" : "") + "/" + file; + } + inline std::string GetSharedFileRel(const std::string& file = "", + bool tmp = false) const { + assert(file.size() == 0 || file[0] != '/'); + return std::string("shared/") + (tmp ? "." : "") + file + + (tmp ? ".tmp" : ""); + } + inline std::string GetSharedFileWithChecksumRel(const std::string& file = "", + bool tmp = false) const { + assert(file.size() == 0 || file[0] != '/'); + return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file + + (tmp ? ".tmp" : ""); + } + inline std::string GetSharedFileWithChecksum(const std::string& file, + const uint32_t checksum_value, + const uint64_t file_size) const { + assert(file.size() == 0 || file[0] != '/'); + std::string file_copy = file; + return file_copy.insert(file_copy.find_last_of('.'), + "_" + ROCKSDB_NAMESPACE::ToString(checksum_value) + + "_" + ROCKSDB_NAMESPACE::ToString(file_size)); + } + inline std::string GetFileFromChecksumFile(const std::string& file) const { + assert(file.size() == 0 || file[0] != '/'); + std::string file_copy = file; + size_t first_underscore = file_copy.find_first_of('_'); + return file_copy.erase(first_underscore, + file_copy.find_last_of('.') - first_underscore); + } + inline std::string GetBackupMetaDir() const { + return GetAbsolutePath("meta"); + } + inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const { + return GetBackupMetaDir() + "/" + (tmp ? "." : "") + + ROCKSDB_NAMESPACE::ToString(backup_id) + (tmp ? ".tmp" : ""); + } + + // If size_limit == 0, there is no size limit, copy everything. + // + // Exactly one of src and contents must be non-empty. + // + // @param src If non-empty, the file is copied from this pathname. + // @param contents If non-empty, the file will be created with these contents. + Status CopyOrCreateFile(const std::string& src, const std::string& dst, + const std::string& contents, Env* src_env, + Env* dst_env, const EnvOptions& src_env_options, + bool sync, RateLimiter* rate_limiter, + uint64_t* size = nullptr, + uint32_t* checksum_value = nullptr, + uint64_t size_limit = 0, + std::function progress_callback = []() {}); + + Status CalculateChecksum(const std::string& src, Env* src_env, + const EnvOptions& src_env_options, + uint64_t size_limit, uint32_t* checksum_value); + + struct CopyOrCreateResult { + uint64_t size; + uint32_t checksum_value; + Status status; + }; + + // Exactly one of src_path and contents must be non-empty. If src_path is + // non-empty, the file is copied from this pathname. Otherwise, if contents is + // non-empty, the file will be created at dst_path with these contents. + struct CopyOrCreateWorkItem { + std::string src_path; + std::string dst_path; + std::string contents; + Env* src_env; + Env* dst_env; + EnvOptions src_env_options; + bool sync; + RateLimiter* rate_limiter; + uint64_t size_limit; + std::promise result; + std::function progress_callback; + + CopyOrCreateWorkItem() + : src_path(""), + dst_path(""), + contents(""), + src_env(nullptr), + dst_env(nullptr), + src_env_options(), + sync(false), + rate_limiter(nullptr), + size_limit(0) {} + + CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete; + CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete; + + CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT { + src_path = std::move(o.src_path); + dst_path = std::move(o.dst_path); + contents = std::move(o.contents); + src_env = o.src_env; + dst_env = o.dst_env; + src_env_options = std::move(o.src_env_options); + sync = o.sync; + rate_limiter = o.rate_limiter; + size_limit = o.size_limit; + result = std::move(o.result); + progress_callback = std::move(o.progress_callback); + return *this; + } + + CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path, + std::string _contents, Env* _src_env, Env* _dst_env, + EnvOptions _src_env_options, bool _sync, + RateLimiter* _rate_limiter, uint64_t _size_limit, + std::function _progress_callback = []() {}) + : src_path(std::move(_src_path)), + dst_path(std::move(_dst_path)), + contents(std::move(_contents)), + src_env(_src_env), + dst_env(_dst_env), + src_env_options(std::move(_src_env_options)), + sync(_sync), + rate_limiter(_rate_limiter), + size_limit(_size_limit), + progress_callback(_progress_callback) {} + }; + + struct BackupAfterCopyOrCreateWorkItem { + std::future result; + bool shared; + bool needed_to_copy; + Env* backup_env; + std::string dst_path_tmp; + std::string dst_path; + std::string dst_relative; + BackupAfterCopyOrCreateWorkItem() + : shared(false), + needed_to_copy(false), + backup_env(nullptr), + dst_path_tmp(""), + dst_path(""), + dst_relative("") {} + + BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o) + ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + BackupAfterCopyOrCreateWorkItem& operator=( + BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT { + result = std::move(o.result); + shared = o.shared; + needed_to_copy = o.needed_to_copy; + backup_env = o.backup_env; + dst_path_tmp = std::move(o.dst_path_tmp); + dst_path = std::move(o.dst_path); + dst_relative = std::move(o.dst_relative); + return *this; + } + + BackupAfterCopyOrCreateWorkItem(std::future&& _result, + bool _shared, bool _needed_to_copy, + Env* _backup_env, std::string _dst_path_tmp, + std::string _dst_path, + std::string _dst_relative) + : result(std::move(_result)), + shared(_shared), + needed_to_copy(_needed_to_copy), + backup_env(_backup_env), + dst_path_tmp(std::move(_dst_path_tmp)), + dst_path(std::move(_dst_path)), + dst_relative(std::move(_dst_relative)) {} + }; + + struct RestoreAfterCopyOrCreateWorkItem { + std::future result; + uint32_t checksum_value; + RestoreAfterCopyOrCreateWorkItem() + : checksum_value(0) {} + RestoreAfterCopyOrCreateWorkItem(std::future&& _result, + uint32_t _checksum_value) + : result(std::move(_result)), checksum_value(_checksum_value) {} + RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o) + ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + RestoreAfterCopyOrCreateWorkItem& operator=( + RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT { + result = std::move(o.result); + checksum_value = o.checksum_value; + return *this; + } + }; + + bool initialized_; + std::mutex byte_report_mutex_; + channel files_to_copy_or_create_; + std::vector threads_; + // Certain operations like PurgeOldBackups and DeleteBackup will trigger + // automatic GarbageCollect (true) unless we've already done one in this + // session and have not failed to delete backup files since then (false). + bool might_need_garbage_collect_ = true; + + // Adds a file to the backup work queue to be copied or created if it doesn't + // already exist. + // + // Exactly one of src_dir and contents must be non-empty. + // + // @param src_dir If non-empty, the file in this directory named fname will be + // copied. + // @param fname Name of destination file and, in case of copy, source file. + // @param contents If non-empty, the file will be created with these contents. + Status AddBackupFileWorkItem( + std::unordered_set& live_dst_paths, + std::vector& backup_items_to_finish, + BackupID backup_id, bool shared, const std::string& src_dir, + const std::string& fname, // starts with "/" + const EnvOptions& src_env_options, RateLimiter* rate_limiter, + uint64_t size_bytes, uint64_t size_limit = 0, + bool shared_checksum = false, + std::function progress_callback = []() {}, + const std::string& contents = std::string()); + + // backup state data + BackupID latest_backup_id_; + BackupID latest_valid_backup_id_; + std::map> backups_; + std::map>> + corrupt_backups_; + std::unordered_map> backuped_file_infos_; + std::atomic stop_backup_; + + // options data + BackupableDBOptions options_; + Env* db_env_; + Env* backup_env_; + + // directories + std::unique_ptr backup_directory_; + std::unique_ptr shared_directory_; + std::unique_ptr meta_directory_; + std::unique_ptr private_directory_; + + static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB + size_t copy_file_buffer_size_; + bool read_only_; + BackupStatistics backup_statistics_; + static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB +}; + +Status BackupEngine::Open(Env* env, const BackupableDBOptions& options, + BackupEngine** backup_engine_ptr) { + std::unique_ptr backup_engine( + new BackupEngineImpl(env, options)); + auto s = backup_engine->Initialize(); + if (!s.ok()) { + *backup_engine_ptr = nullptr; + return s; + } + *backup_engine_ptr = backup_engine.release(); + return Status::OK(); +} + +BackupEngineImpl::BackupEngineImpl(Env* db_env, + const BackupableDBOptions& options, + bool read_only) + : initialized_(false), + latest_backup_id_(0), + latest_valid_backup_id_(0), + stop_backup_(false), + options_(options), + db_env_(db_env), + backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_), + copy_file_buffer_size_(kDefaultCopyFileBufferSize), + read_only_(read_only) { + if (options_.backup_rate_limiter == nullptr && + options_.backup_rate_limit > 0) { + options_.backup_rate_limiter.reset( + NewGenericRateLimiter(options_.backup_rate_limit)); + } + if (options_.restore_rate_limiter == nullptr && + options_.restore_rate_limit > 0) { + options_.restore_rate_limiter.reset( + NewGenericRateLimiter(options_.restore_rate_limit)); + } +} + +BackupEngineImpl::~BackupEngineImpl() { + files_to_copy_or_create_.sendEof(); + for (auto& t : threads_) { + t.join(); + } + LogFlush(options_.info_log); +} + +Status BackupEngineImpl::Initialize() { + assert(!initialized_); + initialized_ = true; + if (read_only_) { + ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine"); + } + options_.Dump(options_.info_log); + + if (!read_only_) { + // we might need to clean up from previous crash or I/O errors + might_need_garbage_collect_ = true; + + if (options_.max_valid_backups_to_open != port::kMaxInt32) { + options_.max_valid_backups_to_open = port::kMaxInt32; + ROCKS_LOG_WARN( + options_.info_log, + "`max_valid_backups_to_open` is not set to the default value. Ignoring " + "its value since BackupEngine is not read-only."); + } + + // gather the list of directories that we need to create + std::vector*>> + directories; + directories.emplace_back(GetAbsolutePath(), &backup_directory_); + if (options_.share_table_files) { + if (options_.share_files_with_checksum) { + directories.emplace_back( + GetAbsolutePath(GetSharedFileWithChecksumRel()), + &shared_directory_); + } else { + directories.emplace_back(GetAbsolutePath(GetSharedFileRel()), + &shared_directory_); + } + } + directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()), + &private_directory_); + directories.emplace_back(GetBackupMetaDir(), &meta_directory_); + // create all the dirs we need + for (const auto& d : directories) { + auto s = backup_env_->CreateDirIfMissing(d.first); + if (s.ok()) { + s = backup_env_->NewDirectory(d.first, d.second); + } + if (!s.ok()) { + return s; + } + } + } + + std::vector backup_meta_files; + { + auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files); + if (s.IsNotFound()) { + return Status::NotFound(GetBackupMetaDir() + " is missing"); + } else if (!s.ok()) { + return s; + } + } + // create backups_ structure + for (auto& file : backup_meta_files) { + if (file == "." || file == "..") { + continue; + } + ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str()); + BackupID backup_id = 0; + sscanf(file.c_str(), "%u", &backup_id); + if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) { + if (!read_only_) { + // invalid file name, delete that + auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file); + ROCKS_LOG_INFO(options_.info_log, + "Unrecognized meta file %s, deleting -- %s", + file.c_str(), s.ToString().c_str()); + } + continue; + } + assert(backups_.find(backup_id) == backups_.end()); + backups_.insert(std::make_pair( + backup_id, std::unique_ptr(new BackupMeta( + GetBackupMetaFile(backup_id, false /* tmp */), + GetBackupMetaFile(backup_id, true /* tmp */), + &backuped_file_infos_, backup_env_)))); + } + + latest_backup_id_ = 0; + latest_valid_backup_id_ = 0; + if (options_.destroy_old_data) { // Destroy old data + assert(!read_only_); + ROCKS_LOG_INFO( + options_.info_log, + "Backup Engine started with destroy_old_data == true, deleting all " + "backups"); + auto s = PurgeOldBackups(0); + if (s.ok()) { + s = GarbageCollect(); + } + if (!s.ok()) { + return s; + } + } else { // Load data from storage + std::unordered_map abs_path_to_size; + for (const auto& rel_dir : + {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) { + const auto abs_dir = GetAbsolutePath(rel_dir); + InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size); + } + // load the backups if any, until valid_backups_to_open of the latest + // non-corrupted backups have been successfully opened. + int valid_backups_to_open = options_.max_valid_backups_to_open; + for (auto backup_iter = backups_.rbegin(); + backup_iter != backups_.rend(); + ++backup_iter) { + assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first); + if (latest_backup_id_ == 0) { + latest_backup_id_ = backup_iter->first; + } + if (valid_backups_to_open == 0) { + break; + } + + InsertPathnameToSizeBytes( + GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_, + &abs_path_to_size); + Status s = backup_iter->second->LoadFromFile(options_.backup_dir, + abs_path_to_size); + if (s.IsCorruption()) { + ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s", + backup_iter->first, s.ToString().c_str()); + corrupt_backups_.insert( + std::make_pair(backup_iter->first, + std::make_pair(s, std::move(backup_iter->second)))); + } else if (!s.ok()) { + // Distinguish corruption errors from errors in the backup Env. + // Errors in the backup Env (i.e., this code path) will cause Open() to + // fail, whereas corruption errors would not cause Open() failures. + return s; + } else { + ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s", + backup_iter->first, + backup_iter->second->GetInfoString().c_str()); + assert(latest_valid_backup_id_ == 0 || + latest_valid_backup_id_ > backup_iter->first); + if (latest_valid_backup_id_ == 0) { + latest_valid_backup_id_ = backup_iter->first; + } + --valid_backups_to_open; + } + } + + for (const auto& corrupt : corrupt_backups_) { + backups_.erase(backups_.find(corrupt.first)); + } + // erase the backups before max_valid_backups_to_open + int num_unopened_backups; + if (options_.max_valid_backups_to_open == 0) { + num_unopened_backups = 0; + } else { + num_unopened_backups = + std::max(0, static_cast(backups_.size()) - + options_.max_valid_backups_to_open); + } + for (int i = 0; i < num_unopened_backups; ++i) { + assert(backups_.begin()->second->Empty()); + backups_.erase(backups_.begin()); + } + } + + ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_); + ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u", + latest_valid_backup_id_); + + // set up threads perform copies from files_to_copy_or_create_ in the + // background + for (int t = 0; t < options_.max_background_operations; t++) { + threads_.emplace_back([this]() { +#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) +#if __GLIBC_PREREQ(2, 12) + pthread_setname_np(pthread_self(), "backup_engine"); +#endif +#endif + CopyOrCreateWorkItem work_item; + while (files_to_copy_or_create_.read(work_item)) { + CopyOrCreateResult result; + result.status = CopyOrCreateFile( + work_item.src_path, work_item.dst_path, work_item.contents, + work_item.src_env, work_item.dst_env, work_item.src_env_options, + work_item.sync, work_item.rate_limiter, &result.size, + &result.checksum_value, work_item.size_limit, + work_item.progress_callback); + work_item.result.set_value(std::move(result)); + } + }); + } + ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine"); + + return Status::OK(); +} + +Status BackupEngineImpl::CreateNewBackupWithMetadata( + DB* db, const std::string& app_metadata, bool flush_before_backup, + std::function progress_callback) { + assert(initialized_); + assert(!read_only_); + if (app_metadata.size() > kMaxAppMetaSize) { + return Status::InvalidArgument("App metadata too large"); + } + + BackupID new_backup_id = latest_backup_id_ + 1; + + assert(backups_.find(new_backup_id) == backups_.end()); + + auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id)); + Status s = backup_env_->FileExists(private_dir); + if (s.ok()) { + // maybe last backup failed and left partial state behind, clean it up. + // need to do this before updating backups_ such that a private dir + // named after new_backup_id will be cleaned up. + // (If an incomplete new backup is followed by an incomplete delete + // of the latest full backup, then there could be more than one next + // id with a private dir, the last thing to be deleted in delete + // backup, but all will be cleaned up with a GarbageCollect.) + s = GarbageCollect(); + } else if (s.IsNotFound()) { + // normal case, the new backup's private dir doesn't exist yet + s = Status::OK(); + } + + auto ret = backups_.insert(std::make_pair( + new_backup_id, std::unique_ptr(new BackupMeta( + GetBackupMetaFile(new_backup_id, false /* tmp */), + GetBackupMetaFile(new_backup_id, true /* tmp */), + &backuped_file_infos_, backup_env_)))); + assert(ret.second == true); + auto& new_backup = ret.first->second; + new_backup->RecordTimestamp(); + new_backup->SetAppMetadata(app_metadata); + + auto start_backup = backup_env_->NowMicros(); + + ROCKS_LOG_INFO(options_.info_log, + "Started the backup process -- creating backup %u", + new_backup_id); + if (s.ok()) { + s = backup_env_->CreateDir(private_dir); + } + + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); + if (rate_limiter) { + copy_file_buffer_size_ = static_cast(rate_limiter->GetSingleBurstBytes()); + } + + // A set into which we will insert the dst_paths that are calculated for live + // files and live WAL files. + // This is used to check whether a live files shares a dst_path with another + // live file. + std::unordered_set live_dst_paths; + + std::vector backup_items_to_finish; + // Add a CopyOrCreateWorkItem to the channel for each live file + db->DisableFileDeletions(); + if (s.ok()) { + CheckpointImpl checkpoint(db); + uint64_t sequence_number = 0; + DBOptions db_options = db->GetDBOptions(); + EnvOptions src_raw_env_options(db_options); + s = checkpoint.CreateCustomCheckpoint( + db_options, + [&](const std::string& /*src_dirname*/, const std::string& /*fname*/, + FileType) { + // custom checkpoint will switch to calling copy_file_cb after it sees + // NotSupported returned from link_file_cb. + return Status::NotSupported(); + } /* link_file_cb */, + [&](const std::string& src_dirname, const std::string& fname, + uint64_t size_limit_bytes, FileType type) { + if (type == kLogFile && !options_.backup_log_files) { + return Status::OK(); + } + Log(options_.info_log, "add file for backup %s", fname.c_str()); + uint64_t size_bytes = 0; + Status st; + if (type == kTableFile) { + st = db_env_->GetFileSize(src_dirname + fname, &size_bytes); + } + EnvOptions src_env_options; + switch (type) { + case kLogFile: + src_env_options = + db_env_->OptimizeForLogRead(src_raw_env_options); + break; + case kTableFile: + src_env_options = db_env_->OptimizeForCompactionTableRead( + src_raw_env_options, ImmutableDBOptions(db_options)); + break; + case kDescriptorFile: + src_env_options = + db_env_->OptimizeForManifestRead(src_raw_env_options); + break; + default: + // Other backed up files (like options file) are not read by live + // DB, so don't need to worry about avoiding mixing buffered and + // direct I/O. Just use plain defaults. + src_env_options = src_raw_env_options; + break; + } + if (st.ok()) { + st = AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + options_.share_table_files && type == kTableFile, src_dirname, + fname, src_env_options, rate_limiter, size_bytes, + size_limit_bytes, + options_.share_files_with_checksum && type == kTableFile, + progress_callback); + } + return st; + } /* copy_file_cb */, + [&](const std::string& fname, const std::string& contents, FileType) { + Log(options_.info_log, "add file for backup %s", fname.c_str()); + return AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + false /* shared */, "" /* src_dir */, fname, + EnvOptions() /* src_env_options */, rate_limiter, contents.size(), + 0 /* size_limit */, false /* shared_checksum */, + progress_callback, contents); + } /* create_file_cb */, + &sequence_number, flush_before_backup ? 0 : port::kMaxUint64); + if (s.ok()) { + new_backup->SetSequenceNumber(sequence_number); + } + } + ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish."); + Status item_status; + for (auto& item : backup_items_to_finish) { + item.result.wait(); + auto result = item.result.get(); + item_status = result.status; + if (item_status.ok() && item.shared && item.needed_to_copy) { + item_status = item.backup_env->RenameFile(item.dst_path_tmp, + item.dst_path); + } + if (item_status.ok()) { + item_status = new_backup.get()->AddFile( + std::make_shared(item.dst_relative, + result.size, + result.checksum_value)); + } + if (!item_status.ok()) { + s = item_status; + } + } + + // we copied all the files, enable file deletions + db->EnableFileDeletions(false); + + auto backup_time = backup_env_->NowMicros() - start_backup; + + if (s.ok()) { + // persist the backup metadata on the disk + s = new_backup->StoreToFile(options_.sync); + } + if (s.ok() && options_.sync) { + std::unique_ptr backup_private_directory; + backup_env_->NewDirectory( + GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)), + &backup_private_directory); + if (backup_private_directory != nullptr) { + s = backup_private_directory->Fsync(); + } + if (s.ok() && private_directory_ != nullptr) { + s = private_directory_->Fsync(); + } + if (s.ok() && meta_directory_ != nullptr) { + s = meta_directory_->Fsync(); + } + if (s.ok() && shared_directory_ != nullptr) { + s = shared_directory_->Fsync(); + } + if (s.ok() && backup_directory_ != nullptr) { + s = backup_directory_->Fsync(); + } + } + + if (s.ok()) { + backup_statistics_.IncrementNumberSuccessBackup(); + } + if (!s.ok()) { + backup_statistics_.IncrementNumberFailBackup(); + // clean all the files we might have created + ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s", + s.ToString().c_str()); + ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n", + backup_statistics_.ToString().c_str()); + // delete files that we might have already written + might_need_garbage_collect_ = true; + DeleteBackup(new_backup_id); + return s; + } + + // here we know that we succeeded and installed the new backup + // in the LATEST_BACKUP file + latest_backup_id_ = new_backup_id; + latest_valid_backup_id_ = new_backup_id; + ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good"); + + // backup_speed is in byte/second + double backup_speed = new_backup->GetSize() / (1.048576 * backup_time); + ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u", + new_backup->GetNumberFiles()); + char human_size[16]; + AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size)); + ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size); + ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds", + backup_time); + ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed); + ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s", + backup_statistics_.ToString().c_str()); + return s; +} + +Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) { + assert(initialized_); + assert(!read_only_); + + // Best effort deletion even with errors + Status overall_status = Status::OK(); + + ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u", + num_backups_to_keep); + std::vector to_delete; + auto itr = backups_.begin(); + while ((backups_.size() - to_delete.size()) > num_backups_to_keep) { + to_delete.push_back(itr->first); + itr++; + } + for (auto backup_id : to_delete) { + auto s = DeleteBackupInternal(backup_id); + if (!s.ok()) { + overall_status = s; + } + } + // Clean up after any incomplete backup deletion, potentially from + // earlier session. + if (might_need_garbage_collect_) { + auto s = GarbageCollect(); + if (!s.ok() && overall_status.ok()) { + overall_status = s; + } + } + return overall_status; +} + +Status BackupEngineImpl::DeleteBackup(BackupID backup_id) { + auto s1 = DeleteBackupInternal(backup_id); + auto s2 = Status::OK(); + + // Clean up after any incomplete backup deletion, potentially from + // earlier session. + if (might_need_garbage_collect_) { + s2 = GarbageCollect(); + } + + if (!s1.ok()) { + return s1; + } else { + return s2; + } +} + +// Does not auto-GarbageCollect +Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) { + assert(initialized_); + assert(!read_only_); + + ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id); + auto backup = backups_.find(backup_id); + if (backup != backups_.end()) { + auto s = backup->second->Delete(); + if (!s.ok()) { + return s; + } + backups_.erase(backup); + } else { + auto corrupt = corrupt_backups_.find(backup_id); + if (corrupt == corrupt_backups_.end()) { + return Status::NotFound("Backup not found"); + } + auto s = corrupt->second.second->Delete(); + if (!s.ok()) { + return s; + } + corrupt_backups_.erase(corrupt); + } + + // After removing meta file, best effort deletion even with errors. + // (Don't delete other files if we can't delete the meta file right + // now.) + std::vector to_delete; + for (auto& itr : backuped_file_infos_) { + if (itr.second->refs == 0) { + Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first)); + ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), + s.ToString().c_str()); + to_delete.push_back(itr.first); + if (!s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + } + for (auto& td : to_delete) { + backuped_file_infos_.erase(td); + } + + // take care of private dirs -- GarbageCollect() will take care of them + // if they are not empty + std::string private_dir = GetPrivateFileRel(backup_id); + Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir)); + ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s", + private_dir.c_str(), s.ToString().c_str()); + if (!s.ok()) { + // Full gc or trying again later might work + might_need_garbage_collect_ = true; + } + return Status::OK(); +} + +void BackupEngineImpl::GetBackupInfo(std::vector* backup_info) { + assert(initialized_); + backup_info->reserve(backups_.size()); + for (auto& backup : backups_) { + if (!backup.second->Empty()) { + backup_info->push_back(BackupInfo( + backup.first, backup.second->GetTimestamp(), backup.second->GetSize(), + backup.second->GetNumberFiles(), backup.second->GetAppMetadata())); + } + } +} + +void +BackupEngineImpl::GetCorruptedBackups( + std::vector* corrupt_backup_ids) { + assert(initialized_); + corrupt_backup_ids->reserve(corrupt_backups_.size()); + for (auto& backup : corrupt_backups_) { + corrupt_backup_ids->push_back(backup.first); + } +} + +Status BackupEngineImpl::RestoreDBFromBackup( + BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, + const RestoreOptions& restore_options) { + assert(initialized_); + auto corrupt_itr = corrupt_backups_.find(backup_id); + if (corrupt_itr != corrupt_backups_.end()) { + return corrupt_itr->second.first; + } + auto backup_itr = backups_.find(backup_id); + if (backup_itr == backups_.end()) { + return Status::NotFound("Backup not found"); + } + auto& backup = backup_itr->second; + if (backup->Empty()) { + return Status::NotFound("Backup not found"); + } + + ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id); + ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n", + static_cast(restore_options.keep_log_files)); + + // just in case. Ignore errors + db_env_->CreateDirIfMissing(db_dir); + db_env_->CreateDirIfMissing(wal_dir); + + if (restore_options.keep_log_files) { + // delete files in db_dir, but keep all the log files + DeleteChildren(db_dir, 1 << kLogFile); + // move all the files from archive dir to wal_dir + std::string archive_dir = ArchivalDirectory(wal_dir); + std::vector archive_files; + db_env_->GetChildren(archive_dir, &archive_files); // ignore errors + for (const auto& f : archive_files) { + uint64_t number; + FileType type; + bool ok = ParseFileName(f, &number, &type); + if (ok && type == kLogFile) { + ROCKS_LOG_INFO(options_.info_log, + "Moving log file from archive/ to wal_dir: %s", + f.c_str()); + Status s = + db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f); + if (!s.ok()) { + // if we can't move log file from archive_dir to wal_dir, + // we should fail, since it might mean data loss + return s; + } + } + } + } else { + DeleteChildren(wal_dir); + DeleteChildren(ArchivalDirectory(wal_dir)); + DeleteChildren(db_dir); + } + + RateLimiter* rate_limiter = options_.restore_rate_limiter.get(); + if (rate_limiter) { + copy_file_buffer_size_ = static_cast(rate_limiter->GetSingleBurstBytes()); + } + Status s; + std::vector restore_items_to_finish; + for (const auto& file_info : backup->GetFiles()) { + const std::string &file = file_info->filename; + std::string dst; + // 1. extract the filename + size_t slash = file.find_last_of('/'); + // file will either be shared/, shared_checksum/ + // or private// + assert(slash != std::string::npos); + dst = file.substr(slash + 1); + + // if the file was in shared_checksum, extract the real file name + // in this case the file is __. + if (file.substr(0, slash) == GetSharedChecksumDirRel()) { + dst = GetFileFromChecksumFile(dst); + } + + // 2. find the filetype + uint64_t number; + FileType type; + bool ok = ParseFileName(dst, &number, &type); + if (!ok) { + return Status::Corruption("Backup corrupted"); + } + // 3. Construct the final path + // kLogFile lives in wal_dir and all the rest live in db_dir + dst = ((type == kLogFile) ? wal_dir : db_dir) + + "/" + dst; + + ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(), + dst.c_str()); + CopyOrCreateWorkItem copy_or_create_work_item( + GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_, + EnvOptions() /* src_env_options */, false, rate_limiter, + 0 /* size_limit */); + RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( + copy_or_create_work_item.result.get_future(), + file_info->checksum_value); + files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); + restore_items_to_finish.push_back( + std::move(after_copy_or_create_work_item)); + } + Status item_status; + for (auto& item : restore_items_to_finish) { + item.result.wait(); + auto result = item.result.get(); + item_status = result.status; + // Note: It is possible that both of the following bad-status cases occur + // during copying. But, we only return one status. + if (!item_status.ok()) { + s = item_status; + break; + } else if (item.checksum_value != result.checksum_value) { + s = Status::Corruption("Checksum check failed"); + break; + } + } + + ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n", + s.ToString().c_str()); + return s; +} + +Status BackupEngineImpl::VerifyBackup(BackupID backup_id) { + assert(initialized_); + auto corrupt_itr = corrupt_backups_.find(backup_id); + if (corrupt_itr != corrupt_backups_.end()) { + return corrupt_itr->second.first; + } + + auto backup_itr = backups_.find(backup_id); + if (backup_itr == backups_.end()) { + return Status::NotFound(); + } + + auto& backup = backup_itr->second; + if (backup->Empty()) { + return Status::NotFound(); + } + + ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id); + + std::unordered_map curr_abs_path_to_size; + for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(), + GetSharedFileWithChecksumRel()}) { + const auto abs_dir = GetAbsolutePath(rel_dir); + InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size); + } + + for (const auto& file_info : backup->GetFiles()) { + const auto abs_path = GetAbsolutePath(file_info->filename); + if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) { + return Status::NotFound("File missing: " + abs_path); + } + if (file_info->size != curr_abs_path_to_size[abs_path]) { + return Status::Corruption("File corrupted: " + abs_path); + } + } + return Status::OK(); +} + +Status BackupEngineImpl::CopyOrCreateFile( + const std::string& src, const std::string& dst, const std::string& contents, + Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync, + RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value, + uint64_t size_limit, std::function progress_callback) { + assert(src.empty() != contents.empty()); + Status s; + std::unique_ptr dst_file; + std::unique_ptr src_file; + EnvOptions dst_env_options; + dst_env_options.use_mmap_writes = false; + // TODO:(gzh) maybe use direct reads/writes here if possible + if (size != nullptr) { + *size = 0; + } + if (checksum_value != nullptr) { + *checksum_value = 0; + } + + // Check if size limit is set. if not, set it to very big number + if (size_limit == 0) { + size_limit = std::numeric_limits::max(); + } + + s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options); + if (s.ok() && !src.empty()) { + s = src_env->NewSequentialFile(src, &src_file, src_env_options); + } + if (!s.ok()) { + return s; + } + + std::unique_ptr dest_writer(new WritableFileWriter( + NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options)); + std::unique_ptr src_reader; + std::unique_ptr buf; + if (!src.empty()) { + src_reader.reset(new SequentialFileReader( + NewLegacySequentialFileWrapper(src_file), src)); + buf.reset(new char[copy_file_buffer_size_]); + } + + Slice data; + uint64_t processed_buffer_size = 0; + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return Status::Incomplete("Backup stopped"); + } + if (!src.empty()) { + size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) + ? copy_file_buffer_size_ + : static_cast(size_limit); + s = src_reader->Read(buffer_to_read, &data, buf.get()); + processed_buffer_size += buffer_to_read; + } else { + data = contents; + } + size_limit -= data.size(); + + if (!s.ok()) { + return s; + } + + if (size != nullptr) { + *size += data.size(); + } + if (checksum_value != nullptr) { + *checksum_value = + crc32c::Extend(*checksum_value, data.data(), data.size()); + } + s = dest_writer->Append(data); + if (rate_limiter != nullptr) { + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kWrite); + } + if (processed_buffer_size > options_.callback_trigger_interval_size) { + processed_buffer_size -= options_.callback_trigger_interval_size; + std::lock_guard lock(byte_report_mutex_); + progress_callback(); + } + } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0); + + if (s.ok() && sync) { + s = dest_writer->Sync(false); + } + if (s.ok()) { + s = dest_writer->Close(); + } + return s; +} + +// fname will always start with "/" +Status BackupEngineImpl::AddBackupFileWorkItem( + std::unordered_set& live_dst_paths, + std::vector& backup_items_to_finish, + BackupID backup_id, bool shared, const std::string& src_dir, + const std::string& fname, const EnvOptions& src_env_options, + RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit, + bool shared_checksum, std::function progress_callback, + const std::string& contents) { + assert(!fname.empty() && fname[0] == '/'); + assert(contents.empty() != src_dir.empty()); + + std::string dst_relative = fname.substr(1); + std::string dst_relative_tmp; + Status s; + uint32_t checksum_value = 0; + + if (shared && shared_checksum) { + // add checksum and file length to the file name + s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit, + &checksum_value); + if (!s.ok()) { + return s; + } + if (size_bytes == port::kMaxUint64) { + return Status::NotFound("File missing: " + src_dir + fname); + } + dst_relative = + GetSharedFileWithChecksum(dst_relative, checksum_value, size_bytes); + dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true); + dst_relative = GetSharedFileWithChecksumRel(dst_relative, false); + } else if (shared) { + dst_relative_tmp = GetSharedFileRel(dst_relative, true); + dst_relative = GetSharedFileRel(dst_relative, false); + } else { + dst_relative = GetPrivateFileRel(backup_id, false, dst_relative); + } + + // We copy into `temp_dest_path` and, once finished, rename it to + // `final_dest_path`. This allows files to atomically appear at + // `final_dest_path`. We can copy directly to the final path when atomicity + // is unnecessary, like for files in private backup directories. + const std::string* copy_dest_path; + std::string temp_dest_path; + std::string final_dest_path = GetAbsolutePath(dst_relative); + if (!dst_relative_tmp.empty()) { + temp_dest_path = GetAbsolutePath(dst_relative_tmp); + copy_dest_path = &temp_dest_path; + } else { + copy_dest_path = &final_dest_path; + } + + // if it's shared, we also need to check if it exists -- if it does, no need + // to copy it again. + bool need_to_copy = true; + // true if final_dest_path is the same path as another live file + const bool same_path = + live_dst_paths.find(final_dest_path) != live_dst_paths.end(); + + bool file_exists = false; + if (shared && !same_path) { + Status exist = backup_env_->FileExists(final_dest_path); + if (exist.ok()) { + file_exists = true; + } else if (exist.IsNotFound()) { + file_exists = false; + } else { + assert(s.IsIOError()); + return exist; + } + } + + if (!contents.empty()) { + need_to_copy = false; + } else if (shared && (same_path || file_exists)) { + need_to_copy = false; + if (shared_checksum) { + ROCKS_LOG_INFO(options_.info_log, + "%s already present, with checksum %u and size %" PRIu64, + fname.c_str(), checksum_value, size_bytes); + } else if (backuped_file_infos_.find(dst_relative) == + backuped_file_infos_.end() && !same_path) { + // file already exists, but it's not referenced by any backup. overwrite + // the file + ROCKS_LOG_INFO( + options_.info_log, + "%s already present, but not referenced by any backup. We will " + "overwrite the file.", + fname.c_str()); + need_to_copy = true; + backup_env_->DeleteFile(final_dest_path); + } else { + // the file is present and referenced by a backup + ROCKS_LOG_INFO(options_.info_log, + "%s already present, calculate checksum", fname.c_str()); + s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, + size_limit, &checksum_value); + } + } + live_dst_paths.insert(final_dest_path); + + if (!contents.empty() || need_to_copy) { + ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(), + copy_dest_path->c_str()); + CopyOrCreateWorkItem copy_or_create_work_item( + src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents, + db_env_, backup_env_, src_env_options, options_.sync, rate_limiter, + size_limit, progress_callback); + BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( + copy_or_create_work_item.result.get_future(), shared, need_to_copy, + backup_env_, temp_dest_path, final_dest_path, dst_relative); + files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); + backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item)); + } else { + std::promise promise_result; + BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( + promise_result.get_future(), shared, need_to_copy, backup_env_, + temp_dest_path, final_dest_path, dst_relative); + backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item)); + CopyOrCreateResult result; + result.status = s; + result.size = size_bytes; + result.checksum_value = checksum_value; + promise_result.set_value(std::move(result)); + } + return s; +} + +Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env, + const EnvOptions& src_env_options, + uint64_t size_limit, + uint32_t* checksum_value) { + *checksum_value = 0; + if (size_limit == 0) { + size_limit = std::numeric_limits::max(); + } + + std::unique_ptr src_file; + Status s = src_env->NewSequentialFile(src, &src_file, src_env_options); + if (!s.ok()) { + return s; + } + + std::unique_ptr src_reader( + new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src)); + std::unique_ptr buf(new char[copy_file_buffer_size_]); + Slice data; + + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return Status::Incomplete("Backup stopped"); + } + size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ? + copy_file_buffer_size_ : static_cast(size_limit); + s = src_reader->Read(buffer_to_read, &data, buf.get()); + + if (!s.ok()) { + return s; + } + + size_limit -= data.size(); + *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size()); + } while (data.size() > 0 && size_limit > 0); + + return s; +} + +void BackupEngineImpl::DeleteChildren(const std::string& dir, + uint32_t file_type_filter) { + std::vector children; + db_env_->GetChildren(dir, &children); // ignore errors + + for (const auto& f : children) { + uint64_t number; + FileType type; + bool ok = ParseFileName(f, &number, &type); + if (ok && (file_type_filter & (1 << type))) { + // don't delete this file + continue; + } + db_env_->DeleteFile(dir + "/" + f); // ignore errors + } +} + +Status BackupEngineImpl::InsertPathnameToSizeBytes( + const std::string& dir, Env* env, + std::unordered_map* result) { + assert(result != nullptr); + std::vector files_attrs; + Status status = env->FileExists(dir); + if (status.ok()) { + status = env->GetChildrenFileAttributes(dir, &files_attrs); + } else if (status.IsNotFound()) { + // Insert no entries can be considered success + status = Status::OK(); + } + const bool slash_needed = dir.empty() || dir.back() != '/'; + for (const auto& file_attrs : files_attrs) { + result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name, + file_attrs.size_bytes); + } + return status; +} + +Status BackupEngineImpl::GarbageCollect() { + assert(!read_only_); + + // We will make a best effort to remove all garbage even in the presence + // of inconsistencies or I/O failures that inhibit finding garbage. + Status overall_status = Status::OK(); + // If all goes well, we don't need another auto-GC this session + might_need_garbage_collect_ = false; + + ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection"); + + // delete obsolete shared files + for (bool with_checksum : {false, true}) { + std::vector shared_children; + { + std::string shared_path; + if (with_checksum) { + shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel()); + } else { + shared_path = GetAbsolutePath(GetSharedFileRel()); + } + auto s = backup_env_->FileExists(shared_path); + if (s.ok()) { + s = backup_env_->GetChildren(shared_path, &shared_children); + } else if (s.IsNotFound()) { + s = Status::OK(); + } + if (!s.ok()) { + overall_status = s; + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + for (auto& child : shared_children) { + if (child == "." || child == "..") { + continue; + } + std::string rel_fname; + if (with_checksum) { + rel_fname = GetSharedFileWithChecksumRel(child); + } else { + rel_fname = GetSharedFileRel(child); + } + auto child_itr = backuped_file_infos_.find(rel_fname); + // if it's not refcounted, delete it + if (child_itr == backuped_file_infos_.end() || + child_itr->second->refs == 0) { + // this might be a directory, but DeleteFile will just fail in that + // case, so we're good + Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname)); + ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", + rel_fname.c_str(), s.ToString().c_str()); + backuped_file_infos_.erase(rel_fname); + if (!s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + } + } + + // delete obsolete private files + std::vector private_children; + { + auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()), + &private_children); + if (!s.ok()) { + overall_status = s; + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + for (auto& child : private_children) { + if (child == "." || child == "..") { + continue; + } + + BackupID backup_id = 0; + bool tmp_dir = child.find(".tmp") != std::string::npos; + sscanf(child.c_str(), "%u", &backup_id); + if (!tmp_dir && // if it's tmp_dir, delete it + (backup_id == 0 || backups_.find(backup_id) != backups_.end())) { + // it's either not a number or it's still alive. continue + continue; + } + // here we have to delete the dir and all its children + std::string full_private_path = + GetAbsolutePath(GetPrivateFileRel(backup_id)); + std::vector subchildren; + backup_env_->GetChildren(full_private_path, &subchildren); + for (auto& subchild : subchildren) { + if (subchild == "." || subchild == "..") { + continue; + } + Status s = backup_env_->DeleteFile(full_private_path + subchild); + ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", + (full_private_path + subchild).c_str(), + s.ToString().c_str()); + if (!s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + // finally delete the private dir + Status s = backup_env_->DeleteDir(full_private_path); + ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s", + full_private_path.c_str(), s.ToString().c_str()); + if (!s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + + assert(overall_status.ok() || might_need_garbage_collect_); + return overall_status; +} + +// ------- BackupMeta class -------- + +Status BackupEngineImpl::BackupMeta::AddFile( + std::shared_ptr file_info) { + auto itr = file_infos_->find(file_info->filename); + if (itr == file_infos_->end()) { + auto ret = file_infos_->insert({file_info->filename, file_info}); + if (ret.second) { + itr = ret.first; + itr->second->refs = 1; + } else { + // if this happens, something is seriously wrong + return Status::Corruption("In memory metadata insertion error"); + } + } else { + if (itr->second->checksum_value != file_info->checksum_value) { + return Status::Corruption( + "Checksum mismatch for existing backup file. Delete old backups and " + "try again."); + } + ++itr->second->refs; // increase refcount if already present + } + + size_ += file_info->size; + files_.push_back(itr->second); + + return Status::OK(); +} + +Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) { + Status s; + for (const auto& file : files_) { + --file->refs; // decrease refcount + } + files_.clear(); + // delete meta file + if (delete_meta) { + s = env_->FileExists(meta_filename_); + if (s.ok()) { + s = env_->DeleteFile(meta_filename_); + } else if (s.IsNotFound()) { + s = Status::OK(); // nothing to delete + } + } + timestamp_ = 0; + return s; +} + +Slice kMetaDataPrefix("metadata "); + +// each backup meta file is of the format: +// +// +// (optional) +// +// +// +// ... +Status BackupEngineImpl::BackupMeta::LoadFromFile( + const std::string& backup_dir, + const std::unordered_map& abs_path_to_size) { + assert(Empty()); + Status s; + std::unique_ptr backup_meta_file; + s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions()); + if (!s.ok()) { + return s; + } + + std::unique_ptr backup_meta_reader( + new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file), + meta_filename_)); + std::unique_ptr buf(new char[max_backup_meta_file_size_ + 1]); + Slice data; + s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get()); + + if (!s.ok() || data.size() == max_backup_meta_file_size_) { + return s.ok() ? Status::Corruption("File size too big") : s; + } + buf[data.size()] = 0; + + uint32_t num_files = 0; + char *next; + timestamp_ = strtoull(data.data(), &next, 10); + data.remove_prefix(next - data.data() + 1); // +1 for '\n' + sequence_number_ = strtoull(data.data(), &next, 10); + data.remove_prefix(next - data.data() + 1); // +1 for '\n' + + if (data.starts_with(kMetaDataPrefix)) { + // app metadata present + data.remove_prefix(kMetaDataPrefix.size()); + Slice hex_encoded_metadata = GetSliceUntil(&data, '\n'); + bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_); + if (!decode_success) { + return Status::Corruption( + "Failed to decode stored hex encoded app metadata"); + } + } + + num_files = static_cast(strtoul(data.data(), &next, 10)); + data.remove_prefix(next - data.data() + 1); // +1 for '\n' + + std::vector> files; + + Slice checksum_prefix("crc32 "); + + for (uint32_t i = 0; s.ok() && i < num_files; ++i) { + auto line = GetSliceUntil(&data, '\n'); + std::string filename = GetSliceUntil(&line, ' ').ToString(); + + uint64_t size; + const std::shared_ptr file_info = GetFile(filename); + if (file_info) { + size = file_info->size; + } else { + std::string abs_path = backup_dir + "/" + filename; + try { + size = abs_path_to_size.at(abs_path); + } catch (std::out_of_range&) { + return Status::Corruption("Size missing for pathname: " + abs_path); + } + } + + if (line.empty()) { + return Status::Corruption("File checksum is missing for " + filename + + " in " + meta_filename_); + } + + uint32_t checksum_value = 0; + if (line.starts_with(checksum_prefix)) { + line.remove_prefix(checksum_prefix.size()); + checksum_value = static_cast( + strtoul(line.data(), nullptr, 10)); + if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) { + return Status::Corruption("Invalid checksum value for " + filename + + " in " + meta_filename_); + } + } else { + return Status::Corruption("Unknown checksum type for " + filename + + " in " + meta_filename_); + } + + files.emplace_back(new FileInfo(filename, size, checksum_value)); + } + + if (s.ok() && data.size() > 0) { + // file has to be read completely. if not, we count it as corruption + s = Status::Corruption("Tailing data in backup meta file in " + + meta_filename_); + } + + if (s.ok()) { + files_.reserve(files.size()); + for (const auto& file_info : files) { + s = AddFile(file_info); + if (!s.ok()) { + break; + } + } + } + + return s; +} + +Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) { + Status s; + std::unique_ptr backup_meta_file; + EnvOptions env_options; + env_options.use_mmap_writes = false; + env_options.use_direct_writes = false; + s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options); + if (!s.ok()) { + return s; + } + + std::unique_ptr buf(new char[max_backup_meta_file_size_]); + size_t len = 0, buf_size = max_backup_meta_file_size_; + len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_); + len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n", + sequence_number_); + if (!app_metadata_.empty()) { + std::string hex_encoded_metadata = + Slice(app_metadata_).ToString(/* hex */ true); + + // +1 to accommodate newline character + size_t hex_meta_strlen = kMetaDataPrefix.ToString().length() + hex_encoded_metadata.length() + 1; + if (hex_meta_strlen >= buf_size) { + return Status::Corruption("Buffer too small to fit backup metadata"); + } + else if (len + hex_meta_strlen >= buf_size) { + backup_meta_file->Append(Slice(buf.get(), len)); + buf.reset(); + std::unique_ptr new_reset_buf( + new char[max_backup_meta_file_size_]); + buf.swap(new_reset_buf); + len = 0; + } + len += snprintf(buf.get() + len, buf_size - len, "%s%s\n", + kMetaDataPrefix.ToString().c_str(), + hex_encoded_metadata.c_str()); + } + + char writelen_temp[19]; + if (len + snprintf(writelen_temp, sizeof(writelen_temp), + "%" ROCKSDB_PRIszt "\n", files_.size()) >= buf_size) { + backup_meta_file->Append(Slice(buf.get(), len)); + buf.reset(); + std::unique_ptr new_reset_buf(new char[max_backup_meta_file_size_]); + buf.swap(new_reset_buf); + len = 0; + } + { + const char *const_write = writelen_temp; + len += snprintf(buf.get() + len, buf_size - len, "%s", const_write); + } + + for (const auto& file : files_) { + // use crc32 for now, switch to something else if needed + + size_t newlen = len + file->filename.length() + snprintf(writelen_temp, + sizeof(writelen_temp), " crc32 %u\n", file->checksum_value); + const char *const_write = writelen_temp; + if (newlen >= buf_size) { + backup_meta_file->Append(Slice(buf.get(), len)); + buf.reset(); + std::unique_ptr new_reset_buf( + new char[max_backup_meta_file_size_]); + buf.swap(new_reset_buf); + len = 0; + } + len += snprintf(buf.get() + len, buf_size - len, "%s%s", + file->filename.c_str(), const_write); + } + + s = backup_meta_file->Append(Slice(buf.get(), len)); + if (s.ok() && sync) { + s = backup_meta_file->Sync(); + } + if (s.ok()) { + s = backup_meta_file->Close(); + } + if (s.ok()) { + s = env_->RenameFile(meta_tmp_filename_, meta_filename_); + } + return s; +} + +// -------- BackupEngineReadOnlyImpl --------- +class BackupEngineReadOnlyImpl : public BackupEngineReadOnly { + public: + BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options) + : backup_engine_(new BackupEngineImpl(db_env, options, true)) {} + + ~BackupEngineReadOnlyImpl() override {} + + // The returned BackupInfos are in chronological order, which means the + // latest backup comes last. + void GetBackupInfo(std::vector* backup_info) override { + backup_engine_->GetBackupInfo(backup_info); + } + + void GetCorruptedBackups(std::vector* corrupt_backup_ids) override { + backup_engine_->GetCorruptedBackups(corrupt_backup_ids); + } + + Status RestoreDBFromBackup( + BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, + const RestoreOptions& restore_options = RestoreOptions()) override { + return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir, + restore_options); + } + + Status RestoreDBFromLatestBackup( + const std::string& db_dir, const std::string& wal_dir, + const RestoreOptions& restore_options = RestoreOptions()) override { + return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir, + restore_options); + } + + Status VerifyBackup(BackupID backup_id) override { + return backup_engine_->VerifyBackup(backup_id); + } + + Status Initialize() { return backup_engine_->Initialize(); } + + private: + std::unique_ptr backup_engine_; +}; + +Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options, + BackupEngineReadOnly** backup_engine_ptr) { + if (options.destroy_old_data) { + return Status::InvalidArgument( + "Can't destroy old data with ReadOnly BackupEngine"); + } + std::unique_ptr backup_engine( + new BackupEngineReadOnlyImpl(env, options)); + auto s = backup_engine->Initialize(); + if (!s.ok()) { + *backup_engine_ptr = nullptr; + return s; + } + *backup_engine_ptr = backup_engine.release(); + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/backupable/backupable_db_test.cc b/src/rocksdb/utilities/backupable/backupable_db_test.cc new file mode 100644 index 000000000..efdb34b30 --- /dev/null +++ b/src/rocksdb/utilities/backupable/backupable_db_test.cc @@ -0,0 +1,1863 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#if !defined(ROCKSDB_LITE) && !defined(OS_WIN) + +#include +#include +#include +#include + +#include "db/db_impl/db_impl.h" +#include "env/env_chroot.h" +#include "file/filename.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/rate_limiter.h" +#include "rocksdb/transaction_log.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/backupable_db.h" +#include "rocksdb/utilities/options_util.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/stderr_logger.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { + +class DummyDB : public StackableDB { + public: + /* implicit */ + DummyDB(const Options& options, const std::string& dbname) + : StackableDB(nullptr), options_(options), dbname_(dbname), + deletions_enabled_(true), sequence_number_(0) {} + + SequenceNumber GetLatestSequenceNumber() const override { + return ++sequence_number_; + } + + const std::string& GetName() const override { return dbname_; } + + Env* GetEnv() const override { return options_.env; } + + using DB::GetOptions; + Options GetOptions(ColumnFamilyHandle* /*column_family*/) const override { + return options_; + } + + DBOptions GetDBOptions() const override { return DBOptions(options_); } + + Status EnableFileDeletions(bool /*force*/) override { + EXPECT_TRUE(!deletions_enabled_); + deletions_enabled_ = true; + return Status::OK(); + } + + Status DisableFileDeletions() override { + EXPECT_TRUE(deletions_enabled_); + deletions_enabled_ = false; + return Status::OK(); + } + + Status GetLiveFiles(std::vector& vec, uint64_t* mfs, + bool /*flush_memtable*/ = true) override { + EXPECT_TRUE(!deletions_enabled_); + vec = live_files_; + *mfs = 100; + return Status::OK(); + } + + ColumnFamilyHandle* DefaultColumnFamily() const override { return nullptr; } + + class DummyLogFile : public LogFile { + public: + /* implicit */ + DummyLogFile(const std::string& path, bool alive = true) + : path_(path), alive_(alive) {} + + std::string PathName() const override { return path_; } + + uint64_t LogNumber() const override { + // what business do you have calling this method? + ADD_FAILURE(); + return 0; + } + + WalFileType Type() const override { + return alive_ ? kAliveLogFile : kArchivedLogFile; + } + + SequenceNumber StartSequence() const override { + // this seqnum guarantees the dummy file will be included in the backup + // as long as it is alive. + return kMaxSequenceNumber; + } + + uint64_t SizeFileBytes() const override { return 0; } + + private: + std::string path_; + bool alive_; + }; // DummyLogFile + + Status GetSortedWalFiles(VectorLogPtr& files) override { + EXPECT_TRUE(!deletions_enabled_); + files.resize(wal_files_.size()); + for (size_t i = 0; i < files.size(); ++i) { + files[i].reset( + new DummyLogFile(wal_files_[i].first, wal_files_[i].second)); + } + return Status::OK(); + } + + // To avoid FlushWAL called on stacked db which is nullptr + Status FlushWAL(bool /*sync*/) override { return Status::OK(); } + + std::vector live_files_; + // pair + std::vector> wal_files_; + private: + Options options_; + std::string dbname_; + bool deletions_enabled_; + mutable SequenceNumber sequence_number_; +}; // DummyDB + +class TestEnv : public EnvWrapper { + public: + explicit TestEnv(Env* t) : EnvWrapper(t) {} + + class DummySequentialFile : public SequentialFile { + public: + explicit DummySequentialFile(bool fail_reads) + : SequentialFile(), rnd_(5), fail_reads_(fail_reads) {} + Status Read(size_t n, Slice* result, char* scratch) override { + if (fail_reads_) { + return Status::IOError(); + } + size_t read_size = (n > size_left) ? size_left : n; + for (size_t i = 0; i < read_size; ++i) { + scratch[i] = rnd_.Next() & 255; + } + *result = Slice(scratch, read_size); + size_left -= read_size; + return Status::OK(); + } + + Status Skip(uint64_t n) override { + size_left = (n > size_left) ? size_left - n : 0; + return Status::OK(); + } + + private: + size_t size_left = 200; + Random rnd_; + bool fail_reads_; + }; + + Status NewSequentialFile(const std::string& f, + std::unique_ptr* r, + const EnvOptions& options) override { + MutexLock l(&mutex_); + if (dummy_sequential_file_) { + r->reset( + new TestEnv::DummySequentialFile(dummy_sequential_file_fail_reads_)); + return Status::OK(); + } else { + Status s = EnvWrapper::NewSequentialFile(f, r, options); + if (s.ok()) { + if ((*r)->use_direct_io()) { + ++num_direct_seq_readers_; + } + ++num_seq_readers_; + } + return s; + } + } + + Status NewWritableFile(const std::string& f, std::unique_ptr* r, + const EnvOptions& options) override { + MutexLock l(&mutex_); + written_files_.push_back(f); + if (limit_written_files_ <= 0) { + return Status::NotSupported("Sorry, can't do this"); + } + limit_written_files_--; + Status s = EnvWrapper::NewWritableFile(f, r, options); + if (s.ok()) { + if ((*r)->use_direct_io()) { + ++num_direct_writers_; + } + ++num_writers_; + } + return s; + } + + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + MutexLock l(&mutex_); + Status s = EnvWrapper::NewRandomAccessFile(fname, result, options); + if (s.ok()) { + if ((*result)->use_direct_io()) { + ++num_direct_rand_readers_; + } + ++num_rand_readers_; + } + return s; + } + + Status DeleteFile(const std::string& fname) override { + MutexLock l(&mutex_); + if (fail_delete_files_) { + return Status::IOError(); + } + EXPECT_GT(limit_delete_files_, 0U); + limit_delete_files_--; + return EnvWrapper::DeleteFile(fname); + } + + Status DeleteDir(const std::string& dirname) override { + MutexLock l(&mutex_); + if (fail_delete_files_) { + return Status::IOError(); + } + return EnvWrapper::DeleteDir(dirname); + } + + void AssertWrittenFiles(std::vector& should_have_written) { + MutexLock l(&mutex_); + std::sort(should_have_written.begin(), should_have_written.end()); + std::sort(written_files_.begin(), written_files_.end()); + + ASSERT_EQ(should_have_written, written_files_); + } + + void ClearWrittenFiles() { + MutexLock l(&mutex_); + written_files_.clear(); + } + + void SetLimitWrittenFiles(uint64_t limit) { + MutexLock l(&mutex_); + limit_written_files_ = limit; + } + + void SetLimitDeleteFiles(uint64_t limit) { + MutexLock l(&mutex_); + limit_delete_files_ = limit; + } + + void SetDeleteFileFailure(bool fail) { + MutexLock l(&mutex_); + fail_delete_files_ = fail; + } + + void SetDummySequentialFile(bool dummy_sequential_file) { + MutexLock l(&mutex_); + dummy_sequential_file_ = dummy_sequential_file; + } + void SetDummySequentialFileFailReads(bool dummy_sequential_file_fail_reads) { + MutexLock l(&mutex_); + dummy_sequential_file_fail_reads_ = dummy_sequential_file_fail_reads; + } + + void SetGetChildrenFailure(bool fail) { get_children_failure_ = fail; } + Status GetChildren(const std::string& dir, + std::vector* r) override { + if (get_children_failure_) { + return Status::IOError("SimulatedFailure"); + } + return EnvWrapper::GetChildren(dir, r); + } + + // Some test cases do not actually create the test files (e.g., see + // DummyDB::live_files_) - for those cases, we mock those files' attributes + // so CreateNewBackup() can get their attributes. + void SetFilenamesForMockedAttrs(const std::vector& filenames) { + filenames_for_mocked_attrs_ = filenames; + } + Status GetChildrenFileAttributes( + const std::string& dir, std::vector* r) override { + if (filenames_for_mocked_attrs_.size() > 0) { + for (const auto& filename : filenames_for_mocked_attrs_) { + r->push_back({dir + filename, 10 /* size_bytes */}); + } + return Status::OK(); + } + return EnvWrapper::GetChildrenFileAttributes(dir, r); + } + Status GetFileSize(const std::string& path, uint64_t* size_bytes) override { + if (filenames_for_mocked_attrs_.size() > 0) { + auto fname = path.substr(path.find_last_of('/')); + auto filename_iter = std::find(filenames_for_mocked_attrs_.begin(), + filenames_for_mocked_attrs_.end(), fname); + if (filename_iter != filenames_for_mocked_attrs_.end()) { + *size_bytes = 10; + return Status::OK(); + } + return Status::NotFound(fname); + } + return EnvWrapper::GetFileSize(path, size_bytes); + } + + void SetCreateDirIfMissingFailure(bool fail) { + create_dir_if_missing_failure_ = fail; + } + Status CreateDirIfMissing(const std::string& d) override { + if (create_dir_if_missing_failure_) { + return Status::IOError("SimulatedFailure"); + } + return EnvWrapper::CreateDirIfMissing(d); + } + + void SetNewDirectoryFailure(bool fail) { new_directory_failure_ = fail; } + Status NewDirectory(const std::string& name, + std::unique_ptr* result) override { + if (new_directory_failure_) { + return Status::IOError("SimulatedFailure"); + } + return EnvWrapper::NewDirectory(name, result); + } + + void ClearFileOpenCounters() { + MutexLock l(&mutex_); + num_rand_readers_ = 0; + num_direct_rand_readers_ = 0; + num_seq_readers_ = 0; + num_direct_seq_readers_ = 0; + num_writers_ = 0; + num_direct_writers_ = 0; + } + + int num_rand_readers() { return num_rand_readers_; } + int num_direct_rand_readers() { return num_direct_rand_readers_; } + int num_seq_readers() { return num_seq_readers_; } + int num_direct_seq_readers() { return num_direct_seq_readers_; } + int num_writers() { return num_writers_; } + int num_direct_writers() { return num_direct_writers_; } + + private: + port::Mutex mutex_; + bool dummy_sequential_file_ = false; + bool dummy_sequential_file_fail_reads_ = false; + std::vector written_files_; + std::vector filenames_for_mocked_attrs_; + uint64_t limit_written_files_ = 1000000; + uint64_t limit_delete_files_ = 1000000; + bool fail_delete_files_ = false; + + bool get_children_failure_ = false; + bool create_dir_if_missing_failure_ = false; + bool new_directory_failure_ = false; + + // Keeps track of how many files of each type were successfully opened, and + // out of those, how many were opened with direct I/O. + std::atomic num_rand_readers_; + std::atomic num_direct_rand_readers_; + std::atomic num_seq_readers_; + std::atomic num_direct_seq_readers_; + std::atomic num_writers_; + std::atomic num_direct_writers_; +}; // TestEnv + +class FileManager : public EnvWrapper { + public: + explicit FileManager(Env* t) : EnvWrapper(t), rnd_(5) {} + + Status DeleteRandomFileInDir(const std::string& dir) { + std::vector children; + GetChildren(dir, &children); + if (children.size() <= 2) { // . and .. + return Status::NotFound(""); + } + while (true) { + int i = rnd_.Next() % children.size(); + if (children[i] != "." && children[i] != "..") { + return DeleteFile(dir + "/" + children[i]); + } + } + // should never get here + assert(false); + return Status::NotFound(""); + } + + Status AppendToRandomFileInDir(const std::string& dir, + const std::string& data) { + std::vector children; + GetChildren(dir, &children); + if (children.size() <= 2) { + return Status::NotFound(""); + } + while (true) { + int i = rnd_.Next() % children.size(); + if (children[i] != "." && children[i] != "..") { + return WriteToFile(dir + "/" + children[i], data); + } + } + // should never get here + assert(false); + return Status::NotFound(""); + } + + Status CorruptFile(const std::string& fname, uint64_t bytes_to_corrupt) { + std::string file_contents; + Status s = ReadFileToString(this, fname, &file_contents); + if (!s.ok()) { + return s; + } + s = DeleteFile(fname); + if (!s.ok()) { + return s; + } + + for (uint64_t i = 0; i < bytes_to_corrupt; ++i) { + std::string tmp; + test::RandomString(&rnd_, 1, &tmp); + file_contents[rnd_.Next() % file_contents.size()] = tmp[0]; + } + return WriteToFile(fname, file_contents); + } + + Status CorruptChecksum(const std::string& fname, bool appear_valid) { + std::string metadata; + Status s = ReadFileToString(this, fname, &metadata); + if (!s.ok()) { + return s; + } + s = DeleteFile(fname); + if (!s.ok()) { + return s; + } + + auto pos = metadata.find("private"); + if (pos == std::string::npos) { + return Status::Corruption("private file is expected"); + } + pos = metadata.find(" crc32 ", pos + 6); + if (pos == std::string::npos) { + return Status::Corruption("checksum not found"); + } + + if (metadata.size() < pos + 7) { + return Status::Corruption("bad CRC32 checksum value"); + } + + if (appear_valid) { + if (metadata[pos + 8] == '\n') { + // single digit value, safe to insert one more digit + metadata.insert(pos + 8, 1, '0'); + } else { + metadata.erase(pos + 8, 1); + } + } else { + metadata[pos + 7] = 'a'; + } + + return WriteToFile(fname, metadata); + } + + Status WriteToFile(const std::string& fname, const std::string& data) { + std::unique_ptr file; + EnvOptions env_options; + env_options.use_mmap_writes = false; + Status s = EnvWrapper::NewWritableFile(fname, &file, env_options); + if (!s.ok()) { + return s; + } + return file->Append(Slice(data)); + } + + private: + Random rnd_; +}; // FileManager + +// utility functions +static size_t FillDB(DB* db, int from, int to) { + size_t bytes_written = 0; + for (int i = from; i < to; ++i) { + std::string key = "testkey" + ToString(i); + std::string value = "testvalue" + ToString(i); + bytes_written += key.size() + value.size(); + + EXPECT_OK(db->Put(WriteOptions(), Slice(key), Slice(value))); + } + return bytes_written; +} + +static void AssertExists(DB* db, int from, int to) { + for (int i = from; i < to; ++i) { + std::string key = "testkey" + ToString(i); + std::string value; + Status s = db->Get(ReadOptions(), Slice(key), &value); + ASSERT_EQ(value, "testvalue" + ToString(i)); + } +} + +static void AssertEmpty(DB* db, int from, int to) { + for (int i = from; i < to; ++i) { + std::string key = "testkey" + ToString(i); + std::string value = "testvalue" + ToString(i); + + Status s = db->Get(ReadOptions(), Slice(key), &value); + ASSERT_TRUE(s.IsNotFound()); + } +} + +class BackupableDBTest : public testing::Test { + public: + enum ShareOption { + kNoShare, + kShareNoChecksum, + kShareWithChecksum, + }; + + const std::vector kAllShareOptions = { + kNoShare, kShareNoChecksum, kShareWithChecksum}; + + BackupableDBTest() { + // set up files + std::string db_chroot = test::PerThreadDBPath("backupable_db"); + std::string backup_chroot = test::PerThreadDBPath("backupable_db_backup"); + Env::Default()->CreateDir(db_chroot); + Env::Default()->CreateDir(backup_chroot); + dbname_ = "/tempdb"; + backupdir_ = "/tempbk"; + + // set up envs + db_chroot_env_.reset(NewChrootEnv(Env::Default(), db_chroot)); + backup_chroot_env_.reset(NewChrootEnv(Env::Default(), backup_chroot)); + test_db_env_.reset(new TestEnv(db_chroot_env_.get())); + test_backup_env_.reset(new TestEnv(backup_chroot_env_.get())); + file_manager_.reset(new FileManager(backup_chroot_env_.get())); + + // set up db options + options_.create_if_missing = true; + options_.paranoid_checks = true; + options_.write_buffer_size = 1 << 17; // 128KB + options_.env = test_db_env_.get(); + options_.wal_dir = dbname_; + + // Create logger + DBOptions logger_options; + logger_options.env = db_chroot_env_.get(); + CreateLoggerFromOptions(dbname_, logger_options, &logger_); + + // set up backup db options + backupable_options_.reset(new BackupableDBOptions( + backupdir_, test_backup_env_.get(), true, logger_.get(), true)); + + // most tests will use multi-threaded backups + backupable_options_->max_background_operations = 7; + + // delete old files in db + DestroyDB(dbname_, options_); + } + + DB* OpenDB() { + DB* db; + EXPECT_OK(DB::Open(options_, dbname_, &db)); + return db; + } + + void OpenDBAndBackupEngine(bool destroy_old_data = false, bool dummy = false, + ShareOption shared_option = kShareNoChecksum) { + // reset all the defaults + test_backup_env_->SetLimitWrittenFiles(1000000); + test_db_env_->SetLimitWrittenFiles(1000000); + test_db_env_->SetDummySequentialFile(dummy); + + DB* db; + if (dummy) { + dummy_db_ = new DummyDB(options_, dbname_); + db = dummy_db_; + } else { + ASSERT_OK(DB::Open(options_, dbname_, &db)); + } + db_.reset(db); + backupable_options_->destroy_old_data = destroy_old_data; + backupable_options_->share_table_files = shared_option != kNoShare; + backupable_options_->share_files_with_checksum = + shared_option == kShareWithChecksum; + BackupEngine* backup_engine; + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *backupable_options_, + &backup_engine)); + backup_engine_.reset(backup_engine); + } + + void CloseDBAndBackupEngine() { + db_.reset(); + backup_engine_.reset(); + } + + void OpenBackupEngine() { + backupable_options_->destroy_old_data = false; + BackupEngine* backup_engine; + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *backupable_options_, + &backup_engine)); + backup_engine_.reset(backup_engine); + } + + void CloseBackupEngine() { backup_engine_.reset(nullptr); } + + // restores backup backup_id and asserts the existence of + // [start_exist, end_exist> and not-existence of + // [end_exist, end> + // + // if backup_id == 0, it means restore from latest + // if end == 0, don't check AssertEmpty + void AssertBackupConsistency(BackupID backup_id, uint32_t start_exist, + uint32_t end_exist, uint32_t end = 0, + bool keep_log_files = false) { + RestoreOptions restore_options(keep_log_files); + bool opened_backup_engine = false; + if (backup_engine_.get() == nullptr) { + opened_backup_engine = true; + OpenBackupEngine(); + } + if (backup_id > 0) { + ASSERT_OK(backup_engine_->RestoreDBFromBackup(backup_id, dbname_, dbname_, + restore_options)); + } else { + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_, + restore_options)); + } + DB* db = OpenDB(); + AssertExists(db, start_exist, end_exist); + if (end != 0) { + AssertEmpty(db, end_exist, end); + } + delete db; + if (opened_backup_engine) { + CloseBackupEngine(); + } + } + + void DeleteLogFiles() { + std::vector delete_logs; + db_chroot_env_->GetChildren(dbname_, &delete_logs); + for (auto f : delete_logs) { + uint64_t number; + FileType type; + bool ok = ParseFileName(f, &number, &type); + if (ok && type == kLogFile) { + db_chroot_env_->DeleteFile(dbname_ + "/" + f); + } + } + } + + // files + std::string dbname_; + std::string backupdir_; + + // logger_ must be above backup_engine_ such that the engine's destructor, + // which uses a raw pointer to the logger, executes first. + std::shared_ptr logger_; + + // envs + std::unique_ptr db_chroot_env_; + std::unique_ptr backup_chroot_env_; + std::unique_ptr test_db_env_; + std::unique_ptr test_backup_env_; + std::unique_ptr file_manager_; + + // all the dbs! + DummyDB* dummy_db_; // BackupableDB owns dummy_db_ + std::unique_ptr db_; + std::unique_ptr backup_engine_; + + // options + Options options_; + + protected: + std::unique_ptr backupable_options_; +}; // BackupableDBTest + +void AppendPath(const std::string& path, std::vector& v) { + for (auto& f : v) { + f = path + f; + } +} + +class BackupableDBTestWithParam : public BackupableDBTest, + public testing::WithParamInterface { + public: + BackupableDBTestWithParam() { + backupable_options_->share_files_with_checksum = GetParam(); + } +}; + +// This test verifies that the verifyBackup method correctly identifies +// invalid backups +TEST_P(BackupableDBTestWithParam, VerifyBackup) { + const int keys_iteration = 5000; + Random rnd(6); + Status s; + OpenDBAndBackupEngine(true); + // create five backups + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + // ---------- case 1. - valid backup ----------- + ASSERT_TRUE(backup_engine_->VerifyBackup(1).ok()); + + // ---------- case 2. - delete a file -----------i + file_manager_->DeleteRandomFileInDir(backupdir_ + "/private/1"); + ASSERT_TRUE(backup_engine_->VerifyBackup(1).IsNotFound()); + + // ---------- case 3. - corrupt a file ----------- + std::string append_data = "Corrupting a random file"; + file_manager_->AppendToRandomFileInDir(backupdir_ + "/private/2", + append_data); + ASSERT_TRUE(backup_engine_->VerifyBackup(2).IsCorruption()); + + // ---------- case 4. - invalid backup ----------- + ASSERT_TRUE(backup_engine_->VerifyBackup(6).IsNotFound()); + CloseDBAndBackupEngine(); +} + +// open DB, write, close DB, backup, restore, repeat +TEST_P(BackupableDBTestWithParam, OfflineIntegrationTest) { + // has to be a big number, so that it triggers the memtable flush + const int keys_iteration = 5000; + const int max_key = keys_iteration * 4 + 10; + // first iter -- flush before backup + // second iter -- don't flush before backup + for (int iter = 0; iter < 2; ++iter) { + // delete old data + DestroyDB(dbname_, options_); + bool destroy_data = true; + + // every iteration -- + // 1. insert new data in the DB + // 2. backup the DB + // 3. destroy the db + // 4. restore the db, check everything is still there + for (int i = 0; i < 5; ++i) { + // in last iteration, put smaller amount of data, + int fill_up_to = std::min(keys_iteration * (i + 1), max_key); + // ---- insert new data and back up ---- + OpenDBAndBackupEngine(destroy_data); + destroy_data = false; + FillDB(db_.get(), keys_iteration * i, fill_up_to); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), iter == 0)); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); + + // ---- make sure it's empty ---- + DB* db = OpenDB(); + AssertEmpty(db, 0, fill_up_to); + delete db; + + // ---- restore the DB ---- + OpenBackupEngine(); + if (i >= 3) { // test purge old backups + // when i == 4, purge to only 1 backup + // when i == 3, purge to 2 backups + ASSERT_OK(backup_engine_->PurgeOldBackups(5 - i)); + } + // ---- make sure the data is there --- + AssertBackupConsistency(0, 0, fill_up_to, max_key); + CloseBackupEngine(); + } + } +} + +// open DB, write, backup, write, backup, close, restore +TEST_P(BackupableDBTestWithParam, OnlineIntegrationTest) { + // has to be a big number, so that it triggers the memtable flush + const int keys_iteration = 5000; + const int max_key = keys_iteration * 4 + 10; + Random rnd(7); + // delete old data + DestroyDB(dbname_, options_); + + OpenDBAndBackupEngine(true); + // write some data, backup, repeat + for (int i = 0; i < 5; ++i) { + if (i == 4) { + // delete backup number 2, online delete! + ASSERT_OK(backup_engine_->DeleteBackup(2)); + } + // in last iteration, put smaller amount of data, + // so that backups can share sst files + int fill_up_to = std::min(keys_iteration * (i + 1), max_key); + FillDB(db_.get(), keys_iteration * i, fill_up_to); + // we should get consistent results with flush_before_backup + // set to both true and false + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + } + // close and destroy + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); + + // ---- make sure it's empty ---- + DB* db = OpenDB(); + AssertEmpty(db, 0, max_key); + delete db; + + // ---- restore every backup and verify all the data is there ---- + OpenBackupEngine(); + for (int i = 1; i <= 5; ++i) { + if (i == 2) { + // we deleted backup 2 + Status s = backup_engine_->RestoreDBFromBackup(2, dbname_, dbname_); + ASSERT_TRUE(!s.ok()); + } else { + int fill_up_to = std::min(keys_iteration * i, max_key); + AssertBackupConsistency(i, 0, fill_up_to, max_key); + } + } + + // delete some backups -- this should leave only backups 3 and 5 alive + ASSERT_OK(backup_engine_->DeleteBackup(4)); + ASSERT_OK(backup_engine_->PurgeOldBackups(2)); + + std::vector backup_info; + backup_engine_->GetBackupInfo(&backup_info); + ASSERT_EQ(2UL, backup_info.size()); + + // check backup 3 + AssertBackupConsistency(3, 0, 3 * keys_iteration, max_key); + // check backup 5 + AssertBackupConsistency(5, 0, max_key); + + CloseBackupEngine(); +} + +INSTANTIATE_TEST_CASE_P(BackupableDBTestWithParam, BackupableDBTestWithParam, + ::testing::Bool()); + +// this will make sure that backup does not copy the same file twice +TEST_F(BackupableDBTest, NoDoubleCopy_And_AutoGC) { + OpenDBAndBackupEngine(true, true); + + // should write 5 DB files + one meta file + test_backup_env_->SetLimitWrittenFiles(7); + test_backup_env_->ClearWrittenFiles(); + test_db_env_->SetLimitWrittenFiles(0); + dummy_db_->live_files_ = {"/00010.sst", "/00011.sst", "/CURRENT", + "/MANIFEST-01"}; + dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; + test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + std::vector should_have_written = { + "/shared/.00010.sst.tmp", "/shared/.00011.sst.tmp", "/private/1/CURRENT", + "/private/1/MANIFEST-01", "/private/1/00011.log", "/meta/.1.tmp"}; + AppendPath(backupdir_, should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); + + char db_number = '1'; + + for (std::string other_sst : {"00015.sst", "00017.sst", "00019.sst"}) { + // should write 4 new DB files + one meta file + // should not write/copy 00010.sst, since it's already there! + test_backup_env_->SetLimitWrittenFiles(6); + test_backup_env_->ClearWrittenFiles(); + + dummy_db_->live_files_ = {"/00010.sst", "/" + other_sst, "/CURRENT", + "/MANIFEST-01"}; + dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; + test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + // should not open 00010.sst - it's already there + + ++db_number; + std::string private_dir = std::string("/private/") + db_number; + should_have_written = { + "/shared/." + other_sst + ".tmp", private_dir + "/CURRENT", + private_dir + "/MANIFEST-01", private_dir + "/00011.log", + std::string("/meta/.") + db_number + ".tmp"}; + AppendPath(backupdir_, should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); + } + + ASSERT_OK(backup_engine_->DeleteBackup(1)); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00010.sst")); + + // 00011.sst was only in backup 1, should be deleted + ASSERT_EQ(Status::NotFound(), + test_backup_env_->FileExists(backupdir_ + "/shared/00011.sst")); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00015.sst")); + + // MANIFEST file size should be only 100 + uint64_t size = 0; + test_backup_env_->GetFileSize(backupdir_ + "/private/2/MANIFEST-01", &size); + ASSERT_EQ(100UL, size); + test_backup_env_->GetFileSize(backupdir_ + "/shared/00015.sst", &size); + ASSERT_EQ(200UL, size); + + CloseBackupEngine(); + + // + // Now simulate incomplete delete by removing just meta + // + ASSERT_OK(test_backup_env_->DeleteFile(backupdir_ + "/meta/2")); + + OpenBackupEngine(); + + // 1 appears to be removed, so + // 2 non-corrupt and 0 corrupt seen + std::vector backup_info; + std::vector corrupt_backup_ids; + backup_engine_->GetBackupInfo(&backup_info); + backup_engine_->GetCorruptedBackups(&corrupt_backup_ids); + ASSERT_EQ(2UL, backup_info.size()); + ASSERT_EQ(0UL, corrupt_backup_ids.size()); + + // Keep the two we see, but this should suffice to purge unreferenced + // shared files from incomplete delete. + ASSERT_OK(backup_engine_->PurgeOldBackups(2)); + + // Make sure dangling sst file has been removed (somewhere along this + // process). GarbageCollect should not be needed. + ASSERT_EQ(Status::NotFound(), + test_backup_env_->FileExists(backupdir_ + "/shared/00015.sst")); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst")); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst")); + + // Now actually purge a good one + ASSERT_OK(backup_engine_->PurgeOldBackups(1)); + + ASSERT_EQ(Status::NotFound(), + test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst")); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst")); + + CloseDBAndBackupEngine(); +} + +// test various kind of corruptions that may happen: +// 1. Not able to write a file for backup - that backup should fail, +// everything else should work +// 2. Corrupted backup meta file or missing backuped file - we should +// not be able to open that backup, but all other backups should be +// fine +// 3. Corrupted checksum value - if the checksum is not a valid uint32_t, +// db open should fail, otherwise, it aborts during the restore process. +TEST_F(BackupableDBTest, CorruptionsTest) { + const int keys_iteration = 5000; + Random rnd(6); + Status s; + + OpenDBAndBackupEngine(true); + // create five backups + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + } + + // ---------- case 1. - fail a write ----------- + // try creating backup 6, but fail a write + FillDB(db_.get(), keys_iteration * 5, keys_iteration * 6); + test_backup_env_->SetLimitWrittenFiles(2); + // should fail + s = backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2)); + ASSERT_TRUE(!s.ok()); + test_backup_env_->SetLimitWrittenFiles(1000000); + // latest backup should have all the keys + CloseDBAndBackupEngine(); + AssertBackupConsistency(0, 0, keys_iteration * 5, keys_iteration * 6); + + // --------- case 2. corrupted backup meta or missing backuped file ---- + ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/5", 3)); + // since 5 meta is now corrupted, latest backup should be 4 + AssertBackupConsistency(0, 0, keys_iteration * 4, keys_iteration * 5); + OpenBackupEngine(); + s = backup_engine_->RestoreDBFromBackup(5, dbname_, dbname_); + ASSERT_TRUE(!s.ok()); + CloseBackupEngine(); + ASSERT_OK(file_manager_->DeleteRandomFileInDir(backupdir_ + "/private/4")); + // 4 is corrupted, 3 is the latest backup now + AssertBackupConsistency(0, 0, keys_iteration * 3, keys_iteration * 5); + OpenBackupEngine(); + s = backup_engine_->RestoreDBFromBackup(4, dbname_, dbname_); + CloseBackupEngine(); + ASSERT_TRUE(!s.ok()); + + // --------- case 3. corrupted checksum value ---- + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false)); + // checksum of backup 3 is an invalid value, this can be detected at + // db open time, and it reverts to the previous backup automatically + AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5); + // checksum of the backup 2 appears to be valid, this can cause checksum + // mismatch and abort restore process + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true)); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/2")); + OpenBackupEngine(); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/2")); + s = backup_engine_->RestoreDBFromBackup(2, dbname_, dbname_); + ASSERT_TRUE(!s.ok()); + + // make sure that no corrupt backups have actually been deleted! + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/1")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/2")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/3")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/4")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/5")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/1")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/2")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/3")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/4")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/5")); + + // delete the corrupt backups and then make sure they're actually deleted + ASSERT_OK(backup_engine_->DeleteBackup(5)); + ASSERT_OK(backup_engine_->DeleteBackup(4)); + ASSERT_OK(backup_engine_->DeleteBackup(3)); + ASSERT_OK(backup_engine_->DeleteBackup(2)); + // Should not be needed anymore with auto-GC on DeleteBackup + //(void)backup_engine_->GarbageCollect(); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/meta/5")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/private/5")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/meta/4")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/private/4")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/meta/3")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/private/3")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/meta/2")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/private/2")); + + CloseBackupEngine(); + AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5); + + // new backup should be 2! + OpenDBAndBackupEngine(); + FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + CloseDBAndBackupEngine(); + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5); +} + +TEST_F(BackupableDBTest, InterruptCreationTest) { + // Interrupt backup creation by failing new writes and failing cleanup of the + // partial state. Then verify a subsequent backup can still succeed. + const int keys_iteration = 5000; + Random rnd(6); + + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, keys_iteration); + test_backup_env_->SetLimitWrittenFiles(2); + test_backup_env_->SetDeleteFileFailure(true); + // should fail creation + ASSERT_FALSE( + backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2)).ok()); + CloseDBAndBackupEngine(); + // should also fail cleanup so the tmp directory stays behind + ASSERT_OK(backup_chroot_env_->FileExists(backupdir_ + "/private/1/")); + + OpenDBAndBackupEngine(false /* destroy_old_data */); + test_backup_env_->SetLimitWrittenFiles(1000000); + test_backup_env_->SetDeleteFileFailure(false); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + // latest backup should have all the keys + CloseDBAndBackupEngine(); + AssertBackupConsistency(0, 0, keys_iteration); +} + +inline std::string OptionsPath(std::string ret, int backupID) { + ret += "/private/"; + ret += std::to_string(backupID); + ret += "/"; + return ret; +} + +// Backup the LATEST options file to +// "/private//OPTIONS" + +TEST_F(BackupableDBTest, BackupOptions) { + OpenDBAndBackupEngine(true); + for (int i = 1; i < 5; i++) { + std::string name; + std::vector filenames; + // Must reset() before reset(OpenDB()) again. + // Calling OpenDB() while *db_ is existing will cause LOCK issue + db_.reset(); + db_.reset(OpenDB()); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + ROCKSDB_NAMESPACE::GetLatestOptionsFileName(db_->GetName(), options_.env, + &name); + ASSERT_OK(file_manager_->FileExists(OptionsPath(backupdir_, i) + name)); + backup_chroot_env_->GetChildren(OptionsPath(backupdir_, i), &filenames); + for (auto fn : filenames) { + if (fn.compare(0, 7, "OPTIONS") == 0) { + ASSERT_EQ(name, fn); + } + } + } + + CloseDBAndBackupEngine(); +} + +TEST_F(BackupableDBTest, SetOptionsBackupRaceCondition) { + OpenDBAndBackupEngine(true); + SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "BackupableDBTest::SetOptionsBackupRaceCondition:BeforeSetOptions"}, + {"BackupableDBTest::SetOptionsBackupRaceCondition:AfterSetOptions", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::port::Thread setoptions_thread{[this]() { + TEST_SYNC_POINT( + "BackupableDBTest::SetOptionsBackupRaceCondition:BeforeSetOptions"); + DBImpl* dbi = static_cast(db_.get()); + // Change arbitrary option to trigger OPTIONS file deletion + ASSERT_OK(dbi->SetOptions(dbi->DefaultColumnFamily(), + {{"paranoid_file_checks", "false"}})); + ASSERT_OK(dbi->SetOptions(dbi->DefaultColumnFamily(), + {{"paranoid_file_checks", "true"}})); + ASSERT_OK(dbi->SetOptions(dbi->DefaultColumnFamily(), + {{"paranoid_file_checks", "false"}})); + TEST_SYNC_POINT( + "BackupableDBTest::SetOptionsBackupRaceCondition:AfterSetOptions"); + }}; + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + setoptions_thread.join(); + CloseDBAndBackupEngine(); +} + +// This test verifies we don't delete the latest backup when read-only option is +// set +TEST_F(BackupableDBTest, NoDeleteWithReadOnly) { + const int keys_iteration = 5000; + Random rnd(6); + Status s; + + OpenDBAndBackupEngine(true); + // create five backups + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + } + CloseDBAndBackupEngine(); + ASSERT_OK(file_manager_->WriteToFile(backupdir_ + "/LATEST_BACKUP", "4")); + + backupable_options_->destroy_old_data = false; + BackupEngineReadOnly* read_only_backup_engine; + ASSERT_OK(BackupEngineReadOnly::Open(backup_chroot_env_.get(), + *backupable_options_, + &read_only_backup_engine)); + + // assert that data from backup 5 is still here (even though LATEST_BACKUP + // says 4 is latest) + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/5")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/5")); + + // Behavior change: We now ignore LATEST_BACKUP contents. This means that + // we should have 5 backups, even if LATEST_BACKUP says 4. + std::vector backup_info; + read_only_backup_engine->GetBackupInfo(&backup_info); + ASSERT_EQ(5UL, backup_info.size()); + delete read_only_backup_engine; +} + +TEST_F(BackupableDBTest, FailOverwritingBackups) { + options_.write_buffer_size = 1024 * 1024 * 1024; // 1GB + options_.disable_auto_compactions = true; + + // create backups 1, 2, 3, 4, 5 + OpenDBAndBackupEngine(true); + for (int i = 0; i < 5; ++i) { + CloseDBAndBackupEngine(); + DeleteLogFiles(); + OpenDBAndBackupEngine(false); + FillDB(db_.get(), 100 * i, 100 * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + // restore 3 + OpenBackupEngine(); + ASSERT_OK(backup_engine_->RestoreDBFromBackup(3, dbname_, dbname_)); + CloseBackupEngine(); + + OpenDBAndBackupEngine(false); + FillDB(db_.get(), 0, 300); + Status s = backup_engine_->CreateNewBackup(db_.get(), true); + // the new backup fails because new table files + // clash with old table files from backups 4 and 5 + // (since write_buffer_size is huge, we can be sure that + // each backup will generate only one sst file and that + // a file generated by a new backup is the same as + // sst file generated by backup 4) + ASSERT_TRUE(s.IsCorruption()); + ASSERT_OK(backup_engine_->DeleteBackup(4)); + ASSERT_OK(backup_engine_->DeleteBackup(5)); + // now, the backup can succeed + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); +} + +TEST_F(BackupableDBTest, NoShareTableFiles) { + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true, false, kNoShare); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2))); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } +} + +// Verify that you can backup and restore with share_files_with_checksum on +TEST_F(BackupableDBTest, ShareTableFilesWithChecksums) { + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2))); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } +} + +// Verify that you can backup and restore using share_files_with_checksum set to +// false and then transition this option to true +TEST_F(BackupableDBTest, ShareTableFilesWithChecksumsTransition) { + const int keys_iteration = 5000; + // set share_files_with_checksum to false + OpenDBAndBackupEngine(true, false, kShareNoChecksum); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } + + // set share_files_with_checksum to true and do some more backups + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + for (int i = 5; i < 10; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + // Verify first (about to delete) + AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 11); + + // For an extra challenge, make sure that GarbageCollect / DeleteBackup + // is OK even if we open without share_table_files + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + backup_engine_->DeleteBackup(1); + backup_engine_->GarbageCollect(); + CloseDBAndBackupEngine(); + + // Verify rest (not deleted) + for (int i = 1; i < 10; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 11); + } +} + +// This test simulates cleaning up after aborted or incomplete creation +// of a new backup. +TEST_F(BackupableDBTest, DeleteTmpFiles) { + for (int cleanup_fn : {1, 2, 3, 4}) { + for (ShareOption shared_option : kAllShareOptions) { + OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */, + shared_option); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + BackupID next_id = 1; + BackupID oldest_id = std::numeric_limits::max(); + { + std::vector backup_info; + backup_engine_->GetBackupInfo(&backup_info); + for (const auto& bi : backup_info) { + next_id = std::max(next_id, bi.backup_id + 1); + oldest_id = std::min(oldest_id, bi.backup_id); + } + } + CloseDBAndBackupEngine(); + + // An aborted or incomplete new backup will always be in the next + // id (maybe more) + std::string next_private = "private/" + std::to_string(next_id); + + // NOTE: both shared and shared_checksum should be cleaned up + // regardless of how the backup engine is opened. + std::vector tmp_files_and_dirs; + for (const auto& dir_and_file : { + std::make_pair(std::string("shared"), + std::string(".00006.sst.tmp")), + std::make_pair(std::string("shared_checksum"), + std::string(".00007.sst.tmp")), + std::make_pair(next_private, std::string("00003.sst")), + }) { + std::string dir = backupdir_ + "/" + dir_and_file.first; + file_manager_->CreateDir(dir); + ASSERT_OK(file_manager_->FileExists(dir)); + + std::string file = dir + "/" + dir_and_file.second; + file_manager_->WriteToFile(file, "tmp"); + ASSERT_OK(file_manager_->FileExists(file)); + + tmp_files_and_dirs.push_back(file); + } + if (cleanup_fn != /*CreateNewBackup*/ 4) { + // This exists after CreateNewBackup because it's deleted then + // re-created. + tmp_files_and_dirs.push_back(backupdir_ + "/" + next_private); + } + + OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */, + shared_option); + // Need to call one of these explicitly to delete tmp files + switch (cleanup_fn) { + case 1: + ASSERT_OK(backup_engine_->GarbageCollect()); + break; + case 2: + ASSERT_OK(backup_engine_->DeleteBackup(oldest_id)); + break; + case 3: + ASSERT_OK(backup_engine_->PurgeOldBackups(1)); + break; + case 4: + // Does a garbage collect if it sees that next private dir exists + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + break; + default: + assert(false); + } + CloseDBAndBackupEngine(); + for (std::string file_or_dir : tmp_files_and_dirs) { + if (file_manager_->FileExists(file_or_dir) != Status::NotFound()) { + FAIL() << file_or_dir << " was expected to be deleted." << cleanup_fn; + } + } + } + } +} + +TEST_F(BackupableDBTest, KeepLogFiles) { + backupable_options_->backup_log_files = false; + // basically infinite + options_.WAL_ttl_seconds = 24 * 60 * 60; + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + ASSERT_OK(db_->Flush(FlushOptions())); + FillDB(db_.get(), 100, 200); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + FillDB(db_.get(), 200, 300); + ASSERT_OK(db_->Flush(FlushOptions())); + FillDB(db_.get(), 300, 400); + ASSERT_OK(db_->Flush(FlushOptions())); + FillDB(db_.get(), 400, 500); + ASSERT_OK(db_->Flush(FlushOptions())); + CloseDBAndBackupEngine(); + + // all data should be there if we call with keep_log_files = true + AssertBackupConsistency(0, 0, 500, 600, true); +} + +TEST_F(BackupableDBTest, RateLimiting) { + size_t const kMicrosPerSec = 1000 * 1000LL; + uint64_t const MB = 1024 * 1024; + + const std::vector> limits( + {{1 * MB, 5 * MB}, {2 * MB, 3 * MB}}); + + std::shared_ptr backupThrottler(NewGenericRateLimiter(1)); + std::shared_ptr restoreThrottler(NewGenericRateLimiter(1)); + + for (bool makeThrottler : {false, true}) { + if (makeThrottler) { + backupable_options_->backup_rate_limiter = backupThrottler; + backupable_options_->restore_rate_limiter = restoreThrottler; + } + // iter 0 -- single threaded + // iter 1 -- multi threaded + for (int iter = 0; iter < 2; ++iter) { + for (const auto& limit : limits) { + // destroy old data + DestroyDB(dbname_, Options()); + if (makeThrottler) { + backupThrottler->SetBytesPerSecond(limit.first); + restoreThrottler->SetBytesPerSecond(limit.second); + } else { + backupable_options_->backup_rate_limit = limit.first; + backupable_options_->restore_rate_limit = limit.second; + } + backupable_options_->max_background_operations = (iter == 0) ? 1 : 10; + options_.compression = kNoCompression; + OpenDBAndBackupEngine(true); + size_t bytes_written = FillDB(db_.get(), 0, 100000); + + auto start_backup = db_chroot_env_->NowMicros(); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + auto backup_time = db_chroot_env_->NowMicros() - start_backup; + auto rate_limited_backup_time = + (bytes_written * kMicrosPerSec) / limit.first; + ASSERT_GT(backup_time, 0.8 * rate_limited_backup_time); + + CloseDBAndBackupEngine(); + + OpenBackupEngine(); + auto start_restore = db_chroot_env_->NowMicros(); + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + auto restore_time = db_chroot_env_->NowMicros() - start_restore; + CloseBackupEngine(); + auto rate_limited_restore_time = + (bytes_written * kMicrosPerSec) / limit.second; + ASSERT_GT(restore_time, 0.8 * rate_limited_restore_time); + + AssertBackupConsistency(0, 0, 100000, 100010); + } + } + } +} + +TEST_F(BackupableDBTest, ReadOnlyBackupEngine) { + DestroyDB(dbname_, options_); + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + FillDB(db_.get(), 100, 200); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); + + backupable_options_->destroy_old_data = false; + test_backup_env_->ClearWrittenFiles(); + test_backup_env_->SetLimitDeleteFiles(0); + BackupEngineReadOnly* read_only_backup_engine; + ASSERT_OK(BackupEngineReadOnly::Open( + db_chroot_env_.get(), *backupable_options_, &read_only_backup_engine)); + std::vector backup_info; + read_only_backup_engine->GetBackupInfo(&backup_info); + ASSERT_EQ(backup_info.size(), 2U); + + RestoreOptions restore_options(false); + ASSERT_OK(read_only_backup_engine->RestoreDBFromLatestBackup( + dbname_, dbname_, restore_options)); + delete read_only_backup_engine; + std::vector should_have_written; + test_backup_env_->AssertWrittenFiles(should_have_written); + + DB* db = OpenDB(); + AssertExists(db, 0, 200); + delete db; +} + +TEST_F(BackupableDBTest, ProgressCallbackDuringBackup) { + DestroyDB(dbname_, options_); + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + bool is_callback_invoked = false; + ASSERT_OK(backup_engine_->CreateNewBackup( + db_.get(), true, + [&is_callback_invoked]() { is_callback_invoked = true; })); + + ASSERT_TRUE(is_callback_invoked); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); +} + +TEST_F(BackupableDBTest, GarbageCollectionBeforeBackup) { + DestroyDB(dbname_, options_); + OpenDBAndBackupEngine(true); + + backup_chroot_env_->CreateDirIfMissing(backupdir_ + "/shared"); + std::string file_five = backupdir_ + "/shared/000007.sst"; + std::string file_five_contents = "I'm not really a sst file"; + // this depends on the fact that 00007.sst is the first file created by the DB + ASSERT_OK(file_manager_->WriteToFile(file_five, file_five_contents)); + + FillDB(db_.get(), 0, 100); + // backup overwrites file 000007.sst + ASSERT_TRUE(backup_engine_->CreateNewBackup(db_.get(), true).ok()); + + std::string new_file_five_contents; + ASSERT_OK(ReadFileToString(backup_chroot_env_.get(), file_five, + &new_file_five_contents)); + // file 000007.sst was overwritten + ASSERT_TRUE(new_file_five_contents != file_five_contents); + + CloseDBAndBackupEngine(); + + AssertBackupConsistency(0, 0, 100); +} + +// Test that we properly propagate Env failures +TEST_F(BackupableDBTest, EnvFailures) { + BackupEngine* backup_engine; + + // get children failure + { + test_backup_env_->SetGetChildrenFailure(true); + ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *backupable_options_, + &backup_engine)); + test_backup_env_->SetGetChildrenFailure(false); + } + + // created dir failure + { + test_backup_env_->SetCreateDirIfMissingFailure(true); + ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *backupable_options_, + &backup_engine)); + test_backup_env_->SetCreateDirIfMissingFailure(false); + } + + // new directory failure + { + test_backup_env_->SetNewDirectoryFailure(true); + ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *backupable_options_, + &backup_engine)); + test_backup_env_->SetNewDirectoryFailure(false); + } + + // Read from meta-file failure + { + DestroyDB(dbname_, options_); + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + ASSERT_TRUE(backup_engine_->CreateNewBackup(db_.get(), true).ok()); + CloseDBAndBackupEngine(); + test_backup_env_->SetDummySequentialFile(true); + test_backup_env_->SetDummySequentialFileFailReads(true); + backupable_options_->destroy_old_data = false; + ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *backupable_options_, + &backup_engine)); + test_backup_env_->SetDummySequentialFile(false); + test_backup_env_->SetDummySequentialFileFailReads(false); + } + + // no failure + { + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *backupable_options_, + &backup_engine)); + delete backup_engine; + } +} + +// Verify manifest can roll while a backup is being created with the old +// manifest. +TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) { + DestroyDB(dbname_, options_); + options_.max_manifest_file_size = 0; // always rollover manifest for file add + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "VersionSet::LogAndApply:WriteManifest"}, + {"VersionSet::LogAndApply:WriteManifestDone", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread flush_thread{ + [this]() { ASSERT_OK(db_->Flush(FlushOptions())); }}; + + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + + flush_thread.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + // The last manifest roll would've already been cleaned up by the full scan + // that happens when CreateNewBackup invokes EnableFileDeletions. We need to + // trigger another roll to verify non-full scan purges stale manifests. + DBImpl* db_impl = reinterpret_cast(db_.get()); + std::string prev_manifest_path = + DescriptorFileName(dbname_, db_impl->TEST_Current_Manifest_FileNo()); + FillDB(db_.get(), 0, 100); + ASSERT_OK(db_chroot_env_->FileExists(prev_manifest_path)); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_TRUE(db_chroot_env_->FileExists(prev_manifest_path).IsNotFound()); + + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); + AssertBackupConsistency(0, 0, 100); +} + +// see https://github.com/facebook/rocksdb/issues/921 +TEST_F(BackupableDBTest, Issue921Test) { + BackupEngine* backup_engine; + backupable_options_->share_table_files = false; + backup_chroot_env_->CreateDirIfMissing(backupable_options_->backup_dir); + backupable_options_->backup_dir += "/new_dir"; + ASSERT_OK(BackupEngine::Open(backup_chroot_env_.get(), *backupable_options_, + &backup_engine)); + + delete backup_engine; +} + +TEST_F(BackupableDBTest, BackupWithMetadata) { + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true); + // create five backups + for (int i = 0; i < 5; ++i) { + const std::string metadata = std::to_string(i); + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), metadata, true)); + } + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + std::vector backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(5, backup_infos.size()); + for (int i = 0; i < 5; i++) { + ASSERT_EQ(std::to_string(i), backup_infos[i].app_metadata); + } + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); +} + +TEST_F(BackupableDBTest, BinaryMetadata) { + OpenDBAndBackupEngine(true); + std::string binaryMetadata = "abc\ndef"; + binaryMetadata.push_back('\0'); + binaryMetadata.append("ghi"); + ASSERT_OK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), binaryMetadata)); + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + std::vector backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(1, backup_infos.size()); + ASSERT_EQ(binaryMetadata, backup_infos[0].app_metadata); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); +} + +TEST_F(BackupableDBTest, MetadataTooLarge) { + OpenDBAndBackupEngine(true); + std::string largeMetadata(1024 * 1024 + 1, 0); + ASSERT_NOK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), largeMetadata)); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); +} + +TEST_F(BackupableDBTest, LimitBackupsOpened) { + // Verify the specified max backups are opened, including skipping over + // corrupted backups. + // + // Setup: + // - backups 1, 2, and 4 are valid + // - backup 3 is corrupt + // - max_valid_backups_to_open == 2 + // + // Expectation: the engine opens backups 4 and 2 since those are latest two + // non-corrupt backups. + const int kNumKeys = 5000; + OpenDBAndBackupEngine(true); + for (int i = 1; i <= 4; ++i) { + FillDB(db_.get(), kNumKeys * i, kNumKeys * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + if (i == 3) { + ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/3", 3)); + } + } + CloseDBAndBackupEngine(); + + backupable_options_->max_valid_backups_to_open = 2; + backupable_options_->destroy_old_data = false; + BackupEngineReadOnly* read_only_backup_engine; + ASSERT_OK(BackupEngineReadOnly::Open(backup_chroot_env_.get(), + *backupable_options_, + &read_only_backup_engine)); + + std::vector backup_infos; + read_only_backup_engine->GetBackupInfo(&backup_infos); + ASSERT_EQ(2, backup_infos.size()); + ASSERT_EQ(2, backup_infos[0].backup_id); + ASSERT_EQ(4, backup_infos[1].backup_id); + delete read_only_backup_engine; +} + +TEST_F(BackupableDBTest, IgnoreLimitBackupsOpenedWhenNotReadOnly) { + // Verify the specified max_valid_backups_to_open is ignored if the engine + // is not read-only. + // + // Setup: + // - backups 1, 2, and 4 are valid + // - backup 3 is corrupt + // - max_valid_backups_to_open == 2 + // + // Expectation: the engine opens backups 4, 2, and 1 since those are latest + // non-corrupt backups, by ignoring max_valid_backups_to_open == 2. + const int kNumKeys = 5000; + OpenDBAndBackupEngine(true); + for (int i = 1; i <= 4; ++i) { + FillDB(db_.get(), kNumKeys * i, kNumKeys * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + if (i == 3) { + ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/3", 3)); + } + } + CloseDBAndBackupEngine(); + + backupable_options_->max_valid_backups_to_open = 2; + OpenDBAndBackupEngine(); + std::vector backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(3, backup_infos.size()); + ASSERT_EQ(1, backup_infos[0].backup_id); + ASSERT_EQ(2, backup_infos[1].backup_id); + ASSERT_EQ(4, backup_infos[2].backup_id); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, options_); +} + +TEST_F(BackupableDBTest, CreateWhenLatestBackupCorrupted) { + // we should pick an ID greater than corrupted backups' IDs so creation can + // succeed even when latest backup is corrupted. + const int kNumKeys = 5000; + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0 /* from */, kNumKeys); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + true /* flush_before_backup */)); + ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/1", + 3 /* bytes_to_corrupt */)); + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + true /* flush_before_backup */)); + std::vector backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(1, backup_infos.size()); + ASSERT_EQ(2, backup_infos[0].backup_id); +} + +TEST_F(BackupableDBTest, WriteOnlyEngineNoSharedFileDeletion) { + // Verifies a write-only BackupEngine does not delete files belonging to valid + // backups when GarbageCollect, PurgeOldBackups, or DeleteBackup are called. + const int kNumKeys = 5000; + for (int i = 0; i < 3; ++i) { + OpenDBAndBackupEngine(i == 0 /* destroy_old_data */); + FillDB(db_.get(), i * kNumKeys, (i + 1) * kNumKeys); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + backupable_options_->max_valid_backups_to_open = 0; + OpenDBAndBackupEngine(); + switch (i) { + case 0: + ASSERT_OK(backup_engine_->GarbageCollect()); + break; + case 1: + ASSERT_OK(backup_engine_->PurgeOldBackups(1 /* num_backups_to_keep */)); + break; + case 2: + ASSERT_OK(backup_engine_->DeleteBackup(2 /* backup_id */)); + break; + default: + assert(false); + } + CloseDBAndBackupEngine(); + + backupable_options_->max_valid_backups_to_open = port::kMaxInt32; + AssertBackupConsistency(i + 1, 0, (i + 1) * kNumKeys); + } +} + +TEST_P(BackupableDBTestWithParam, BackupUsingDirectIO) { + // Tests direct I/O on the backup engine's reads and writes on the DB env and + // backup env + // We use ChrootEnv underneath so the below line checks for direct I/O support + // in the chroot directory, not the true filesystem root. + if (!test::IsDirectIOSupported(test_db_env_.get(), "/")) { + return; + } + const int kNumKeysPerBackup = 100; + const int kNumBackups = 3; + options_.use_direct_reads = true; + OpenDBAndBackupEngine(true /* destroy_old_data */); + for (int i = 0; i < kNumBackups; ++i) { + FillDB(db_.get(), i * kNumKeysPerBackup /* from */, + (i + 1) * kNumKeysPerBackup /* to */); + ASSERT_OK(db_->Flush(FlushOptions())); + + // Clear the file open counters and then do a bunch of backup engine ops. + // For all ops, files should be opened in direct mode. + test_backup_env_->ClearFileOpenCounters(); + test_db_env_->ClearFileOpenCounters(); + CloseBackupEngine(); + OpenBackupEngine(); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + ASSERT_OK(backup_engine_->VerifyBackup(i + 1)); + CloseBackupEngine(); + OpenBackupEngine(); + std::vector backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(static_cast(i + 1), backup_infos.size()); + + // Verify backup engine always opened files with direct I/O + ASSERT_EQ(0, test_db_env_->num_writers()); + ASSERT_EQ(0, test_db_env_->num_rand_readers()); + ASSERT_GT(test_db_env_->num_direct_seq_readers(), 0); + // Currently the DB doesn't support reading WALs or manifest with direct + // I/O, so subtract two. + ASSERT_EQ(test_db_env_->num_seq_readers() - 2, + test_db_env_->num_direct_seq_readers()); + ASSERT_EQ(0, test_db_env_->num_rand_readers()); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < kNumBackups; ++i) { + AssertBackupConsistency(i + 1 /* backup_id */, + i * kNumKeysPerBackup /* start_exist */, + (i + 1) * kNumKeysPerBackup /* end_exist */, + (i + 2) * kNumKeysPerBackup /* end */); + } +} + +} // anon namespace + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as BackupableDB is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN) -- cgit v1.2.3