diff options
Diffstat (limited to 'src/rocksdb/utilities/backup')
-rw-r--r-- | src/rocksdb/utilities/backup/backup_engine.cc | 3181 | ||||
-rw-r--r-- | src/rocksdb/utilities/backup/backup_engine_impl.h | 36 | ||||
-rw-r--r-- | src/rocksdb/utilities/backup/backup_engine_test.cc | 4219 |
3 files changed, 7436 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/backup/backup_engine.cc b/src/rocksdb/utilities/backup/backup_engine.cc new file mode 100644 index 000000000..81b4a6629 --- /dev/null +++ b/src/rocksdb/utilities/backup/backup_engine.cc @@ -0,0 +1,3181 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE + +#include <algorithm> +#include <atomic> +#include <cinttypes> +#include <cstdlib> +#include <functional> +#include <future> +#include <limits> +#include <map> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "env/composite_env_wrapper.h" +#include "env/fs_readonly.h" +#include "env/fs_remap.h" +#include "file/filename.h" +#include "file/line_file_reader.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" +#include "logging/logging.h" +#include "monitoring/iostats_context_imp.h" +#include "options/options_helper.h" +#include "port/port.h" +#include "rocksdb/advanced_options.h" +#include "rocksdb/env.h" +#include "rocksdb/rate_limiter.h" +#include "rocksdb/statistics.h" +#include "rocksdb/transaction_log.h" +#include "table/sst_file_dumper.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/channel.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/math.h" +#include "util/rate_limiter.h" +#include "util/string_util.h" +#include "utilities/backup/backup_engine_impl.h" +#include "utilities/checkpoint/checkpoint_impl.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +using ShareFilesNaming = BackupEngineOptions::ShareFilesNaming; + +constexpr BackupID kLatestBackupIDMarker = static_cast<BackupID>(-2); + +inline uint32_t ChecksumHexToInt32(const std::string& checksum_hex) { + std::string checksum_str; + Slice(checksum_hex).DecodeHex(&checksum_str); + return EndianSwapValue(DecodeFixed32(checksum_str.c_str())); +} +inline std::string ChecksumStrToHex(const std::string& checksum_str) { + return Slice(checksum_str).ToString(true); +} +inline std::string ChecksumInt32ToHex(const uint32_t& checksum_value) { + std::string checksum_str; + PutFixed32(&checksum_str, EndianSwapValue(checksum_value)); + return ChecksumStrToHex(checksum_str); +} + +const std::string kPrivateDirName = "private"; +const std::string kMetaDirName = "meta"; +const std::string kSharedDirName = "shared"; +const std::string kSharedChecksumDirName = "shared_checksum"; +const std::string kPrivateDirSlash = kPrivateDirName + "/"; +const std::string kMetaDirSlash = kMetaDirName + "/"; +const std::string kSharedDirSlash = kSharedDirName + "/"; +const std::string kSharedChecksumDirSlash = kSharedChecksumDirName + "/"; + +} // namespace + +void BackupStatistics::IncrementNumberSuccessBackup() { + number_success_backup++; +} +void BackupStatistics::IncrementNumberFailBackup() { number_fail_backup++; } + +uint32_t BackupStatistics::GetNumberSuccessBackup() const { + return number_success_backup; +} +uint32_t BackupStatistics::GetNumberFailBackup() const { + return number_fail_backup; +} + +std::string BackupStatistics::ToString() const { + char result[50]; + snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u", + GetNumberSuccessBackup(), GetNumberFailBackup()); + return result; +} + +void BackupEngineOptions::Dump(Logger* logger) const { + ROCKS_LOG_INFO(logger, " Options.backup_dir: %s", + backup_dir.c_str()); + ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env); + ROCKS_LOG_INFO(logger, " Options.share_table_files: %d", + static_cast<int>(share_table_files)); + ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log); + ROCKS_LOG_INFO(logger, " Options.sync: %d", + static_cast<int>(sync)); + ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d", + static_cast<int>(destroy_old_data)); + ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d", + static_cast<int>(backup_log_files)); + ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64, + backup_rate_limit); + ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64, + restore_rate_limit); + ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d", + max_background_operations); +} + +namespace { +// -------- BackupEngineImpl class --------- +class BackupEngineImpl { + public: + BackupEngineImpl(const BackupEngineOptions& options, Env* db_env, + bool read_only = false); + ~BackupEngineImpl(); + + IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options, + DB* db, const std::string& app_metadata, + BackupID* new_backup_id_ptr); + + IOStatus PurgeOldBackups(uint32_t num_backups_to_keep); + + IOStatus DeleteBackup(BackupID backup_id); + + void StopBackup() { stop_backup_.store(true, std::memory_order_release); } + + IOStatus GarbageCollect(); + + // The returned BackupInfos are in chronological order, which means the + // latest backup comes last. + void GetBackupInfo(std::vector<BackupInfo>* backup_info, + bool include_file_details) const; + + Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info, + bool include_file_details = false) const; + + void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) const; + + IOStatus RestoreDBFromBackup(const RestoreOptions& options, + BackupID backup_id, const std::string& db_dir, + const std::string& wal_dir) const; + + IOStatus RestoreDBFromLatestBackup(const RestoreOptions& options, + const std::string& db_dir, + const std::string& wal_dir) const { + // Note: don't read latest_valid_backup_id_ outside of lock + return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir); + } + + IOStatus VerifyBackup(BackupID backup_id, + bool verify_with_checksum = false) const; + + IOStatus Initialize(); + + ShareFilesNaming GetNamingNoFlags() const { + return options_.share_files_with_checksum_naming & + BackupEngineOptions::kMaskNoNamingFlags; + } + ShareFilesNaming GetNamingFlags() const { + return options_.share_files_with_checksum_naming & + BackupEngineOptions::kMaskNamingFlags; + } + + void TEST_SetDefaultRateLimitersClock( + const std::shared_ptr<SystemClock>& backup_rate_limiter_clock, + const std::shared_ptr<SystemClock>& restore_rate_limiter_clock) { + if (backup_rate_limiter_clock) { + static_cast<GenericRateLimiter*>(options_.backup_rate_limiter.get()) + ->TEST_SetClock(backup_rate_limiter_clock); + } + + if (restore_rate_limiter_clock) { + static_cast<GenericRateLimiter*>(options_.restore_rate_limiter.get()) + ->TEST_SetClock(restore_rate_limiter_clock); + } + } + + private: + void DeleteChildren(const std::string& dir, + uint32_t file_type_filter = 0) const; + IOStatus DeleteBackupNoGC(BackupID backup_id); + + // Extends the "result" map with pathname->size mappings for the contents of + // "dir" in "env". Pathnames are prefixed with "dir". + IOStatus ReadChildFileCurrentSizes( + const std::string& dir, const std::shared_ptr<FileSystem>&, + std::unordered_map<std::string, uint64_t>* result) const; + + struct FileInfo { + FileInfo(const std::string& fname, uint64_t sz, const std::string& checksum, + const std::string& id, const std::string& sid, Temperature _temp) + : refs(0), + filename(fname), + size(sz), + checksum_hex(checksum), + db_id(id), + db_session_id(sid), + temp(_temp) {} + + FileInfo(const FileInfo&) = delete; + FileInfo& operator=(const FileInfo&) = delete; + + int refs; + const std::string filename; + const uint64_t size; + // crc32c checksum as hex. empty == unknown / unavailable + std::string checksum_hex; + // DB identities + // db_id is obtained for potential usage in the future but not used + // currently + const std::string db_id; + // db_session_id appears in the backup SST filename if the table naming + // option is kUseDbSessionId + const std::string db_session_id; + Temperature temp; + + std::string GetDbFileName() { + std::string rv; + // extract the filename part + size_t slash = filename.find_last_of('/'); + // file will either be shared/<file>, shared_checksum/<file_crc32c_size>, + // shared_checksum/<file_session>, shared_checksum/<file_crc32c_session>, + // or private/<number>/<file> + assert(slash != std::string::npos); + rv = filename.substr(slash + 1); + + // if the file was in shared_checksum, extract the real file name + // in this case the file is <number>_<checksum>_<size>.<type>, + // <number>_<session>.<type>, or <number>_<checksum>_<session>.<type> + if (filename.substr(0, slash) == kSharedChecksumDirName) { + rv = GetFileFromChecksumFile(rv); + } + return rv; + } + }; + + // TODO: deprecate this function once we migrate all BackupEngine's rate + // limiting to lower-level ones (i.e, ones in file access wrapper level like + // `WritableFileWriter`) + static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request, + RateLimiter* rate_limiter, + const Env::IOPriority pri, + Statistics* stats, + const RateLimiter::OpType op_type); + + static inline std::string WithoutTrailingSlash(const std::string& path) { + if (path.empty() || path.back() != '/') { + return path; + } else { + return path.substr(path.size() - 1); + } + } + + static inline std::string WithTrailingSlash(const std::string& path) { + if (path.empty() || path.back() != '/') { + return path + '/'; + } else { + return path; + } + } + + // A filesystem wrapper that makes shared backup files appear to be in the + // private backup directory (dst_dir), so that the private backup dir can + // be opened as a read-only DB. + class RemapSharedFileSystem : public RemapFileSystem { + public: + RemapSharedFileSystem(const std::shared_ptr<FileSystem>& base, + const std::string& dst_dir, + const std::string& src_base_dir, + const std::vector<std::shared_ptr<FileInfo>>& files) + : RemapFileSystem(base), + dst_dir_(WithoutTrailingSlash(dst_dir)), + dst_dir_slash_(WithTrailingSlash(dst_dir)), + src_base_dir_(WithTrailingSlash(src_base_dir)) { + for (auto& info : files) { + if (!StartsWith(info->filename, kPrivateDirSlash)) { + assert(StartsWith(info->filename, kSharedDirSlash) || + StartsWith(info->filename, kSharedChecksumDirSlash)); + remaps_[info->GetDbFileName()] = info; + } + } + } + + const char* Name() const override { + return "BackupEngineImpl::RemapSharedFileSystem"; + } + + // Sometimes a directory listing is required in opening a DB + IOStatus GetChildren(const std::string& dir, const IOOptions& options, + std::vector<std::string>* result, + IODebugContext* dbg) override { + IOStatus s = RemapFileSystem::GetChildren(dir, options, result, dbg); + if (s.ok() && (dir == dst_dir_ || dir == dst_dir_slash_)) { + // Assume remapped files exist + for (auto& r : remaps_) { + result->push_back(r.first); + } + } + return s; + } + + // Sometimes a directory listing is required in opening a DB + IOStatus GetChildrenFileAttributes(const std::string& dir, + const IOOptions& options, + std::vector<FileAttributes>* result, + IODebugContext* dbg) override { + IOStatus s = + RemapFileSystem::GetChildrenFileAttributes(dir, options, result, dbg); + if (s.ok() && (dir == dst_dir_ || dir == dst_dir_slash_)) { + // Assume remapped files exist with recorded size + for (auto& r : remaps_) { + result->emplace_back(); // clean up with C++20 + FileAttributes& attr = result->back(); + attr.name = r.first; + attr.size_bytes = r.second->size; + } + } + return s; + } + + protected: + // When a file in dst_dir is requested, see if we need to remap to shared + // file path. + std::pair<IOStatus, std::string> EncodePath( + const std::string& path) override { + if (path.empty() || path[0] != '/') { + return {IOStatus::InvalidArgument(path, "Not an absolute path"), ""}; + } + std::pair<IOStatus, std::string> rv{IOStatus(), path}; + if (StartsWith(path, dst_dir_slash_)) { + std::string relative = path.substr(dst_dir_slash_.size()); + auto it = remaps_.find(relative); + if (it != remaps_.end()) { + rv.second = src_base_dir_ + it->second->filename; + } + } + return rv; + } + + private: + // Absolute path to a directory that some extra files will be mapped into. + const std::string dst_dir_; + // Includes a trailing slash. + const std::string dst_dir_slash_; + // Absolute path to a directory containing some files to be mapped into + // dst_dir_. Includes a trailing slash. + const std::string src_base_dir_; + // If remaps_[x] exists, attempt to read dst_dir_ / x should instead read + // src_base_dir_ / remaps_[x]->filename. FileInfo is used to maximize + // sharing with other backup data in memory. + std::unordered_map<std::string, std::shared_ptr<FileInfo>> remaps_; + }; + + class BackupMeta { + public: + BackupMeta( + const std::string& meta_filename, const std::string& meta_tmp_filename, + std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos, + Env* env, const std::shared_ptr<FileSystem>& fs) + : timestamp_(0), + sequence_number_(0), + size_(0), + meta_filename_(meta_filename), + meta_tmp_filename_(meta_tmp_filename), + file_infos_(file_infos), + env_(env), + fs_(fs) {} + + BackupMeta(const BackupMeta&) = delete; + BackupMeta& operator=(const BackupMeta&) = delete; + + ~BackupMeta() {} + + void RecordTimestamp() { + // Best effort + Status s = env_->GetCurrentTime(×tamp_); + if (!s.ok()) { + timestamp_ = /* something clearly fabricated */ 1; + } + } + int64_t GetTimestamp() const { return timestamp_; } + uint64_t GetSize() const { return size_; } + uint32_t GetNumberFiles() const { + return static_cast<uint32_t>(files_.size()); + } + void SetSequenceNumber(uint64_t sequence_number) { + sequence_number_ = sequence_number; + } + uint64_t GetSequenceNumber() const { return sequence_number_; } + + const std::string& GetAppMetadata() const { return app_metadata_; } + + void SetAppMetadata(const std::string& app_metadata) { + app_metadata_ = app_metadata; + } + + IOStatus AddFile(std::shared_ptr<FileInfo> file_info); + + IOStatus Delete(bool delete_meta = true); + + bool Empty() const { return files_.empty(); } + + std::shared_ptr<FileInfo> GetFile(const std::string& filename) const { + auto it = file_infos_->find(filename); + if (it == file_infos_->end()) { + return nullptr; + } + return it->second; + } + + const std::vector<std::shared_ptr<FileInfo>>& GetFiles() const { + return files_; + } + + // @param abs_path_to_size Pre-fetched file sizes (bytes). + IOStatus LoadFromFile( + const std::string& backup_dir, + const std::unordered_map<std::string, uint64_t>& abs_path_to_size, + RateLimiter* rate_limiter, Logger* info_log, + std::unordered_set<std::string>* reported_ignored_fields); + IOStatus StoreToFile( + bool sync, int schema_version, + const TEST_BackupMetaSchemaOptions* schema_test_options); + + std::string GetInfoString() { + std::ostringstream ss; + ss << "Timestamp: " << timestamp_ << std::endl; + char human_size[16]; + AppendHumanBytes(size_, human_size, sizeof(human_size)); + ss << "Size: " << human_size << std::endl; + ss << "Files:" << std::endl; + for (const auto& file : files_) { + AppendHumanBytes(file->size, human_size, sizeof(human_size)); + ss << file->filename << ", size " << human_size << ", refs " + << file->refs << std::endl; + } + return ss.str(); + } + + const std::shared_ptr<Env>& GetEnvForOpen() const { + if (!env_for_open_) { + // Lazy initialize + // Find directories + std::string dst_dir = meta_filename_; + auto i = dst_dir.rfind(kMetaDirSlash); + assert(i != std::string::npos); + std::string src_base_dir = dst_dir.substr(0, i); + dst_dir.replace(i, kMetaDirSlash.size(), kPrivateDirSlash); + // Make the RemapSharedFileSystem + std::shared_ptr<FileSystem> remap_fs = + std::make_shared<RemapSharedFileSystem>(fs_, dst_dir, src_base_dir, + files_); + // Make it read-only for safety + remap_fs = std::make_shared<ReadOnlyFileSystem>(remap_fs); + // Make an Env wrapper + env_for_open_ = std::make_shared<CompositeEnvWrapper>(env_, remap_fs); + } + return env_for_open_; + } + + private: + int64_t timestamp_; + // sequence number is only approximate, should not be used + // by clients + uint64_t sequence_number_; + uint64_t size_; + std::string app_metadata_; + std::string const meta_filename_; + std::string const meta_tmp_filename_; + // files with relative paths (without "/" prefix!!) + std::vector<std::shared_ptr<FileInfo>> files_; + std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_; + Env* env_; + mutable std::shared_ptr<Env> env_for_open_; + std::shared_ptr<FileSystem> fs_; + IOOptions iooptions_ = IOOptions(); + }; // BackupMeta + + void SetBackupInfoFromBackupMeta(BackupID id, const BackupMeta& meta, + BackupInfo* backup_info, + bool include_file_details) const; + + inline std::string GetAbsolutePath( + const std::string& relative_path = "") const { + assert(relative_path.size() == 0 || relative_path[0] != '/'); + return options_.backup_dir + "/" + relative_path; + } + inline std::string GetPrivateFileRel(BackupID backup_id, bool tmp = false, + const std::string& file = "") const { + assert(file.size() == 0 || file[0] != '/'); + return kPrivateDirSlash + std::to_string(backup_id) + (tmp ? ".tmp" : "") + + "/" + file; + } + inline std::string GetSharedFileRel(const std::string& file = "", + bool tmp = false) const { + assert(file.size() == 0 || file[0] != '/'); + return kSharedDirSlash + std::string(tmp ? "." : "") + file + + (tmp ? ".tmp" : ""); + } + inline std::string GetSharedFileWithChecksumRel(const std::string& file = "", + bool tmp = false) const { + assert(file.size() == 0 || file[0] != '/'); + return kSharedChecksumDirSlash + std::string(tmp ? "." : "") + file + + (tmp ? ".tmp" : ""); + } + inline bool UseLegacyNaming(const std::string& sid) const { + return GetNamingNoFlags() == + BackupEngineOptions::kLegacyCrc32cAndFileSize || + sid.empty(); + } + inline std::string GetSharedFileWithChecksum( + const std::string& file, const std::string& checksum_hex, + const uint64_t file_size, const std::string& db_session_id) const { + assert(file.size() == 0 || file[0] != '/'); + std::string file_copy = file; + if (UseLegacyNaming(db_session_id)) { + assert(!checksum_hex.empty()); + file_copy.insert(file_copy.find_last_of('.'), + "_" + std::to_string(ChecksumHexToInt32(checksum_hex)) + + "_" + std::to_string(file_size)); + } else { + file_copy.insert(file_copy.find_last_of('.'), "_s" + db_session_id); + if (GetNamingFlags() & BackupEngineOptions::kFlagIncludeFileSize) { + file_copy.insert(file_copy.find_last_of('.'), + "_" + std::to_string(file_size)); + } + } + return file_copy; + } + static inline std::string GetFileFromChecksumFile(const std::string& file) { + assert(file.size() == 0 || file[0] != '/'); + std::string file_copy = file; + size_t first_underscore = file_copy.find_first_of('_'); + return file_copy.erase(first_underscore, + file_copy.find_last_of('.') - first_underscore); + } + inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const { + return GetAbsolutePath(kMetaDirName) + "/" + (tmp ? "." : "") + + std::to_string(backup_id) + (tmp ? ".tmp" : ""); + } + + // If size_limit == 0, there is no size limit, copy everything. + // + // Exactly one of src and contents must be non-empty. + // + // @param src If non-empty, the file is copied from this pathname. + // @param contents If non-empty, the file will be created with these contents. + // @param src_temperature Pass in expected temperature of src, return back + // temperature reported by FileSystem + IOStatus CopyOrCreateFile(const std::string& src, const std::string& dst, + const std::string& contents, uint64_t size_limit, + Env* src_env, Env* dst_env, + const EnvOptions& src_env_options, bool sync, + RateLimiter* rate_limiter, + std::function<void()> progress_callback, + Temperature* src_temperature, + Temperature dst_temperature, + uint64_t* bytes_toward_next_callback, + uint64_t* size, std::string* checksum_hex); + + IOStatus ReadFileAndComputeChecksum(const std::string& src, + const std::shared_ptr<FileSystem>& src_fs, + const EnvOptions& src_env_options, + uint64_t size_limit, + std::string* checksum_hex, + const Temperature src_temperature) const; + + // Obtain db_id and db_session_id from the table properties of file_path + Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options, + const std::string& file_path, + Temperature file_temp, RateLimiter* rate_limiter, + std::string* db_id, std::string* db_session_id); + + struct CopyOrCreateResult { + ~CopyOrCreateResult() { + // The Status needs to be ignored here for two reasons. + // First, if the BackupEngineImpl shuts down with jobs outstanding, then + // it is possible that the Status in the future/promise is never read, + // resulting in an unchecked Status. Second, if there are items in the + // channel when the BackupEngineImpl is shutdown, these will also have + // Status that have not been checked. This + // TODO: Fix those issues so that the Status + io_status.PermitUncheckedError(); + } + uint64_t size; + std::string checksum_hex; + std::string db_id; + std::string db_session_id; + IOStatus io_status; + Temperature expected_src_temperature = Temperature::kUnknown; + Temperature current_src_temperature = Temperature::kUnknown; + }; + + // Exactly one of src_path and contents must be non-empty. If src_path is + // non-empty, the file is copied from this pathname. Otherwise, if contents is + // non-empty, the file will be created at dst_path with these contents. + struct CopyOrCreateWorkItem { + std::string src_path; + std::string dst_path; + Temperature src_temperature; + Temperature dst_temperature; + std::string contents; + Env* src_env; + Env* dst_env; + EnvOptions src_env_options; + bool sync; + RateLimiter* rate_limiter; + uint64_t size_limit; + Statistics* stats; + std::promise<CopyOrCreateResult> result; + std::function<void()> progress_callback; + std::string src_checksum_func_name; + std::string src_checksum_hex; + std::string db_id; + std::string db_session_id; + + CopyOrCreateWorkItem() + : src_path(""), + dst_path(""), + src_temperature(Temperature::kUnknown), + dst_temperature(Temperature::kUnknown), + contents(""), + src_env(nullptr), + dst_env(nullptr), + src_env_options(), + sync(false), + rate_limiter(nullptr), + size_limit(0), + stats(nullptr), + src_checksum_func_name(kUnknownFileChecksumFuncName), + src_checksum_hex(""), + db_id(""), + db_session_id("") {} + + CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete; + CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete; + + CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) noexcept { + *this = std::move(o); + } + + CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) noexcept { + src_path = std::move(o.src_path); + dst_path = std::move(o.dst_path); + src_temperature = std::move(o.src_temperature); + dst_temperature = std::move(o.dst_temperature); + contents = std::move(o.contents); + src_env = o.src_env; + dst_env = o.dst_env; + src_env_options = std::move(o.src_env_options); + sync = o.sync; + rate_limiter = o.rate_limiter; + size_limit = o.size_limit; + stats = o.stats; + result = std::move(o.result); + progress_callback = std::move(o.progress_callback); + src_checksum_func_name = std::move(o.src_checksum_func_name); + src_checksum_hex = std::move(o.src_checksum_hex); + db_id = std::move(o.db_id); + db_session_id = std::move(o.db_session_id); + src_temperature = o.src_temperature; + return *this; + } + + CopyOrCreateWorkItem( + std::string _src_path, std::string _dst_path, + const Temperature _src_temperature, const Temperature _dst_temperature, + std::string _contents, Env* _src_env, Env* _dst_env, + EnvOptions _src_env_options, bool _sync, RateLimiter* _rate_limiter, + uint64_t _size_limit, Statistics* _stats, + std::function<void()> _progress_callback = []() {}, + const std::string& _src_checksum_func_name = + kUnknownFileChecksumFuncName, + const std::string& _src_checksum_hex = "", + const std::string& _db_id = "", const std::string& _db_session_id = "") + : src_path(std::move(_src_path)), + dst_path(std::move(_dst_path)), + src_temperature(_src_temperature), + dst_temperature(_dst_temperature), + contents(std::move(_contents)), + src_env(_src_env), + dst_env(_dst_env), + src_env_options(std::move(_src_env_options)), + sync(_sync), + rate_limiter(_rate_limiter), + size_limit(_size_limit), + stats(_stats), + progress_callback(_progress_callback), + src_checksum_func_name(_src_checksum_func_name), + src_checksum_hex(_src_checksum_hex), + db_id(_db_id), + db_session_id(_db_session_id) {} + }; + + struct BackupAfterCopyOrCreateWorkItem { + std::future<CopyOrCreateResult> result; + bool shared; + bool needed_to_copy; + Env* backup_env; + std::string dst_path_tmp; + std::string dst_path; + std::string dst_relative; + BackupAfterCopyOrCreateWorkItem() + : shared(false), + needed_to_copy(false), + backup_env(nullptr), + dst_path_tmp(""), + dst_path(""), + dst_relative("") {} + + BackupAfterCopyOrCreateWorkItem( + BackupAfterCopyOrCreateWorkItem&& o) noexcept { + *this = std::move(o); + } + + BackupAfterCopyOrCreateWorkItem& operator=( + BackupAfterCopyOrCreateWorkItem&& o) noexcept { + result = std::move(o.result); + shared = o.shared; + needed_to_copy = o.needed_to_copy; + backup_env = o.backup_env; + dst_path_tmp = std::move(o.dst_path_tmp); + dst_path = std::move(o.dst_path); + dst_relative = std::move(o.dst_relative); + return *this; + } + + BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result, + bool _shared, bool _needed_to_copy, + Env* _backup_env, std::string _dst_path_tmp, + std::string _dst_path, + std::string _dst_relative) + : result(std::move(_result)), + shared(_shared), + needed_to_copy(_needed_to_copy), + backup_env(_backup_env), + dst_path_tmp(std::move(_dst_path_tmp)), + dst_path(std::move(_dst_path)), + dst_relative(std::move(_dst_relative)) {} + }; + + struct RestoreAfterCopyOrCreateWorkItem { + std::future<CopyOrCreateResult> result; + std::string from_file; + std::string to_file; + std::string checksum_hex; + RestoreAfterCopyOrCreateWorkItem() : checksum_hex("") {} + RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result, + const std::string& _from_file, + const std::string& _to_file, + const std::string& _checksum_hex) + : result(std::move(_result)), + from_file(_from_file), + to_file(_to_file), + checksum_hex(_checksum_hex) {} + RestoreAfterCopyOrCreateWorkItem( + RestoreAfterCopyOrCreateWorkItem&& o) noexcept { + *this = std::move(o); + } + + RestoreAfterCopyOrCreateWorkItem& operator=( + RestoreAfterCopyOrCreateWorkItem&& o) noexcept { + result = std::move(o.result); + checksum_hex = std::move(o.checksum_hex); + return *this; + } + }; + + bool initialized_; + std::mutex byte_report_mutex_; + mutable channel<CopyOrCreateWorkItem> files_to_copy_or_create_; + std::vector<port::Thread> threads_; + std::atomic<CpuPriority> threads_cpu_priority_; + + // Certain operations like PurgeOldBackups and DeleteBackup will trigger + // automatic GarbageCollect (true) unless we've already done one in this + // session and have not failed to delete backup files since then (false). + bool might_need_garbage_collect_ = true; + + // Adds a file to the backup work queue to be copied or created if it doesn't + // already exist. + // + // Exactly one of src_dir and contents must be non-empty. + // + // @param src_dir If non-empty, the file in this directory named fname will be + // copied. + // @param fname Name of destination file and, in case of copy, source file. + // @param contents If non-empty, the file will be created with these contents. + IOStatus AddBackupFileWorkItem( + std::unordered_set<std::string>& live_dst_paths, + std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish, + BackupID backup_id, bool shared, const std::string& src_dir, + const std::string& fname, // starts with "/" + const EnvOptions& src_env_options, RateLimiter* rate_limiter, + FileType file_type, uint64_t size_bytes, Statistics* stats, + uint64_t size_limit = 0, bool shared_checksum = false, + std::function<void()> progress_callback = []() {}, + const std::string& contents = std::string(), + const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName, + const std::string& src_checksum_str = kUnknownFileChecksum, + const Temperature src_temperature = Temperature::kUnknown); + + // backup state data + BackupID latest_backup_id_; + BackupID latest_valid_backup_id_; + std::map<BackupID, std::unique_ptr<BackupMeta>> backups_; + std::map<BackupID, std::pair<IOStatus, std::unique_ptr<BackupMeta>>> + corrupt_backups_; + std::unordered_map<std::string, std::shared_ptr<FileInfo>> + backuped_file_infos_; + std::atomic<bool> stop_backup_; + + // options data + BackupEngineOptions options_; + Env* db_env_; + Env* backup_env_; + + // directories + std::unique_ptr<FSDirectory> backup_directory_; + std::unique_ptr<FSDirectory> shared_directory_; + std::unique_ptr<FSDirectory> meta_directory_; + std::unique_ptr<FSDirectory> private_directory_; + + static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB + bool read_only_; + BackupStatistics backup_statistics_; + std::unordered_set<std::string> reported_ignored_fields_; + static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB + std::shared_ptr<FileSystem> db_fs_; + std::shared_ptr<FileSystem> backup_fs_; + IOOptions io_options_ = IOOptions(); + + public: + std::unique_ptr<TEST_BackupMetaSchemaOptions> schema_test_options_; +}; + +// -------- BackupEngineImplThreadSafe class --------- +// This locking layer for thread safety in the public API is layered on +// top to prevent accidental recursive locking with RWMutex, which is UB. +// Note: BackupEngineReadOnlyBase inherited twice, but has no fields +class BackupEngineImplThreadSafe : public BackupEngine, + public BackupEngineReadOnly { + public: + BackupEngineImplThreadSafe(const BackupEngineOptions& options, Env* db_env, + bool read_only = false) + : impl_(options, db_env, read_only) {} + ~BackupEngineImplThreadSafe() override {} + + using BackupEngine::CreateNewBackupWithMetadata; + IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options, + DB* db, const std::string& app_metadata, + BackupID* new_backup_id) override { + WriteLock lock(&mutex_); + return impl_.CreateNewBackupWithMetadata(options, db, app_metadata, + new_backup_id); + } + + IOStatus PurgeOldBackups(uint32_t num_backups_to_keep) override { + WriteLock lock(&mutex_); + return impl_.PurgeOldBackups(num_backups_to_keep); + } + + IOStatus DeleteBackup(BackupID backup_id) override { + WriteLock lock(&mutex_); + return impl_.DeleteBackup(backup_id); + } + + void StopBackup() override { + // No locking needed + impl_.StopBackup(); + } + + IOStatus GarbageCollect() override { + WriteLock lock(&mutex_); + return impl_.GarbageCollect(); + } + + Status GetLatestBackupInfo(BackupInfo* backup_info, + bool include_file_details = false) const override { + ReadLock lock(&mutex_); + return impl_.GetBackupInfo(kLatestBackupIDMarker, backup_info, + include_file_details); + } + + Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info, + bool include_file_details = false) const override { + ReadLock lock(&mutex_); + return impl_.GetBackupInfo(backup_id, backup_info, include_file_details); + } + + void GetBackupInfo(std::vector<BackupInfo>* backup_info, + bool include_file_details) const override { + ReadLock lock(&mutex_); + impl_.GetBackupInfo(backup_info, include_file_details); + } + + void GetCorruptedBackups( + std::vector<BackupID>* corrupt_backup_ids) const override { + ReadLock lock(&mutex_); + impl_.GetCorruptedBackups(corrupt_backup_ids); + } + + using BackupEngine::RestoreDBFromBackup; + IOStatus RestoreDBFromBackup(const RestoreOptions& options, + BackupID backup_id, const std::string& db_dir, + const std::string& wal_dir) const override { + ReadLock lock(&mutex_); + return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir); + } + + using BackupEngine::RestoreDBFromLatestBackup; + IOStatus RestoreDBFromLatestBackup( + const RestoreOptions& options, const std::string& db_dir, + const std::string& wal_dir) const override { + // Defer to above function, which locks + return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir); + } + + IOStatus VerifyBackup(BackupID backup_id, + bool verify_with_checksum = false) const override { + ReadLock lock(&mutex_); + return impl_.VerifyBackup(backup_id, verify_with_checksum); + } + + // Not public API but needed + IOStatus Initialize() { + // No locking needed + return impl_.Initialize(); + } + + // Not public API but used in testing + void TEST_SetBackupMetaSchemaOptions( + const TEST_BackupMetaSchemaOptions& options) { + impl_.schema_test_options_.reset(new TEST_BackupMetaSchemaOptions(options)); + } + + // Not public API but used in testing + void TEST_SetDefaultRateLimitersClock( + const std::shared_ptr<SystemClock>& backup_rate_limiter_clock = nullptr, + const std::shared_ptr<SystemClock>& restore_rate_limiter_clock = + nullptr) { + impl_.TEST_SetDefaultRateLimitersClock(backup_rate_limiter_clock, + restore_rate_limiter_clock); + } + + private: + mutable port::RWMutex mutex_; + BackupEngineImpl impl_; +}; +} // namespace + +IOStatus BackupEngine::Open(const BackupEngineOptions& options, Env* env, + BackupEngine** backup_engine_ptr) { + std::unique_ptr<BackupEngineImplThreadSafe> backup_engine( + new BackupEngineImplThreadSafe(options, env)); + auto s = backup_engine->Initialize(); + if (!s.ok()) { + *backup_engine_ptr = nullptr; + return s; + } + *backup_engine_ptr = backup_engine.release(); + return IOStatus::OK(); +} + +namespace { +BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options, + Env* db_env, bool read_only) + : initialized_(false), + threads_cpu_priority_(), + latest_backup_id_(0), + latest_valid_backup_id_(0), + stop_backup_(false), + options_(options), + db_env_(db_env), + backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_), + read_only_(read_only) { + if (options_.backup_rate_limiter == nullptr && + options_.backup_rate_limit > 0) { + options_.backup_rate_limiter.reset( + NewGenericRateLimiter(options_.backup_rate_limit)); + } + if (options_.restore_rate_limiter == nullptr && + options_.restore_rate_limit > 0) { + options_.restore_rate_limiter.reset( + NewGenericRateLimiter(options_.restore_rate_limit)); + } + db_fs_ = db_env_->GetFileSystem(); + backup_fs_ = backup_env_->GetFileSystem(); +} + +BackupEngineImpl::~BackupEngineImpl() { + files_to_copy_or_create_.sendEof(); + for (auto& t : threads_) { + t.join(); + } + LogFlush(options_.info_log); + for (const auto& it : corrupt_backups_) { + it.second.first.PermitUncheckedError(); + } +} + +IOStatus BackupEngineImpl::Initialize() { + assert(!initialized_); + initialized_ = true; + if (read_only_) { + ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine"); + } + options_.Dump(options_.info_log); + + auto meta_path = GetAbsolutePath(kMetaDirName); + + if (!read_only_) { + // we might need to clean up from previous crash or I/O errors + might_need_garbage_collect_ = true; + + if (options_.max_valid_backups_to_open != + std::numeric_limits<int32_t>::max()) { + options_.max_valid_backups_to_open = std::numeric_limits<int32_t>::max(); + ROCKS_LOG_WARN( + options_.info_log, + "`max_valid_backups_to_open` is not set to the default value. " + "Ignoring its value since BackupEngine is not read-only."); + } + + // gather the list of directories that we need to create + std::vector<std::pair<std::string, std::unique_ptr<FSDirectory>*>> + directories; + directories.emplace_back(GetAbsolutePath(), &backup_directory_); + if (options_.share_table_files) { + if (options_.share_files_with_checksum) { + directories.emplace_back( + GetAbsolutePath(GetSharedFileWithChecksumRel()), + &shared_directory_); + } else { + directories.emplace_back(GetAbsolutePath(GetSharedFileRel()), + &shared_directory_); + } + } + directories.emplace_back(GetAbsolutePath(kPrivateDirName), + &private_directory_); + directories.emplace_back(meta_path, &meta_directory_); + // create all the dirs we need + for (const auto& d : directories) { + IOStatus io_s = + backup_fs_->CreateDirIfMissing(d.first, io_options_, nullptr); + if (io_s.ok()) { + io_s = + backup_fs_->NewDirectory(d.first, io_options_, d.second, nullptr); + } + if (!io_s.ok()) { + return io_s; + } + } + } + + std::vector<std::string> backup_meta_files; + { + IOStatus io_s = backup_fs_->GetChildren(meta_path, io_options_, + &backup_meta_files, nullptr); + if (io_s.IsNotFound()) { + return IOStatus::NotFound(meta_path + " is missing"); + } else if (!io_s.ok()) { + return io_s; + } + } + // create backups_ structure + for (auto& file : backup_meta_files) { + ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str()); + BackupID backup_id = 0; + sscanf(file.c_str(), "%u", &backup_id); + if (backup_id == 0 || file != std::to_string(backup_id)) { + // Invalid file name, will be deleted with auto-GC when user + // initiates an append or write operation. (Behave as read-only until + // then.) + ROCKS_LOG_INFO(options_.info_log, "Skipping unrecognized meta file %s", + file.c_str()); + continue; + } + assert(backups_.find(backup_id) == backups_.end()); + // Insert all the (backup_id, BackupMeta) that will be loaded later + // The loading performed later will check whether there are corrupt backups + // and move the corrupt backups to corrupt_backups_ + backups_.insert(std::make_pair( + backup_id, std::unique_ptr<BackupMeta>(new BackupMeta( + GetBackupMetaFile(backup_id, false /* tmp */), + GetBackupMetaFile(backup_id, true /* tmp */), + &backuped_file_infos_, backup_env_, backup_fs_)))); + } + + latest_backup_id_ = 0; + latest_valid_backup_id_ = 0; + if (options_.destroy_old_data) { // Destroy old data + assert(!read_only_); + ROCKS_LOG_INFO( + options_.info_log, + "Backup Engine started with destroy_old_data == true, deleting all " + "backups"); + IOStatus io_s = PurgeOldBackups(0); + if (io_s.ok()) { + io_s = GarbageCollect(); + } + if (!io_s.ok()) { + return io_s; + } + } else { // Load data from storage + // abs_path_to_size: maps absolute paths of files in backup directory to + // their corresponding sizes + std::unordered_map<std::string, uint64_t> abs_path_to_size; + // Insert files and their sizes in backup sub-directories (shared and + // shared_checksum) to abs_path_to_size + for (const auto& rel_dir : + {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) { + const auto abs_dir = GetAbsolutePath(rel_dir); + IOStatus io_s = + ReadChildFileCurrentSizes(abs_dir, backup_fs_, &abs_path_to_size); + if (!io_s.ok()) { + // I/O error likely impacting all backups + return io_s; + } + } + // load the backups if any, until valid_backups_to_open of the latest + // non-corrupted backups have been successfully opened. + int valid_backups_to_open = options_.max_valid_backups_to_open; + for (auto backup_iter = backups_.rbegin(); backup_iter != backups_.rend(); + ++backup_iter) { + assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first); + if (latest_backup_id_ == 0) { + latest_backup_id_ = backup_iter->first; + } + if (valid_backups_to_open == 0) { + break; + } + + // Insert files and their sizes in backup sub-directories + // (private/backup_id) to abs_path_to_size + IOStatus io_s = ReadChildFileCurrentSizes( + GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_fs_, + &abs_path_to_size); + if (io_s.ok()) { + io_s = backup_iter->second->LoadFromFile( + options_.backup_dir, abs_path_to_size, + options_.backup_rate_limiter.get(), options_.info_log, + &reported_ignored_fields_); + } + if (io_s.IsCorruption() || io_s.IsNotSupported()) { + ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s", + backup_iter->first, io_s.ToString().c_str()); + corrupt_backups_.insert(std::make_pair( + backup_iter->first, + std::make_pair(io_s, std::move(backup_iter->second)))); + } else if (!io_s.ok()) { + // Distinguish corruption errors from errors in the backup Env. + // Errors in the backup Env (i.e., this code path) will cause Open() to + // fail, whereas corruption errors would not cause Open() failures. + return io_s; + } else { + ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s", + backup_iter->first, + backup_iter->second->GetInfoString().c_str()); + assert(latest_valid_backup_id_ == 0 || + latest_valid_backup_id_ > backup_iter->first); + if (latest_valid_backup_id_ == 0) { + latest_valid_backup_id_ = backup_iter->first; + } + --valid_backups_to_open; + } + } + + for (const auto& corrupt : corrupt_backups_) { + backups_.erase(backups_.find(corrupt.first)); + } + // erase the backups before max_valid_backups_to_open + int num_unopened_backups; + if (options_.max_valid_backups_to_open == 0) { + num_unopened_backups = 0; + } else { + num_unopened_backups = + std::max(0, static_cast<int>(backups_.size()) - + options_.max_valid_backups_to_open); + } + for (int i = 0; i < num_unopened_backups; ++i) { + assert(backups_.begin()->second->Empty()); + backups_.erase(backups_.begin()); + } + } + + ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_); + ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u", + latest_valid_backup_id_); + + // set up threads perform copies from files_to_copy_or_create_ in the + // background + threads_cpu_priority_ = CpuPriority::kNormal; + threads_.reserve(options_.max_background_operations); + for (int t = 0; t < options_.max_background_operations; t++) { + threads_.emplace_back([this]() { +#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) +#if __GLIBC_PREREQ(2, 12) + pthread_setname_np(pthread_self(), "backup_engine"); +#endif +#endif + CpuPriority current_priority = CpuPriority::kNormal; + CopyOrCreateWorkItem work_item; + uint64_t bytes_toward_next_callback = 0; + while (files_to_copy_or_create_.read(work_item)) { + CpuPriority priority = threads_cpu_priority_; + if (current_priority != priority) { + TEST_SYNC_POINT_CALLBACK( + "BackupEngineImpl::Initialize:SetCpuPriority", &priority); + port::SetCpuPriority(0, priority); + current_priority = priority; + } + // `bytes_read` and `bytes_written` stats are enabled based on + // compile-time support and cannot be dynamically toggled. So we do not + // need to worry about `PerfLevel` here, unlike many other + // `IOStatsContext` / `PerfContext` stats. + uint64_t prev_bytes_read = IOSTATS(bytes_read); + uint64_t prev_bytes_written = IOSTATS(bytes_written); + + CopyOrCreateResult result; + Temperature temp = work_item.src_temperature; + result.io_status = CopyOrCreateFile( + work_item.src_path, work_item.dst_path, work_item.contents, + work_item.size_limit, work_item.src_env, work_item.dst_env, + work_item.src_env_options, work_item.sync, work_item.rate_limiter, + work_item.progress_callback, &temp, work_item.dst_temperature, + &bytes_toward_next_callback, &result.size, &result.checksum_hex); + + RecordTick(work_item.stats, BACKUP_READ_BYTES, + IOSTATS(bytes_read) - prev_bytes_read); + RecordTick(work_item.stats, BACKUP_WRITE_BYTES, + IOSTATS(bytes_written) - prev_bytes_written); + + result.db_id = work_item.db_id; + result.db_session_id = work_item.db_session_id; + result.expected_src_temperature = work_item.src_temperature; + result.current_src_temperature = temp; + if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) { + // unknown checksum function name implies no db table file checksum in + // db manifest; work_item.src_checksum_hex not empty means + // backup engine has calculated its crc32c checksum for the table + // file; therefore, we are able to compare the checksums. + if (work_item.src_checksum_func_name == + kUnknownFileChecksumFuncName || + work_item.src_checksum_func_name == kDbFileChecksumFuncName) { + if (work_item.src_checksum_hex != result.checksum_hex) { + std::string checksum_info( + "Expected checksum is " + work_item.src_checksum_hex + + " while computed checksum is " + result.checksum_hex); + result.io_status = IOStatus::Corruption( + "Checksum mismatch after copying to " + work_item.dst_path + + ": " + checksum_info); + } + } else { + // FIXME(peterd): dead code? + std::string checksum_function_info( + "Existing checksum function is " + + work_item.src_checksum_func_name + + " while provided checksum function is " + + kBackupFileChecksumFuncName); + ROCKS_LOG_INFO( + options_.info_log, + "Unable to verify checksum after copying to %s: %s\n", + work_item.dst_path.c_str(), checksum_function_info.c_str()); + } + } + work_item.result.set_value(std::move(result)); + } + }); + } + ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine"); + return IOStatus::OK(); +} + +IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( + const CreateBackupOptions& options, DB* db, const std::string& app_metadata, + BackupID* new_backup_id_ptr) { + assert(initialized_); + assert(!read_only_); + if (app_metadata.size() > kMaxAppMetaSize) { + return IOStatus::InvalidArgument("App metadata too large"); + } + + if (options.decrease_background_thread_cpu_priority) { + if (options.background_thread_cpu_priority < threads_cpu_priority_) { + threads_cpu_priority_.store(options.background_thread_cpu_priority); + } + } + + BackupID new_backup_id = latest_backup_id_ + 1; + + // `bytes_read` and `bytes_written` stats are enabled based on compile-time + // support and cannot be dynamically toggled. So we do not need to worry about + // `PerfLevel` here, unlike many other `IOStatsContext` / `PerfContext` stats. + uint64_t prev_bytes_read = IOSTATS(bytes_read); + uint64_t prev_bytes_written = IOSTATS(bytes_written); + + assert(backups_.find(new_backup_id) == backups_.end()); + + auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id)); + IOStatus io_s = backup_fs_->FileExists(private_dir, io_options_, nullptr); + if (io_s.ok()) { + // maybe last backup failed and left partial state behind, clean it up. + // need to do this before updating backups_ such that a private dir + // named after new_backup_id will be cleaned up. + // (If an incomplete new backup is followed by an incomplete delete + // of the latest full backup, then there could be more than one next + // id with a private dir, the last thing to be deleted in delete + // backup, but all will be cleaned up with a GarbageCollect.) + io_s = GarbageCollect(); + } else if (io_s.IsNotFound()) { + // normal case, the new backup's private dir doesn't exist yet + io_s = IOStatus::OK(); + } + + auto ret = backups_.insert(std::make_pair( + new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta( + GetBackupMetaFile(new_backup_id, false /* tmp */), + GetBackupMetaFile(new_backup_id, true /* tmp */), + &backuped_file_infos_, backup_env_, backup_fs_)))); + assert(ret.second == true); + auto& new_backup = ret.first->second; + new_backup->RecordTimestamp(); + new_backup->SetAppMetadata(app_metadata); + + auto start_backup = backup_env_->NowMicros(); + + ROCKS_LOG_INFO(options_.info_log, + "Started the backup process -- creating backup %u", + new_backup_id); + + if (options_.share_table_files && !options_.share_files_with_checksum) { + ROCKS_LOG_WARN(options_.info_log, + "BackupEngineOptions::share_files_with_checksum=false is " + "DEPRECATED and could lead to data loss."); + } + + if (io_s.ok()) { + io_s = backup_fs_->CreateDir(private_dir, io_options_, nullptr); + } + + // A set into which we will insert the dst_paths that are calculated for live + // files and live WAL files. + // This is used to check whether a live files shares a dst_path with another + // live file. + std::unordered_set<std::string> live_dst_paths; + + std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish; + // Add a CopyOrCreateWorkItem to the channel for each live file + Status disabled = db->DisableFileDeletions(); + DBOptions db_options = db->GetDBOptions(); + Statistics* stats = db_options.statistics.get(); + if (io_s.ok()) { + CheckpointImpl checkpoint(db); + uint64_t sequence_number = 0; + FileChecksumGenFactory* db_checksum_factory = + db_options.file_checksum_gen_factory.get(); + const std::string kFileChecksumGenFactoryName = + "FileChecksumGenCrc32cFactory"; + bool compare_checksum = + db_checksum_factory != nullptr && + db_checksum_factory->Name() == kFileChecksumGenFactoryName + ? true + : false; + EnvOptions src_raw_env_options(db_options); + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); + io_s = status_to_io_status(checkpoint.CreateCustomCheckpoint( + [&](const std::string& /*src_dirname*/, const std::string& /*fname*/, + FileType) { + // custom checkpoint will switch to calling copy_file_cb after it sees + // NotSupported returned from link_file_cb. + return IOStatus::NotSupported(); + } /* link_file_cb */, + [&](const std::string& src_dirname, const std::string& fname, + uint64_t size_limit_bytes, FileType type, + const std::string& checksum_func_name, + const std::string& checksum_val, + const Temperature src_temperature) { + if (type == kWalFile && !options_.backup_log_files) { + return IOStatus::OK(); + } + Log(options_.info_log, "add file for backup %s", fname.c_str()); + uint64_t size_bytes = 0; + IOStatus io_st; + if (type == kTableFile || type == kBlobFile) { + io_st = db_fs_->GetFileSize(src_dirname + "/" + fname, io_options_, + &size_bytes, nullptr); + if (!io_st.ok()) { + Log(options_.info_log, "GetFileSize is failed: %s", + io_st.ToString().c_str()); + return io_st; + } + } + EnvOptions src_env_options; + switch (type) { + case kWalFile: + src_env_options = + db_env_->OptimizeForLogRead(src_raw_env_options); + break; + case kTableFile: + src_env_options = db_env_->OptimizeForCompactionTableRead( + src_raw_env_options, ImmutableDBOptions(db_options)); + break; + case kDescriptorFile: + src_env_options = + db_env_->OptimizeForManifestRead(src_raw_env_options); + break; + case kBlobFile: + src_env_options = db_env_->OptimizeForBlobFileRead( + src_raw_env_options, ImmutableDBOptions(db_options)); + break; + default: + // Other backed up files (like options file) are not read by live + // DB, so don't need to worry about avoiding mixing buffered and + // direct I/O. Just use plain defaults. + src_env_options = src_raw_env_options; + break; + } + io_st = AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + options_.share_table_files && + (type == kTableFile || type == kBlobFile), + src_dirname, fname, src_env_options, rate_limiter, type, + size_bytes, db_options.statistics.get(), size_limit_bytes, + options_.share_files_with_checksum && + (type == kTableFile || type == kBlobFile), + options.progress_callback, "" /* contents */, checksum_func_name, + checksum_val, src_temperature); + return io_st; + } /* copy_file_cb */, + [&](const std::string& fname, const std::string& contents, + FileType type) { + Log(options_.info_log, "add file for backup %s", fname.c_str()); + return AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + false /* shared */, "" /* src_dir */, fname, + EnvOptions() /* src_env_options */, rate_limiter, type, + contents.size(), db_options.statistics.get(), 0 /* size_limit */, + false /* shared_checksum */, options.progress_callback, contents); + } /* create_file_cb */, + &sequence_number, + options.flush_before_backup ? 0 : std::numeric_limits<uint64_t>::max(), + compare_checksum)); + if (io_s.ok()) { + new_backup->SetSequenceNumber(sequence_number); + } + } + ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish."); + IOStatus item_io_status; + for (auto& item : backup_items_to_finish) { + item.result.wait(); + auto result = item.result.get(); + item_io_status = result.io_status; + Temperature temp = result.expected_src_temperature; + if (result.current_src_temperature != Temperature::kUnknown && + (temp == Temperature::kUnknown || + options_.current_temperatures_override_manifest)) { + temp = result.current_src_temperature; + } + if (item_io_status.ok() && item.shared && item.needed_to_copy) { + item_io_status = item.backup_env->GetFileSystem()->RenameFile( + item.dst_path_tmp, item.dst_path, io_options_, nullptr); + } + if (item_io_status.ok()) { + item_io_status = new_backup.get()->AddFile(std::make_shared<FileInfo>( + item.dst_relative, result.size, result.checksum_hex, result.db_id, + result.db_session_id, temp)); + } + if (!item_io_status.ok()) { + io_s = item_io_status; + } + } + + // we copied all the files, enable file deletions + if (disabled.ok()) { // If we successfully disabled file deletions + db->EnableFileDeletions(false).PermitUncheckedError(); + } + auto backup_time = backup_env_->NowMicros() - start_backup; + + if (io_s.ok()) { + // persist the backup metadata on the disk + io_s = new_backup->StoreToFile(options_.sync, options_.schema_version, + schema_test_options_.get()); + } + if (io_s.ok() && options_.sync) { + std::unique_ptr<FSDirectory> backup_private_directory; + backup_fs_ + ->NewDirectory(GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)), + io_options_, &backup_private_directory, nullptr) + .PermitUncheckedError(); + if (backup_private_directory != nullptr) { + io_s = backup_private_directory->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + if (io_s.ok() && private_directory_ != nullptr) { + io_s = private_directory_->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + if (io_s.ok() && meta_directory_ != nullptr) { + io_s = meta_directory_->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + if (io_s.ok() && shared_directory_ != nullptr) { + io_s = shared_directory_->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + if (io_s.ok() && backup_directory_ != nullptr) { + io_s = backup_directory_->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + } + + if (io_s.ok()) { + backup_statistics_.IncrementNumberSuccessBackup(); + // here we know that we succeeded and installed the new backup + latest_backup_id_ = new_backup_id; + latest_valid_backup_id_ = new_backup_id; + if (new_backup_id_ptr) { + *new_backup_id_ptr = new_backup_id; + } + ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good"); + + // backup_speed is in byte/second + double backup_speed = new_backup->GetSize() / (1.048576 * backup_time); + ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u", + new_backup->GetNumberFiles()); + char human_size[16]; + AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size)); + ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size); + ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds", + backup_time); + ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed); + ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s", + backup_statistics_.ToString().c_str()); + } else { + backup_statistics_.IncrementNumberFailBackup(); + // clean all the files we might have created + ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s", + io_s.ToString().c_str()); + ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n", + backup_statistics_.ToString().c_str()); + // delete files that we might have already written + might_need_garbage_collect_ = true; + DeleteBackup(new_backup_id).PermitUncheckedError(); + } + + RecordTick(stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read); + RecordTick(stats, BACKUP_WRITE_BYTES, + IOSTATS(bytes_written) - prev_bytes_written); + return io_s; +} + +IOStatus BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) { + assert(initialized_); + assert(!read_only_); + + // Best effort deletion even with errors + IOStatus overall_status = IOStatus::OK(); + + ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u", + num_backups_to_keep); + std::vector<BackupID> to_delete; + auto itr = backups_.begin(); + while ((backups_.size() - to_delete.size()) > num_backups_to_keep) { + to_delete.push_back(itr->first); + itr++; + } + for (auto backup_id : to_delete) { + // Do not GC until end + IOStatus io_s = DeleteBackupNoGC(backup_id); + if (!io_s.ok()) { + overall_status = io_s; + } + } + // Clean up after any incomplete backup deletion, potentially from + // earlier session. + if (might_need_garbage_collect_) { + IOStatus io_s = GarbageCollect(); + if (!io_s.ok() && overall_status.ok()) { + overall_status = io_s; + } + } + return overall_status; +} + +IOStatus BackupEngineImpl::DeleteBackup(BackupID backup_id) { + IOStatus s1 = DeleteBackupNoGC(backup_id); + IOStatus s2 = IOStatus::OK(); + + // Clean up after any incomplete backup deletion, potentially from + // earlier session. + if (might_need_garbage_collect_) { + s2 = GarbageCollect(); + } + + if (!s1.ok()) { + // Any failure in the primary objective trumps any failure in the + // secondary objective. + s2.PermitUncheckedError(); + return s1; + } else { + return s2; + } +} + +// Does not auto-GarbageCollect nor lock +IOStatus BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) { + assert(initialized_); + assert(!read_only_); + + ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id); + auto backup = backups_.find(backup_id); + if (backup != backups_.end()) { + IOStatus io_s = backup->second->Delete(); + if (!io_s.ok()) { + return io_s; + } + backups_.erase(backup); + } else { + auto corrupt = corrupt_backups_.find(backup_id); + if (corrupt == corrupt_backups_.end()) { + return IOStatus::NotFound("Backup not found"); + } + IOStatus io_s = corrupt->second.second->Delete(); + if (!io_s.ok()) { + return io_s; + } + corrupt->second.first.PermitUncheckedError(); + corrupt_backups_.erase(corrupt); + } + + // After removing meta file, best effort deletion even with errors. + // (Don't delete other files if we can't delete the meta file right + // now.) + std::vector<std::string> to_delete; + for (auto& itr : backuped_file_infos_) { + if (itr.second->refs == 0) { + IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(itr.first), + io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), + io_s.ToString().c_str()); + to_delete.push_back(itr.first); + if (!io_s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + } + for (auto& td : to_delete) { + backuped_file_infos_.erase(td); + } + + // take care of private dirs -- GarbageCollect() will take care of them + // if they are not empty + std::string private_dir = GetPrivateFileRel(backup_id); + IOStatus io_s = + backup_fs_->DeleteDir(GetAbsolutePath(private_dir), io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s", + private_dir.c_str(), io_s.ToString().c_str()); + if (!io_s.ok()) { + // Full gc or trying again later might work + might_need_garbage_collect_ = true; + } + return IOStatus::OK(); +} + +void BackupEngineImpl::SetBackupInfoFromBackupMeta( + BackupID id, const BackupMeta& meta, BackupInfo* backup_info, + bool include_file_details) const { + *backup_info = BackupInfo(id, meta.GetTimestamp(), meta.GetSize(), + meta.GetNumberFiles(), meta.GetAppMetadata()); + std::string dir = + options_.backup_dir + "/" + kPrivateDirSlash + std::to_string(id); + if (include_file_details) { + auto& file_details = backup_info->file_details; + file_details.reserve(meta.GetFiles().size()); + for (auto& file_ptr : meta.GetFiles()) { + BackupFileInfo& finfo = *file_details.emplace(file_details.end()); + finfo.relative_filename = file_ptr->filename; + finfo.size = file_ptr->size; + finfo.directory = dir; + uint64_t number; + FileType type; + bool ok = ParseFileName(file_ptr->filename, &number, &type); + if (ok) { + finfo.file_number = number; + finfo.file_type = type; + } + // TODO: temperature, file_checksum, file_checksum_func_name + } + backup_info->name_for_open = GetAbsolutePath(GetPrivateFileRel(id)); + backup_info->name_for_open.pop_back(); // remove trailing '/' + backup_info->env_for_open = meta.GetEnvForOpen(); + } +} + +Status BackupEngineImpl::GetBackupInfo(BackupID backup_id, + BackupInfo* backup_info, + bool include_file_details) const { + assert(initialized_); + if (backup_id == kLatestBackupIDMarker) { + // Note: Read latest_valid_backup_id_ inside of lock + backup_id = latest_valid_backup_id_; + } + auto corrupt_itr = corrupt_backups_.find(backup_id); + if (corrupt_itr != corrupt_backups_.end()) { + return Status::Corruption(corrupt_itr->second.first.ToString()); + } + auto backup_itr = backups_.find(backup_id); + if (backup_itr == backups_.end()) { + return Status::NotFound("Backup not found"); + } + auto& backup = backup_itr->second; + if (backup->Empty()) { + return Status::NotFound("Backup not found"); + } + + SetBackupInfoFromBackupMeta(backup_id, *backup, backup_info, + include_file_details); + return Status::OK(); +} + +void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info, + bool include_file_details) const { + assert(initialized_); + backup_info->resize(backups_.size()); + size_t i = 0; + for (auto& backup : backups_) { + const BackupMeta& meta = *backup.second; + if (!meta.Empty()) { + SetBackupInfoFromBackupMeta(backup.first, meta, &backup_info->at(i++), + include_file_details); + } + } +} + +void BackupEngineImpl::GetCorruptedBackups( + std::vector<BackupID>* corrupt_backup_ids) const { + assert(initialized_); + corrupt_backup_ids->reserve(corrupt_backups_.size()); + for (auto& backup : corrupt_backups_) { + corrupt_backup_ids->push_back(backup.first); + } +} + +IOStatus BackupEngineImpl::RestoreDBFromBackup( + const RestoreOptions& options, BackupID backup_id, + const std::string& db_dir, const std::string& wal_dir) const { + assert(initialized_); + if (backup_id == kLatestBackupIDMarker) { + // Note: Read latest_valid_backup_id_ inside of lock + backup_id = latest_valid_backup_id_; + } + auto corrupt_itr = corrupt_backups_.find(backup_id); + if (corrupt_itr != corrupt_backups_.end()) { + return corrupt_itr->second.first; + } + auto backup_itr = backups_.find(backup_id); + if (backup_itr == backups_.end()) { + return IOStatus::NotFound("Backup not found"); + } + auto& backup = backup_itr->second; + if (backup->Empty()) { + return IOStatus::NotFound("Backup not found"); + } + + ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id); + ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n", + static_cast<int>(options.keep_log_files)); + + // just in case. Ignore errors + db_fs_->CreateDirIfMissing(db_dir, io_options_, nullptr) + .PermitUncheckedError(); + db_fs_->CreateDirIfMissing(wal_dir, io_options_, nullptr) + .PermitUncheckedError(); + + if (options.keep_log_files) { + // delete files in db_dir, but keep all the log files + DeleteChildren(db_dir, 1 << kWalFile); + // move all the files from archive dir to wal_dir + std::string archive_dir = ArchivalDirectory(wal_dir); + std::vector<std::string> archive_files; + db_fs_->GetChildren(archive_dir, io_options_, &archive_files, nullptr) + .PermitUncheckedError(); // ignore errors + for (const auto& f : archive_files) { + uint64_t number; + FileType type; + bool ok = ParseFileName(f, &number, &type); + if (ok && type == kWalFile) { + ROCKS_LOG_INFO(options_.info_log, + "Moving log file from archive/ to wal_dir: %s", + f.c_str()); + IOStatus io_s = db_fs_->RenameFile( + archive_dir + "/" + f, wal_dir + "/" + f, io_options_, nullptr); + if (!io_s.ok()) { + // if we can't move log file from archive_dir to wal_dir, + // we should fail, since it might mean data loss + return io_s; + } + } + } + } else { + DeleteChildren(wal_dir); + DeleteChildren(ArchivalDirectory(wal_dir)); + DeleteChildren(db_dir); + } + + IOStatus io_s; + std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish; + std::string temporary_current_file; + std::string final_current_file; + std::unique_ptr<FSDirectory> db_dir_for_fsync; + std::unique_ptr<FSDirectory> wal_dir_for_fsync; + + for (const auto& file_info : backup->GetFiles()) { + const std::string& file = file_info->filename; + // 1. get DB filename + std::string dst = file_info->GetDbFileName(); + + // 2. find the filetype + uint64_t number; + FileType type; + bool ok = ParseFileName(dst, &number, &type); + if (!ok) { + return IOStatus::Corruption("Backup corrupted: Fail to parse filename " + + dst); + } + // 3. Construct the final path + // kWalFile lives in wal_dir and all the rest live in db_dir + if (type == kWalFile) { + dst = wal_dir + "/" + dst; + if (options_.sync && !wal_dir_for_fsync) { + io_s = db_fs_->NewDirectory(wal_dir, io_options_, &wal_dir_for_fsync, + nullptr); + if (!io_s.ok()) { + return io_s; + } + } + } else { + dst = db_dir + "/" + dst; + if (options_.sync && !db_dir_for_fsync) { + io_s = db_fs_->NewDirectory(db_dir, io_options_, &db_dir_for_fsync, + nullptr); + if (!io_s.ok()) { + return io_s; + } + } + } + // For atomicity, initially restore CURRENT file to a temporary name. + // This is useful even without options_.sync e.g. in case the restore + // process is interrupted. + if (type == kCurrentFile) { + final_current_file = dst; + dst = temporary_current_file = dst + ".tmp"; + } + + ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(), + dst.c_str()); + CopyOrCreateWorkItem copy_or_create_work_item( + GetAbsolutePath(file), dst, Temperature::kUnknown /* src_temp */, + file_info->temp, "" /* contents */, backup_env_, db_env_, + EnvOptions() /* src_env_options */, options_.sync, + options_.restore_rate_limiter.get(), file_info->size, + nullptr /* stats */); + RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( + copy_or_create_work_item.result.get_future(), file, dst, + file_info->checksum_hex); + files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); + restore_items_to_finish.push_back( + std::move(after_copy_or_create_work_item)); + } + IOStatus item_io_status; + for (auto& item : restore_items_to_finish) { + item.result.wait(); + auto result = item.result.get(); + item_io_status = result.io_status; + // Note: It is possible that both of the following bad-status cases occur + // during copying. But, we only return one status. + if (!item_io_status.ok()) { + io_s = item_io_status; + break; + } else if (!item.checksum_hex.empty() && + item.checksum_hex != result.checksum_hex) { + io_s = IOStatus::Corruption( + "While restoring " + item.from_file + " -> " + item.to_file + + ": expected checksum is " + item.checksum_hex + + " while computed checksum is " + result.checksum_hex); + break; + } + } + + // When enabled, the first FsyncWithDirOptions is to ensure all files are + // fully persisted before renaming CURRENT.tmp + if (io_s.ok() && db_dir_for_fsync) { + ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n"); + io_s = db_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + + if (io_s.ok() && wal_dir_for_fsync) { + io_s = wal_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + + if (io_s.ok() && !temporary_current_file.empty()) { + ROCKS_LOG_INFO(options_.info_log, "Restore: atomic rename CURRENT.tmp\n"); + assert(!final_current_file.empty()); + io_s = db_fs_->RenameFile(temporary_current_file, final_current_file, + io_options_, nullptr); + } + + if (io_s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) { + // Second FsyncWithDirOptions is to ensure the final atomic rename of DB + // restore is fully persisted even if power goes out right after restore + // operation returns success + assert(db_dir_for_fsync); + io_s = db_dir_for_fsync->FsyncWithDirOptions( + io_options_, nullptr, DirFsyncOptions(final_current_file)); + } + + ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n", + io_s.ToString().c_str()); + return io_s; +} + +IOStatus BackupEngineImpl::VerifyBackup(BackupID backup_id, + bool verify_with_checksum) const { + assert(initialized_); + // Check if backup_id is corrupted, or valid and registered + auto corrupt_itr = corrupt_backups_.find(backup_id); + if (corrupt_itr != corrupt_backups_.end()) { + return corrupt_itr->second.first; + } + + auto backup_itr = backups_.find(backup_id); + if (backup_itr == backups_.end()) { + return IOStatus::NotFound(); + } + + auto& backup = backup_itr->second; + if (backup->Empty()) { + return IOStatus::NotFound(); + } + + ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id); + + // Find all existing backup files belong to backup_id + std::unordered_map<std::string, uint64_t> curr_abs_path_to_size; + for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(), + GetSharedFileWithChecksumRel()}) { + const auto abs_dir = GetAbsolutePath(rel_dir); + // Shared directories allowed to be missing in some cases. Expected but + // missing files will be reported a few lines down. + ReadChildFileCurrentSizes(abs_dir, backup_fs_, &curr_abs_path_to_size) + .PermitUncheckedError(); + } + + // For all files registered in backup + for (const auto& file_info : backup->GetFiles()) { + const auto abs_path = GetAbsolutePath(file_info->filename); + // check existence of the file + if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) { + return IOStatus::NotFound("File missing: " + abs_path); + } + // verify file size + if (file_info->size != curr_abs_path_to_size[abs_path]) { + std::string size_info("Expected file size is " + + std::to_string(file_info->size) + + " while found file size is " + + std::to_string(curr_abs_path_to_size[abs_path])); + return IOStatus::Corruption("File corrupted: File size mismatch for " + + abs_path + ": " + size_info); + } + if (verify_with_checksum && !file_info->checksum_hex.empty()) { + // verify file checksum + std::string checksum_hex; + ROCKS_LOG_INFO(options_.info_log, "Verifying %s checksum...\n", + abs_path.c_str()); + IOStatus io_s = ReadFileAndComputeChecksum( + abs_path, backup_fs_, EnvOptions(), 0 /* size_limit */, &checksum_hex, + Temperature::kUnknown); + if (!io_s.ok()) { + return io_s; + } else if (file_info->checksum_hex != checksum_hex) { + std::string checksum_info( + "Expected checksum is " + file_info->checksum_hex + + " while computed checksum is " + checksum_hex); + return IOStatus::Corruption("File corrupted: Checksum mismatch for " + + abs_path + ": " + checksum_info); + } + } + } + return IOStatus::OK(); +} + +IOStatus BackupEngineImpl::CopyOrCreateFile( + const std::string& src, const std::string& dst, const std::string& contents, + uint64_t size_limit, Env* src_env, Env* dst_env, + const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter, + std::function<void()> progress_callback, Temperature* src_temperature, + Temperature dst_temperature, uint64_t* bytes_toward_next_callback, + uint64_t* size, std::string* checksum_hex) { + assert(src.empty() != contents.empty()); + IOStatus io_s; + std::unique_ptr<FSWritableFile> dst_file; + std::unique_ptr<FSSequentialFile> src_file; + FileOptions dst_file_options; + dst_file_options.use_mmap_writes = false; + dst_file_options.temperature = dst_temperature; + // TODO:(gzh) maybe use direct reads/writes here if possible + if (size != nullptr) { + *size = 0; + } + uint32_t checksum_value = 0; + + // Check if size limit is set. if not, set it to very big number + if (size_limit == 0) { + size_limit = std::numeric_limits<uint64_t>::max(); + } + + io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options, + &dst_file, nullptr); + if (io_s.ok() && !src.empty()) { + auto src_file_options = FileOptions(src_env_options); + src_file_options.temperature = *src_temperature; + io_s = src_env->GetFileSystem()->NewSequentialFile(src, src_file_options, + &src_file, nullptr); + } + if (io_s.IsPathNotFound() && *src_temperature != Temperature::kUnknown) { + // Retry without temperature hint in case the FileSystem is strict with + // non-kUnknown temperature option + io_s = src_env->GetFileSystem()->NewSequentialFile( + src, FileOptions(src_env_options), &src_file, nullptr); + } + if (!io_s.ok()) { + return io_s; + } + + size_t buf_size = + rate_limiter ? static_cast<size_t>(rate_limiter->GetSingleBurstBytes()) + : kDefaultCopyFileBufferSize; + + std::unique_ptr<WritableFileWriter> dest_writer( + new WritableFileWriter(std::move(dst_file), dst, dst_file_options)); + std::unique_ptr<SequentialFileReader> src_reader; + std::unique_ptr<char[]> buf; + if (!src.empty()) { + // Return back current temperature in FileSystem + *src_temperature = src_file->GetTemperature(); + + src_reader.reset(new SequentialFileReader( + std::move(src_file), src, nullptr /* io_tracer */, {}, rate_limiter)); + buf.reset(new char[buf_size]); + } + + Slice data; + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return status_to_io_status(Status::Incomplete("Backup stopped")); + } + if (!src.empty()) { + size_t buffer_to_read = + (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit); + io_s = src_reader->Read(buffer_to_read, &data, buf.get(), + Env::IO_LOW /* rate_limiter_priority */); + *bytes_toward_next_callback += data.size(); + } else { + data = contents; + } + size_limit -= data.size(); + TEST_SYNC_POINT_CALLBACK( + "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup", + (src.length() > 4 && src.rfind(".sst") == src.length() - 4) ? &data + : nullptr); + + if (!io_s.ok()) { + return io_s; + } + + if (size != nullptr) { + *size += data.size(); + } + if (checksum_hex != nullptr) { + checksum_value = crc32c::Extend(checksum_value, data.data(), data.size()); + } + io_s = dest_writer->Append(data); + + if (rate_limiter != nullptr) { + if (!src.empty()) { + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kWrite); + } else { + LoopRateLimitRequestHelper(data.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kWrite); + } + } + while (*bytes_toward_next_callback >= + options_.callback_trigger_interval_size) { + *bytes_toward_next_callback -= options_.callback_trigger_interval_size; + std::lock_guard<std::mutex> lock(byte_report_mutex_); + progress_callback(); + } + } while (io_s.ok() && contents.empty() && data.size() > 0 && size_limit > 0); + + // Convert uint32_t checksum to hex checksum + if (checksum_hex != nullptr) { + checksum_hex->assign(ChecksumInt32ToHex(checksum_value)); + } + + if (io_s.ok() && sync) { + io_s = dest_writer->Sync(false); + } + if (io_s.ok()) { + io_s = dest_writer->Close(); + } + return io_s; +} + +// fname will always start with "/" +IOStatus BackupEngineImpl::AddBackupFileWorkItem( + std::unordered_set<std::string>& live_dst_paths, + std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish, + BackupID backup_id, bool shared, const std::string& src_dir, + const std::string& fname, const EnvOptions& src_env_options, + RateLimiter* rate_limiter, FileType file_type, uint64_t size_bytes, + Statistics* stats, uint64_t size_limit, bool shared_checksum, + std::function<void()> progress_callback, const std::string& contents, + const std::string& src_checksum_func_name, + const std::string& src_checksum_str, const Temperature src_temperature) { + assert(contents.empty() != src_dir.empty()); + + std::string src_path = src_dir + "/" + fname; + std::string dst_relative; + std::string dst_relative_tmp; + std::string db_id; + std::string db_session_id; + // crc32c checksum in hex. empty == unavailable / unknown + std::string checksum_hex; + + // Whenever a default checksum function name is passed in, we will compares + // the corresponding checksum values after copying. Note that only table and + // blob files may have a known checksum function name passed in. + // + // If no default checksum function name is passed in and db session id is not + // available, we will calculate the checksum *before* copying in two cases + // (we always calcuate checksums when copying or creating for any file types): + // a) share_files_with_checksum is true and file type is table; + // b) share_table_files is true and the file exists already. + // + // Step 0: Check if default checksum function name is passed in + if (kDbFileChecksumFuncName == src_checksum_func_name) { + if (src_checksum_str == kUnknownFileChecksum) { + return status_to_io_status( + Status::Aborted("Unknown checksum value for " + fname)); + } + checksum_hex = ChecksumStrToHex(src_checksum_str); + } + + // Step 1: Prepare the relative path to destination + if (shared && shared_checksum) { + if (GetNamingNoFlags() != BackupEngineOptions::kLegacyCrc32cAndFileSize && + file_type != kBlobFile) { + // Prepare db_session_id to add to the file name + // Ignore the returned status + // In the failed cases, db_id and db_session_id will be empty + GetFileDbIdentities(db_env_, src_env_options, src_path, src_temperature, + rate_limiter, &db_id, &db_session_id) + .PermitUncheckedError(); + } + // Calculate checksum if checksum and db session id are not available. + // If db session id is available, we will not calculate the checksum + // since the session id should suffice to avoid file name collision in + // the shared_checksum directory. + if (checksum_hex.empty() && db_session_id.empty()) { + IOStatus io_s = ReadFileAndComputeChecksum( + src_path, db_fs_, src_env_options, size_limit, &checksum_hex, + src_temperature); + if (!io_s.ok()) { + return io_s; + } + } + if (size_bytes == std::numeric_limits<uint64_t>::max()) { + return IOStatus::NotFound("File missing: " + src_path); + } + // dst_relative depends on the following conditions: + // 1) the naming scheme is kUseDbSessionId, + // 2) db_session_id is not empty, + // 3) checksum is available in the DB manifest. + // If 1,2,3) are satisfied, then dst_relative will be of the form: + // shared_checksum/<file_number>_<checksum>_<db_session_id>.sst + // If 1,2) are satisfied, then dst_relative will be of the form: + // shared_checksum/<file_number>_<db_session_id>.sst + // Otherwise, dst_relative is of the form + // shared_checksum/<file_number>_<checksum>_<size>.sst + // + // For blob files, db_session_id is not supported with the blob file format. + // It uses original/legacy naming scheme. + // dst_relative will be of the form: + // shared_checksum/<file_number>_<checksum>_<size>.blob + dst_relative = GetSharedFileWithChecksum(fname, checksum_hex, size_bytes, + db_session_id); + dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true); + dst_relative = GetSharedFileWithChecksumRel(dst_relative, false); + } else if (shared) { + dst_relative_tmp = GetSharedFileRel(fname, true); + dst_relative = GetSharedFileRel(fname, false); + } else { + dst_relative = GetPrivateFileRel(backup_id, false, fname); + } + + // We copy into `temp_dest_path` and, once finished, rename it to + // `final_dest_path`. This allows files to atomically appear at + // `final_dest_path`. We can copy directly to the final path when atomicity + // is unnecessary, like for files in private backup directories. + const std::string* copy_dest_path; + std::string temp_dest_path; + std::string final_dest_path = GetAbsolutePath(dst_relative); + if (!dst_relative_tmp.empty()) { + temp_dest_path = GetAbsolutePath(dst_relative_tmp); + copy_dest_path = &temp_dest_path; + } else { + copy_dest_path = &final_dest_path; + } + + // Step 2: Determine whether to copy or not + // if it's shared, we also need to check if it exists -- if it does, no need + // to copy it again. + bool need_to_copy = true; + // true if final_dest_path is the same path as another live file + const bool same_path = + live_dst_paths.find(final_dest_path) != live_dst_paths.end(); + + bool file_exists = false; + if (shared && !same_path) { + // Should be in shared directory but not a live path, check existence in + // shared directory + IOStatus exist = + backup_fs_->FileExists(final_dest_path, io_options_, nullptr); + if (exist.ok()) { + file_exists = true; + } else if (exist.IsNotFound()) { + file_exists = false; + } else { + return exist; + } + } + + if (!contents.empty()) { + need_to_copy = false; + } else if (shared && (same_path || file_exists)) { + need_to_copy = false; + auto find_result = backuped_file_infos_.find(dst_relative); + if (find_result == backuped_file_infos_.end() && !same_path) { + // file exists but not referenced + ROCKS_LOG_INFO( + options_.info_log, + "%s already present, but not referenced by any backup. We will " + "overwrite the file.", + fname.c_str()); + need_to_copy = true; + // Defer any failure reporting to when we try to write the file + backup_fs_->DeleteFile(final_dest_path, io_options_, nullptr) + .PermitUncheckedError(); + } else { + // file exists and referenced + if (checksum_hex.empty()) { + // same_path should not happen for a standard DB, so OK to + // read file contents to check for checksum mismatch between + // two files from same DB getting same name. + // For compatibility with future meta file that might not have + // crc32c checksum available, consider it might be empty, but + // we don't currently generate meta file without crc32c checksum. + // Therefore we have to read & compute it if we don't have it. + if (!same_path && !find_result->second->checksum_hex.empty()) { + assert(find_result != backuped_file_infos_.end()); + // Note: to save I/O on incremental backups, we copy prior known + // checksum of the file instead of reading entire file contents + // to recompute it. + checksum_hex = find_result->second->checksum_hex; + // Regarding corruption detection, consider: + // (a) the DB file is corrupt (since previous backup) and the backup + // file is OK: we failed to detect, but the backup is safe. DB can + // be repaired/restored once its corruption is detected. + // (b) the backup file is corrupt (since previous backup) and the + // db file is OK: we failed to detect, but the backup is corrupt. + // CreateNewBackup should support fast incremental backups and + // there's no way to support that without reading all the files. + // We might add an option for extra checks on incremental backup, + // but until then, use VerifyBackups to check existing backup data. + // (c) file name collision with legitimately different content. + // This is almost inconceivable with a well-generated DB session + // ID, but even in that case, we double check the file sizes in + // BackupMeta::AddFile. + } else { + IOStatus io_s = ReadFileAndComputeChecksum( + src_path, db_fs_, src_env_options, size_limit, &checksum_hex, + src_temperature); + if (!io_s.ok()) { + return io_s; + } + } + } + if (!db_session_id.empty()) { + ROCKS_LOG_INFO(options_.info_log, + "%s already present, with checksum %s, size %" PRIu64 + " and DB session identity %s", + fname.c_str(), checksum_hex.c_str(), size_bytes, + db_session_id.c_str()); + } else { + ROCKS_LOG_INFO(options_.info_log, + "%s already present, with checksum %s and size %" PRIu64, + fname.c_str(), checksum_hex.c_str(), size_bytes); + } + } + } + live_dst_paths.insert(final_dest_path); + + // Step 3: Add work item + if (!contents.empty() || need_to_copy) { + ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(), + copy_dest_path->c_str()); + CopyOrCreateWorkItem copy_or_create_work_item( + src_dir.empty() ? "" : src_path, *copy_dest_path, src_temperature, + Temperature::kUnknown /*dst_temp*/, contents, db_env_, backup_env_, + src_env_options, options_.sync, rate_limiter, size_limit, stats, + progress_callback, src_checksum_func_name, checksum_hex, db_id, + db_session_id); + BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( + copy_or_create_work_item.result.get_future(), shared, need_to_copy, + backup_env_, temp_dest_path, final_dest_path, dst_relative); + files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); + backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item)); + } else { + std::promise<CopyOrCreateResult> promise_result; + BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( + promise_result.get_future(), shared, need_to_copy, backup_env_, + temp_dest_path, final_dest_path, dst_relative); + backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item)); + CopyOrCreateResult result; + result.io_status = IOStatus::OK(); + result.size = size_bytes; + result.checksum_hex = std::move(checksum_hex); + result.db_id = std::move(db_id); + result.db_session_id = std::move(db_session_id); + promise_result.set_value(std::move(result)); + } + return IOStatus::OK(); +} + +IOStatus BackupEngineImpl::ReadFileAndComputeChecksum( + const std::string& src, const std::shared_ptr<FileSystem>& src_fs, + const EnvOptions& src_env_options, uint64_t size_limit, + std::string* checksum_hex, const Temperature src_temperature) const { + if (checksum_hex == nullptr) { + return status_to_io_status(Status::Aborted("Checksum pointer is null")); + } + uint32_t checksum_value = 0; + if (size_limit == 0) { + size_limit = std::numeric_limits<uint64_t>::max(); + } + + std::unique_ptr<SequentialFileReader> src_reader; + auto file_options = FileOptions(src_env_options); + file_options.temperature = src_temperature; + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); + IOStatus io_s = SequentialFileReader::Create( + src_fs, src, file_options, &src_reader, nullptr /* dbg */, rate_limiter); + if (io_s.IsPathNotFound() && src_temperature != Temperature::kUnknown) { + // Retry without temperature hint in case the FileSystem is strict with + // non-kUnknown temperature option + file_options.temperature = Temperature::kUnknown; + io_s = SequentialFileReader::Create(src_fs, src, file_options, &src_reader, + nullptr /* dbg */, rate_limiter); + } + if (!io_s.ok()) { + return io_s; + } + + size_t buf_size = kDefaultCopyFileBufferSize; + std::unique_ptr<char[]> buf(new char[buf_size]); + Slice data; + + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return status_to_io_status(Status::Incomplete("Backup stopped")); + } + size_t buffer_to_read = + (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit); + io_s = src_reader->Read(buffer_to_read, &data, buf.get(), + Env::IO_LOW /* rate_limiter_priority */); + if (!io_s.ok()) { + return io_s; + } + + size_limit -= data.size(); + checksum_value = crc32c::Extend(checksum_value, data.data(), data.size()); + } while (data.size() > 0 && size_limit > 0); + + checksum_hex->assign(ChecksumInt32ToHex(checksum_value)); + + return io_s; +} + +Status BackupEngineImpl::GetFileDbIdentities( + Env* src_env, const EnvOptions& src_env_options, + const std::string& file_path, Temperature file_temp, + RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id) { + assert(db_id != nullptr || db_session_id != nullptr); + + Options options; + options.env = src_env; + SstFileDumper sst_reader(options, file_path, file_temp, + 2 * 1024 * 1024 + /* readahead_size */, + false /* verify_checksum */, false /* output_hex */, + false /* decode_blob_index */, src_env_options, + true /* silent */); + + const TableProperties* table_properties = nullptr; + std::shared_ptr<const TableProperties> tp; + Status s = sst_reader.getStatus(); + + if (s.ok()) { + // Try to get table properties from the table reader of sst_reader + if (!sst_reader.ReadTableProperties(&tp).ok()) { + // Try to use table properites from the initialization of sst_reader + table_properties = sst_reader.GetInitTableProperties(); + } else { + table_properties = tp.get(); + if (table_properties != nullptr && rate_limiter != nullptr) { + // sizeof(*table_properties) is a sufficent but far-from-exact + // approximation of read bytes due to metaindex block, std::string + // properties and varint compression + LoopRateLimitRequestHelper(sizeof(*table_properties), rate_limiter, + Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } + } + } else { + ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s", + file_path.c_str(), s.ToString().c_str()); + return s; + } + + if (table_properties != nullptr) { + if (db_id != nullptr) { + db_id->assign(table_properties->db_id); + } + if (db_session_id != nullptr) { + db_session_id->assign(table_properties->db_session_id); + if (db_session_id->empty()) { + s = Status::NotFound("DB session identity not found in " + file_path); + ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str()); + return s; + } + } + return Status::OK(); + } else { + s = Status::Corruption("Table properties missing in " + file_path); + ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str()); + return s; + } +} + +void BackupEngineImpl::LoopRateLimitRequestHelper( + const size_t total_bytes_to_request, RateLimiter* rate_limiter, + const Env::IOPriority pri, Statistics* stats, + const RateLimiter::OpType op_type) { + assert(rate_limiter != nullptr); + size_t remaining_bytes = total_bytes_to_request; + size_t request_bytes = 0; + while (remaining_bytes > 0) { + request_bytes = + std::min(static_cast<size_t>(rate_limiter->GetSingleBurstBytes()), + remaining_bytes); + rate_limiter->Request(request_bytes, pri, stats, op_type); + remaining_bytes -= request_bytes; + } +} + +void BackupEngineImpl::DeleteChildren(const std::string& dir, + uint32_t file_type_filter) const { + std::vector<std::string> children; + db_fs_->GetChildren(dir, io_options_, &children, nullptr) + .PermitUncheckedError(); // ignore errors + + for (const auto& f : children) { + uint64_t number; + FileType type; + bool ok = ParseFileName(f, &number, &type); + if (ok && (file_type_filter & (1 << type))) { + // don't delete this file + continue; + } + db_fs_->DeleteFile(dir + "/" + f, io_options_, nullptr) + .PermitUncheckedError(); // ignore errors + } +} + +IOStatus BackupEngineImpl::ReadChildFileCurrentSizes( + const std::string& dir, const std::shared_ptr<FileSystem>& fs, + std::unordered_map<std::string, uint64_t>* result) const { + assert(result != nullptr); + std::vector<Env::FileAttributes> files_attrs; + IOStatus io_status = fs->FileExists(dir, io_options_, nullptr); + if (io_status.ok()) { + io_status = + fs->GetChildrenFileAttributes(dir, io_options_, &files_attrs, nullptr); + } else if (io_status.IsNotFound()) { + // Insert no entries can be considered success + io_status = IOStatus::OK(); + } + const bool slash_needed = dir.empty() || dir.back() != '/'; + for (const auto& file_attrs : files_attrs) { + result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name, + file_attrs.size_bytes); + } + return io_status; +} + +IOStatus BackupEngineImpl::GarbageCollect() { + assert(!read_only_); + + // We will make a best effort to remove all garbage even in the presence + // of inconsistencies or I/O failures that inhibit finding garbage. + IOStatus overall_status = IOStatus::OK(); + // If all goes well, we don't need another auto-GC this session + might_need_garbage_collect_ = false; + + ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection"); + + // delete obsolete shared files + for (bool with_checksum : {false, true}) { + std::vector<std::string> shared_children; + { + std::string shared_path; + if (with_checksum) { + shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel()); + } else { + shared_path = GetAbsolutePath(GetSharedFileRel()); + } + IOStatus io_s = backup_fs_->FileExists(shared_path, io_options_, nullptr); + if (io_s.ok()) { + io_s = backup_fs_->GetChildren(shared_path, io_options_, + &shared_children, nullptr); + } else if (io_s.IsNotFound()) { + io_s = IOStatus::OK(); + } + if (!io_s.ok()) { + overall_status = io_s; + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + for (auto& child : shared_children) { + std::string rel_fname; + if (with_checksum) { + rel_fname = GetSharedFileWithChecksumRel(child); + } else { + rel_fname = GetSharedFileRel(child); + } + auto child_itr = backuped_file_infos_.find(rel_fname); + // if it's not refcounted, delete it + if (child_itr == backuped_file_infos_.end() || + child_itr->second->refs == 0) { + // this might be a directory, but DeleteFile will just fail in that + // case, so we're good + IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(rel_fname), + io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", + rel_fname.c_str(), io_s.ToString().c_str()); + backuped_file_infos_.erase(rel_fname); + if (!io_s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + } + } + + // delete obsolete private files + std::vector<std::string> private_children; + { + IOStatus io_s = + backup_fs_->GetChildren(GetAbsolutePath(kPrivateDirName), io_options_, + &private_children, nullptr); + if (!io_s.ok()) { + overall_status = io_s; + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + for (auto& child : private_children) { + BackupID backup_id = 0; + bool tmp_dir = child.find(".tmp") != std::string::npos; + sscanf(child.c_str(), "%u", &backup_id); + if (!tmp_dir && // if it's tmp_dir, delete it + (backup_id == 0 || backups_.find(backup_id) != backups_.end())) { + // it's either not a number or it's still alive. continue + continue; + } + // here we have to delete the dir and all its children + std::string full_private_path = + GetAbsolutePath(GetPrivateFileRel(backup_id)); + std::vector<std::string> subchildren; + if (backup_fs_ + ->GetChildren(full_private_path, io_options_, &subchildren, nullptr) + .ok()) { + for (auto& subchild : subchildren) { + IOStatus io_s = backup_fs_->DeleteFile(full_private_path + subchild, + io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", + (full_private_path + subchild).c_str(), + io_s.ToString().c_str()); + if (!io_s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + } + // finally delete the private dir + IOStatus io_s = + backup_fs_->DeleteDir(full_private_path, io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s", + full_private_path.c_str(), io_s.ToString().c_str()); + if (!io_s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + + assert(overall_status.ok() || might_need_garbage_collect_); + return overall_status; +} + +// ------- BackupMeta class -------- + +IOStatus BackupEngineImpl::BackupMeta::AddFile( + std::shared_ptr<FileInfo> file_info) { + auto itr = file_infos_->find(file_info->filename); + if (itr == file_infos_->end()) { + auto ret = file_infos_->insert({file_info->filename, file_info}); + if (ret.second) { + itr = ret.first; + itr->second->refs = 1; + } else { + // if this happens, something is seriously wrong + return IOStatus::Corruption("In memory metadata insertion error"); + } + } else { + // Compare sizes, because we scanned that off the filesystem on both + // ends. This is like a check in VerifyBackup. + if (itr->second->size != file_info->size) { + std::string msg = "Size mismatch for existing backup file: "; + msg.append(file_info->filename); + msg.append(" Size in backup is " + std::to_string(itr->second->size) + + " while size in DB is " + std::to_string(file_info->size)); + msg.append( + " If this DB file checks as not corrupt, try deleting old" + " backups or backing up to a different backup directory."); + return IOStatus::Corruption(msg); + } + if (file_info->checksum_hex.empty()) { + // No checksum available to check + } else if (itr->second->checksum_hex.empty()) { + // Remember checksum if newly acquired + itr->second->checksum_hex = file_info->checksum_hex; + } else if (itr->second->checksum_hex != file_info->checksum_hex) { + // Note: to save I/O, these will be equal trivially on already backed + // up files that don't have the checksum in their name. And it should + // never fail for files that do have checksum in their name. + + // Should never reach here, but produce an appropriate corruption + // message in case we do in a release build. + assert(false); + std::string msg = "Checksum mismatch for existing backup file: "; + msg.append(file_info->filename); + msg.append(" Expected checksum is " + itr->second->checksum_hex + + " while computed checksum is " + file_info->checksum_hex); + msg.append( + " If this DB file checks as not corrupt, try deleting old" + " backups or backing up to a different backup directory."); + return IOStatus::Corruption(msg); + } + ++itr->second->refs; // increase refcount if already present + } + + size_ += file_info->size; + files_.push_back(itr->second); + + return IOStatus::OK(); +} + +IOStatus BackupEngineImpl::BackupMeta::Delete(bool delete_meta) { + IOStatus io_s; + for (const auto& file : files_) { + --file->refs; // decrease refcount + } + files_.clear(); + // delete meta file + if (delete_meta) { + io_s = fs_->FileExists(meta_filename_, iooptions_, nullptr); + if (io_s.ok()) { + io_s = fs_->DeleteFile(meta_filename_, iooptions_, nullptr); + } else if (io_s.IsNotFound()) { + io_s = IOStatus::OK(); // nothing to delete + } + } + timestamp_ = 0; + return io_s; +} + +// Constants for backup meta file schema (see LoadFromFile) +const std::string kSchemaVersionPrefix{"schema_version "}; +const std::string kFooterMarker{"// FOOTER"}; + +const std::string kAppMetaDataFieldName{"metadata"}; + +// WART: The checksums are crc32c but named "crc32" +const std::string kFileCrc32cFieldName{"crc32"}; +const std::string kFileSizeFieldName{"size"}; +const std::string kTemperatureFieldName{"temp"}; + +// Marks a (future) field that should cause failure if not recognized. +// Other fields are assumed to be ignorable. For example, in the future +// we might add +// ni::file_name_escape uri_percent +// to indicate all file names have had spaces and special characters +// escaped using a URI percent encoding. +const std::string kNonIgnorableFieldPrefix{"ni::"}; + +// Each backup meta file is of the format (schema version 1): +//---------------------------------------------------------- +// <timestamp> +// <seq number> +// metadata <metadata> (optional) +// <number of files> +// <file1> crc32 <crc32c_as_unsigned_decimal> +// <file2> crc32 <crc32c_as_unsigned_decimal> +// ... +//---------------------------------------------------------- +// +// For schema version 2.x (not in public APIs, but +// forward-compatibility started): +//---------------------------------------------------------- +// schema_version <ver> +// <timestamp> +// <seq number> +// [<field name> <field data>] +// ... +// <number of files> +// <file1>( <field name> <field data no spaces>)* +// <file2>( <field name> <field data no spaces>)* +// ... +// [// FOOTER] +// [<field name> <field data>] +// ... +//---------------------------------------------------------- +// where +// <ver> ::= [0-9]+([.][0-9]+) +// <field name> ::= [A-Za-z_][A-Za-z_0-9.]+ +// <field data> is anything but newline +// <field data no spaces> is anything but space and newline +// Although "// FOOTER" wouldn't strictly be required as a delimiter +// given the number of files is included, it is there for parsing +// sanity in case of corruption. It is only required if followed +// by footer fields, such as a checksum of the meta file (so far). +// Unrecognized fields are ignored, to support schema evolution on +// non-critical features with forward compatibility. Update schema +// major version for breaking changes. Schema minor versions are indicated +// only for diagnostic/debugging purposes. +// +// Fields in schema version 2.0: +// * Top-level meta fields: +// * Only "metadata" as in schema version 1 +// * File meta fields: +// * "crc32" - a crc32c checksum as in schema version 1 +// * "size" - the size of the file (new) +// * Footer meta fields: +// * None yet (future use for meta file checksum anticipated) +// +IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( + const std::string& backup_dir, + const std::unordered_map<std::string, uint64_t>& abs_path_to_size, + RateLimiter* rate_limiter, Logger* info_log, + std::unordered_set<std::string>* reported_ignored_fields) { + assert(reported_ignored_fields); + assert(Empty()); + + std::unique_ptr<LineFileReader> backup_meta_reader; + { + IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(), + &backup_meta_reader, + nullptr /* dbg */, rate_limiter); + if (!io_s.ok()) { + return io_s; + } + } + + // If we don't read an explicit schema_version, that implies version 1, + // which is what we call the original backup meta schema. + int schema_major_version = 1; + + // Failures handled at the end + std::string line; + if (backup_meta_reader->ReadLine(&line, + Env::IO_LOW /* rate_limiter_priority */)) { + if (StartsWith(line, kSchemaVersionPrefix)) { + std::string ver = line.substr(kSchemaVersionPrefix.size()); + if (ver == "2" || StartsWith(ver, "2.")) { + schema_major_version = 2; + } else { + return IOStatus::NotSupported( + "Unsupported/unrecognized schema version: " + ver); + } + line.clear(); + } else if (line.empty()) { + return IOStatus::Corruption("Unexpected empty line"); + } + } + if (!line.empty()) { + timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); + } else if (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { + timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); + } + if (backup_meta_reader->ReadLine(&line, + Env::IO_LOW /* rate_limiter_priority */)) { + sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); + } + uint32_t num_files = UINT32_MAX; + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { + if (line.empty()) { + return IOStatus::Corruption("Unexpected empty line"); + } + // Number -> number of files -> exit loop reading optional meta fields + if (line[0] >= '0' && line[0] <= '9') { + num_files = static_cast<uint32_t>(strtoul(line.c_str(), nullptr, 10)); + break; + } + // else, must be a meta field assignment + auto space_pos = line.find_first_of(' '); + if (space_pos == std::string::npos) { + return IOStatus::Corruption("Expected number of files or meta field"); + } + std::string field_name = line.substr(0, space_pos); + std::string field_data = line.substr(space_pos + 1); + if (field_name == kAppMetaDataFieldName) { + // app metadata present + bool decode_success = Slice(field_data).DecodeHex(&app_metadata_); + if (!decode_success) { + return IOStatus::Corruption( + "Failed to decode stored hex encoded app metadata"); + } + } else if (schema_major_version < 2) { + return IOStatus::Corruption("Expected number of files or \"" + + kAppMetaDataFieldName + "\" field"); + } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) { + return IOStatus::NotSupported("Unrecognized non-ignorable meta field " + + field_name + " (from future version?)"); + } else { + // Warn the first time we see any particular unrecognized meta field + if (reported_ignored_fields->insert("meta:" + field_name).second) { + ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup meta field %s", + field_name.c_str()); + } + } + } + std::vector<std::shared_ptr<FileInfo>> files; + bool footer_present = false; + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { + std::vector<std::string> components = StringSplit(line, ' '); + + if (components.size() < 1) { + return IOStatus::Corruption("Empty line instead of file entry."); + } + if (schema_major_version >= 2 && components.size() == 2 && + line == kFooterMarker) { + footer_present = true; + break; + } + + const std::string& filename = components[0]; + + uint64_t actual_size; + const std::shared_ptr<FileInfo> file_info = GetFile(filename); + if (file_info) { + actual_size = file_info->size; + } else { + std::string abs_path = backup_dir + "/" + filename; + auto e = abs_path_to_size.find(abs_path); + if (e == abs_path_to_size.end()) { + return IOStatus::Corruption( + "Pathname in meta file not found on disk: " + abs_path); + } + actual_size = e->second; + } + + if (schema_major_version >= 2) { + if (components.size() % 2 != 1) { + return IOStatus::Corruption( + "Bad number of line components for file entry."); + } + } else { + // Check restricted original schema + if (components.size() < 3) { + return IOStatus::Corruption("File checksum is missing for " + filename + + " in " + meta_filename_); + } + if (components[1] != kFileCrc32cFieldName) { + return IOStatus::Corruption("Unknown checksum type for " + filename + + " in " + meta_filename_); + } + if (components.size() > 3) { + return IOStatus::Corruption("Extra data for entry " + filename + + " in " + meta_filename_); + } + } + + std::string checksum_hex; + Temperature temp = Temperature::kUnknown; + for (unsigned i = 1; i < components.size(); i += 2) { + const std::string& field_name = components[i]; + const std::string& field_data = components[i + 1]; + + if (field_name == kFileCrc32cFieldName) { + uint32_t checksum_value = + static_cast<uint32_t>(strtoul(field_data.c_str(), nullptr, 10)); + if (field_data != std::to_string(checksum_value)) { + return IOStatus::Corruption("Invalid checksum value for " + filename + + " in " + meta_filename_); + } + checksum_hex = ChecksumInt32ToHex(checksum_value); + } else if (field_name == kFileSizeFieldName) { + uint64_t ex_size = + std::strtoull(field_data.c_str(), nullptr, /*base*/ 10); + if (ex_size != actual_size) { + return IOStatus::Corruption( + "For file " + filename + " expected size " + + std::to_string(ex_size) + " but found size" + + std::to_string(actual_size)); + } + } else if (field_name == kTemperatureFieldName) { + auto iter = temperature_string_map.find(field_data); + if (iter != temperature_string_map.end()) { + temp = iter->second; + } else { + // Could report corruption, but in case of new temperatures added + // in future, letting those map to kUnknown which should generally + // be safe. + temp = Temperature::kUnknown; + } + } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) { + return IOStatus::NotSupported("Unrecognized non-ignorable file field " + + field_name + " (from future version?)"); + } else { + // Warn the first time we see any particular unrecognized file field + if (reported_ignored_fields->insert("file:" + field_name).second) { + ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup file field %s", + field_name.c_str()); + } + } + } + + files.emplace_back(new FileInfo(filename, actual_size, checksum_hex, + /*id*/ "", /*sid*/ "", temp)); + } + + if (footer_present) { + assert(schema_major_version >= 2); + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { + if (line.empty()) { + return IOStatus::Corruption("Unexpected empty line"); + } + auto space_pos = line.find_first_of(' '); + if (space_pos == std::string::npos) { + return IOStatus::Corruption("Expected footer field"); + } + std::string field_name = line.substr(0, space_pos); + std::string field_data = line.substr(space_pos + 1); + if (StartsWith(field_name, kNonIgnorableFieldPrefix)) { + return IOStatus::NotSupported("Unrecognized non-ignorable field " + + field_name + " (from future version?)"); + } else if (reported_ignored_fields->insert("footer:" + field_name) + .second) { + // Warn the first time we see any particular unrecognized footer field + ROCKS_LOG_WARN(info_log, + "Ignoring unrecognized backup meta footer field %s", + field_name.c_str()); + } + } + } + + { + IOStatus io_s = backup_meta_reader->GetStatus(); + if (!io_s.ok()) { + return io_s; + } + } + + if (num_files != files.size()) { + return IOStatus::Corruption( + "Inconsistent number of files or missing/incomplete header in " + + meta_filename_); + } + + files_.reserve(files.size()); + for (const auto& file_info : files) { + IOStatus io_s = AddFile(file_info); + if (!io_s.ok()) { + return io_s; + } + } + + return IOStatus::OK(); +} + +const std::vector<std::string> minor_version_strings{ + "", // invalid major version 0 + "", // implicit major version 1 + "2.0", +}; + +IOStatus BackupEngineImpl::BackupMeta::StoreToFile( + bool sync, int schema_version, + const TEST_BackupMetaSchemaOptions* schema_test_options) { + if (schema_version < 1) { + return IOStatus::InvalidArgument( + "BackupEngineOptions::schema_version must be >= 1"); + } + if (schema_version > static_cast<int>(minor_version_strings.size() - 1)) { + return IOStatus::NotSupported( + "Only BackupEngineOptions::schema_version <= " + + std::to_string(minor_version_strings.size() - 1) + " is supported"); + } + std::string ver = minor_version_strings[schema_version]; + + // Need schema_version >= 2 for TEST_BackupMetaSchemaOptions + assert(schema_version >= 2 || schema_test_options == nullptr); + + IOStatus io_s; + std::unique_ptr<FSWritableFile> backup_meta_file; + FileOptions file_options; + file_options.use_mmap_writes = false; + file_options.use_direct_writes = false; + io_s = fs_->NewWritableFile(meta_tmp_filename_, file_options, + &backup_meta_file, nullptr); + if (!io_s.ok()) { + return io_s; + } + + std::ostringstream buf; + if (schema_test_options) { + // override for testing + ver = schema_test_options->version; + } + if (!ver.empty()) { + assert(schema_version >= 2); + buf << kSchemaVersionPrefix << ver << "\n"; + } + buf << static_cast<unsigned long long>(timestamp_) << "\n"; + buf << sequence_number_ << "\n"; + + if (!app_metadata_.empty()) { + std::string hex_encoded_metadata = + Slice(app_metadata_).ToString(/* hex */ true); + buf << kAppMetaDataFieldName << " " << hex_encoded_metadata << "\n"; + } + if (schema_test_options) { + for (auto& e : schema_test_options->meta_fields) { + buf << e.first << " " << e.second << "\n"; + } + } + buf << files_.size() << "\n"; + + for (const auto& file : files_) { + buf << file->filename; + if (schema_test_options == nullptr || + schema_test_options->crc32c_checksums) { + // use crc32c for now, switch to something else if needed + buf << " " << kFileCrc32cFieldName << " " + << ChecksumHexToInt32(file->checksum_hex); + } + if (schema_version >= 2 && file->temp != Temperature::kUnknown) { + buf << " " << kTemperatureFieldName << " " + << temperature_to_string[file->temp]; + } + if (schema_test_options && schema_test_options->file_sizes) { + buf << " " << kFileSizeFieldName << " " << std::to_string(file->size); + } + if (schema_test_options) { + for (auto& e : schema_test_options->file_fields) { + buf << " " << e.first << " " << e.second; + } + } + buf << "\n"; + } + + if (schema_test_options && !schema_test_options->footer_fields.empty()) { + buf << kFooterMarker << "\n"; + for (auto& e : schema_test_options->footer_fields) { + buf << e.first << " " << e.second << "\n"; + } + } + + io_s = backup_meta_file->Append(Slice(buf.str()), iooptions_, nullptr); + IOSTATS_ADD(bytes_written, buf.str().size()); + if (io_s.ok() && sync) { + io_s = backup_meta_file->Sync(iooptions_, nullptr); + } + if (io_s.ok()) { + io_s = backup_meta_file->Close(iooptions_, nullptr); + } + if (io_s.ok()) { + io_s = fs_->RenameFile(meta_tmp_filename_, meta_filename_, iooptions_, + nullptr); + } + return io_s; +} +} // namespace + +IOStatus BackupEngineReadOnly::Open(const BackupEngineOptions& options, + Env* env, + BackupEngineReadOnly** backup_engine_ptr) { + if (options.destroy_old_data) { + return IOStatus::InvalidArgument( + "Can't destroy old data with ReadOnly BackupEngine"); + } + std::unique_ptr<BackupEngineImplThreadSafe> backup_engine( + new BackupEngineImplThreadSafe(options, env, true /*read_only*/)); + auto s = backup_engine->Initialize(); + if (!s.ok()) { + *backup_engine_ptr = nullptr; + return s; + } + *backup_engine_ptr = backup_engine.release(); + return IOStatus::OK(); +} + +void TEST_SetBackupMetaSchemaOptions( + BackupEngine* engine, const TEST_BackupMetaSchemaOptions& options) { + BackupEngineImplThreadSafe* impl = + static_cast_with_check<BackupEngineImplThreadSafe>(engine); + impl->TEST_SetBackupMetaSchemaOptions(options); +} + +void TEST_SetDefaultRateLimitersClock( + BackupEngine* engine, + const std::shared_ptr<SystemClock>& backup_rate_limiter_clock, + const std::shared_ptr<SystemClock>& restore_rate_limiter_clock) { + BackupEngineImplThreadSafe* impl = + static_cast_with_check<BackupEngineImplThreadSafe>(engine); + impl->TEST_SetDefaultRateLimitersClock(backup_rate_limiter_clock, + restore_rate_limiter_clock); +} +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/backup/backup_engine_impl.h b/src/rocksdb/utilities/backup/backup_engine_impl.h new file mode 100644 index 000000000..398f47f27 --- /dev/null +++ b/src/rocksdb/utilities/backup/backup_engine_impl.h @@ -0,0 +1,36 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/backup_engine.h" + +namespace ROCKSDB_NAMESPACE { + +struct TEST_BackupMetaSchemaOptions { + std::string version = "2"; + bool crc32c_checksums = false; + bool file_sizes = true; + std::map<std::string, std::string> meta_fields; + std::map<std::string, std::string> file_fields; + std::map<std::string, std::string> footer_fields; +}; + +// Modifies the BackupEngine(Impl) to write backup meta files using the +// unpublished schema version 2, for the life of this object (not backup_dir). +// TEST_BackupMetaSchemaOptions offers some customization for testing. +void TEST_SetBackupMetaSchemaOptions( + BackupEngine* engine, const TEST_BackupMetaSchemaOptions& options); + +// Modifies the BackupEngine(Impl) to use specified clocks for backup and +// restore rate limiters created by default if not specified by users for +// test speedup. +void TEST_SetDefaultRateLimitersClock( + BackupEngine* engine, + const std::shared_ptr<SystemClock>& backup_rate_limiter_clock = nullptr, + const std::shared_ptr<SystemClock>& restore_rate_limiter_clock = nullptr); +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/backup/backup_engine_test.cc b/src/rocksdb/utilities/backup/backup_engine_test.cc new file mode 100644 index 000000000..d1f74f769 --- /dev/null +++ b/src/rocksdb/utilities/backup/backup_engine_test.cc @@ -0,0 +1,4219 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#if !defined(ROCKSDB_LITE) && !defined(OS_WIN) + +#include "rocksdb/utilities/backup_engine.h" + +#include <algorithm> +#include <array> +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <limits> +#include <memory> +#include <random> +#include <string> +#include <utility> + +#include "db/db_impl/db_impl.h" +#include "db/db_test_util.h" +#include "env/composite_env_wrapper.h" +#include "env/env_chroot.h" +#include "file/filename.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/advanced_options.h" +#include "rocksdb/env.h" +#include "rocksdb/file_checksum.h" +#include "rocksdb/rate_limiter.h" +#include "rocksdb/statistics.h" +#include "rocksdb/transaction_log.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/options_util.h" +#include "rocksdb/utilities/stackable_db.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/cast_util.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/rate_limiter.h" +#include "util/stderr_logger.h" +#include "util/string_util.h" +#include "utilities/backup/backup_engine_impl.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +using ShareFilesNaming = BackupEngineOptions::ShareFilesNaming; +const auto kLegacyCrc32cAndFileSize = + BackupEngineOptions::kLegacyCrc32cAndFileSize; +const auto kUseDbSessionId = BackupEngineOptions::kUseDbSessionId; +const auto kFlagIncludeFileSize = BackupEngineOptions::kFlagIncludeFileSize; +const auto kNamingDefault = kUseDbSessionId | kFlagIncludeFileSize; + +class DummyDB : public StackableDB { + public: + /* implicit */ + DummyDB(const Options& options, const std::string& dbname) + : StackableDB(nullptr), + options_(options), + dbname_(dbname), + deletions_enabled_(true), + sequence_number_(0) {} + + SequenceNumber GetLatestSequenceNumber() const override { + return ++sequence_number_; + } + + const std::string& GetName() const override { return dbname_; } + + Env* GetEnv() const override { return options_.env; } + + using DB::GetOptions; + Options GetOptions(ColumnFamilyHandle* /*column_family*/) const override { + return options_; + } + + DBOptions GetDBOptions() const override { return DBOptions(options_); } + + Status EnableFileDeletions(bool /*force*/) override { + EXPECT_TRUE(!deletions_enabled_); + deletions_enabled_ = true; + return Status::OK(); + } + + Status DisableFileDeletions() override { + EXPECT_TRUE(deletions_enabled_); + deletions_enabled_ = false; + return Status::OK(); + } + + ColumnFamilyHandle* DefaultColumnFamily() const override { return nullptr; } + + Status GetLiveFilesStorageInfo( + const LiveFilesStorageInfoOptions& opts, + std::vector<LiveFileStorageInfo>* files) override { + uint64_t number; + FileType type; + files->clear(); + for (auto& f : live_files_) { + bool success = ParseFileName(f, &number, &type); + if (!success) { + return Status::InvalidArgument("Bad file name: " + f); + } + files->emplace_back(); + LiveFileStorageInfo& info = files->back(); + info.relative_filename = f; + info.directory = dbname_; + info.file_number = number; + info.file_type = type; + if (type == kDescriptorFile) { + info.size = 100; // See TestFs::GetChildrenFileAttributes below + info.trim_to_size = true; + } else if (type == kCurrentFile) { + info.size = 0; + info.trim_to_size = true; + } else { + info.size = 200; // See TestFs::GetChildrenFileAttributes below + } + if (opts.include_checksum_info) { + info.file_checksum = kUnknownFileChecksum; + info.file_checksum_func_name = kUnknownFileChecksumFuncName; + } + } + return Status::OK(); + } + + // To avoid FlushWAL called on stacked db which is nullptr + Status FlushWAL(bool /*sync*/) override { return Status::OK(); } + + std::vector<std::string> live_files_; + + private: + Options options_; + std::string dbname_; + bool deletions_enabled_; + mutable SequenceNumber sequence_number_; +}; // DummyDB + +class TestFs : public FileSystemWrapper { + public: + explicit TestFs(const std::shared_ptr<FileSystem>& t) + : FileSystemWrapper(t) {} + const char* Name() const override { return "TestFs"; } + + class DummySequentialFile : public FSSequentialFile { + public: + explicit DummySequentialFile(bool fail_reads) + : FSSequentialFile(), rnd_(5), fail_reads_(fail_reads) {} + IOStatus Read(size_t n, const IOOptions&, Slice* result, char* scratch, + IODebugContext*) override { + if (fail_reads_) { + return IOStatus::IOError(); + } + size_t read_size = (n > size_left) ? size_left : n; + for (size_t i = 0; i < read_size; ++i) { + scratch[i] = rnd_.Next() & 255; + } + *result = Slice(scratch, read_size); + size_left -= read_size; + return IOStatus::OK(); + } + + IOStatus Skip(uint64_t n) override { + size_left = (n > size_left) ? size_left - n : 0; + return IOStatus::OK(); + } + + private: + size_t size_left = 200; + Random rnd_; + bool fail_reads_; + }; + + IOStatus NewSequentialFile(const std::string& f, const FileOptions& file_opts, + std::unique_ptr<FSSequentialFile>* r, + IODebugContext* dbg) override { + MutexLock l(&mutex_); + if (dummy_sequential_file_) { + r->reset( + new TestFs::DummySequentialFile(dummy_sequential_file_fail_reads_)); + return IOStatus::OK(); + } else { + IOStatus s = FileSystemWrapper::NewSequentialFile(f, file_opts, r, dbg); + if (s.ok()) { + if ((*r)->use_direct_io()) { + ++num_direct_seq_readers_; + } + ++num_seq_readers_; + } + return s; + } + } + + IOStatus NewWritableFile(const std::string& f, const FileOptions& file_opts, + std::unique_ptr<FSWritableFile>* r, + IODebugContext* dbg) override { + MutexLock l(&mutex_); + written_files_.push_back(f); + if (limit_written_files_ == 0) { + return IOStatus::NotSupported("Limit on written files reached"); + } + limit_written_files_--; + IOStatus s = FileSystemWrapper::NewWritableFile(f, file_opts, r, dbg); + if (s.ok()) { + if ((*r)->use_direct_io()) { + ++num_direct_writers_; + } + ++num_writers_; + } + return s; + } + + IOStatus NewRandomAccessFile(const std::string& f, + const FileOptions& file_opts, + std::unique_ptr<FSRandomAccessFile>* r, + IODebugContext* dbg) override { + MutexLock l(&mutex_); + IOStatus s = FileSystemWrapper::NewRandomAccessFile(f, file_opts, r, dbg); + if (s.ok()) { + if ((*r)->use_direct_io()) { + ++num_direct_rand_readers_; + } + ++num_rand_readers_; + } + return s; + } + + IOStatus DeleteFile(const std::string& f, const IOOptions& options, + IODebugContext* dbg) override { + MutexLock l(&mutex_); + if (fail_delete_files_) { + return IOStatus::IOError(); + } + EXPECT_GT(limit_delete_files_, 0U); + limit_delete_files_--; + return FileSystemWrapper::DeleteFile(f, options, dbg); + } + + IOStatus DeleteDir(const std::string& d, const IOOptions& options, + IODebugContext* dbg) override { + MutexLock l(&mutex_); + if (fail_delete_files_) { + return IOStatus::IOError(); + } + return FileSystemWrapper::DeleteDir(d, options, dbg); + } + + void AssertWrittenFiles(std::vector<std::string>& should_have_written) { + MutexLock l(&mutex_); + std::sort(should_have_written.begin(), should_have_written.end()); + std::sort(written_files_.begin(), written_files_.end()); + + ASSERT_EQ(should_have_written, written_files_); + } + + void ClearWrittenFiles() { + MutexLock l(&mutex_); + written_files_.clear(); + } + + void SetLimitWrittenFiles(uint64_t limit) { + MutexLock l(&mutex_); + limit_written_files_ = limit; + } + + void SetLimitDeleteFiles(uint64_t limit) { + MutexLock l(&mutex_); + limit_delete_files_ = limit; + } + + void SetDeleteFileFailure(bool fail) { + MutexLock l(&mutex_); + fail_delete_files_ = fail; + } + + void SetDummySequentialFile(bool dummy_sequential_file) { + MutexLock l(&mutex_); + dummy_sequential_file_ = dummy_sequential_file; + } + void SetDummySequentialFileFailReads(bool dummy_sequential_file_fail_reads) { + MutexLock l(&mutex_); + dummy_sequential_file_fail_reads_ = dummy_sequential_file_fail_reads; + } + + void SetGetChildrenFailure(bool fail) { get_children_failure_ = fail; } + IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts, + std::vector<std::string>* r, + IODebugContext* dbg) override { + if (get_children_failure_) { + return IOStatus::IOError("SimulatedFailure"); + } + return FileSystemWrapper::GetChildren(dir, io_opts, r, dbg); + } + + // Some test cases do not actually create the test files (e.g., see + // DummyDB::live_files_) - for those cases, we mock those files' attributes + // so CreateNewBackup() can get their attributes. + void SetFilenamesForMockedAttrs(const std::vector<std::string>& filenames) { + filenames_for_mocked_attrs_ = filenames; + } + IOStatus GetChildrenFileAttributes(const std::string& dir, + const IOOptions& options, + std::vector<FileAttributes>* result, + IODebugContext* dbg) override { + if (filenames_for_mocked_attrs_.size() > 0) { + for (const auto& filename : filenames_for_mocked_attrs_) { + uint64_t size_bytes = 200; // Match TestFs + if (filename.find("MANIFEST") == 0) { + size_bytes = 100; // Match DummyDB::GetLiveFiles + } + result->push_back({dir + "/" + filename, size_bytes}); + } + return IOStatus::OK(); + } + return FileSystemWrapper::GetChildrenFileAttributes(dir, options, result, + dbg); + } + + IOStatus GetFileSize(const std::string& f, const IOOptions& options, + uint64_t* s, IODebugContext* dbg) override { + if (filenames_for_mocked_attrs_.size() > 0) { + auto fname = f.substr(f.find_last_of('/') + 1); + auto filename_iter = std::find(filenames_for_mocked_attrs_.begin(), + filenames_for_mocked_attrs_.end(), fname); + if (filename_iter != filenames_for_mocked_attrs_.end()) { + *s = 200; // Match TestFs + if (fname.find("MANIFEST") == 0) { + *s = 100; // Match DummyDB::GetLiveFiles + } + return IOStatus::OK(); + } + return IOStatus::NotFound(fname); + } + return FileSystemWrapper::GetFileSize(f, options, s, dbg); + } + + void SetCreateDirIfMissingFailure(bool fail) { + create_dir_if_missing_failure_ = fail; + } + IOStatus CreateDirIfMissing(const std::string& d, const IOOptions& options, + IODebugContext* dbg) override { + if (create_dir_if_missing_failure_) { + return IOStatus::IOError("SimulatedFailure"); + } + return FileSystemWrapper::CreateDirIfMissing(d, options, dbg); + } + + void SetNewDirectoryFailure(bool fail) { new_directory_failure_ = fail; } + IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts, + std::unique_ptr<FSDirectory>* result, + IODebugContext* dbg) override { + if (new_directory_failure_) { + return IOStatus::IOError("SimulatedFailure"); + } + return FileSystemWrapper::NewDirectory(name, io_opts, result, dbg); + } + + void ClearFileOpenCounters() { + MutexLock l(&mutex_); + num_rand_readers_ = 0; + num_direct_rand_readers_ = 0; + num_seq_readers_ = 0; + num_direct_seq_readers_ = 0; + num_writers_ = 0; + num_direct_writers_ = 0; + } + + int num_rand_readers() { return num_rand_readers_; } + int num_direct_rand_readers() { return num_direct_rand_readers_; } + int num_seq_readers() { return num_seq_readers_; } + int num_direct_seq_readers() { return num_direct_seq_readers_; } + int num_writers() { return num_writers_; } + // FIXME(?): unused + int num_direct_writers() { return num_direct_writers_; } + + private: + port::Mutex mutex_; + bool dummy_sequential_file_ = false; + bool dummy_sequential_file_fail_reads_ = false; + std::vector<std::string> written_files_; + std::vector<std::string> filenames_for_mocked_attrs_; + uint64_t limit_written_files_ = 1000000; + uint64_t limit_delete_files_ = 1000000; + bool fail_delete_files_ = false; + + bool get_children_failure_ = false; + bool create_dir_if_missing_failure_ = false; + bool new_directory_failure_ = false; + + // Keeps track of how many files of each type were successfully opened, and + // out of those, how many were opened with direct I/O. + std::atomic<int> num_rand_readers_{}; + std::atomic<int> num_direct_rand_readers_{}; + std::atomic<int> num_seq_readers_{}; + std::atomic<int> num_direct_seq_readers_{}; + std::atomic<int> num_writers_{}; + std::atomic<int> num_direct_writers_{}; +}; // TestFs + +class FileManager : public EnvWrapper { + public: + explicit FileManager(Env* t) : EnvWrapper(t), rnd_(5) {} + const char* Name() const override { return "FileManager"; } + + Status GetRandomFileInDir(const std::string& dir, std::string* fname, + uint64_t* fsize) { + std::vector<FileAttributes> children; + auto s = GetChildrenFileAttributes(dir, &children); + if (!s.ok()) { + return s; + } else if (children.size() <= 2) { // . and .. + return Status::NotFound("Empty directory: " + dir); + } + assert(fname != nullptr); + while (true) { + int i = rnd_.Next() % children.size(); + fname->assign(dir + "/" + children[i].name); + *fsize = children[i].size_bytes; + return Status::OK(); + } + // should never get here + assert(false); + return Status::NotFound(""); + } + + Status DeleteRandomFileInDir(const std::string& dir) { + std::vector<std::string> children; + Status s = GetChildren(dir, &children); + if (!s.ok()) { + return s; + } + while (true) { + int i = rnd_.Next() % children.size(); + return DeleteFile(dir + "/" + children[i]); + } + // should never get here + assert(false); + return Status::NotFound(""); + } + + Status AppendToRandomFileInDir(const std::string& dir, + const std::string& data) { + std::vector<std::string> children; + Status s = GetChildren(dir, &children); + if (!s.ok()) { + return s; + } + while (true) { + int i = rnd_.Next() % children.size(); + return WriteToFile(dir + "/" + children[i], data); + } + // should never get here + assert(false); + return Status::NotFound(""); + } + + Status CorruptFile(const std::string& fname, uint64_t bytes_to_corrupt) { + std::string file_contents; + Status s = ReadFileToString(this, fname, &file_contents); + if (!s.ok()) { + return s; + } + s = DeleteFile(fname); + if (!s.ok()) { + return s; + } + + for (uint64_t i = 0; i < bytes_to_corrupt; ++i) { + std::string tmp = rnd_.RandomString(1); + file_contents[rnd_.Next() % file_contents.size()] = tmp[0]; + } + return WriteToFile(fname, file_contents); + } + + Status CorruptFileStart(const std::string& fname) { + std::string to_xor = "blah"; + std::string file_contents; + Status s = ReadFileToString(this, fname, &file_contents); + if (!s.ok()) { + return s; + } + s = DeleteFile(fname); + if (!s.ok()) { + return s; + } + for (size_t i = 0; i < to_xor.size(); ++i) { + file_contents[i] ^= to_xor[i]; + } + return WriteToFile(fname, file_contents); + } + + Status CorruptChecksum(const std::string& fname, bool appear_valid) { + std::string metadata; + Status s = ReadFileToString(this, fname, &metadata); + if (!s.ok()) { + return s; + } + s = DeleteFile(fname); + if (!s.ok()) { + return s; + } + + auto pos = metadata.find("private"); + if (pos == std::string::npos) { + return Status::Corruption("private file is expected"); + } + pos = metadata.find(" crc32 ", pos + 6); + if (pos == std::string::npos) { + return Status::Corruption("checksum not found"); + } + + if (metadata.size() < pos + 7) { + return Status::Corruption("bad CRC32 checksum value"); + } + + if (appear_valid) { + if (metadata[pos + 8] == '\n') { + // single digit value, safe to insert one more digit + metadata.insert(pos + 8, 1, '0'); + } else { + metadata.erase(pos + 8, 1); + } + } else { + metadata[pos + 7] = 'a'; + } + + return WriteToFile(fname, metadata); + } + + Status WriteToFile(const std::string& fname, const std::string& data) { + std::unique_ptr<WritableFile> file; + EnvOptions env_options; + env_options.use_mmap_writes = false; + Status s = EnvWrapper::NewWritableFile(fname, &file, env_options); + if (!s.ok()) { + return s; + } + return file->Append(Slice(data)); + } + + private: + Random rnd_; +}; // FileManager + +// utility functions +namespace { + +enum FillDBFlushAction { + kFlushMost, + kFlushAll, + kAutoFlushOnly, +}; + +// Many tests in this file expect FillDB to write at least one sst file, +// so the default behavior (if not kAutoFlushOnly) of FillDB is to force +// a flush. But to ensure coverage of the WAL file case, we also (by default) +// do one Put after the Flush (kFlushMost). +size_t FillDB(DB* db, int from, int to, + FillDBFlushAction flush_action = kFlushMost) { + size_t bytes_written = 0; + for (int i = from; i < to; ++i) { + std::string key = "testkey" + std::to_string(i); + std::string value = "testvalue" + std::to_string(i); + bytes_written += key.size() + value.size(); + + EXPECT_OK(db->Put(WriteOptions(), Slice(key), Slice(value))); + + if (flush_action == kFlushMost && i == to - 2) { + EXPECT_OK(db->Flush(FlushOptions())); + } + } + if (flush_action == kFlushAll) { + EXPECT_OK(db->Flush(FlushOptions())); + } + return bytes_written; +} + +void AssertExists(DB* db, int from, int to) { + for (int i = from; i < to; ++i) { + std::string key = "testkey" + std::to_string(i); + std::string value; + Status s = db->Get(ReadOptions(), Slice(key), &value); + ASSERT_EQ(value, "testvalue" + std::to_string(i)); + } +} + +void AssertEmpty(DB* db, int from, int to) { + for (int i = from; i < to; ++i) { + std::string key = "testkey" + std::to_string(i); + std::string value = "testvalue" + std::to_string(i); + + Status s = db->Get(ReadOptions(), Slice(key), &value); + ASSERT_TRUE(s.IsNotFound()); + } +} +} // namespace + +class BackupEngineTest : public testing::Test { + public: + enum ShareOption { + kNoShare, + kShareNoChecksum, + kShareWithChecksum, + }; + + const std::vector<ShareOption> kAllShareOptions = {kNoShare, kShareNoChecksum, + kShareWithChecksum}; + + BackupEngineTest() { + // set up files + std::string db_chroot = test::PerThreadDBPath("db_for_backup"); + std::string backup_chroot = test::PerThreadDBPath("db_backups"); + EXPECT_OK(Env::Default()->CreateDirIfMissing(db_chroot)); + EXPECT_OK(Env::Default()->CreateDirIfMissing(backup_chroot)); + dbname_ = "/tempdb"; + backupdir_ = "/tempbk"; + latest_backup_ = backupdir_ + "/LATEST_BACKUP"; + + // set up FileSystem & Envs + db_chroot_fs_ = NewChrootFileSystem(FileSystem::Default(), db_chroot); + backup_chroot_fs_ = + NewChrootFileSystem(FileSystem::Default(), backup_chroot); + test_db_fs_ = std::make_shared<TestFs>(db_chroot_fs_); + test_backup_fs_ = std::make_shared<TestFs>(backup_chroot_fs_); + SetEnvsFromFileSystems(); + + // set up db options + options_.create_if_missing = true; + options_.paranoid_checks = true; + options_.write_buffer_size = 1 << 17; // 128KB + options_.wal_dir = dbname_; + options_.enable_blob_files = true; + + // The sync option is not easily testable in unit tests, but should be + // smoke tested across all the other backup tests. However, it is + // certainly not worth doubling the runtime of backup tests for it. + // Thus, we can enable sync for one of our alternate testing + // configurations. + constexpr bool kUseSync = +#ifdef ROCKSDB_MODIFY_NPHASH + true; +#else + false; +#endif // ROCKSDB_MODIFY_NPHASH + + // set up backup db options + engine_options_.reset(new BackupEngineOptions( + backupdir_, test_backup_env_.get(), /*share_table_files*/ true, + logger_.get(), kUseSync)); + + // most tests will use multi-threaded backups + engine_options_->max_background_operations = 7; + + // delete old files in db + DestroyDBWithoutCheck(dbname_, options_); + + // delete old LATEST_BACKUP file, which some tests create for compatibility + // testing. + backup_chroot_env_->DeleteFile(latest_backup_).PermitUncheckedError(); + } + + void SetEnvsFromFileSystems() { + db_chroot_env_.reset( + new CompositeEnvWrapper(Env::Default(), db_chroot_fs_)); + backup_chroot_env_.reset( + new CompositeEnvWrapper(Env::Default(), backup_chroot_fs_)); + test_db_env_.reset(new CompositeEnvWrapper(Env::Default(), test_db_fs_)); + options_.env = test_db_env_.get(); + test_backup_env_.reset( + new CompositeEnvWrapper(Env::Default(), test_backup_fs_)); + if (engine_options_) { + engine_options_->backup_env = test_backup_env_.get(); + } + file_manager_.reset(new FileManager(backup_chroot_env_.get())); + db_file_manager_.reset(new FileManager(db_chroot_env_.get())); + + // Create logger + DBOptions logger_options; + logger_options.env = db_chroot_env_.get(); + ASSERT_OK(CreateLoggerFromOptions(dbname_, logger_options, &logger_)); + } + + DB* OpenDB() { + DB* db; + EXPECT_OK(DB::Open(options_, dbname_, &db)); + return db; + } + + void CloseAndReopenDB(bool read_only = false) { + // Close DB + db_.reset(); + + // Open DB + test_db_fs_->SetLimitWrittenFiles(1000000); + DB* db; + if (read_only) { + ASSERT_OK(DB::OpenForReadOnly(options_, dbname_, &db)); + } else { + ASSERT_OK(DB::Open(options_, dbname_, &db)); + } + db_.reset(db); + } + + void InitializeDBAndBackupEngine(bool dummy = false) { + // reset all the db env defaults + test_db_fs_->SetLimitWrittenFiles(1000000); + test_db_fs_->SetDummySequentialFile(dummy); + + DB* db; + if (dummy) { + dummy_db_ = new DummyDB(options_, dbname_); + db = dummy_db_; + } else { + ASSERT_OK(DB::Open(options_, dbname_, &db)); + } + db_.reset(db); + } + + virtual void OpenDBAndBackupEngine( + bool destroy_old_data = false, bool dummy = false, + ShareOption shared_option = kShareNoChecksum) { + InitializeDBAndBackupEngine(dummy); + // reset backup env defaults + test_backup_fs_->SetLimitWrittenFiles(1000000); + engine_options_->destroy_old_data = destroy_old_data; + engine_options_->share_table_files = shared_option != kNoShare; + engine_options_->share_files_with_checksum = + shared_option == kShareWithChecksum; + OpenBackupEngine(destroy_old_data); + } + + void CloseDBAndBackupEngine() { + db_.reset(); + backup_engine_.reset(); + } + + void OpenBackupEngine(bool destroy_old_data = false) { + engine_options_->destroy_old_data = destroy_old_data; + engine_options_->info_log = logger_.get(); + BackupEngine* backup_engine; + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *engine_options_, + &backup_engine)); + backup_engine_.reset(backup_engine); + } + + void CloseBackupEngine() { backup_engine_.reset(nullptr); } + + // cross-cutting test of GetBackupInfo + void AssertBackupInfoConsistency() { + std::vector<BackupInfo> backup_info; + backup_engine_->GetBackupInfo(&backup_info, /*with file details*/ true); + std::map<std::string, uint64_t> file_sizes; + + // Find the files that are supposed to be there + for (auto& backup : backup_info) { + uint64_t sum_for_backup = 0; + for (auto& file : backup.file_details) { + auto e = file_sizes.find(file.relative_filename); + if (e == file_sizes.end()) { + // fprintf(stderr, "Adding %s -> %u\n", + // file.relative_filename.c_str(), (unsigned)file.size); + file_sizes[file.relative_filename] = file.size; + } else { + ASSERT_EQ(file_sizes[file.relative_filename], file.size); + } + sum_for_backup += file.size; + } + ASSERT_EQ(backup.size, sum_for_backup); + } + + std::vector<BackupID> corrupt_backup_ids; + backup_engine_->GetCorruptedBackups(&corrupt_backup_ids); + bool has_corrupt = corrupt_backup_ids.size() > 0; + + // Compare with what's in backup dir + std::vector<std::string> child_dirs; + ASSERT_OK( + test_backup_env_->GetChildren(backupdir_ + "/private", &child_dirs)); + for (auto& dir : child_dirs) { + dir = "private/" + dir; + } + child_dirs.push_back("shared"); // might not exist + child_dirs.push_back("shared_checksum"); // might not exist + for (auto& dir : child_dirs) { + std::vector<std::string> children; + test_backup_env_->GetChildren(backupdir_ + "/" + dir, &children) + .PermitUncheckedError(); + // fprintf(stderr, "ls %s\n", (backupdir_ + "/" + dir).c_str()); + for (auto& file : children) { + uint64_t size; + size = UINT64_MAX; // appease clang-analyze + std::string rel_file = dir + "/" + file; + // fprintf(stderr, "stat %s\n", (backupdir_ + "/" + rel_file).c_str()); + ASSERT_OK( + test_backup_env_->GetFileSize(backupdir_ + "/" + rel_file, &size)); + auto e = file_sizes.find(rel_file); + if (e == file_sizes.end()) { + // The only case in which we should find files not reported + ASSERT_TRUE(has_corrupt); + } else { + ASSERT_EQ(e->second, size); + file_sizes.erase(e); + } + } + } + + // Everything should have been matched + ASSERT_EQ(file_sizes.size(), 0); + } + + // restores backup backup_id and asserts the existence of + // [start_exist, end_exist> and not-existence of + // [end_exist, end> + // + // if backup_id == 0, it means restore from latest + // if end == 0, don't check AssertEmpty + void AssertBackupConsistency(BackupID backup_id, uint32_t start_exist, + uint32_t end_exist, uint32_t end = 0, + bool keep_log_files = false) { + RestoreOptions restore_options(keep_log_files); + bool opened_backup_engine = false; + if (backup_engine_.get() == nullptr) { + opened_backup_engine = true; + OpenBackupEngine(); + } + AssertBackupInfoConsistency(); + + // Now perform restore + if (backup_id > 0) { + ASSERT_OK(backup_engine_->RestoreDBFromBackup(backup_id, dbname_, dbname_, + restore_options)); + } else { + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_, + restore_options)); + } + DB* db = OpenDB(); + // Check DB contents + AssertExists(db, start_exist, end_exist); + if (end != 0) { + AssertEmpty(db, end_exist, end); + } + delete db; + if (opened_backup_engine) { + CloseBackupEngine(); + } + } + + void DeleteLogFiles() { + std::vector<std::string> delete_logs; + ASSERT_OK(db_chroot_env_->GetChildren(dbname_, &delete_logs)); + for (auto f : delete_logs) { + uint64_t number; + FileType type; + bool ok = ParseFileName(f, &number, &type); + if (ok && type == kWalFile) { + ASSERT_OK(db_chroot_env_->DeleteFile(dbname_ + "/" + f)); + } + } + } + + Status GetDataFilesInDB(const FileType& file_type, + std::vector<FileAttributes>* files) { + std::vector<std::string> live; + uint64_t ignore_manifest_size; + Status s = db_->GetLiveFiles(live, &ignore_manifest_size, /*flush*/ false); + if (!s.ok()) { + return s; + } + std::vector<FileAttributes> children; + s = test_db_env_->GetChildrenFileAttributes(dbname_, &children); + for (const auto& child : children) { + FileType type; + uint64_t number = 0; + if (ParseFileName(child.name, &number, &type) && type == file_type && + std::find(live.begin(), live.end(), "/" + child.name) != live.end()) { + files->push_back(child); + } + } + return s; + } + + Status GetRandomDataFileInDB(const FileType& file_type, + std::string* fname_out, + uint64_t* fsize_out = nullptr) { + Random rnd(6); // NB: hardly "random" + std::vector<FileAttributes> files; + Status s = GetDataFilesInDB(file_type, &files); + if (!s.ok()) { + return s; + } + if (files.empty()) { + return Status::NotFound(""); + } + size_t i = rnd.Uniform(static_cast<int>(files.size())); + *fname_out = dbname_ + "/" + files[i].name; + if (fsize_out) { + *fsize_out = files[i].size_bytes; + } + return Status::OK(); + } + + Status CorruptRandomDataFileInDB(const FileType& file_type) { + std::string fname; + uint64_t fsize = 0; + Status s = GetRandomDataFileInDB(file_type, &fname, &fsize); + if (!s.ok()) { + return s; + } + + std::string file_contents; + s = ReadFileToString(test_db_env_.get(), fname, &file_contents); + if (!s.ok()) { + return s; + } + s = test_db_env_->DeleteFile(fname); + if (!s.ok()) { + return s; + } + + file_contents[0] = (file_contents[0] + 257) % 256; + return WriteStringToFile(test_db_env_.get(), file_contents, fname); + } + + void AssertDirectoryFilesMatchRegex(const std::string& dir, + const TestRegex& pattern, + const std::string& file_type, + int minimum_count) { + std::vector<FileAttributes> children; + ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children)); + int found_count = 0; + for (const auto& child : children) { + if (EndsWith(child.name, file_type)) { + ASSERT_MATCHES_REGEX(child.name, pattern); + ++found_count; + } + } + ASSERT_GE(found_count, minimum_count); + } + + void AssertDirectoryFilesSizeIndicators(const std::string& dir, + int minimum_count) { + std::vector<FileAttributes> children; + ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children)); + int found_count = 0; + for (const auto& child : children) { + auto last_underscore = child.name.find_last_of('_'); + auto last_dot = child.name.find_last_of('.'); + ASSERT_NE(child.name, child.name.substr(0, last_underscore)); + ASSERT_NE(child.name, child.name.substr(0, last_dot)); + ASSERT_LT(last_underscore, last_dot); + std::string s = child.name.substr(last_underscore + 1, + last_dot - (last_underscore + 1)); + ASSERT_EQ(s, std::to_string(child.size_bytes)); + ++found_count; + } + ASSERT_GE(found_count, minimum_count); + } + + // files + std::string dbname_; + std::string backupdir_; + std::string latest_backup_; + + // logger_ must be above backup_engine_ such that the engine's destructor, + // which uses a raw pointer to the logger, executes first. + std::shared_ptr<Logger> logger_; + + // FileSystems + std::shared_ptr<FileSystem> db_chroot_fs_; + std::shared_ptr<FileSystem> backup_chroot_fs_; + std::shared_ptr<TestFs> test_db_fs_; + std::shared_ptr<TestFs> test_backup_fs_; + + // Env wrappers + std::unique_ptr<Env> db_chroot_env_; + std::unique_ptr<Env> backup_chroot_env_; + std::unique_ptr<Env> test_db_env_; + std::unique_ptr<Env> test_backup_env_; + std::unique_ptr<FileManager> file_manager_; + std::unique_ptr<FileManager> db_file_manager_; + + // all the dbs! + DummyDB* dummy_db_; // owned as db_ when present + std::unique_ptr<DB> db_; + std::unique_ptr<BackupEngine> backup_engine_; + + // options + Options options_; + + protected: + void DestroyDBWithoutCheck(const std::string& dbname, + const Options& options) { + // DestroyDB may fail because the db might not be existed for some tests + DestroyDB(dbname, options).PermitUncheckedError(); + } + + std::unique_ptr<BackupEngineOptions> engine_options_; +}; // BackupEngineTest + +void AppendPath(const std::string& path, std::vector<std::string>& v) { + for (auto& f : v) { + f = path + f; + } +} + +class BackupEngineTestWithParam : public BackupEngineTest, + public testing::WithParamInterface<bool> { + public: + BackupEngineTestWithParam() { + engine_options_->share_files_with_checksum = GetParam(); + } + void OpenDBAndBackupEngine( + bool destroy_old_data = false, bool dummy = false, + ShareOption shared_option = kShareNoChecksum) override { + BackupEngineTest::InitializeDBAndBackupEngine(dummy); + // reset backup env defaults + test_backup_fs_->SetLimitWrittenFiles(1000000); + engine_options_->destroy_old_data = destroy_old_data; + engine_options_->share_table_files = shared_option != kNoShare; + // NOTE: keep share_files_with_checksum setting from constructor + OpenBackupEngine(destroy_old_data); + } +}; + +TEST_F(BackupEngineTest, FileCollision) { + const int keys_iteration = 100; + for (const auto& sopt : kAllShareOptions) { + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, sopt); + FillDB(db_.get(), 0, keys_iteration); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + FillDB(db_.get(), keys_iteration, keys_iteration * 2); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); + + // If the db directory has been cleaned up, it is sensitive to file + // collision. + DestroyDBWithoutCheck(dbname_, options_); + + // open fresh DB, but old backups present + OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */, + sopt); + FillDB(db_.get(), 0, keys_iteration); + ASSERT_OK(db_->Flush(FlushOptions())); // like backup would do + FillDB(db_.get(), keys_iteration, keys_iteration * 2); + if (sopt != kShareNoChecksum) { + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + } else { + // The new table files created in FillDB() will clash with the old + // backup and sharing tables with no checksum will have the file + // collision problem. + ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get())); + ASSERT_OK(backup_engine_->PurgeOldBackups(0)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + } + CloseDBAndBackupEngine(); + + // delete old data + DestroyDBWithoutCheck(dbname_, options_); + } +} + +// This test verifies that the verifyBackup method correctly identifies +// invalid backups +TEST_P(BackupEngineTestWithParam, VerifyBackup) { + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true); + // create five backups + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + // ---------- case 1. - valid backup ----------- + ASSERT_TRUE(backup_engine_->VerifyBackup(1).ok()); + + // ---------- case 2. - delete a file -----------i + ASSERT_OK(file_manager_->DeleteRandomFileInDir(backupdir_ + "/private/1")); + ASSERT_TRUE(backup_engine_->VerifyBackup(1).IsNotFound()); + + // ---------- case 3. - corrupt a file ----------- + std::string append_data = "Corrupting a random file"; + ASSERT_OK(file_manager_->AppendToRandomFileInDir(backupdir_ + "/private/2", + append_data)); + ASSERT_TRUE(backup_engine_->VerifyBackup(2).IsCorruption()); + + // ---------- case 4. - invalid backup ----------- + ASSERT_TRUE(backup_engine_->VerifyBackup(6).IsNotFound()); + CloseDBAndBackupEngine(); +} + +#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) +// open DB, write, close DB, backup, restore, repeat +TEST_P(BackupEngineTestWithParam, OfflineIntegrationTest) { + // has to be a big number, so that it triggers the memtable flush + const int keys_iteration = 5000; + const int max_key = keys_iteration * 4 + 10; + // first iter -- flush before backup + // second iter -- don't flush before backup + for (int iter = 0; iter < 2; ++iter) { + // delete old data + DestroyDBWithoutCheck(dbname_, options_); + bool destroy_data = true; + + // every iteration -- + // 1. insert new data in the DB + // 2. backup the DB + // 3. destroy the db + // 4. restore the db, check everything is still there + for (int i = 0; i < 5; ++i) { + // in last iteration, put smaller amount of data, + int fill_up_to = std::min(keys_iteration * (i + 1), max_key); + // ---- insert new data and back up ---- + OpenDBAndBackupEngine(destroy_data); + destroy_data = false; + // kAutoFlushOnly to preserve legacy test behavior (consider updating) + FillDB(db_.get(), keys_iteration * i, fill_up_to, kAutoFlushOnly); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), iter == 0)) + << "iter: " << iter << ", idx: " << i; + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); + + // ---- make sure it's empty ---- + DB* db = OpenDB(); + AssertEmpty(db, 0, fill_up_to); + delete db; + + // ---- restore the DB ---- + OpenBackupEngine(); + if (i >= 3) { // test purge old backups + // when i == 4, purge to only 1 backup + // when i == 3, purge to 2 backups + ASSERT_OK(backup_engine_->PurgeOldBackups(5 - i)); + } + // ---- make sure the data is there --- + AssertBackupConsistency(0, 0, fill_up_to, max_key); + CloseBackupEngine(); + } + } +} + +// open DB, write, backup, write, backup, close, restore +TEST_P(BackupEngineTestWithParam, OnlineIntegrationTest) { + // has to be a big number, so that it triggers the memtable flush + const int keys_iteration = 5000; + const int max_key = keys_iteration * 4 + 10; + Random rnd(7); + // delete old data + DestroyDBWithoutCheck(dbname_, options_); + + // TODO: Implement & test db_paths support in backup (not supported in + // restore) + // options_.db_paths.emplace_back(dbname_, 500 * 1024); + // options_.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024); + + OpenDBAndBackupEngine(true); + // write some data, backup, repeat + for (int i = 0; i < 5; ++i) { + if (i == 4) { + // delete backup number 2, online delete! + ASSERT_OK(backup_engine_->DeleteBackup(2)); + } + // in last iteration, put smaller amount of data, + // so that backups can share sst files + int fill_up_to = std::min(keys_iteration * (i + 1), max_key); + // kAutoFlushOnly to preserve legacy test behavior (consider updating) + FillDB(db_.get(), keys_iteration * i, fill_up_to, kAutoFlushOnly); + // we should get consistent results with flush_before_backup + // set to both true and false + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + } + // close and destroy + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); + + // ---- make sure it's empty ---- + DB* db = OpenDB(); + AssertEmpty(db, 0, max_key); + delete db; + + // ---- restore every backup and verify all the data is there ---- + OpenBackupEngine(); + for (int i = 1; i <= 5; ++i) { + if (i == 2) { + // we deleted backup 2 + Status s = backup_engine_->RestoreDBFromBackup(2, dbname_, dbname_); + ASSERT_TRUE(!s.ok()); + } else { + int fill_up_to = std::min(keys_iteration * i, max_key); + AssertBackupConsistency(i, 0, fill_up_to, max_key); + } + } + + // delete some backups -- this should leave only backups 3 and 5 alive + ASSERT_OK(backup_engine_->DeleteBackup(4)); + ASSERT_OK(backup_engine_->PurgeOldBackups(2)); + + std::vector<BackupInfo> backup_info; + backup_engine_->GetBackupInfo(&backup_info); + ASSERT_EQ(2UL, backup_info.size()); + + // check backup 3 + AssertBackupConsistency(3, 0, 3 * keys_iteration, max_key); + // check backup 5 + AssertBackupConsistency(5, 0, max_key); + + CloseBackupEngine(); +} +#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) + +INSTANTIATE_TEST_CASE_P(BackupEngineTestWithParam, BackupEngineTestWithParam, + ::testing::Bool()); + +// this will make sure that backup does not copy the same file twice +TEST_F(BackupEngineTest, NoDoubleCopy_And_AutoGC) { + OpenDBAndBackupEngine(true, true); + + // should write 5 DB files + one meta file + test_backup_fs_->SetLimitWrittenFiles(7); + test_backup_fs_->ClearWrittenFiles(); + test_db_fs_->SetLimitWrittenFiles(0); + dummy_db_->live_files_ = {"00010.sst", "00011.sst", "CURRENT", "MANIFEST-01", + "00011.log"}; + test_db_fs_->SetFilenamesForMockedAttrs(dummy_db_->live_files_); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + std::vector<std::string> should_have_written = { + "/shared/.00010.sst.tmp", "/shared/.00011.sst.tmp", "/private/1/CURRENT", + "/private/1/MANIFEST-01", "/private/1/00011.log", "/meta/.1.tmp"}; + AppendPath(backupdir_, should_have_written); + test_backup_fs_->AssertWrittenFiles(should_have_written); + + char db_number = '1'; + + for (std::string other_sst : {"00015.sst", "00017.sst", "00019.sst"}) { + // should write 4 new DB files + one meta file + // should not write/copy 00010.sst, since it's already there! + test_backup_fs_->SetLimitWrittenFiles(6); + test_backup_fs_->ClearWrittenFiles(); + + dummy_db_->live_files_ = {"00010.sst", other_sst, "CURRENT", "MANIFEST-01", + "00011.log"}; + test_db_fs_->SetFilenamesForMockedAttrs(dummy_db_->live_files_); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + // should not open 00010.sst - it's already there + + ++db_number; + std::string private_dir = std::string("/private/") + db_number; + should_have_written = { + "/shared/." + other_sst + ".tmp", private_dir + "/CURRENT", + private_dir + "/MANIFEST-01", private_dir + "/00011.log", + std::string("/meta/.") + db_number + ".tmp"}; + AppendPath(backupdir_, should_have_written); + test_backup_fs_->AssertWrittenFiles(should_have_written); + } + + ASSERT_OK(backup_engine_->DeleteBackup(1)); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00010.sst")); + + // 00011.sst was only in backup 1, should be deleted + ASSERT_EQ(Status::NotFound(), + test_backup_env_->FileExists(backupdir_ + "/shared/00011.sst")); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00015.sst")); + + // MANIFEST file size should be only 100 + uint64_t size = 0; + ASSERT_OK(test_backup_env_->GetFileSize(backupdir_ + "/private/2/MANIFEST-01", + &size)); + ASSERT_EQ(100UL, size); + ASSERT_OK( + test_backup_env_->GetFileSize(backupdir_ + "/shared/00015.sst", &size)); + ASSERT_EQ(200UL, size); + + CloseBackupEngine(); + + // + // Now simulate incomplete delete by removing just meta + // + ASSERT_OK(test_backup_env_->DeleteFile(backupdir_ + "/meta/2")); + + OpenBackupEngine(); + + // 1 appears to be removed, so + // 2 non-corrupt and 0 corrupt seen + std::vector<BackupInfo> backup_info; + std::vector<BackupID> corrupt_backup_ids; + backup_engine_->GetBackupInfo(&backup_info); + backup_engine_->GetCorruptedBackups(&corrupt_backup_ids); + ASSERT_EQ(2UL, backup_info.size()); + ASSERT_EQ(0UL, corrupt_backup_ids.size()); + + // Keep the two we see, but this should suffice to purge unreferenced + // shared files from incomplete delete. + ASSERT_OK(backup_engine_->PurgeOldBackups(2)); + + // Make sure dangling sst file has been removed (somewhere along this + // process). GarbageCollect should not be needed. + ASSERT_EQ(Status::NotFound(), + test_backup_env_->FileExists(backupdir_ + "/shared/00015.sst")); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst")); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst")); + + // Now actually purge a good one + ASSERT_OK(backup_engine_->PurgeOldBackups(1)); + + ASSERT_EQ(Status::NotFound(), + test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst")); + ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst")); + + CloseDBAndBackupEngine(); +} + +// test various kind of corruptions that may happen: +// 1. Not able to write a file for backup - that backup should fail, +// everything else should work +// 2. Corrupted backup meta file or missing backuped file - we should +// not be able to open that backup, but all other backups should be +// fine +// 3. Corrupted checksum value - if the checksum is not a valid uint32_t, +// db open should fail, otherwise, it aborts during the restore process. +TEST_F(BackupEngineTest, CorruptionsTest) { + const int keys_iteration = 5000; + Random rnd(6); + Status s; + + OpenDBAndBackupEngine(true); + // create five backups + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + } + + // ---------- case 1. - fail a write ----------- + // try creating backup 6, but fail a write + FillDB(db_.get(), keys_iteration * 5, keys_iteration * 6); + test_backup_fs_->SetLimitWrittenFiles(2); + // should fail + s = backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2)); + ASSERT_NOK(s); + test_backup_fs_->SetLimitWrittenFiles(1000000); + // latest backup should have all the keys + CloseDBAndBackupEngine(); + AssertBackupConsistency(0, 0, keys_iteration * 5, keys_iteration * 6); + + // --------- case 2. corrupted backup meta or missing backuped file ---- + ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/5", 3)); + // since 5 meta is now corrupted, latest backup should be 4 + AssertBackupConsistency(0, 0, keys_iteration * 4, keys_iteration * 5); + OpenBackupEngine(); + s = backup_engine_->RestoreDBFromBackup(5, dbname_, dbname_); + ASSERT_NOK(s); + CloseBackupEngine(); + ASSERT_OK(file_manager_->DeleteRandomFileInDir(backupdir_ + "/private/4")); + // 4 is corrupted, 3 is the latest backup now + AssertBackupConsistency(0, 0, keys_iteration * 3, keys_iteration * 5); + OpenBackupEngine(); + s = backup_engine_->RestoreDBFromBackup(4, dbname_, dbname_); + CloseBackupEngine(); + ASSERT_NOK(s); + + // --------- case 3. corrupted checksum value ---- + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false)); + // checksum of backup 3 is an invalid value, this can be detected at + // db open time, and it reverts to the previous backup automatically + AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5); + // checksum of the backup 2 appears to be valid, this can cause checksum + // mismatch and abort restore process + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true)); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/2")); + OpenBackupEngine(); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/2")); + s = backup_engine_->RestoreDBFromBackup(2, dbname_, dbname_); + ASSERT_NOK(s); + + // make sure that no corrupt backups have actually been deleted! + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/1")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/2")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/3")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/4")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/5")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/1")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/2")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/3")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/4")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/5")); + + // delete the corrupt backups and then make sure they're actually deleted + ASSERT_OK(backup_engine_->DeleteBackup(5)); + ASSERT_OK(backup_engine_->DeleteBackup(4)); + ASSERT_OK(backup_engine_->DeleteBackup(3)); + ASSERT_OK(backup_engine_->DeleteBackup(2)); + // Should not be needed anymore with auto-GC on DeleteBackup + //(void)backup_engine_->GarbageCollect(); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/meta/5")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/private/5")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/meta/4")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/private/4")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/meta/3")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/private/3")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/meta/2")); + ASSERT_EQ(Status::NotFound(), + file_manager_->FileExists(backupdir_ + "/private/2")); + CloseBackupEngine(); + AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5); + + // new backup should be 2! + OpenDBAndBackupEngine(); + FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + CloseDBAndBackupEngine(); + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5); +} + +// Corrupt a file but maintain its size +TEST_F(BackupEngineTest, CorruptFileMaintainSize) { + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true); + // create a backup + FillDB(db_.get(), 0, keys_iteration); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + // verify with file size + ASSERT_OK(backup_engine_->VerifyBackup(1, false)); + // verify with file checksum + ASSERT_OK(backup_engine_->VerifyBackup(1, true)); + + std::string file_to_corrupt; + uint64_t file_size = 0; + // under normal circumstance, there should be at least one nonempty file + while (file_size == 0) { + // get a random file in /private/1 + assert(file_manager_ + ->GetRandomFileInDir(backupdir_ + "/private/1", &file_to_corrupt, + &file_size) + .ok()); + // corrupt the file by replacing its content by file_size random bytes + ASSERT_OK(file_manager_->CorruptFile(file_to_corrupt, file_size)); + } + // file sizes match + ASSERT_OK(backup_engine_->VerifyBackup(1, false)); + // file checksums mismatch + ASSERT_NOK(backup_engine_->VerifyBackup(1, true)); + // sanity check, use default second argument + ASSERT_OK(backup_engine_->VerifyBackup(1)); + CloseDBAndBackupEngine(); + + // an extra challenge + // set share_files_with_checksum to true and do two more backups + // corrupt all the table files in shared_checksum but maintain their sizes + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kShareWithChecksum); + // creat two backups + for (int i = 1; i < 3; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + std::vector<FileAttributes> children; + const std::string dir = backupdir_ + "/shared_checksum"; + ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children)); + for (const auto& child : children) { + if (child.size_bytes == 0) { + continue; + } + // corrupt the file by replacing its content by file_size random bytes + ASSERT_OK( + file_manager_->CorruptFile(dir + "/" + child.name, child.size_bytes)); + } + // file sizes match + ASSERT_OK(backup_engine_->VerifyBackup(1, false)); + ASSERT_OK(backup_engine_->VerifyBackup(2, false)); + // file checksums mismatch + ASSERT_NOK(backup_engine_->VerifyBackup(1, true)); + ASSERT_NOK(backup_engine_->VerifyBackup(2, true)); + CloseDBAndBackupEngine(); +} + +// Corrupt a blob file but maintain its size +TEST_P(BackupEngineTestWithParam, CorruptBlobFileMaintainSize) { + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true); + // create a backup + FillDB(db_.get(), 0, keys_iteration); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + // verify with file size + ASSERT_OK(backup_engine_->VerifyBackup(1, false)); + // verify with file checksum + ASSERT_OK(backup_engine_->VerifyBackup(1, true)); + + std::string file_to_corrupt; + std::vector<FileAttributes> children; + + std::string dir = backupdir_; + if (engine_options_->share_files_with_checksum) { + dir += "/shared_checksum"; + } else { + dir += "/shared"; + } + + ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children)); + + for (const auto& child : children) { + if (EndsWith(child.name, ".blob") && child.size_bytes != 0) { + // corrupt the blob files by replacing its content by file_size random + // bytes + ASSERT_OK( + file_manager_->CorruptFile(dir + "/" + child.name, child.size_bytes)); + } + } + + // file sizes match + ASSERT_OK(backup_engine_->VerifyBackup(1, false)); + // file checksums mismatch + ASSERT_NOK(backup_engine_->VerifyBackup(1, true)); + // sanity check, use default second argument + ASSERT_OK(backup_engine_->VerifyBackup(1)); + CloseDBAndBackupEngine(); +} + +// Test if BackupEngine will fail to create new backup if some table has been +// corrupted and the table file checksum is stored in the DB manifest +TEST_F(BackupEngineTest, TableFileCorruptedBeforeBackup) { + const int keys_iteration = 50000; + + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kNoShare); + FillDB(db_.get(), 0, keys_iteration); + CloseAndReopenDB(/*read_only*/ true); + // corrupt a random table file in the DB directory + ASSERT_OK(CorruptRandomDataFileInDB(kTableFile)); + // file_checksum_gen_factory is null, and thus table checksum is not + // verified for creating a new backup; no correction is detected + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); + + // delete old files in db + DestroyDBWithoutCheck(dbname_, options_); + + // Enable table file checksum in DB manifest + options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kNoShare); + FillDB(db_.get(), 0, keys_iteration); + CloseAndReopenDB(/*read_only*/ true); + // corrupt a random table file in the DB directory + ASSERT_OK(CorruptRandomDataFileInDB(kTableFile)); + // table file checksum is enabled so we should be able to detect any + // corruption + ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); +} + +// Test if BackupEngine will fail to create new backup if some blob files has +// been corrupted and the blob file checksum is stored in the DB manifest +TEST_F(BackupEngineTest, BlobFileCorruptedBeforeBackup) { + const int keys_iteration = 50000; + + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kNoShare); + FillDB(db_.get(), 0, keys_iteration); + CloseAndReopenDB(/*read_only*/ true); + // corrupt a random blob file in the DB directory + ASSERT_OK(CorruptRandomDataFileInDB(kBlobFile)); + // file_checksum_gen_factory is null, and thus blob checksum is not + // verified for creating a new backup; no correction is detected + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); + + // delete old files in db + DestroyDBWithoutCheck(dbname_, options_); + + // Enable file checksum in DB manifest + options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kNoShare); + FillDB(db_.get(), 0, keys_iteration); + CloseAndReopenDB(/*read_only*/ true); + // corrupt a random blob file in the DB directory + ASSERT_OK(CorruptRandomDataFileInDB(kBlobFile)); + + // file checksum is enabled so we should be able to detect any + // corruption + ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); +} + +#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) +// Test if BackupEngine will fail to create new backup if some table has been +// corrupted and the table file checksum is stored in the DB manifest for the +// case when backup table files will be stored in a shared directory +TEST_P(BackupEngineTestWithParam, TableFileCorruptedBeforeBackup) { + const int keys_iteration = 50000; + + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, keys_iteration); + CloseAndReopenDB(/*read_only*/ true); + // corrupt a random table file in the DB directory + ASSERT_OK(CorruptRandomDataFileInDB(kTableFile)); + // cannot detect corruption since DB manifest has no table checksums + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); + + // delete old files in db + DestroyDBWithoutCheck(dbname_, options_); + + // Enable table checksums in DB manifest + options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, keys_iteration); + CloseAndReopenDB(/*read_only*/ true); + // corrupt a random table file in the DB directory + ASSERT_OK(CorruptRandomDataFileInDB(kTableFile)); + // corruption is detected + ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); +} + +// Test if BackupEngine will fail to create new backup if some blob files have +// been corrupted and the blob file checksum is stored in the DB manifest for +// the case when backup blob files will be stored in a shared directory +TEST_P(BackupEngineTestWithParam, BlobFileCorruptedBeforeBackup) { + const int keys_iteration = 50000; + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, keys_iteration); + CloseAndReopenDB(/*read_only*/ true); + // corrupt a random blob file in the DB directory + ASSERT_OK(CorruptRandomDataFileInDB(kBlobFile)); + // cannot detect corruption since DB manifest has no blob file checksums + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); + + // delete old files in db + DestroyDBWithoutCheck(dbname_, options_); + + // Enable blob file checksums in DB manifest + options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, keys_iteration); + CloseAndReopenDB(/*read_only*/ true); + // corrupt a random blob file in the DB directory + ASSERT_OK(CorruptRandomDataFileInDB(kBlobFile)); + // corruption is detected + ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); +} +#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) + +TEST_F(BackupEngineTest, TableFileWithoutDbChecksumCorruptedDuringBackup) { + const int keys_iteration = 50000; + engine_options_->share_files_with_checksum_naming = kLegacyCrc32cAndFileSize; + // When share_files_with_checksum is on, we calculate checksums of table + // files before and after copying. So we can test whether a corruption has + // happened during the file is copied to backup directory. + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kShareWithChecksum); + + FillDB(db_.get(), 0, keys_iteration); + std::atomic<bool> corrupted{false}; + // corrupt files when copying to the backup directory + SyncPoint::GetInstance()->SetCallBack( + "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup", + [&](void* data) { + if (data != nullptr) { + Slice* d = reinterpret_cast<Slice*>(data); + if (!d->empty()) { + d->remove_suffix(1); + corrupted = true; + } + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + Status s = backup_engine_->CreateNewBackup(db_.get()); + if (corrupted) { + ASSERT_NOK(s); + } else { + // should not in this path in normal cases + ASSERT_OK(s); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + CloseDBAndBackupEngine(); + // delete old files in db + DestroyDBWithoutCheck(dbname_, options_); +} + +TEST_F(BackupEngineTest, TableFileWithDbChecksumCorruptedDuringBackup) { + const int keys_iteration = 50000; + options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + for (auto& sopt : kAllShareOptions) { + // Since the default DB table file checksum is on, we obtain checksums of + // table files from the DB manifest before copying and verify it with the + // one calculated during copying. + // Therefore, we can test whether a corruption has happened during the file + // being copied to backup directory. + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, sopt); + + FillDB(db_.get(), 0, keys_iteration); + + // corrupt files when copying to the backup directory + SyncPoint::GetInstance()->SetCallBack( + "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup", + [&](void* data) { + if (data != nullptr) { + Slice* d = reinterpret_cast<Slice*>(data); + if (!d->empty()) { + d->remove_suffix(1); + } + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + // The only case that we can't detect a corruption is when the file + // being backed up is empty. But as keys_iteration is large, such + // a case shouldn't have happened and we should be able to detect + // the corruption. + ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get())); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + CloseDBAndBackupEngine(); + // delete old files in db + DestroyDBWithoutCheck(dbname_, options_); + } +} + +TEST_F(BackupEngineTest, InterruptCreationTest) { + // Interrupt backup creation by failing new writes and failing cleanup of the + // partial state. Then verify a subsequent backup can still succeed. + const int keys_iteration = 5000; + Random rnd(6); + + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, keys_iteration); + test_backup_fs_->SetLimitWrittenFiles(2); + test_backup_fs_->SetDeleteFileFailure(true); + // should fail creation + ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + CloseDBAndBackupEngine(); + // should also fail cleanup so the tmp directory stays behind + ASSERT_OK(backup_chroot_env_->FileExists(backupdir_ + "/private/1/")); + + OpenDBAndBackupEngine(false /* destroy_old_data */); + test_backup_fs_->SetLimitWrittenFiles(1000000); + test_backup_fs_->SetDeleteFileFailure(false); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + // latest backup should have all the keys + CloseDBAndBackupEngine(); + AssertBackupConsistency(0, 0, keys_iteration); +} + +TEST_F(BackupEngineTest, FlushCompactDuringBackupCheckpoint) { + const int keys_iteration = 5000; + options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + for (const auto& sopt : kAllShareOptions) { + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, sopt); + FillDB(db_.get(), 0, keys_iteration); + // That FillDB leaves a mix of flushed and unflushed data + SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1", + "BackupEngineTest::FlushCompactDuringBackupCheckpoint:Before"}, + {"BackupEngineTest::FlushCompactDuringBackupCheckpoint:After", + "CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"}}); + SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::port::Thread flush_thread{[this]() { + TEST_SYNC_POINT( + "BackupEngineTest::FlushCompactDuringBackupCheckpoint:Before"); + FillDB(db_.get(), keys_iteration, 2 * keys_iteration); + ASSERT_OK(db_->Flush(FlushOptions())); + DBImpl* dbi = static_cast<DBImpl*>(db_.get()); + ASSERT_OK(dbi->TEST_WaitForFlushMemTable()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbi->TEST_WaitForCompact()); + TEST_SYNC_POINT( + "BackupEngineTest::FlushCompactDuringBackupCheckpoint:After"); + }}; + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + flush_thread.join(); + CloseDBAndBackupEngine(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + /* FIXME(peterd): reinstate with option for checksum in file names + if (sopt == kShareWithChecksum) { + // Ensure we actually got DB manifest checksums by inspecting + // shared_checksum file names for hex checksum component + TestRegex expected("[^_]+_[0-9A-F]{8}_[^_]+.sst"); + std::vector<FileAttributes> children; + const std::string dir = backupdir_ + "/shared_checksum"; + ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children)); + for (const auto& child : children) { + if (child.size_bytes == 0) { + continue; + } + EXPECT_MATCHES_REGEX(child.name, expected); + } + } + */ + AssertBackupConsistency(0, 0, keys_iteration); + } +} + +inline std::string OptionsPath(std::string ret, int backupID) { + ret += "/private/"; + ret += std::to_string(backupID); + ret += "/"; + return ret; +} + +// Backup the LATEST options file to +// "<backup_dir>/private/<backup_id>/OPTIONS<number>" + +TEST_F(BackupEngineTest, BackupOptions) { + OpenDBAndBackupEngine(true); + for (int i = 1; i < 5; i++) { + std::string name; + std::vector<std::string> filenames; + // Must reset() before reset(OpenDB()) again. + // Calling OpenDB() while *db_ is existing will cause LOCK issue + db_.reset(); + db_.reset(OpenDB()); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + ASSERT_OK(ROCKSDB_NAMESPACE::GetLatestOptionsFileName(db_->GetName(), + options_.env, &name)); + ASSERT_OK(file_manager_->FileExists(OptionsPath(backupdir_, i) + name)); + ASSERT_OK(backup_chroot_env_->GetChildren(OptionsPath(backupdir_, i), + &filenames)); + for (auto fn : filenames) { + if (fn.compare(0, 7, "OPTIONS") == 0) { + ASSERT_EQ(name, fn); + } + } + } + + CloseDBAndBackupEngine(); +} + +TEST_F(BackupEngineTest, SetOptionsBackupRaceCondition) { + OpenDBAndBackupEngine(true); + SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "BackupEngineTest::SetOptionsBackupRaceCondition:BeforeSetOptions"}, + {"BackupEngineTest::SetOptionsBackupRaceCondition:AfterSetOptions", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::port::Thread setoptions_thread{[this]() { + TEST_SYNC_POINT( + "BackupEngineTest::SetOptionsBackupRaceCondition:BeforeSetOptions"); + DBImpl* dbi = static_cast<DBImpl*>(db_.get()); + // Change arbitrary option to trigger OPTIONS file deletion + ASSERT_OK(dbi->SetOptions(dbi->DefaultColumnFamily(), + {{"paranoid_file_checks", "false"}})); + ASSERT_OK(dbi->SetOptions(dbi->DefaultColumnFamily(), + {{"paranoid_file_checks", "true"}})); + ASSERT_OK(dbi->SetOptions(dbi->DefaultColumnFamily(), + {{"paranoid_file_checks", "false"}})); + TEST_SYNC_POINT( + "BackupEngineTest::SetOptionsBackupRaceCondition:AfterSetOptions"); + }}; + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + setoptions_thread.join(); + CloseDBAndBackupEngine(); +} + +// This test verifies we don't delete the latest backup when read-only option is +// set +TEST_F(BackupEngineTest, NoDeleteWithReadOnly) { + const int keys_iteration = 5000; + Random rnd(6); + + OpenDBAndBackupEngine(true); + // create five backups + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2))); + } + CloseDBAndBackupEngine(); + ASSERT_OK(file_manager_->WriteToFile(latest_backup_, "4")); + + engine_options_->destroy_old_data = false; + BackupEngineReadOnly* read_only_backup_engine; + ASSERT_OK(BackupEngineReadOnly::Open( + backup_chroot_env_.get(), *engine_options_, &read_only_backup_engine)); + + // assert that data from backup 5 is still here (even though LATEST_BACKUP + // says 4 is latest) + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/5")); + ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/5")); + + // Behavior change: We now ignore LATEST_BACKUP contents. This means that + // we should have 5 backups, even if LATEST_BACKUP says 4. + std::vector<BackupInfo> backup_info; + read_only_backup_engine->GetBackupInfo(&backup_info); + ASSERT_EQ(5UL, backup_info.size()); + delete read_only_backup_engine; +} + +TEST_F(BackupEngineTest, FailOverwritingBackups) { + options_.write_buffer_size = 1024 * 1024 * 1024; // 1GB + options_.disable_auto_compactions = true; + + // create backups 1, 2, 3, 4, 5 + OpenDBAndBackupEngine(true); + for (int i = 0; i < 5; ++i) { + CloseDBAndBackupEngine(); + DeleteLogFiles(); + OpenDBAndBackupEngine(false); + FillDB(db_.get(), 100 * i, 100 * (i + 1), kFlushAll); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + } + CloseDBAndBackupEngine(); + + // restore 3 + OpenBackupEngine(); + ASSERT_OK(backup_engine_->RestoreDBFromBackup(3, dbname_, dbname_)); + CloseBackupEngine(); + + OpenDBAndBackupEngine(false); + // More data, bigger SST + FillDB(db_.get(), 1000, 1300, kFlushAll); + Status s = backup_engine_->CreateNewBackup(db_.get()); + // the new backup fails because new table files + // clash with old table files from backups 4 and 5 + // (since write_buffer_size is huge, we can be sure that + // each backup will generate only one sst file and that + // a file generated here would have the same name as an + // sst file generated by backup 4, and will be bigger) + ASSERT_TRUE(s.IsCorruption()); + ASSERT_OK(backup_engine_->DeleteBackup(4)); + ASSERT_OK(backup_engine_->DeleteBackup(5)); + // now, the backup can succeed + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); +} + +TEST_F(BackupEngineTest, NoShareTableFiles) { + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true, false, kNoShare); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2))); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } +} + +// Verify that you can backup and restore with share_files_with_checksum on +TEST_F(BackupEngineTest, ShareTableFilesWithChecksums) { + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2))); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } +} + +// Verify that you can backup and restore using share_files_with_checksum set to +// false and then transition this option to true +TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsTransition) { + const int keys_iteration = 5000; + // set share_files_with_checksum to false + OpenDBAndBackupEngine(true, false, kShareNoChecksum); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } + + // set share_files_with_checksum to true and do some more backups + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + for (int i = 5; i < 10; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + // Verify first (about to delete) + AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 11); + + // For an extra challenge, make sure that GarbageCollect / DeleteBackup + // is OK even if we open without share_table_files + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + ASSERT_OK(backup_engine_->DeleteBackup(1)); + ASSERT_OK(backup_engine_->GarbageCollect()); + CloseDBAndBackupEngine(); + + // Verify rest (not deleted) + for (int i = 1; i < 10; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 11); + } +} + +// Verify backup and restore with various naming options, check names +TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsNewNaming) { + ASSERT_TRUE(engine_options_->share_files_with_checksum_naming == + kNamingDefault); + + const int keys_iteration = 5000; + + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + FillDB(db_.get(), 0, keys_iteration); + CloseDBAndBackupEngine(); + + static const std::map<ShareFilesNaming, TestRegex> option_to_expected = { + {kLegacyCrc32cAndFileSize, "[0-9]+_[0-9]+_[0-9]+[.]sst"}, + // kFlagIncludeFileSize redundant here + {kLegacyCrc32cAndFileSize | kFlagIncludeFileSize, + "[0-9]+_[0-9]+_[0-9]+[.]sst"}, + {kUseDbSessionId, "[0-9]+_s[0-9A-Z]{20}[.]sst"}, + {kUseDbSessionId | kFlagIncludeFileSize, + "[0-9]+_s[0-9A-Z]{20}_[0-9]+[.]sst"}, + }; + + const TestRegex blobfile_pattern = "[0-9]+_[0-9]+_[0-9]+[.]blob"; + + for (const auto& pair : option_to_expected) { + CloseAndReopenDB(); + engine_options_->share_files_with_checksum_naming = pair.first; + OpenBackupEngine(true /*destroy_old_data*/); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); + AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 2); + AssertDirectoryFilesMatchRegex(backupdir_ + "/shared_checksum", pair.second, + ".sst", 1 /* minimum_count */); + if (std::string::npos != pair.second.GetPattern().find("_[0-9]+[.]sst")) { + AssertDirectoryFilesSizeIndicators(backupdir_ + "/shared_checksum", + 1 /* minimum_count */); + } + + AssertDirectoryFilesMatchRegex(backupdir_ + "/shared_checksum", + blobfile_pattern, ".blob", + 1 /* minimum_count */); + } +} + +// Mimic SST file generated by pre-6.12 releases and verify that +// old names are always used regardless of naming option. +TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsOldFileNaming) { + const int keys_iteration = 5000; + + // Pre-6.12 release did not include db id and db session id properties. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PropertyBlockBuilder::AddTableProperty:Start", [&](void* props_vs) { + auto props = static_cast<TableProperties*>(props_vs); + props->db_id = ""; + props->db_session_id = ""; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Corrupting the table properties corrupts the unique id. + // Ignore the unique id recorded in the manifest. + options_.verify_sst_unique_id_in_manifest = false; + + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + FillDB(db_.get(), 0, keys_iteration); + CloseDBAndBackupEngine(); + + // Old names should always be used on old files + const TestRegex sstfile_pattern("[0-9]+_[0-9]+_[0-9]+[.]sst"); + + const TestRegex blobfile_pattern = "[0-9]+_[0-9]+_[0-9]+[.]blob"; + + for (ShareFilesNaming option : {kNamingDefault, kUseDbSessionId}) { + CloseAndReopenDB(); + engine_options_->share_files_with_checksum_naming = option; + OpenBackupEngine(true /*destroy_old_data*/); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); + AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 2); + AssertDirectoryFilesMatchRegex(backupdir_ + "/shared_checksum", + sstfile_pattern, ".sst", + 1 /* minimum_count */); + AssertDirectoryFilesMatchRegex(backupdir_ + "/shared_checksum", + blobfile_pattern, ".blob", + 1 /* minimum_count */); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +// Test how naming options interact with detecting DB corruption +// between incremental backups +TEST_F(BackupEngineTest, TableFileCorruptionBeforeIncremental) { + const auto share_no_checksum = static_cast<ShareFilesNaming>(0); + + for (bool corrupt_before_first_backup : {false, true}) { + for (ShareFilesNaming option : + {share_no_checksum, kLegacyCrc32cAndFileSize, kNamingDefault}) { + auto share = + option == share_no_checksum ? kShareNoChecksum : kShareWithChecksum; + if (option != share_no_checksum) { + engine_options_->share_files_with_checksum_naming = option; + } + OpenDBAndBackupEngine(true, false, share); + DBImpl* dbi = static_cast<DBImpl*>(db_.get()); + // A small SST file + ASSERT_OK(dbi->Put(WriteOptions(), "x", "y")); + ASSERT_OK(dbi->Flush(FlushOptions())); + // And a bigger one + ASSERT_OK(dbi->Put(WriteOptions(), "y", Random(42).RandomString(500))); + ASSERT_OK(dbi->Flush(FlushOptions())); + ASSERT_OK(dbi->TEST_WaitForFlushMemTable()); + CloseAndReopenDB(/*read_only*/ true); + + std::vector<FileAttributes> table_files; + ASSERT_OK(GetDataFilesInDB(kTableFile, &table_files)); + ASSERT_EQ(table_files.size(), 2); + std::string tf0 = dbname_ + "/" + table_files[0].name; + std::string tf1 = dbname_ + "/" + table_files[1].name; + + CloseDBAndBackupEngine(); + + if (corrupt_before_first_backup) { + // This corrupts a data block, which does not cause DB open + // failure, only failure on accessing the block. + ASSERT_OK(db_file_manager_->CorruptFileStart(tf0)); + } + + OpenDBAndBackupEngine(false, false, share); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + CloseDBAndBackupEngine(); + + // if corrupt_before_first_backup, this undoes the initial corruption + ASSERT_OK(db_file_manager_->CorruptFileStart(tf0)); + + OpenDBAndBackupEngine(false, false, share); + Status s = backup_engine_->CreateNewBackup(db_.get()); + + // Even though none of the naming options catch the inconsistency + // between the first and second time backing up fname, in the case + // of kUseDbSessionId (kNamingDefault), this is an intentional + // trade-off to avoid full scan of files from the DB that are + // already backed up. If we did the scan, kUseDbSessionId could catch + // the corruption. kLegacyCrc32cAndFileSize does the scan (to + // compute checksum for name) without catching the corruption, + // because the corruption means the names don't merge. + EXPECT_OK(s); + + // VerifyBackup doesn't check DB integrity or table file internal + // checksums + EXPECT_OK(backup_engine_->VerifyBackup(1, true)); + EXPECT_OK(backup_engine_->VerifyBackup(2, true)); + + db_.reset(); + ASSERT_OK(backup_engine_->RestoreDBFromBackup(2, dbname_, dbname_)); + { + DB* db = OpenDB(); + s = db->VerifyChecksum(); + delete db; + } + if (option != kLegacyCrc32cAndFileSize && !corrupt_before_first_backup) { + // Second backup is OK because it used (uncorrupt) file from first + // backup instead of (corrupt) file from DB. + // This is arguably a good trade-off vs. treating the file as distinct + // from the old version, because a file should be more likely to be + // corrupt as it ages. Although the backed-up file might also corrupt + // with age, the alternative approach (checksum in file name computed + // from current DB file contents) wouldn't detect that case at backup + // time either. Although you would have both copies of the file with + // the alternative approach, that would only last until the older + // backup is deleted. + ASSERT_OK(s); + } else if (option == kLegacyCrc32cAndFileSize && + corrupt_before_first_backup) { + // Second backup is OK because it saved the updated (uncorrupt) + // file from DB, instead of the sharing with first backup. + // Recall: if corrupt_before_first_backup, [second CorruptFileStart] + // undoes the initial corruption. + // This is arguably a bad trade-off vs. sharing the old version of the + // file because a file should be more likely to corrupt as it ages. + // (Not likely that the previously backed-up version was already + // corrupt and the new version is non-corrupt. This approach doesn't + // help if backed-up version is corrupted after taking the backup.) + ASSERT_OK(s); + } else { + // Something is legitimately corrupted, but we can't be sure what + // with information available (TODO? unless one passes block checksum + // test and other doesn't. Probably better to use end-to-end full file + // checksum anyway.) + ASSERT_TRUE(s.IsCorruption()); + } + + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); + } + } +} + +// Test how naming options interact with detecting file size corruption +// between incremental backups +TEST_F(BackupEngineTest, FileSizeForIncremental) { + const auto share_no_checksum = static_cast<ShareFilesNaming>(0); + // TODO: enable blob files once Integrated BlobDB supports DB session id. + options_.enable_blob_files = false; + + for (ShareFilesNaming option : {share_no_checksum, kLegacyCrc32cAndFileSize, + kNamingDefault, kUseDbSessionId}) { + auto share = + option == share_no_checksum ? kShareNoChecksum : kShareWithChecksum; + if (option != share_no_checksum) { + engine_options_->share_files_with_checksum_naming = option; + } + OpenDBAndBackupEngine(true, false, share); + + std::vector<FileAttributes> children; + const std::string shared_dir = + backupdir_ + + (option == share_no_checksum ? "/shared" : "/shared_checksum"); + + // A single small SST file + ASSERT_OK(db_->Put(WriteOptions(), "x", "y")); + + // First, test that we always detect file size corruption on the shared + // backup side on incremental. (Since sizes aren't really part of backup + // meta file, this works by querying the filesystem for the sizes.) + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true /*flush*/)); + CloseDBAndBackupEngine(); + + // Corrupt backup SST file + ASSERT_OK(file_manager_->GetChildrenFileAttributes(shared_dir, &children)); + ASSERT_EQ(children.size(), 1U); // one sst + for (const auto& child : children) { + if (child.name.size() > 4 && child.size_bytes > 0) { + ASSERT_OK( + file_manager_->WriteToFile(shared_dir + "/" + child.name, "asdf")); + break; + } + } + + OpenDBAndBackupEngine(false, false, share); + Status s = backup_engine_->CreateNewBackup(db_.get()); + EXPECT_TRUE(s.IsCorruption()); + + ASSERT_OK(backup_engine_->PurgeOldBackups(0)); + CloseDBAndBackupEngine(); + + // Second, test that a hypothetical db session id collision would likely + // not suffice to corrupt a backup, because there's a good chance of + // file size difference (in this test, guaranteed) so either no name + // collision or detected collision. + + // Create backup 1 + OpenDBAndBackupEngine(false, false, share); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + // Even though we have "the same" DB state as backup 1, we need + // to restore to recreate the same conditions as later restore. + db_.reset(); + DestroyDBWithoutCheck(dbname_, options_); + ASSERT_OK(backup_engine_->RestoreDBFromBackup(1, dbname_, dbname_)); + CloseDBAndBackupEngine(); + + // Forge session id + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SetDbSessionId", [](void* sid_void_star) { + std::string* sid = static_cast<std::string*>(sid_void_star); + *sid = "01234567890123456789"; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Create another SST file + OpenDBAndBackupEngine(false, false, share); + ASSERT_OK(db_->Put(WriteOptions(), "y", "x")); + + // Create backup 2 + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true /*flush*/)); + + // Restore backup 1 (again) + db_.reset(); + DestroyDBWithoutCheck(dbname_, options_); + ASSERT_OK(backup_engine_->RestoreDBFromBackup(1, dbname_, dbname_)); + CloseDBAndBackupEngine(); + + // Create another SST file with same number and db session id, only bigger + OpenDBAndBackupEngine(false, false, share); + ASSERT_OK(db_->Put(WriteOptions(), "y", Random(42).RandomString(500))); + + // Count backup SSTs files. + children.clear(); + ASSERT_OK(file_manager_->GetChildrenFileAttributes(shared_dir, &children)); + ASSERT_EQ(children.size(), 2U); // two sst files + + // Try create backup 3 + s = backup_engine_->CreateNewBackup(db_.get(), true /*flush*/); + + // Re-count backup SSTs + children.clear(); + ASSERT_OK(file_manager_->GetChildrenFileAttributes(shared_dir, &children)); + + if (option == kUseDbSessionId) { + // Acceptable to call it corruption if size is not in name and + // db session id collision is practically impossible. + EXPECT_TRUE(s.IsCorruption()); + EXPECT_EQ(children.size(), 2U); // no SST file added + } else if (option == share_no_checksum) { + // Good to call it corruption if both backups cannot be + // accommodated. + EXPECT_TRUE(s.IsCorruption()); + EXPECT_EQ(children.size(), 2U); // no SST file added + } else { + // Since opening a DB seems sufficient for detecting size corruption + // on the DB side, this should be a good thing, ... + EXPECT_OK(s); + // ... as long as we did actually treat it as a distinct SST file. + EXPECT_EQ(children.size(), 3U); // Another SST added + } + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + } +} + +// Verify backup and restore with share_files_with_checksum off and then +// transition this option to on and share_files_with_checksum_naming to be +// based on kUseDbSessionId +TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsNewNamingTransition) { + const int keys_iteration = 5000; + // We may set share_files_with_checksum_naming to kLegacyCrc32cAndFileSize + // here but even if we don't, it should have no effect when + // share_files_with_checksum is false + ASSERT_TRUE(engine_options_->share_files_with_checksum_naming == + kNamingDefault); + // set share_files_with_checksum to false + OpenDBAndBackupEngine(true, false, kShareNoChecksum); + int j = 3; + for (int i = 0; i < j; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < j; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * (j + 1)); + } + + // set share_files_with_checksum to true and do some more backups + // and use session id in the name of SST file backup + ASSERT_TRUE(engine_options_->share_files_with_checksum_naming == + kNamingDefault); + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + FillDB(db_.get(), keys_iteration * j, keys_iteration * (j + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + // Use checksum in the name as well + ++j; + options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + FillDB(db_.get(), keys_iteration * j, keys_iteration * (j + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + // Verify first (about to delete) + AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * (j + 1)); + + // For an extra challenge, make sure that GarbageCollect / DeleteBackup + // is OK even if we open without share_table_files but with + // share_files_with_checksum_naming based on kUseDbSessionId + ASSERT_TRUE(engine_options_->share_files_with_checksum_naming == + kNamingDefault); + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + ASSERT_OK(backup_engine_->DeleteBackup(1)); + ASSERT_OK(backup_engine_->GarbageCollect()); + CloseDBAndBackupEngine(); + + // Verify second (about to delete) + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * (j + 1)); + + // Use checksum and file size for backup table file names and open without + // share_table_files + // Again, make sure that GarbageCollect / DeleteBackup is OK + engine_options_->share_files_with_checksum_naming = kLegacyCrc32cAndFileSize; + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + ASSERT_OK(backup_engine_->DeleteBackup(2)); + ASSERT_OK(backup_engine_->GarbageCollect()); + CloseDBAndBackupEngine(); + + // Verify rest (not deleted) + for (int i = 2; i < j; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * (j + 1)); + } +} + +// Verify backup and restore with share_files_with_checksum on and transition +// from kLegacyCrc32cAndFileSize to kUseDbSessionId +TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsNewNamingUpgrade) { + engine_options_->share_files_with_checksum_naming = kLegacyCrc32cAndFileSize; + const int keys_iteration = 5000; + // set share_files_with_checksum to true + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + int j = 3; + for (int i = 0; i < j; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < j; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * (j + 1)); + } + + engine_options_->share_files_with_checksum_naming = kUseDbSessionId; + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + FillDB(db_.get(), keys_iteration * j, keys_iteration * (j + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + ++j; + options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + FillDB(db_.get(), keys_iteration * j, keys_iteration * (j + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + // Verify first (about to delete) + AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * (j + 1)); + + // For an extra challenge, make sure that GarbageCollect / DeleteBackup + // is OK even if we open without share_table_files + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + ASSERT_OK(backup_engine_->DeleteBackup(1)); + ASSERT_OK(backup_engine_->GarbageCollect()); + CloseDBAndBackupEngine(); + + // Verify second (about to delete) + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * (j + 1)); + + // Use checksum and file size for backup table file names and open without + // share_table_files + // Again, make sure that GarbageCollect / DeleteBackup is OK + engine_options_->share_files_with_checksum_naming = kLegacyCrc32cAndFileSize; + OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare); + ASSERT_OK(backup_engine_->DeleteBackup(2)); + ASSERT_OK(backup_engine_->GarbageCollect()); + CloseDBAndBackupEngine(); + + // Verify rest (not deleted) + for (int i = 2; i < j; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * (j + 1)); + } +} + +// This test simulates cleaning up after aborted or incomplete creation +// of a new backup. +TEST_F(BackupEngineTest, DeleteTmpFiles) { + for (int cleanup_fn : {1, 2, 3, 4}) { + for (ShareOption shared_option : kAllShareOptions) { + OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */, + shared_option); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + BackupID next_id = 1; + BackupID oldest_id = std::numeric_limits<BackupID>::max(); + { + std::vector<BackupInfo> backup_info; + backup_engine_->GetBackupInfo(&backup_info); + for (const auto& bi : backup_info) { + next_id = std::max(next_id, bi.backup_id + 1); + oldest_id = std::min(oldest_id, bi.backup_id); + } + } + CloseDBAndBackupEngine(); + + // An aborted or incomplete new backup will always be in the next + // id (maybe more) + std::string next_private = "private/" + std::to_string(next_id); + + // NOTE: both shared and shared_checksum should be cleaned up + // regardless of how the backup engine is opened. + std::vector<std::string> tmp_files_and_dirs; + for (const auto& dir_and_file : { + std::make_pair(std::string("shared"), + std::string(".00006.sst.tmp")), + std::make_pair(std::string("shared_checksum"), + std::string(".00007.sst.tmp")), + std::make_pair(next_private, std::string("00003.sst")), + }) { + std::string dir = backupdir_ + "/" + dir_and_file.first; + ASSERT_OK(file_manager_->CreateDirIfMissing(dir)); + ASSERT_OK(file_manager_->FileExists(dir)); + + std::string file = dir + "/" + dir_and_file.second; + ASSERT_OK(file_manager_->WriteToFile(file, "tmp")); + ASSERT_OK(file_manager_->FileExists(file)); + + tmp_files_and_dirs.push_back(file); + } + if (cleanup_fn != /*CreateNewBackup*/ 4) { + // This exists after CreateNewBackup because it's deleted then + // re-created. + tmp_files_and_dirs.push_back(backupdir_ + "/" + next_private); + } + + OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */, + shared_option); + // Need to call one of these explicitly to delete tmp files + switch (cleanup_fn) { + case 1: + ASSERT_OK(backup_engine_->GarbageCollect()); + break; + case 2: + ASSERT_OK(backup_engine_->DeleteBackup(oldest_id)); + break; + case 3: + ASSERT_OK(backup_engine_->PurgeOldBackups(1)); + break; + case 4: + // Does a garbage collect if it sees that next private dir exists + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + break; + default: + assert(false); + } + CloseDBAndBackupEngine(); + for (std::string file_or_dir : tmp_files_and_dirs) { + if (file_manager_->FileExists(file_or_dir) != Status::NotFound()) { + FAIL() << file_or_dir << " was expected to be deleted." << cleanup_fn; + } + } + } + } +} + +TEST_F(BackupEngineTest, KeepLogFiles) { + engine_options_->backup_log_files = false; + // basically infinite + options_.WAL_ttl_seconds = 24 * 60 * 60; + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100, kFlushAll); + FillDB(db_.get(), 100, 200, kFlushAll); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + FillDB(db_.get(), 200, 300, kFlushAll); + FillDB(db_.get(), 300, 400, kFlushAll); + FillDB(db_.get(), 400, 500, kFlushAll); + CloseDBAndBackupEngine(); + + // all data should be there if we call with keep_log_files = true + AssertBackupConsistency(0, 0, 500, 600, true); +} + +#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) +class BackupEngineRateLimitingTestWithParam + : public BackupEngineTest, + public testing::WithParamInterface< + std::tuple<bool /* make throttle */, + int /* 0 = single threaded, 1 = multi threaded*/, + std::pair<uint64_t, uint64_t> /* limits */>> { + public: + BackupEngineRateLimitingTestWithParam() {} +}; + +uint64_t const MB = 1024 * 1024; + +INSTANTIATE_TEST_CASE_P( + RateLimiting, BackupEngineRateLimitingTestWithParam, + ::testing::Values(std::make_tuple(false, 0, std::make_pair(1 * MB, 5 * MB)), + std::make_tuple(false, 0, std::make_pair(2 * MB, 3 * MB)), + std::make_tuple(false, 1, std::make_pair(1 * MB, 5 * MB)), + std::make_tuple(false, 1, std::make_pair(2 * MB, 3 * MB)), + std::make_tuple(true, 0, std::make_pair(1 * MB, 5 * MB)), + std::make_tuple(true, 0, std::make_pair(2 * MB, 3 * MB)), + std::make_tuple(true, 1, std::make_pair(1 * MB, 5 * MB)), + std::make_tuple(true, 1, + std::make_pair(2 * MB, 3 * MB)))); + +TEST_P(BackupEngineRateLimitingTestWithParam, RateLimiting) { + size_t const kMicrosPerSec = 1000 * 1000LL; + const bool custom_rate_limiter = std::get<0>(GetParam()); + // iter 0 -- single threaded + // iter 1 -- multi threaded + const int iter = std::get<1>(GetParam()); + const std::pair<uint64_t, uint64_t> limit = std::get<2>(GetParam()); + std::unique_ptr<Env> special_env( + new SpecialEnv(db_chroot_env_.get(), /*time_elapse_only_sleep*/ true)); + // destroy old data + Options options; + options.env = special_env.get(); + DestroyDBWithoutCheck(dbname_, options); + + if (custom_rate_limiter) { + std::shared_ptr<RateLimiter> backup_rate_limiter = + std::make_shared<GenericRateLimiter>( + limit.first, 100 * 1000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kWritesOnly /* mode */, + special_env->GetSystemClock(), false /* auto_tuned */); + std::shared_ptr<RateLimiter> restore_rate_limiter = + std::make_shared<GenericRateLimiter>( + limit.second, 100 * 1000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kWritesOnly /* mode */, + special_env->GetSystemClock(), false /* auto_tuned */); + engine_options_->backup_rate_limiter = backup_rate_limiter; + engine_options_->restore_rate_limiter = restore_rate_limiter; + } else { + engine_options_->backup_rate_limit = limit.first; + engine_options_->restore_rate_limit = limit.second; + } + + engine_options_->max_background_operations = (iter == 0) ? 1 : 10; + options_.compression = kNoCompression; + + // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the + // `Env` to advance its time according to the fake wait duration. The + // workaround is to install a callback that advance the `Env`'s mock time. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) { + int64_t time_waited_us = *static_cast<int64_t*>(arg); + special_env->SleepForMicroseconds(static_cast<int>(time_waited_us)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + OpenDBAndBackupEngine(true); + TEST_SetDefaultRateLimitersClock(backup_engine_.get(), + special_env->GetSystemClock()); + + size_t bytes_written = FillDB(db_.get(), 0, 10000); + + auto start_backup = special_env->NowMicros(); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + auto backup_time = special_env->NowMicros() - start_backup; + CloseDBAndBackupEngine(); + auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / limit.first; + ASSERT_GT(backup_time, 0.8 * rate_limited_backup_time); + + OpenBackupEngine(); + TEST_SetDefaultRateLimitersClock( + backup_engine_.get(), + special_env->GetSystemClock() /* backup_rate_limiter_clock */, + special_env->GetSystemClock() /* restore_rate_limiter_clock */); + + auto start_restore = special_env->NowMicros(); + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + auto restore_time = special_env->NowMicros() - start_restore; + CloseBackupEngine(); + auto rate_limited_restore_time = + (bytes_written * kMicrosPerSec) / limit.second; + ASSERT_GT(restore_time, 0.8 * rate_limited_restore_time); + + AssertBackupConsistency(0, 0, 10000, 10100); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::Request:PostTimedWait"); +} + +TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingVerifyBackup) { + const std::size_t kMicrosPerSec = 1000 * 1000LL; + const bool custom_rate_limiter = std::get<0>(GetParam()); + const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first; + const bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false; + std::unique_ptr<Env> special_env( + new SpecialEnv(db_chroot_env_.get(), /*time_elapse_only_sleep*/ true)); + + if (custom_rate_limiter) { + std::shared_ptr<RateLimiter> backup_rate_limiter = + std::make_shared<GenericRateLimiter>( + backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */, + special_env->GetSystemClock(), false /* auto_tuned */); + engine_options_->backup_rate_limiter = backup_rate_limiter; + } else { + engine_options_->backup_rate_limit = backup_rate_limiter_limit; + } + + engine_options_->max_background_operations = is_single_threaded ? 1 : 10; + + Options options; + options.env = special_env.get(); + DestroyDBWithoutCheck(dbname_, options); + // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the + // `Env` to advance its time according to the fake wait duration. The + // workaround is to install a callback that advance the `Env`'s mock time. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) { + int64_t time_waited_us = *static_cast<int64_t*>(arg); + special_env->SleepForMicroseconds(static_cast<int>(time_waited_us)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + OpenDBAndBackupEngine(true /* destroy_old_data */); + TEST_SetDefaultRateLimitersClock(backup_engine_.get(), + special_env->GetSystemClock(), nullptr); + FillDB(db_.get(), 0, 10000); + + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + + std::vector<BackupInfo> backup_infos; + BackupInfo backup_info; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(1, backup_infos.size()); + const int backup_id = 1; + ASSERT_EQ(backup_id, backup_infos[0].backup_id); + ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info, + true /* include_file_details */)); + + std::uint64_t bytes_read_during_verify_backup = 0; + for (BackupFileInfo backup_file_info : backup_info.file_details) { + bytes_read_during_verify_backup += backup_file_info.size; + } + auto start_verify_backup = special_env->NowMicros(); + ASSERT_OK( + backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */)); + auto verify_backup_time = special_env->NowMicros() - start_verify_backup; + auto rate_limited_verify_backup_time = + (bytes_read_during_verify_backup * kMicrosPerSec) / + backup_rate_limiter_limit; + if (custom_rate_limiter) { + EXPECT_GE(verify_backup_time, 0.8 * rate_limited_verify_backup_time); + } + + CloseDBAndBackupEngine(); + AssertBackupConsistency(backup_id, 0, 10000, 10010); + DestroyDBWithoutCheck(dbname_, options); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::Request:PostTimedWait"); +} + +TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInBackup) { + bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false; + engine_options_->max_background_operations = is_single_threaded ? 1 : 10; + + const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first; + std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter( + backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */)); + engine_options_->backup_rate_limiter = backup_rate_limiter; + + DestroyDBWithoutCheck(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kShareWithChecksum /* shared_option */); + FillDB(db_.get(), 0, 10); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + std::int64_t total_bytes_through_with_no_read_charged = + backup_rate_limiter->GetTotalBytesThrough(); + CloseBackupEngine(); + + backup_rate_limiter.reset(NewGenericRateLimiter( + backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */)); + engine_options_->backup_rate_limiter = backup_rate_limiter; + + OpenBackupEngine(true); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + std::int64_t total_bytes_through_with_read_charged = + backup_rate_limiter->GetTotalBytesThrough(); + EXPECT_GT(total_bytes_through_with_read_charged, + total_bytes_through_with_no_read_charged); + CloseDBAndBackupEngine(); + AssertBackupConsistency(1, 0, 10, 20); + DestroyDBWithoutCheck(dbname_, Options()); +} + +TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInRestore) { + bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false; + engine_options_->max_background_operations = is_single_threaded ? 1 : 10; + + const std::uint64_t restore_rate_limiter_limit = + std::get<2>(GetParam()).second; + std::shared_ptr<RateLimiter> restore_rate_limiter(NewGenericRateLimiter( + restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */)); + engine_options_->restore_rate_limiter = restore_rate_limiter; + + DestroyDBWithoutCheck(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, 10); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, Options()); + + OpenBackupEngine(false /* destroy_old_data */); + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + std::int64_t total_bytes_through_with_no_read_charged = + restore_rate_limiter->GetTotalBytesThrough(); + CloseBackupEngine(); + DestroyDBWithoutCheck(dbname_, Options()); + + restore_rate_limiter.reset(NewGenericRateLimiter( + restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */)); + engine_options_->restore_rate_limiter = restore_rate_limiter; + + OpenBackupEngine(false /* destroy_old_data */); + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + std::int64_t total_bytes_through_with_read_charged = + restore_rate_limiter->GetTotalBytesThrough(); + EXPECT_EQ(total_bytes_through_with_read_charged, + total_bytes_through_with_no_read_charged * 2); + CloseBackupEngine(); + AssertBackupConsistency(1, 0, 10, 20); + DestroyDBWithoutCheck(dbname_, Options()); +} + +TEST_P(BackupEngineRateLimitingTestWithParam, + RateLimitingChargeReadInInitialize) { + bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false; + engine_options_->max_background_operations = is_single_threaded ? 1 : 10; + + const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first; + std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter( + backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */)); + engine_options_->backup_rate_limiter = backup_rate_limiter; + + DestroyDBWithoutCheck(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */); + FillDB(db_.get(), 0, 10); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + CloseDBAndBackupEngine(); + AssertBackupConsistency(1, 0, 10, 20); + + std::int64_t total_bytes_through_before_initialize = + engine_options_->backup_rate_limiter->GetTotalBytesThrough(); + OpenDBAndBackupEngine(false /* destroy_old_data */); + // We charge read in BackupEngineImpl::BackupMeta::LoadFromFile, + // which is called in BackupEngineImpl::Initialize() during + // OpenBackupEngine(false) + EXPECT_GT(engine_options_->backup_rate_limiter->GetTotalBytesThrough(), + total_bytes_through_before_initialize); + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, Options()); +} + +class BackupEngineRateLimitingTestWithParam2 + : public BackupEngineTest, + public testing::WithParamInterface< + std::tuple<std::pair<uint64_t, uint64_t> /* limits */>> { + public: + BackupEngineRateLimitingTestWithParam2() {} +}; + +INSTANTIATE_TEST_CASE_P( + LowRefillBytesPerPeriod, BackupEngineRateLimitingTestWithParam2, + ::testing::Values(std::make_tuple(std::make_pair(1, 1)))); +// To verify we don't request over-sized bytes relative to +// refill_bytes_per_period_ in each RateLimiter::Request() called in +// BackupEngine through verifying we don't trigger assertion +// failure on over-sized request in GenericRateLimiter in debug builds +TEST_P(BackupEngineRateLimitingTestWithParam2, + RateLimitingWithLowRefillBytesPerPeriod) { + SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true); + + engine_options_->max_background_operations = 1; + const uint64_t backup_rate_limiter_limit = std::get<0>(GetParam()).first; + std::shared_ptr<RateLimiter> backup_rate_limiter( + std::make_shared<GenericRateLimiter>( + backup_rate_limiter_limit, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */, + special_env.GetSystemClock(), false /* auto_tuned */)); + + engine_options_->backup_rate_limiter = backup_rate_limiter; + + const uint64_t restore_rate_limiter_limit = std::get<0>(GetParam()).second; + std::shared_ptr<RateLimiter> restore_rate_limiter( + std::make_shared<GenericRateLimiter>( + restore_rate_limiter_limit, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */, + special_env.GetSystemClock(), false /* auto_tuned */)); + + engine_options_->restore_rate_limiter = restore_rate_limiter; + + // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the + // `Env` to advance its time according to the fake wait duration. The + // workaround is to install a callback that advance the `Env`'s mock time. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) { + int64_t time_waited_us = *static_cast<int64_t*>(arg); + special_env.SleepForMicroseconds(static_cast<int>(time_waited_us)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + DestroyDBWithoutCheck(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kShareWithChecksum /* shared_option */); + + FillDB(db_.get(), 0, 100); + int64_t total_bytes_through_before_backup = + engine_options_->backup_rate_limiter->GetTotalBytesThrough(); + EXPECT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + int64_t total_bytes_through_after_backup = + engine_options_->backup_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_backup, + total_bytes_through_before_backup); + + std::vector<BackupInfo> backup_infos; + BackupInfo backup_info; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(1, backup_infos.size()); + const int backup_id = 1; + ASSERT_EQ(backup_id, backup_infos[0].backup_id); + ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info, + true /* include_file_details */)); + int64_t total_bytes_through_before_verify_backup = + engine_options_->backup_rate_limiter->GetTotalBytesThrough(); + EXPECT_OK( + backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */)); + int64_t total_bytes_through_after_verify_backup = + engine_options_->backup_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_verify_backup, + total_bytes_through_before_verify_backup); + + CloseDBAndBackupEngine(); + AssertBackupConsistency(backup_id, 0, 100, 101); + + int64_t total_bytes_through_before_initialize = + engine_options_->backup_rate_limiter->GetTotalBytesThrough(); + OpenDBAndBackupEngine(false /* destroy_old_data */); + // We charge read in BackupEngineImpl::BackupMeta::LoadFromFile, + // which is called in BackupEngineImpl::Initialize() during + // OpenBackupEngine(false) + int64_t total_bytes_through_after_initialize = + engine_options_->backup_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_initialize, + total_bytes_through_before_initialize); + CloseDBAndBackupEngine(); + + DestroyDBWithoutCheck(dbname_, Options()); + OpenBackupEngine(false /* destroy_old_data */); + int64_t total_bytes_through_before_restore = + engine_options_->restore_rate_limiter->GetTotalBytesThrough(); + EXPECT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + int64_t total_bytes_through_after_restore = + engine_options_->restore_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_restore, + total_bytes_through_before_restore); + CloseBackupEngine(); + + DestroyDBWithoutCheck(dbname_, Options()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::Request:PostTimedWait"); +} + +#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) + +TEST_F(BackupEngineTest, ReadOnlyBackupEngine) { + DestroyDBWithoutCheck(dbname_, options_); + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + // Also test read-only DB with CreateNewBackup and flush=true (no flush) + CloseAndReopenDB(/*read_only*/ true); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), /*flush*/ true)); + CloseAndReopenDB(/*read_only*/ false); + FillDB(db_.get(), 100, 200); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), /*flush*/ true)); + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); + + engine_options_->destroy_old_data = false; + test_backup_fs_->ClearWrittenFiles(); + test_backup_fs_->SetLimitDeleteFiles(0); + BackupEngineReadOnly* read_only_backup_engine; + ASSERT_OK(BackupEngineReadOnly::Open(db_chroot_env_.get(), *engine_options_, + &read_only_backup_engine)); + std::vector<BackupInfo> backup_info; + read_only_backup_engine->GetBackupInfo(&backup_info); + ASSERT_EQ(backup_info.size(), 2U); + + RestoreOptions restore_options(false); + ASSERT_OK(read_only_backup_engine->RestoreDBFromLatestBackup( + dbname_, dbname_, restore_options)); + delete read_only_backup_engine; + std::vector<std::string> should_have_written; + test_backup_fs_->AssertWrittenFiles(should_have_written); + + DB* db = OpenDB(); + AssertExists(db, 0, 200); + delete db; +} + +TEST_F(BackupEngineTest, OpenBackupAsReadOnlyDB) { + DestroyDBWithoutCheck(dbname_, options_); + options_.write_dbid_to_manifest = false; + + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), /*flush*/ false)); + + options_.write_dbid_to_manifest = true; // exercises some read-only DB code + CloseAndReopenDB(); + + FillDB(db_.get(), 100, 200); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), /*flush*/ false)); + db_.reset(); // CloseDB + DestroyDBWithoutCheck(dbname_, options_); + BackupInfo backup_info; + // First, check that we get empty fields without include_file_details + ASSERT_OK(backup_engine_->GetBackupInfo(/*id*/ 1U, &backup_info, + /*with file details*/ false)); + ASSERT_EQ(backup_info.name_for_open, ""); + ASSERT_FALSE(backup_info.env_for_open); + + // Now for the real test + backup_info = BackupInfo(); + ASSERT_OK(backup_engine_->GetBackupInfo(/*id*/ 1U, &backup_info, + /*with file details*/ true)); + + // Caution: DBOptions only holds a raw pointer to Env, so something else + // must keep it alive. + // Case 1: Keeping BackupEngine open suffices to keep Env alive + DB* db = nullptr; + Options opts = options_; + // Ensure some key defaults are set + opts.wal_dir = ""; + opts.create_if_missing = false; + opts.info_log.reset(); + + opts.env = backup_info.env_for_open.get(); + std::string name = backup_info.name_for_open; + backup_info = BackupInfo(); + ASSERT_OK(DB::OpenForReadOnly(opts, name, &db)); + + AssertExists(db, 0, 100); + AssertEmpty(db, 100, 200); + + delete db; + db = nullptr; + + // Case 2: Keeping BackupInfo alive rather than BackupEngine also suffices + ASSERT_OK(backup_engine_->GetBackupInfo(/*id*/ 2U, &backup_info, + /*with file details*/ true)); + CloseBackupEngine(); + opts.create_if_missing = true; // check also OK (though pointless) + opts.env = backup_info.env_for_open.get(); + name = backup_info.name_for_open; + // Note: keeping backup_info alive + ASSERT_OK(DB::OpenForReadOnly(opts, name, &db)); + + AssertExists(db, 0, 200); + delete db; + db = nullptr; + + // Now try opening read-write and make sure it fails, for safety. + ASSERT_TRUE(DB::Open(opts, name, &db).IsIOError()); +} + +TEST_F(BackupEngineTest, ProgressCallbackDuringBackup) { + DestroyDBWithoutCheck(dbname_, options_); + // Too big for this small DB + engine_options_->callback_trigger_interval_size = 100000; + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + bool is_callback_invoked = false; + ASSERT_OK(backup_engine_->CreateNewBackup( + db_.get(), true, + [&is_callback_invoked]() { is_callback_invoked = true; })); + ASSERT_FALSE(is_callback_invoked); + CloseBackupEngine(); + + // Easily small enough for this small DB + engine_options_->callback_trigger_interval_size = 1000; + OpenBackupEngine(); + ASSERT_OK(backup_engine_->CreateNewBackup( + db_.get(), true, + [&is_callback_invoked]() { is_callback_invoked = true; })); + ASSERT_TRUE(is_callback_invoked); + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); +} + +TEST_F(BackupEngineTest, GarbageCollectionBeforeBackup) { + DestroyDBWithoutCheck(dbname_, options_); + OpenDBAndBackupEngine(true); + + ASSERT_OK(backup_chroot_env_->CreateDirIfMissing(backupdir_ + "/shared")); + std::string file_five = backupdir_ + "/shared/000009.sst"; + std::string file_five_contents = "I'm not really a sst file"; + // this depends on the fact that 00009.sst is the first file created by the DB + ASSERT_OK(file_manager_->WriteToFile(file_five, file_five_contents)); + + FillDB(db_.get(), 0, 100); + // backup overwrites file 000009.sst + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + + std::string new_file_five_contents; + ASSERT_OK(ReadFileToString(backup_chroot_env_.get(), file_five, + &new_file_five_contents)); + // file 000009.sst was overwritten + ASSERT_TRUE(new_file_five_contents != file_five_contents); + + CloseDBAndBackupEngine(); + + AssertBackupConsistency(0, 0, 100); +} + +// Test that we properly propagate Env failures +TEST_F(BackupEngineTest, EnvFailures) { + BackupEngine* backup_engine; + + // get children failure + { + test_backup_fs_->SetGetChildrenFailure(true); + ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *engine_options_, + &backup_engine)); + test_backup_fs_->SetGetChildrenFailure(false); + } + + // created dir failure + { + test_backup_fs_->SetCreateDirIfMissingFailure(true); + ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *engine_options_, + &backup_engine)); + test_backup_fs_->SetCreateDirIfMissingFailure(false); + } + + // new directory failure + { + test_backup_fs_->SetNewDirectoryFailure(true); + ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *engine_options_, + &backup_engine)); + test_backup_fs_->SetNewDirectoryFailure(false); + } + + // Read from meta-file failure + { + DestroyDBWithoutCheck(dbname_, options_); + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + test_backup_fs_->SetDummySequentialFile(true); + test_backup_fs_->SetDummySequentialFileFailReads(true); + engine_options_->destroy_old_data = false; + ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *engine_options_, + &backup_engine)); + test_backup_fs_->SetDummySequentialFile(false); + test_backup_fs_->SetDummySequentialFileFailReads(false); + } + + // no failure + { + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *engine_options_, + &backup_engine)); + delete backup_engine; + } +} + +// Verify manifest can roll while a backup is being created with the old +// manifest. +TEST_F(BackupEngineTest, ChangeManifestDuringBackupCreation) { + DestroyDBWithoutCheck(dbname_, options_); + options_.max_manifest_file_size = 0; // always rollover manifest for file add + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100, kAutoFlushOnly); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "VersionSet::LogAndApply:WriteManifest"}, + {"VersionSet::LogAndApply:WriteManifestDone", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread flush_thread{ + [this]() { ASSERT_OK(db_->Flush(FlushOptions())); }}; + + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + + flush_thread.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + // The last manifest roll would've already been cleaned up by the full scan + // that happens when CreateNewBackup invokes EnableFileDeletions. We need to + // trigger another roll to verify non-full scan purges stale manifests. + DBImpl* db_impl = static_cast_with_check<DBImpl>(db_.get()); + std::string prev_manifest_path = + DescriptorFileName(dbname_, db_impl->TEST_Current_Manifest_FileNo()); + FillDB(db_.get(), 0, 100, kAutoFlushOnly); + ASSERT_OK(db_chroot_env_->FileExists(prev_manifest_path)); + ASSERT_OK(db_->Flush(FlushOptions())); + // Even though manual flush completed above, the background thread may not + // have finished its cleanup work. `TEST_WaitForBackgroundWork()` will wait + // until all the background thread's work has completed, including cleanup. + ASSERT_OK(db_impl->TEST_WaitForBackgroundWork()); + ASSERT_TRUE(db_chroot_env_->FileExists(prev_manifest_path).IsNotFound()); + + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); + AssertBackupConsistency(0, 0, 100); +} + +// see https://github.com/facebook/rocksdb/issues/921 +TEST_F(BackupEngineTest, Issue921Test) { + BackupEngine* backup_engine; + engine_options_->share_table_files = false; + ASSERT_OK( + backup_chroot_env_->CreateDirIfMissing(engine_options_->backup_dir)); + engine_options_->backup_dir += "/new_dir"; + ASSERT_OK(BackupEngine::Open(backup_chroot_env_.get(), *engine_options_, + &backup_engine)); + + delete backup_engine; +} + +TEST_F(BackupEngineTest, BackupWithMetadata) { + const int keys_iteration = 5000; + OpenDBAndBackupEngine(true); + // create five backups + for (int i = 0; i < 5; ++i) { + const std::string metadata = std::to_string(i); + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + // Here also test CreateNewBackupWithMetadata with CreateBackupOptions + // and outputting saved BackupID. + CreateBackupOptions opts; + opts.flush_before_backup = true; + BackupID new_id = 0; + ASSERT_OK(backup_engine_->CreateNewBackupWithMetadata(opts, db_.get(), + metadata, &new_id)); + ASSERT_EQ(new_id, static_cast<BackupID>(i + 1)); + } + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + { // Verify in bulk BackupInfo + std::vector<BackupInfo> backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(5, backup_infos.size()); + for (int i = 0; i < 5; i++) { + ASSERT_EQ(std::to_string(i), backup_infos[i].app_metadata); + } + } + // Also verify in individual BackupInfo + for (int i = 0; i < 5; i++) { + BackupInfo backup_info; + ASSERT_OK(backup_engine_->GetBackupInfo(static_cast<BackupID>(i + 1), + &backup_info)); + ASSERT_EQ(std::to_string(i), backup_info.app_metadata); + } + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); +} + +TEST_F(BackupEngineTest, BinaryMetadata) { + OpenDBAndBackupEngine(true); + std::string binaryMetadata = "abc\ndef"; + binaryMetadata.push_back('\0'); + binaryMetadata.append("ghi"); + ASSERT_OK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), binaryMetadata)); + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + std::vector<BackupInfo> backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(1, backup_infos.size()); + ASSERT_EQ(binaryMetadata, backup_infos[0].app_metadata); + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); +} + +TEST_F(BackupEngineTest, MetadataTooLarge) { + OpenDBAndBackupEngine(true); + std::string largeMetadata(1024 * 1024 + 1, 0); + ASSERT_NOK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), largeMetadata)); + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); +} + +TEST_F(BackupEngineTest, MetaSchemaVersion2_SizeCorruption) { + engine_options_->schema_version = 1; + OpenDBAndBackupEngine(/*destroy_old_data*/ true); + + // Backup 1: no future schema, no sizes, with checksums + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + CloseDBAndBackupEngine(); + engine_options_->schema_version = 2; + OpenDBAndBackupEngine(/*destroy_old_data*/ false); + + // Backup 2: no checksums, no sizes + TEST_BackupMetaSchemaOptions test_opts; + test_opts.crc32c_checksums = false; + test_opts.file_sizes = false; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + // Backup 3: no checksums, with sizes + test_opts.file_sizes = true; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + // Backup 4: with checksums and sizes + test_opts.crc32c_checksums = true; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + CloseDBAndBackupEngine(); + + // Corrupt all the CURRENT files with the wrong size + const std::string private_dir = backupdir_ + "/private"; + + for (int id = 1; id <= 3; ++id) { + ASSERT_OK(file_manager_->WriteToFile( + private_dir + "/" + std::to_string(id) + "/CURRENT", "x")); + } + // Except corrupt Backup 4 with same size CURRENT file + { + uint64_t size = 0; + ASSERT_OK(test_backup_env_->GetFileSize(private_dir + "/4/CURRENT", &size)); + ASSERT_OK(file_manager_->WriteToFile(private_dir + "/4/CURRENT", + std::string(size, 'x'))); + } + + OpenBackupEngine(); + + // Only the one with sizes in metadata will be immediately detected + // as corrupt + std::vector<BackupID> corrupted; + backup_engine_->GetCorruptedBackups(&corrupted); + ASSERT_EQ(corrupted.size(), 1); + ASSERT_EQ(corrupted[0], 3); + + // Size corruption detected on Restore with checksum + ASSERT_TRUE(backup_engine_->RestoreDBFromBackup(1 /*id*/, dbname_, dbname_) + .IsCorruption()); + + // Size corruption not detected without checksums nor sizes + ASSERT_OK(backup_engine_->RestoreDBFromBackup(2 /*id*/, dbname_, dbname_)); + + // Non-size corruption detected on Restore with checksum + ASSERT_TRUE(backup_engine_->RestoreDBFromBackup(4 /*id*/, dbname_, dbname_) + .IsCorruption()); + + CloseBackupEngine(); +} + +TEST_F(BackupEngineTest, MetaSchemaVersion2_NotSupported) { + engine_options_->schema_version = 2; + TEST_BackupMetaSchemaOptions test_opts; + std::string app_metadata = "abc\ndef"; + + OpenDBAndBackupEngine(true); + // Start with supported + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata)); + + // Because we are injecting badness with a TEST API, the badness is only + // detected on attempt to restore. + // Not supported versions + test_opts.version = "3"; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata)); + test_opts.version = "23.45.67"; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata)); + test_opts.version = "2"; + + // Non-ignorable fields + test_opts.meta_fields["ni::blah"] = "123"; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata)); + test_opts.meta_fields.clear(); + + test_opts.file_fields["ni::123"] = "xyz"; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata)); + test_opts.file_fields.clear(); + + test_opts.footer_fields["ni::123"] = "xyz"; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK( + backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata)); + test_opts.footer_fields.clear(); + CloseDBAndBackupEngine(); + + OpenBackupEngine(); + std::vector<BackupID> corrupted; + backup_engine_->GetCorruptedBackups(&corrupted); + ASSERT_EQ(corrupted.size(), 5); + + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + CloseBackupEngine(); +} + +TEST_F(BackupEngineTest, MetaSchemaVersion2_Restore) { + engine_options_->schema_version = 2; + TEST_BackupMetaSchemaOptions test_opts; + const int keys_iteration = 5000; + + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + FillDB(db_.get(), 0, keys_iteration); + // Start with minimum metadata to ensure it works without it being filled + // based on shared files also in other backups with the metadata. + test_opts.crc32c_checksums = false; + test_opts.file_sizes = false; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + AssertBackupConsistency(1 /* id */, 0, keys_iteration, keys_iteration * 2); + + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + test_opts.file_sizes = true; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + for (int id = 1; id <= 2; ++id) { + AssertBackupConsistency(id, 0, keys_iteration, keys_iteration * 2); + } + + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + test_opts.crc32c_checksums = true; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + for (int id = 1; id <= 3; ++id) { + AssertBackupConsistency(id, 0, keys_iteration, keys_iteration * 2); + } + + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + // No TEST_EnableWriteFutureSchemaVersion2 + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + for (int id = 1; id <= 4; ++id) { + AssertBackupConsistency(id, 0, keys_iteration, keys_iteration * 2); + } + + OpenDBAndBackupEngine(false /* destroy_old_data */, false, + kShareWithChecksum); + // Minor version updates should be forward-compatible + test_opts.version = "2.5.70"; + test_opts.meta_fields["asdf.3456"] = "-42"; + test_opts.meta_fields["__QRST"] = " 1 $ %%& "; + test_opts.file_fields["z94._"] = "^\\"; + test_opts.file_fields["_7yyyyyyyyy"] = "111111111111"; + test_opts.footer_fields["Qwzn.tz89"] = "ASDF!!@# ##=\t "; + test_opts.footer_fields["yes"] = "no!"; + TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + for (int id = 1; id <= 5; ++id) { + AssertBackupConsistency(id, 0, keys_iteration, keys_iteration * 2); + } +} + +TEST_F(BackupEngineTest, Concurrency) { + // Check that we can simultaneously: + // * Run several read operations in different threads on a single + // BackupEngine object, and + // * With another BackupEngine object on the same + // backup_dir, run the same read operations in another thread, and + // * With yet another BackupEngine object on the same + // backup_dir, create two new backups in parallel threads. + // + // Because of the challenges of integrating this into db_stress, + // this is a non-deterministic mini-stress test here instead. + + // To check for a race condition in handling buffer size based on byte + // burst limit, we need a (generous) rate limiter + std::shared_ptr<RateLimiter> limiter{NewGenericRateLimiter(1000000000)}; + engine_options_->backup_rate_limiter = limiter; + engine_options_->restore_rate_limiter = limiter; + + OpenDBAndBackupEngine(true, false, kShareWithChecksum); + + static constexpr int keys_iteration = 5000; + FillDB(db_.get(), 0, keys_iteration); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + FillDB(db_.get(), keys_iteration, 2 * keys_iteration); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + static constexpr int max_factor = 3; + FillDB(db_.get(), 2 * keys_iteration, max_factor * keys_iteration); + // will create another backup soon... + + Options db_opts = options_; + db_opts.wal_dir = ""; + db_opts.create_if_missing = false; + BackupEngineOptions be_opts = *engine_options_; + be_opts.destroy_old_data = false; + + std::mt19937 rng{std::random_device()()}; + + std::array<std::thread, 4> read_threads; + std::array<std::thread, 4> restore_verify_threads; + for (uint32_t i = 0; i < read_threads.size(); ++i) { + uint32_t sleep_micros = rng() % 100000; + read_threads[i] = std::thread([this, i, sleep_micros, &db_opts, &be_opts, + &restore_verify_threads, &limiter] { + test_db_env_->SleepForMicroseconds(sleep_micros); + + // Whether to also re-open the BackupEngine, potentially seeing + // additional backups + bool reopen = i == 3; + // Whether we are going to restore "latest" + bool latest = i > 1; + + BackupEngine* my_be; + if (reopen) { + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be)); + } else { + my_be = backup_engine_.get(); + } + + // Verify metadata (we don't receive updates from concurrently + // creating a new backup) + std::vector<BackupInfo> infos; + my_be->GetBackupInfo(&infos); + const uint32_t count = static_cast<uint32_t>(infos.size()); + infos.clear(); + if (reopen) { + ASSERT_GE(count, 2U); + ASSERT_LE(count, 4U); + fprintf(stderr, "Reopen saw %u backups\n", count); + } else { + ASSERT_EQ(count, 2U); + } + std::vector<BackupID> ids; + my_be->GetCorruptedBackups(&ids); + ASSERT_EQ(ids.size(), 0U); + + // (Eventually, see below) Restore one of the backups, or "latest" + std::string restore_db_dir = dbname_ + "/restore" + std::to_string(i); + DestroyDir(test_db_env_.get(), restore_db_dir).PermitUncheckedError(); + BackupID to_restore; + if (latest) { + to_restore = count; + } else { + to_restore = i + 1; + } + + // Open restored DB to verify its contents, but test atomic restore + // by doing it async and ensuring we either get OK or InvalidArgument + restore_verify_threads[i] = + std::thread([this, &db_opts, restore_db_dir, to_restore] { + DB* restored; + Status s; + for (;;) { + s = DB::Open(db_opts, restore_db_dir, &restored); + if (s.IsInvalidArgument()) { + // Restore hasn't finished + test_db_env_->SleepForMicroseconds(1000); + continue; + } else { + // We should only get InvalidArgument if restore is + // incomplete, or OK if complete + ASSERT_OK(s); + break; + } + } + int factor = std::min(static_cast<int>(to_restore), max_factor); + AssertExists(restored, 0, factor * keys_iteration); + AssertEmpty(restored, factor * keys_iteration, + (factor + 1) * keys_iteration); + delete restored; + }); + + // (Ok now) Restore one of the backups, or "latest" + if (latest) { + ASSERT_OK( + my_be->RestoreDBFromLatestBackup(restore_db_dir, restore_db_dir)); + } else { + ASSERT_OK(my_be->VerifyBackup(to_restore, true)); + ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir, + restore_db_dir)); + } + + // Test for race condition in reconfiguring limiter + // FIXME: this could set to a different value in all threads, except + // GenericRateLimiter::SetBytesPerSecond has a write-write race + // reported by TSAN + if (i == 0) { + limiter->SetBytesPerSecond(2000000000); + } + + // Re-verify metadata (we don't receive updates from concurrently + // creating a new backup) + my_be->GetBackupInfo(&infos); + ASSERT_EQ(infos.size(), count); + my_be->GetCorruptedBackups(&ids); + ASSERT_EQ(ids.size(), 0); + // fprintf(stderr, "Finished read thread\n"); + + if (reopen) { + delete my_be; + } + }); + } + + BackupEngine* alt_be; + ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &alt_be)); + + std::array<std::thread, 2> append_threads; + for (unsigned i = 0; i < append_threads.size(); ++i) { + uint32_t sleep_micros = rng() % 100000; + append_threads[i] = std::thread([this, sleep_micros, alt_be] { + test_db_env_->SleepForMicroseconds(sleep_micros); + // WART: CreateNewBackup doesn't tell you the BackupID it just created, + // which is ugly for multithreaded setting. + // TODO: add delete backup also when that is added + ASSERT_OK(alt_be->CreateNewBackup(db_.get())); + // fprintf(stderr, "Finished append thread\n"); + }); + } + + for (auto& t : append_threads) { + t.join(); + } + // Verify metadata + std::vector<BackupInfo> infos; + alt_be->GetBackupInfo(&infos); + ASSERT_EQ(infos.size(), 2 + append_threads.size()); + + for (auto& t : read_threads) { + t.join(); + } + + delete alt_be; + + for (auto& t : restore_verify_threads) { + t.join(); + } + + CloseDBAndBackupEngine(); +} + +TEST_F(BackupEngineTest, LimitBackupsOpened) { + // Verify the specified max backups are opened, including skipping over + // corrupted backups. + // + // Setup: + // - backups 1, 2, and 4 are valid + // - backup 3 is corrupt + // - max_valid_backups_to_open == 2 + // + // Expectation: the engine opens backups 4 and 2 since those are latest two + // non-corrupt backups. + const int kNumKeys = 5000; + OpenDBAndBackupEngine(true); + for (int i = 1; i <= 4; ++i) { + FillDB(db_.get(), kNumKeys * i, kNumKeys * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + if (i == 3) { + ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/3", 3)); + } + } + CloseDBAndBackupEngine(); + + engine_options_->max_valid_backups_to_open = 2; + engine_options_->destroy_old_data = false; + BackupEngineReadOnly* read_only_backup_engine; + ASSERT_OK(BackupEngineReadOnly::Open( + backup_chroot_env_.get(), *engine_options_, &read_only_backup_engine)); + + std::vector<BackupInfo> backup_infos; + read_only_backup_engine->GetBackupInfo(&backup_infos); + ASSERT_EQ(2, backup_infos.size()); + ASSERT_EQ(2, backup_infos[0].backup_id); + ASSERT_EQ(4, backup_infos[1].backup_id); + delete read_only_backup_engine; +} + +TEST_F(BackupEngineTest, IgnoreLimitBackupsOpenedWhenNotReadOnly) { + // Verify the specified max_valid_backups_to_open is ignored if the engine + // is not read-only. + // + // Setup: + // - backups 1, 2, and 4 are valid + // - backup 3 is corrupt + // - max_valid_backups_to_open == 2 + // + // Expectation: the engine opens backups 4, 2, and 1 since those are latest + // non-corrupt backups, by ignoring max_valid_backups_to_open == 2. + const int kNumKeys = 5000; + OpenDBAndBackupEngine(true); + for (int i = 1; i <= 4; ++i) { + FillDB(db_.get(), kNumKeys * i, kNumKeys * (i + 1)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + if (i == 3) { + ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/3", 3)); + } + } + CloseDBAndBackupEngine(); + + engine_options_->max_valid_backups_to_open = 2; + OpenDBAndBackupEngine(); + std::vector<BackupInfo> backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(3, backup_infos.size()); + ASSERT_EQ(1, backup_infos[0].backup_id); + ASSERT_EQ(2, backup_infos[1].backup_id); + ASSERT_EQ(4, backup_infos[2].backup_id); + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); +} + +TEST_F(BackupEngineTest, CreateWhenLatestBackupCorrupted) { + // we should pick an ID greater than corrupted backups' IDs so creation can + // succeed even when latest backup is corrupted. + const int kNumKeys = 5000; + OpenDBAndBackupEngine(true /* destroy_old_data */); + BackupInfo backup_info; + ASSERT_TRUE(backup_engine_->GetLatestBackupInfo(&backup_info).IsNotFound()); + FillDB(db_.get(), 0 /* from */, kNumKeys); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + true /* flush_before_backup */)); + ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/1", + 3 /* bytes_to_corrupt */)); + CloseDBAndBackupEngine(); + + OpenDBAndBackupEngine(); + ASSERT_TRUE(backup_engine_->GetLatestBackupInfo(&backup_info).IsNotFound()); + + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + true /* flush_before_backup */)); + + ASSERT_TRUE(backup_engine_->GetLatestBackupInfo(&backup_info).ok()); + ASSERT_EQ(2, backup_info.backup_id); + + std::vector<BackupInfo> backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(1, backup_infos.size()); + ASSERT_EQ(2, backup_infos[0].backup_id); + + // Verify individual GetBackupInfo by ID + ASSERT_TRUE(backup_engine_->GetBackupInfo(0U, &backup_info).IsNotFound()); + ASSERT_TRUE(backup_engine_->GetBackupInfo(1U, &backup_info).IsCorruption()); + ASSERT_TRUE(backup_engine_->GetBackupInfo(2U, &backup_info).ok()); + ASSERT_TRUE(backup_engine_->GetBackupInfo(3U, &backup_info).IsNotFound()); + ASSERT_TRUE( + backup_engine_->GetBackupInfo(999999U, &backup_info).IsNotFound()); +} + +TEST_F(BackupEngineTest, WriteOnlyEngineNoSharedFileDeletion) { + // Verifies a write-only BackupEngine does not delete files belonging to valid + // backups when GarbageCollect, PurgeOldBackups, or DeleteBackup are called. + const int kNumKeys = 5000; + for (int i = 0; i < 3; ++i) { + OpenDBAndBackupEngine(i == 0 /* destroy_old_data */); + FillDB(db_.get(), i * kNumKeys, (i + 1) * kNumKeys); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true)); + CloseDBAndBackupEngine(); + + engine_options_->max_valid_backups_to_open = 0; + OpenDBAndBackupEngine(); + switch (i) { + case 0: + ASSERT_OK(backup_engine_->GarbageCollect()); + break; + case 1: + ASSERT_OK(backup_engine_->PurgeOldBackups(1 /* num_backups_to_keep */)); + break; + case 2: + ASSERT_OK(backup_engine_->DeleteBackup(2 /* backup_id */)); + break; + default: + assert(false); + } + CloseDBAndBackupEngine(); + + engine_options_->max_valid_backups_to_open = + std::numeric_limits<int32_t>::max(); + AssertBackupConsistency(i + 1, 0, (i + 1) * kNumKeys); + } +} + +TEST_P(BackupEngineTestWithParam, BackupUsingDirectIO) { + // Tests direct I/O on the backup engine's reads and writes on the DB env and + // backup env + // We use ChrootEnv underneath so the below line checks for direct I/O support + // in the chroot directory, not the true filesystem root. + if (!test::IsDirectIOSupported(test_db_env_.get(), "/")) { + ROCKSDB_GTEST_SKIP("Test requires Direct I/O Support"); + return; + } + const int kNumKeysPerBackup = 100; + const int kNumBackups = 3; + options_.use_direct_reads = true; + OpenDBAndBackupEngine(true /* destroy_old_data */); + for (int i = 0; i < kNumBackups; ++i) { + FillDB(db_.get(), i * kNumKeysPerBackup /* from */, + (i + 1) * kNumKeysPerBackup /* to */, kFlushAll); + + // Clear the file open counters and then do a bunch of backup engine ops. + // For all ops, files should be opened in direct mode. + test_backup_fs_->ClearFileOpenCounters(); + test_db_fs_->ClearFileOpenCounters(); + CloseBackupEngine(); + OpenBackupEngine(); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + ASSERT_OK(backup_engine_->VerifyBackup(i + 1)); + CloseBackupEngine(); + OpenBackupEngine(); + std::vector<BackupInfo> backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(static_cast<size_t>(i + 1), backup_infos.size()); + + // Verify backup engine always opened files with direct I/O + ASSERT_EQ(0, test_db_fs_->num_writers()); + ASSERT_GE(test_db_fs_->num_direct_rand_readers(), 0); + ASSERT_GT(test_db_fs_->num_direct_seq_readers(), 0); + // Currently the DB doesn't support reading WALs or manifest with direct + // I/O, so subtract two. + ASSERT_EQ(test_db_fs_->num_seq_readers() - 2, + test_db_fs_->num_direct_seq_readers()); + ASSERT_EQ(test_db_fs_->num_rand_readers(), + test_db_fs_->num_direct_rand_readers()); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < kNumBackups; ++i) { + AssertBackupConsistency(i + 1 /* backup_id */, + i * kNumKeysPerBackup /* start_exist */, + (i + 1) * kNumKeysPerBackup /* end_exist */, + (i + 2) * kNumKeysPerBackup /* end */); + } +} + +TEST_F(BackupEngineTest, BackgroundThreadCpuPriority) { + std::atomic<CpuPriority> priority(CpuPriority::kNormal); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackupEngineImpl::Initialize:SetCpuPriority", [&](void* new_priority) { + priority.store(*reinterpret_cast<CpuPriority*>(new_priority)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // 1 thread is easier to test, otherwise, we may not be sure which thread + // actually does the work during CreateNewBackup. + engine_options_->max_background_operations = 1; + OpenDBAndBackupEngine(true); + + { + FillDB(db_.get(), 0, 100); + + // by default, cpu priority is not changed. + CreateBackupOptions options; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get())); + + ASSERT_EQ(priority, CpuPriority::kNormal); + } + + { + FillDB(db_.get(), 101, 200); + + // decrease cpu priority from normal to low. + CreateBackupOptions options; + options.decrease_background_thread_cpu_priority = true; + options.background_thread_cpu_priority = CpuPriority::kLow; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get())); + + ASSERT_EQ(priority, CpuPriority::kLow); + } + + { + FillDB(db_.get(), 201, 300); + + // try to upgrade cpu priority back to normal, + // the priority should still low. + CreateBackupOptions options; + options.decrease_background_thread_cpu_priority = true; + options.background_thread_cpu_priority = CpuPriority::kNormal; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get())); + + ASSERT_EQ(priority, CpuPriority::kLow); + } + + { + FillDB(db_.get(), 301, 400); + + // decrease cpu priority from low to idle. + CreateBackupOptions options; + options.decrease_background_thread_cpu_priority = true; + options.background_thread_cpu_priority = CpuPriority::kIdle; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get())); + + ASSERT_EQ(priority, CpuPriority::kIdle); + } + + { + FillDB(db_.get(), 301, 400); + + // reset priority to later verify that it's not updated by SetCpuPriority. + priority = CpuPriority::kNormal; + + // setting the same cpu priority won't call SetCpuPriority. + CreateBackupOptions options; + options.decrease_background_thread_cpu_priority = true; + options.background_thread_cpu_priority = CpuPriority::kIdle; + + // Also check output backup_id with CreateNewBackup + BackupID new_id = 0; + ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get(), &new_id)); + ASSERT_EQ(new_id, 5U); + + ASSERT_EQ(priority, CpuPriority::kNormal); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + CloseDBAndBackupEngine(); + DestroyDBWithoutCheck(dbname_, options_); +} + +// Populates `*total_size` with the size of all files under `backup_dir`. +// We don't go through `BackupEngine` currently because it's hard to figure out +// the metadata file size. +Status GetSizeOfBackupFiles(FileSystem* backup_fs, + const std::string& backup_dir, size_t* total_size) { + *total_size = 0; + std::vector<std::string> dir_stack = {backup_dir}; + Status s; + while (s.ok() && !dir_stack.empty()) { + std::string dir = std::move(dir_stack.back()); + dir_stack.pop_back(); + std::vector<std::string> children; + s = backup_fs->GetChildren(dir, IOOptions(), &children, nullptr /* dbg */); + for (size_t i = 0; s.ok() && i < children.size(); ++i) { + std::string path = dir + "/" + children[i]; + bool is_dir; + s = backup_fs->IsDirectory(path, IOOptions(), &is_dir, nullptr /* dbg */); + uint64_t file_size = 0; + if (s.ok()) { + if (is_dir) { + dir_stack.emplace_back(std::move(path)); + } else { + s = backup_fs->GetFileSize(path, IOOptions(), &file_size, + nullptr /* dbg */); + } + } + if (s.ok()) { + *total_size += file_size; + } + } + } + return s; +} + +TEST_F(BackupEngineTest, IOStats) { + // Tests the `BACKUP_READ_BYTES` and `BACKUP_WRITE_BYTES` ticker stats have + // the expected values according to the files in the backups. + + // These ticker stats are expected to be populated regardless of `PerfLevel` + // in user thread + SetPerfLevel(kDisable); + + options_.statistics = CreateDBStatistics(); + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kShareWithChecksum); + + FillDB(db_.get(), 0 /* from */, 100 /* to */, kFlushMost); + + ASSERT_EQ(0, options_.statistics->getTickerCount(BACKUP_READ_BYTES)); + ASSERT_EQ(0, options_.statistics->getTickerCount(BACKUP_WRITE_BYTES)); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + + size_t orig_backup_files_size; + ASSERT_OK(GetSizeOfBackupFiles(test_backup_env_->GetFileSystem().get(), + backupdir_, &orig_backup_files_size)); + size_t expected_bytes_written = orig_backup_files_size; + ASSERT_EQ(expected_bytes_written, + options_.statistics->getTickerCount(BACKUP_WRITE_BYTES)); + // Bytes read is more difficult to pin down since there are reads for many + // purposes other than creating file, like `GetSortedWalFiles()` to find first + // sequence number, or `CreateNewBackup()` thread to find SST file session ID. + // So we loosely require there are at least as many reads as needed for + // copying, but not as many as twice that. + ASSERT_GE(options_.statistics->getTickerCount(BACKUP_READ_BYTES), + expected_bytes_written); + ASSERT_LT(expected_bytes_written, + 2 * options_.statistics->getTickerCount(BACKUP_READ_BYTES)); + + FillDB(db_.get(), 100 /* from */, 200 /* to */, kFlushMost); + + ASSERT_OK(options_.statistics->Reset()); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + size_t final_backup_files_size; + ASSERT_OK(GetSizeOfBackupFiles(test_backup_env_->GetFileSystem().get(), + backupdir_, &final_backup_files_size)); + expected_bytes_written = final_backup_files_size - orig_backup_files_size; + ASSERT_EQ(expected_bytes_written, + options_.statistics->getTickerCount(BACKUP_WRITE_BYTES)); + // See above for why these bounds were chosen. + ASSERT_GE(options_.statistics->getTickerCount(BACKUP_READ_BYTES), + expected_bytes_written); + ASSERT_LT(expected_bytes_written, + 2 * options_.statistics->getTickerCount(BACKUP_READ_BYTES)); +} + +TEST_F(BackupEngineTest, FileTemperatures) { + CloseDBAndBackupEngine(); + + // Required for recording+restoring temperatures + engine_options_->schema_version = 2; + + // More file IO instrumentation + auto my_db_fs = std::make_shared<FileTemperatureTestFS>(db_chroot_fs_); + test_db_fs_ = std::make_shared<TestFs>(my_db_fs); + SetEnvsFromFileSystems(); + + // Use temperatures + options_.bottommost_temperature = Temperature::kWarm; + options_.level0_file_num_compaction_trigger = 2; + // set dynamic_level to true so the compaction would compact the data to the + // last level directly which will have the last_level_temperature + options_.level_compaction_dynamic_level_bytes = true; + + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kShareWithChecksum); + + // generate a bottommost file (combined from 2) and a non-bottommost file + DBImpl* dbi = static_cast_with_check<DBImpl>(db_.get()); + ASSERT_OK(db_->Put(WriteOptions(), "a", "val")); + ASSERT_OK(db_->Put(WriteOptions(), "c", "val")); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_OK(db_->Put(WriteOptions(), "b", "val")); + ASSERT_OK(db_->Put(WriteOptions(), "d", "val")); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_OK(dbi->TEST_WaitForCompact()); + ASSERT_OK(db_->Put(WriteOptions(), "e", "val")); + ASSERT_OK(db_->Flush(FlushOptions())); + + // Get temperatures from manifest + std::map<uint64_t, Temperature> manifest_temps; + std::map<Temperature, int> manifest_temp_counts; + { + std::vector<LiveFileStorageInfo> infos; + ASSERT_OK( + db_->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(), &infos)); + for (auto info : infos) { + if (info.file_type == kTableFile) { + manifest_temps.emplace(info.file_number, info.temperature); + manifest_temp_counts[info.temperature]++; + } + } + } + + // Verify expected manifest temperatures + ASSERT_EQ(manifest_temp_counts.size(), 2); + ASSERT_EQ(manifest_temp_counts[Temperature::kWarm], 1); + ASSERT_EQ(manifest_temp_counts[Temperature::kUnknown], 1); + + // Verify manifest temperatures match FS temperatures + std::map<uint64_t, Temperature> current_temps; + my_db_fs->CopyCurrentSstFileTemperatures(¤t_temps); + for (const auto& manifest_temp : manifest_temps) { + ASSERT_EQ(current_temps[manifest_temp.first], manifest_temp.second); + } + + // Try a few different things + for (int i = 1; i <= 5; ++i) { + // Expected temperatures after restore are based on manifest temperatures + std::map<uint64_t, Temperature> expected_temps = manifest_temps; + + if (i >= 2) { + // For iterations 2 & 3, override current temperature of one file + // and vary which temperature is authoritative (current or manifest). + // For iterations 4 & 5, override current temperature of both files + // but make sure an current temperate always takes precedence over + // unknown regardless of current_temperatures_override_manifest setting. + bool use_current = ((i % 2) == 1); + engine_options_->current_temperatures_override_manifest = use_current; + CloseBackupEngine(); + OpenBackupEngine(); + for (const auto& manifest_temp : manifest_temps) { + if (i <= 3) { + if (manifest_temp.second == Temperature::kWarm) { + my_db_fs->OverrideSstFileTemperature(manifest_temp.first, + Temperature::kCold); + if (use_current) { + expected_temps[manifest_temp.first] = Temperature::kCold; + } + } + } else { + assert(i <= 5); + if (manifest_temp.second == Temperature::kWarm) { + my_db_fs->OverrideSstFileTemperature(manifest_temp.first, + Temperature::kUnknown); + } else { + ASSERT_EQ(manifest_temp.second, Temperature::kUnknown); + my_db_fs->OverrideSstFileTemperature(manifest_temp.first, + Temperature::kHot); + // regardless of use_current + expected_temps[manifest_temp.first] = Temperature::kHot; + } + } + } + } + + // Sample requested temperatures in opening files for backup + my_db_fs->PopRequestedSstFileTemperatures(); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get())); + + // Verify requested temperatures against manifest temperatures (before + // retry with kUnknown if needed, and before backup finds out current + // temperatures in FileSystem) + std::vector<std::pair<uint64_t, Temperature>> requested_temps; + my_db_fs->PopRequestedSstFileTemperatures(&requested_temps); + std::set<uint64_t> distinct_requests; + for (const auto& requested_temp : requested_temps) { + // Matching manifest temperatures, except allow retry request with + // kUnknown + auto manifest_temp = manifest_temps.at(requested_temp.first); + if (manifest_temp == Temperature::kUnknown || + requested_temp.second != Temperature::kUnknown) { + ASSERT_EQ(manifest_temp, requested_temp.second); + } + distinct_requests.insert(requested_temp.first); + } + // Two distinct requests + ASSERT_EQ(distinct_requests.size(), 2); + + // Verify against backup info file details API + BackupInfo info; + ASSERT_OK(backup_engine_->GetLatestBackupInfo( + &info, /*include_file_details*/ true)); + ASSERT_GT(info.file_details.size(), 2); + for (auto& e : info.file_details) { + ASSERT_EQ(expected_temps[e.file_number], e.temperature); + } + + // Restore backup to another virtual (tiered) dir + const std::string restore_dir = "/restore" + std::to_string(i); + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup( + RestoreOptions(), restore_dir, restore_dir)); + + // Verify restored FS temperatures match expectation + // (FileTemperatureTestFS doesn't distinguish directories when reporting + // current temperatures, just whatever SST was written or overridden last + // with that file number.) + my_db_fs->CopyCurrentSstFileTemperatures(¤t_temps); + for (const auto& expected_temp : expected_temps) { + ASSERT_EQ(current_temps[expected_temp.first], expected_temp.second); + } + + // Delete backup to force next backup to copy files + ASSERT_OK(backup_engine_->PurgeOldBackups(0)); + } +} + +} // namespace + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as BackupEngine is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN) |