// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #ifndef ROCKSDB_LITE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "env/composite_env_wrapper.h" #include "env/fs_readonly.h" #include "env/fs_remap.h" #include "file/filename.h" #include "file/line_file_reader.h" #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" #include "logging/logging.h" #include "monitoring/iostats_context_imp.h" #include "options/options_helper.h" #include "port/port.h" #include "rocksdb/advanced_options.h" #include "rocksdb/env.h" #include "rocksdb/rate_limiter.h" #include "rocksdb/statistics.h" #include "rocksdb/transaction_log.h" #include "table/sst_file_dumper.h" #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/channel.h" #include "util/coding.h" #include "util/crc32c.h" #include "util/math.h" #include "util/rate_limiter.h" #include "util/string_util.h" #include "utilities/backup/backup_engine_impl.h" #include "utilities/checkpoint/checkpoint_impl.h" namespace ROCKSDB_NAMESPACE { namespace { using ShareFilesNaming = BackupEngineOptions::ShareFilesNaming; constexpr BackupID kLatestBackupIDMarker = static_cast(-2); inline uint32_t ChecksumHexToInt32(const std::string& checksum_hex) { std::string checksum_str; Slice(checksum_hex).DecodeHex(&checksum_str); return EndianSwapValue(DecodeFixed32(checksum_str.c_str())); } inline std::string ChecksumStrToHex(const std::string& checksum_str) { return Slice(checksum_str).ToString(true); } inline std::string ChecksumInt32ToHex(const uint32_t& checksum_value) { std::string checksum_str; PutFixed32(&checksum_str, EndianSwapValue(checksum_value)); return ChecksumStrToHex(checksum_str); } const std::string kPrivateDirName = "private"; const std::string kMetaDirName = "meta"; const std::string kSharedDirName = "shared"; const std::string kSharedChecksumDirName = "shared_checksum"; const std::string kPrivateDirSlash = kPrivateDirName + "/"; const std::string kMetaDirSlash = kMetaDirName + "/"; const std::string kSharedDirSlash = kSharedDirName + "/"; const std::string kSharedChecksumDirSlash = kSharedChecksumDirName + "/"; } // namespace void BackupStatistics::IncrementNumberSuccessBackup() { number_success_backup++; } void BackupStatistics::IncrementNumberFailBackup() { number_fail_backup++; } uint32_t BackupStatistics::GetNumberSuccessBackup() const { return number_success_backup; } uint32_t BackupStatistics::GetNumberFailBackup() const { return number_fail_backup; } std::string BackupStatistics::ToString() const { char result[50]; snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u", GetNumberSuccessBackup(), GetNumberFailBackup()); return result; } void BackupEngineOptions::Dump(Logger* logger) const { ROCKS_LOG_INFO(logger, " Options.backup_dir: %s", backup_dir.c_str()); ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env); ROCKS_LOG_INFO(logger, " Options.share_table_files: %d", static_cast(share_table_files)); ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log); ROCKS_LOG_INFO(logger, " Options.sync: %d", static_cast(sync)); ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d", static_cast(destroy_old_data)); ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d", static_cast(backup_log_files)); ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64, backup_rate_limit); ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64, restore_rate_limit); ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d", max_background_operations); } namespace { // -------- BackupEngineImpl class --------- class BackupEngineImpl { public: BackupEngineImpl(const BackupEngineOptions& options, Env* db_env, bool read_only = false); ~BackupEngineImpl(); IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db, const std::string& app_metadata, BackupID* new_backup_id_ptr); IOStatus PurgeOldBackups(uint32_t num_backups_to_keep); IOStatus DeleteBackup(BackupID backup_id); void StopBackup() { stop_backup_.store(true, std::memory_order_release); } IOStatus GarbageCollect(); // The returned BackupInfos are in chronological order, which means the // latest backup comes last. void GetBackupInfo(std::vector* backup_info, bool include_file_details) const; Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info, bool include_file_details = false) const; void GetCorruptedBackups(std::vector* corrupt_backup_ids) const; IOStatus RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id, const std::string& db_dir, const std::string& wal_dir) const; IOStatus RestoreDBFromLatestBackup(const RestoreOptions& options, const std::string& db_dir, const std::string& wal_dir) const { // Note: don't read latest_valid_backup_id_ outside of lock return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir); } IOStatus VerifyBackup(BackupID backup_id, bool verify_with_checksum = false) const; IOStatus Initialize(); ShareFilesNaming GetNamingNoFlags() const { return options_.share_files_with_checksum_naming & BackupEngineOptions::kMaskNoNamingFlags; } ShareFilesNaming GetNamingFlags() const { return options_.share_files_with_checksum_naming & BackupEngineOptions::kMaskNamingFlags; } void TEST_SetDefaultRateLimitersClock( const std::shared_ptr& backup_rate_limiter_clock, const std::shared_ptr& restore_rate_limiter_clock) { if (backup_rate_limiter_clock) { static_cast(options_.backup_rate_limiter.get()) ->TEST_SetClock(backup_rate_limiter_clock); } if (restore_rate_limiter_clock) { static_cast(options_.restore_rate_limiter.get()) ->TEST_SetClock(restore_rate_limiter_clock); } } private: void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0) const; IOStatus DeleteBackupNoGC(BackupID backup_id); // Extends the "result" map with pathname->size mappings for the contents of // "dir" in "env". Pathnames are prefixed with "dir". IOStatus ReadChildFileCurrentSizes( const std::string& dir, const std::shared_ptr&, std::unordered_map* result) const; struct FileInfo { FileInfo(const std::string& fname, uint64_t sz, const std::string& checksum, const std::string& id, const std::string& sid, Temperature _temp) : refs(0), filename(fname), size(sz), checksum_hex(checksum), db_id(id), db_session_id(sid), temp(_temp) {} FileInfo(const FileInfo&) = delete; FileInfo& operator=(const FileInfo&) = delete; int refs; const std::string filename; const uint64_t size; // crc32c checksum as hex. empty == unknown / unavailable std::string checksum_hex; // DB identities // db_id is obtained for potential usage in the future but not used // currently const std::string db_id; // db_session_id appears in the backup SST filename if the table naming // option is kUseDbSessionId const std::string db_session_id; Temperature temp; std::string GetDbFileName() { std::string rv; // extract the filename part size_t slash = filename.find_last_of('/'); // file will either be shared/, shared_checksum/, // shared_checksum/, shared_checksum/, // or private// assert(slash != std::string::npos); rv = filename.substr(slash + 1); // if the file was in shared_checksum, extract the real file name // in this case the file is __., // _., or __. if (filename.substr(0, slash) == kSharedChecksumDirName) { rv = GetFileFromChecksumFile(rv); } return rv; } }; // TODO: deprecate this function once we migrate all BackupEngine's rate // limiting to lower-level ones (i.e, ones in file access wrapper level like // `WritableFileWriter`) static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request, RateLimiter* rate_limiter, const Env::IOPriority pri, Statistics* stats, const RateLimiter::OpType op_type); static inline std::string WithoutTrailingSlash(const std::string& path) { if (path.empty() || path.back() != '/') { return path; } else { return path.substr(path.size() - 1); } } static inline std::string WithTrailingSlash(const std::string& path) { if (path.empty() || path.back() != '/') { return path + '/'; } else { return path; } } // A filesystem wrapper that makes shared backup files appear to be in the // private backup directory (dst_dir), so that the private backup dir can // be opened as a read-only DB. class RemapSharedFileSystem : public RemapFileSystem { public: RemapSharedFileSystem(const std::shared_ptr& base, const std::string& dst_dir, const std::string& src_base_dir, const std::vector>& files) : RemapFileSystem(base), dst_dir_(WithoutTrailingSlash(dst_dir)), dst_dir_slash_(WithTrailingSlash(dst_dir)), src_base_dir_(WithTrailingSlash(src_base_dir)) { for (auto& info : files) { if (!StartsWith(info->filename, kPrivateDirSlash)) { assert(StartsWith(info->filename, kSharedDirSlash) || StartsWith(info->filename, kSharedChecksumDirSlash)); remaps_[info->GetDbFileName()] = info; } } } const char* Name() const override { return "BackupEngineImpl::RemapSharedFileSystem"; } // Sometimes a directory listing is required in opening a DB IOStatus GetChildren(const std::string& dir, const IOOptions& options, std::vector* result, IODebugContext* dbg) override { IOStatus s = RemapFileSystem::GetChildren(dir, options, result, dbg); if (s.ok() && (dir == dst_dir_ || dir == dst_dir_slash_)) { // Assume remapped files exist for (auto& r : remaps_) { result->push_back(r.first); } } return s; } // Sometimes a directory listing is required in opening a DB IOStatus GetChildrenFileAttributes(const std::string& dir, const IOOptions& options, std::vector* result, IODebugContext* dbg) override { IOStatus s = RemapFileSystem::GetChildrenFileAttributes(dir, options, result, dbg); if (s.ok() && (dir == dst_dir_ || dir == dst_dir_slash_)) { // Assume remapped files exist with recorded size for (auto& r : remaps_) { result->emplace_back(); // clean up with C++20 FileAttributes& attr = result->back(); attr.name = r.first; attr.size_bytes = r.second->size; } } return s; } protected: // When a file in dst_dir is requested, see if we need to remap to shared // file path. std::pair EncodePath( const std::string& path) override { if (path.empty() || path[0] != '/') { return {IOStatus::InvalidArgument(path, "Not an absolute path"), ""}; } std::pair rv{IOStatus(), path}; if (StartsWith(path, dst_dir_slash_)) { std::string relative = path.substr(dst_dir_slash_.size()); auto it = remaps_.find(relative); if (it != remaps_.end()) { rv.second = src_base_dir_ + it->second->filename; } } return rv; } private: // Absolute path to a directory that some extra files will be mapped into. const std::string dst_dir_; // Includes a trailing slash. const std::string dst_dir_slash_; // Absolute path to a directory containing some files to be mapped into // dst_dir_. Includes a trailing slash. const std::string src_base_dir_; // If remaps_[x] exists, attempt to read dst_dir_ / x should instead read // src_base_dir_ / remaps_[x]->filename. FileInfo is used to maximize // sharing with other backup data in memory. std::unordered_map> remaps_; }; class BackupMeta { public: BackupMeta( const std::string& meta_filename, const std::string& meta_tmp_filename, std::unordered_map>* file_infos, Env* env, const std::shared_ptr& fs) : timestamp_(0), sequence_number_(0), size_(0), meta_filename_(meta_filename), meta_tmp_filename_(meta_tmp_filename), file_infos_(file_infos), env_(env), fs_(fs) {} BackupMeta(const BackupMeta&) = delete; BackupMeta& operator=(const BackupMeta&) = delete; ~BackupMeta() {} void RecordTimestamp() { // Best effort Status s = env_->GetCurrentTime(×tamp_); if (!s.ok()) { timestamp_ = /* something clearly fabricated */ 1; } } int64_t GetTimestamp() const { return timestamp_; } uint64_t GetSize() const { return size_; } uint32_t GetNumberFiles() const { return static_cast(files_.size()); } void SetSequenceNumber(uint64_t sequence_number) { sequence_number_ = sequence_number; } uint64_t GetSequenceNumber() const { return sequence_number_; } const std::string& GetAppMetadata() const { return app_metadata_; } void SetAppMetadata(const std::string& app_metadata) { app_metadata_ = app_metadata; } IOStatus AddFile(std::shared_ptr file_info); IOStatus Delete(bool delete_meta = true); bool Empty() const { return files_.empty(); } std::shared_ptr GetFile(const std::string& filename) const { auto it = file_infos_->find(filename); if (it == file_infos_->end()) { return nullptr; } return it->second; } const std::vector>& GetFiles() const { return files_; } // @param abs_path_to_size Pre-fetched file sizes (bytes). IOStatus LoadFromFile( const std::string& backup_dir, const std::unordered_map& abs_path_to_size, RateLimiter* rate_limiter, Logger* info_log, std::unordered_set* reported_ignored_fields); IOStatus StoreToFile( bool sync, int schema_version, const TEST_BackupMetaSchemaOptions* schema_test_options); std::string GetInfoString() { std::ostringstream ss; ss << "Timestamp: " << timestamp_ << std::endl; char human_size[16]; AppendHumanBytes(size_, human_size, sizeof(human_size)); ss << "Size: " << human_size << std::endl; ss << "Files:" << std::endl; for (const auto& file : files_) { AppendHumanBytes(file->size, human_size, sizeof(human_size)); ss << file->filename << ", size " << human_size << ", refs " << file->refs << std::endl; } return ss.str(); } const std::shared_ptr& GetEnvForOpen() const { if (!env_for_open_) { // Lazy initialize // Find directories std::string dst_dir = meta_filename_; auto i = dst_dir.rfind(kMetaDirSlash); assert(i != std::string::npos); std::string src_base_dir = dst_dir.substr(0, i); dst_dir.replace(i, kMetaDirSlash.size(), kPrivateDirSlash); // Make the RemapSharedFileSystem std::shared_ptr remap_fs = std::make_shared(fs_, dst_dir, src_base_dir, files_); // Make it read-only for safety remap_fs = std::make_shared(remap_fs); // Make an Env wrapper env_for_open_ = std::make_shared(env_, remap_fs); } return env_for_open_; } private: int64_t timestamp_; // sequence number is only approximate, should not be used // by clients uint64_t sequence_number_; uint64_t size_; std::string app_metadata_; std::string const meta_filename_; std::string const meta_tmp_filename_; // files with relative paths (without "/" prefix!!) std::vector> files_; std::unordered_map>* file_infos_; Env* env_; mutable std::shared_ptr env_for_open_; std::shared_ptr fs_; IOOptions iooptions_ = IOOptions(); }; // BackupMeta void SetBackupInfoFromBackupMeta(BackupID id, const BackupMeta& meta, BackupInfo* backup_info, bool include_file_details) const; inline std::string GetAbsolutePath( const std::string& relative_path = "") const { assert(relative_path.size() == 0 || relative_path[0] != '/'); return options_.backup_dir + "/" + relative_path; } inline std::string GetPrivateFileRel(BackupID backup_id, bool tmp = false, const std::string& file = "") const { assert(file.size() == 0 || file[0] != '/'); return kPrivateDirSlash + std::to_string(backup_id) + (tmp ? ".tmp" : "") + "/" + file; } inline std::string GetSharedFileRel(const std::string& file = "", bool tmp = false) const { assert(file.size() == 0 || file[0] != '/'); return kSharedDirSlash + std::string(tmp ? "." : "") + file + (tmp ? ".tmp" : ""); } inline std::string GetSharedFileWithChecksumRel(const std::string& file = "", bool tmp = false) const { assert(file.size() == 0 || file[0] != '/'); return kSharedChecksumDirSlash + std::string(tmp ? "." : "") + file + (tmp ? ".tmp" : ""); } inline bool UseLegacyNaming(const std::string& sid) const { return GetNamingNoFlags() == BackupEngineOptions::kLegacyCrc32cAndFileSize || sid.empty(); } inline std::string GetSharedFileWithChecksum( const std::string& file, const std::string& checksum_hex, const uint64_t file_size, const std::string& db_session_id) const { assert(file.size() == 0 || file[0] != '/'); std::string file_copy = file; if (UseLegacyNaming(db_session_id)) { assert(!checksum_hex.empty()); file_copy.insert(file_copy.find_last_of('.'), "_" + std::to_string(ChecksumHexToInt32(checksum_hex)) + "_" + std::to_string(file_size)); } else { file_copy.insert(file_copy.find_last_of('.'), "_s" + db_session_id); if (GetNamingFlags() & BackupEngineOptions::kFlagIncludeFileSize) { file_copy.insert(file_copy.find_last_of('.'), "_" + std::to_string(file_size)); } } return file_copy; } static inline std::string GetFileFromChecksumFile(const std::string& file) { assert(file.size() == 0 || file[0] != '/'); std::string file_copy = file; size_t first_underscore = file_copy.find_first_of('_'); return file_copy.erase(first_underscore, file_copy.find_last_of('.') - first_underscore); } inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const { return GetAbsolutePath(kMetaDirName) + "/" + (tmp ? "." : "") + std::to_string(backup_id) + (tmp ? ".tmp" : ""); } // If size_limit == 0, there is no size limit, copy everything. // // Exactly one of src and contents must be non-empty. // // @param src If non-empty, the file is copied from this pathname. // @param contents If non-empty, the file will be created with these contents. // @param src_temperature Pass in expected temperature of src, return back // temperature reported by FileSystem IOStatus CopyOrCreateFile(const std::string& src, const std::string& dst, const std::string& contents, uint64_t size_limit, Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter, std::function progress_callback, Temperature* src_temperature, Temperature dst_temperature, uint64_t* bytes_toward_next_callback, uint64_t* size, std::string* checksum_hex); IOStatus ReadFileAndComputeChecksum(const std::string& src, const std::shared_ptr& src_fs, const EnvOptions& src_env_options, uint64_t size_limit, std::string* checksum_hex, const Temperature src_temperature) const; // Obtain db_id and db_session_id from the table properties of file_path Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options, const std::string& file_path, Temperature file_temp, RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id); struct CopyOrCreateResult { ~CopyOrCreateResult() { // The Status needs to be ignored here for two reasons. // First, if the BackupEngineImpl shuts down with jobs outstanding, then // it is possible that the Status in the future/promise is never read, // resulting in an unchecked Status. Second, if there are items in the // channel when the BackupEngineImpl is shutdown, these will also have // Status that have not been checked. This // TODO: Fix those issues so that the Status io_status.PermitUncheckedError(); } uint64_t size; std::string checksum_hex; std::string db_id; std::string db_session_id; IOStatus io_status; Temperature expected_src_temperature = Temperature::kUnknown; Temperature current_src_temperature = Temperature::kUnknown; }; // Exactly one of src_path and contents must be non-empty. If src_path is // non-empty, the file is copied from this pathname. Otherwise, if contents is // non-empty, the file will be created at dst_path with these contents. struct CopyOrCreateWorkItem { std::string src_path; std::string dst_path; Temperature src_temperature; Temperature dst_temperature; std::string contents; Env* src_env; Env* dst_env; EnvOptions src_env_options; bool sync; RateLimiter* rate_limiter; uint64_t size_limit; Statistics* stats; std::promise result; std::function progress_callback; std::string src_checksum_func_name; std::string src_checksum_hex; std::string db_id; std::string db_session_id; CopyOrCreateWorkItem() : src_path(""), dst_path(""), src_temperature(Temperature::kUnknown), dst_temperature(Temperature::kUnknown), contents(""), src_env(nullptr), dst_env(nullptr), src_env_options(), sync(false), rate_limiter(nullptr), size_limit(0), stats(nullptr), src_checksum_func_name(kUnknownFileChecksumFuncName), src_checksum_hex(""), db_id(""), db_session_id("") {} CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete; CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete; CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) noexcept { *this = std::move(o); } CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) noexcept { src_path = std::move(o.src_path); dst_path = std::move(o.dst_path); src_temperature = std::move(o.src_temperature); dst_temperature = std::move(o.dst_temperature); contents = std::move(o.contents); src_env = o.src_env; dst_env = o.dst_env; src_env_options = std::move(o.src_env_options); sync = o.sync; rate_limiter = o.rate_limiter; size_limit = o.size_limit; stats = o.stats; result = std::move(o.result); progress_callback = std::move(o.progress_callback); src_checksum_func_name = std::move(o.src_checksum_func_name); src_checksum_hex = std::move(o.src_checksum_hex); db_id = std::move(o.db_id); db_session_id = std::move(o.db_session_id); src_temperature = o.src_temperature; return *this; } CopyOrCreateWorkItem( std::string _src_path, std::string _dst_path, const Temperature _src_temperature, const Temperature _dst_temperature, std::string _contents, Env* _src_env, Env* _dst_env, EnvOptions _src_env_options, bool _sync, RateLimiter* _rate_limiter, uint64_t _size_limit, Statistics* _stats, std::function _progress_callback = []() {}, const std::string& _src_checksum_func_name = kUnknownFileChecksumFuncName, const std::string& _src_checksum_hex = "", const std::string& _db_id = "", const std::string& _db_session_id = "") : src_path(std::move(_src_path)), dst_path(std::move(_dst_path)), src_temperature(_src_temperature), dst_temperature(_dst_temperature), contents(std::move(_contents)), src_env(_src_env), dst_env(_dst_env), src_env_options(std::move(_src_env_options)), sync(_sync), rate_limiter(_rate_limiter), size_limit(_size_limit), stats(_stats), progress_callback(_progress_callback), src_checksum_func_name(_src_checksum_func_name), src_checksum_hex(_src_checksum_hex), db_id(_db_id), db_session_id(_db_session_id) {} }; struct BackupAfterCopyOrCreateWorkItem { std::future result; bool shared; bool needed_to_copy; Env* backup_env; std::string dst_path_tmp; std::string dst_path; std::string dst_relative; BackupAfterCopyOrCreateWorkItem() : shared(false), needed_to_copy(false), backup_env(nullptr), dst_path_tmp(""), dst_path(""), dst_relative("") {} BackupAfterCopyOrCreateWorkItem( BackupAfterCopyOrCreateWorkItem&& o) noexcept { *this = std::move(o); } BackupAfterCopyOrCreateWorkItem& operator=( BackupAfterCopyOrCreateWorkItem&& o) noexcept { result = std::move(o.result); shared = o.shared; needed_to_copy = o.needed_to_copy; backup_env = o.backup_env; dst_path_tmp = std::move(o.dst_path_tmp); dst_path = std::move(o.dst_path); dst_relative = std::move(o.dst_relative); return *this; } BackupAfterCopyOrCreateWorkItem(std::future&& _result, bool _shared, bool _needed_to_copy, Env* _backup_env, std::string _dst_path_tmp, std::string _dst_path, std::string _dst_relative) : result(std::move(_result)), shared(_shared), needed_to_copy(_needed_to_copy), backup_env(_backup_env), dst_path_tmp(std::move(_dst_path_tmp)), dst_path(std::move(_dst_path)), dst_relative(std::move(_dst_relative)) {} }; struct RestoreAfterCopyOrCreateWorkItem { std::future result; std::string from_file; std::string to_file; std::string checksum_hex; RestoreAfterCopyOrCreateWorkItem() : checksum_hex("") {} RestoreAfterCopyOrCreateWorkItem(std::future&& _result, const std::string& _from_file, const std::string& _to_file, const std::string& _checksum_hex) : result(std::move(_result)), from_file(_from_file), to_file(_to_file), checksum_hex(_checksum_hex) {} RestoreAfterCopyOrCreateWorkItem( RestoreAfterCopyOrCreateWorkItem&& o) noexcept { *this = std::move(o); } RestoreAfterCopyOrCreateWorkItem& operator=( RestoreAfterCopyOrCreateWorkItem&& o) noexcept { result = std::move(o.result); checksum_hex = std::move(o.checksum_hex); return *this; } }; bool initialized_; std::mutex byte_report_mutex_; mutable channel files_to_copy_or_create_; std::vector threads_; std::atomic threads_cpu_priority_; // Certain operations like PurgeOldBackups and DeleteBackup will trigger // automatic GarbageCollect (true) unless we've already done one in this // session and have not failed to delete backup files since then (false). bool might_need_garbage_collect_ = true; // Adds a file to the backup work queue to be copied or created if it doesn't // already exist. // // Exactly one of src_dir and contents must be non-empty. // // @param src_dir If non-empty, the file in this directory named fname will be // copied. // @param fname Name of destination file and, in case of copy, source file. // @param contents If non-empty, the file will be created with these contents. IOStatus AddBackupFileWorkItem( std::unordered_set& live_dst_paths, std::vector& backup_items_to_finish, BackupID backup_id, bool shared, const std::string& src_dir, const std::string& fname, // starts with "/" const EnvOptions& src_env_options, RateLimiter* rate_limiter, FileType file_type, uint64_t size_bytes, Statistics* stats, uint64_t size_limit = 0, bool shared_checksum = false, std::function progress_callback = []() {}, const std::string& contents = std::string(), const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName, const std::string& src_checksum_str = kUnknownFileChecksum, const Temperature src_temperature = Temperature::kUnknown); // backup state data BackupID latest_backup_id_; BackupID latest_valid_backup_id_; std::map> backups_; std::map>> corrupt_backups_; std::unordered_map> backuped_file_infos_; std::atomic stop_backup_; // options data BackupEngineOptions options_; Env* db_env_; Env* backup_env_; // directories std::unique_ptr backup_directory_; std::unique_ptr shared_directory_; std::unique_ptr meta_directory_; std::unique_ptr private_directory_; static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB bool read_only_; BackupStatistics backup_statistics_; std::unordered_set reported_ignored_fields_; static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB std::shared_ptr db_fs_; std::shared_ptr backup_fs_; IOOptions io_options_ = IOOptions(); public: std::unique_ptr schema_test_options_; }; // -------- BackupEngineImplThreadSafe class --------- // This locking layer for thread safety in the public API is layered on // top to prevent accidental recursive locking with RWMutex, which is UB. // Note: BackupEngineReadOnlyBase inherited twice, but has no fields class BackupEngineImplThreadSafe : public BackupEngine, public BackupEngineReadOnly { public: BackupEngineImplThreadSafe(const BackupEngineOptions& options, Env* db_env, bool read_only = false) : impl_(options, db_env, read_only) {} ~BackupEngineImplThreadSafe() override {} using BackupEngine::CreateNewBackupWithMetadata; IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db, const std::string& app_metadata, BackupID* new_backup_id) override { WriteLock lock(&mutex_); return impl_.CreateNewBackupWithMetadata(options, db, app_metadata, new_backup_id); } IOStatus PurgeOldBackups(uint32_t num_backups_to_keep) override { WriteLock lock(&mutex_); return impl_.PurgeOldBackups(num_backups_to_keep); } IOStatus DeleteBackup(BackupID backup_id) override { WriteLock lock(&mutex_); return impl_.DeleteBackup(backup_id); } void StopBackup() override { // No locking needed impl_.StopBackup(); } IOStatus GarbageCollect() override { WriteLock lock(&mutex_); return impl_.GarbageCollect(); } Status GetLatestBackupInfo(BackupInfo* backup_info, bool include_file_details = false) const override { ReadLock lock(&mutex_); return impl_.GetBackupInfo(kLatestBackupIDMarker, backup_info, include_file_details); } Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info, bool include_file_details = false) const override { ReadLock lock(&mutex_); return impl_.GetBackupInfo(backup_id, backup_info, include_file_details); } void GetBackupInfo(std::vector* backup_info, bool include_file_details) const override { ReadLock lock(&mutex_); impl_.GetBackupInfo(backup_info, include_file_details); } void GetCorruptedBackups( std::vector* corrupt_backup_ids) const override { ReadLock lock(&mutex_); impl_.GetCorruptedBackups(corrupt_backup_ids); } using BackupEngine::RestoreDBFromBackup; IOStatus RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id, const std::string& db_dir, const std::string& wal_dir) const override { ReadLock lock(&mutex_); return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir); } using BackupEngine::RestoreDBFromLatestBackup; IOStatus RestoreDBFromLatestBackup( const RestoreOptions& options, const std::string& db_dir, const std::string& wal_dir) const override { // Defer to above function, which locks return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir); } IOStatus VerifyBackup(BackupID backup_id, bool verify_with_checksum = false) const override { ReadLock lock(&mutex_); return impl_.VerifyBackup(backup_id, verify_with_checksum); } // Not public API but needed IOStatus Initialize() { // No locking needed return impl_.Initialize(); } // Not public API but used in testing void TEST_SetBackupMetaSchemaOptions( const TEST_BackupMetaSchemaOptions& options) { impl_.schema_test_options_.reset(new TEST_BackupMetaSchemaOptions(options)); } // Not public API but used in testing void TEST_SetDefaultRateLimitersClock( const std::shared_ptr& backup_rate_limiter_clock = nullptr, const std::shared_ptr& restore_rate_limiter_clock = nullptr) { impl_.TEST_SetDefaultRateLimitersClock(backup_rate_limiter_clock, restore_rate_limiter_clock); } private: mutable port::RWMutex mutex_; BackupEngineImpl impl_; }; } // namespace IOStatus BackupEngine::Open(const BackupEngineOptions& options, Env* env, BackupEngine** backup_engine_ptr) { std::unique_ptr backup_engine( new BackupEngineImplThreadSafe(options, env)); auto s = backup_engine->Initialize(); if (!s.ok()) { *backup_engine_ptr = nullptr; return s; } *backup_engine_ptr = backup_engine.release(); return IOStatus::OK(); } namespace { BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options, Env* db_env, bool read_only) : initialized_(false), threads_cpu_priority_(), latest_backup_id_(0), latest_valid_backup_id_(0), stop_backup_(false), options_(options), db_env_(db_env), backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_), read_only_(read_only) { if (options_.backup_rate_limiter == nullptr && options_.backup_rate_limit > 0) { options_.backup_rate_limiter.reset( NewGenericRateLimiter(options_.backup_rate_limit)); } if (options_.restore_rate_limiter == nullptr && options_.restore_rate_limit > 0) { options_.restore_rate_limiter.reset( NewGenericRateLimiter(options_.restore_rate_limit)); } db_fs_ = db_env_->GetFileSystem(); backup_fs_ = backup_env_->GetFileSystem(); } BackupEngineImpl::~BackupEngineImpl() { files_to_copy_or_create_.sendEof(); for (auto& t : threads_) { t.join(); } LogFlush(options_.info_log); for (const auto& it : corrupt_backups_) { it.second.first.PermitUncheckedError(); } } IOStatus BackupEngineImpl::Initialize() { assert(!initialized_); initialized_ = true; if (read_only_) { ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine"); } options_.Dump(options_.info_log); auto meta_path = GetAbsolutePath(kMetaDirName); if (!read_only_) { // we might need to clean up from previous crash or I/O errors might_need_garbage_collect_ = true; if (options_.max_valid_backups_to_open != std::numeric_limits::max()) { options_.max_valid_backups_to_open = std::numeric_limits::max(); ROCKS_LOG_WARN( options_.info_log, "`max_valid_backups_to_open` is not set to the default value. " "Ignoring its value since BackupEngine is not read-only."); } // gather the list of directories that we need to create std::vector*>> directories; directories.emplace_back(GetAbsolutePath(), &backup_directory_); if (options_.share_table_files) { if (options_.share_files_with_checksum) { directories.emplace_back( GetAbsolutePath(GetSharedFileWithChecksumRel()), &shared_directory_); } else { directories.emplace_back(GetAbsolutePath(GetSharedFileRel()), &shared_directory_); } } directories.emplace_back(GetAbsolutePath(kPrivateDirName), &private_directory_); directories.emplace_back(meta_path, &meta_directory_); // create all the dirs we need for (const auto& d : directories) { IOStatus io_s = backup_fs_->CreateDirIfMissing(d.first, io_options_, nullptr); if (io_s.ok()) { io_s = backup_fs_->NewDirectory(d.first, io_options_, d.second, nullptr); } if (!io_s.ok()) { return io_s; } } } std::vector backup_meta_files; { IOStatus io_s = backup_fs_->GetChildren(meta_path, io_options_, &backup_meta_files, nullptr); if (io_s.IsNotFound()) { return IOStatus::NotFound(meta_path + " is missing"); } else if (!io_s.ok()) { return io_s; } } // create backups_ structure for (auto& file : backup_meta_files) { ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str()); BackupID backup_id = 0; sscanf(file.c_str(), "%u", &backup_id); if (backup_id == 0 || file != std::to_string(backup_id)) { // Invalid file name, will be deleted with auto-GC when user // initiates an append or write operation. (Behave as read-only until // then.) ROCKS_LOG_INFO(options_.info_log, "Skipping unrecognized meta file %s", file.c_str()); continue; } assert(backups_.find(backup_id) == backups_.end()); // Insert all the (backup_id, BackupMeta) that will be loaded later // The loading performed later will check whether there are corrupt backups // and move the corrupt backups to corrupt_backups_ backups_.insert(std::make_pair( backup_id, std::unique_ptr(new BackupMeta( GetBackupMetaFile(backup_id, false /* tmp */), GetBackupMetaFile(backup_id, true /* tmp */), &backuped_file_infos_, backup_env_, backup_fs_)))); } latest_backup_id_ = 0; latest_valid_backup_id_ = 0; if (options_.destroy_old_data) { // Destroy old data assert(!read_only_); ROCKS_LOG_INFO( options_.info_log, "Backup Engine started with destroy_old_data == true, deleting all " "backups"); IOStatus io_s = PurgeOldBackups(0); if (io_s.ok()) { io_s = GarbageCollect(); } if (!io_s.ok()) { return io_s; } } else { // Load data from storage // abs_path_to_size: maps absolute paths of files in backup directory to // their corresponding sizes std::unordered_map abs_path_to_size; // Insert files and their sizes in backup sub-directories (shared and // shared_checksum) to abs_path_to_size for (const auto& rel_dir : {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) { const auto abs_dir = GetAbsolutePath(rel_dir); IOStatus io_s = ReadChildFileCurrentSizes(abs_dir, backup_fs_, &abs_path_to_size); if (!io_s.ok()) { // I/O error likely impacting all backups return io_s; } } // load the backups if any, until valid_backups_to_open of the latest // non-corrupted backups have been successfully opened. int valid_backups_to_open = options_.max_valid_backups_to_open; for (auto backup_iter = backups_.rbegin(); backup_iter != backups_.rend(); ++backup_iter) { assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first); if (latest_backup_id_ == 0) { latest_backup_id_ = backup_iter->first; } if (valid_backups_to_open == 0) { break; } // Insert files and their sizes in backup sub-directories // (private/backup_id) to abs_path_to_size IOStatus io_s = ReadChildFileCurrentSizes( GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_fs_, &abs_path_to_size); if (io_s.ok()) { io_s = backup_iter->second->LoadFromFile( options_.backup_dir, abs_path_to_size, options_.backup_rate_limiter.get(), options_.info_log, &reported_ignored_fields_); } if (io_s.IsCorruption() || io_s.IsNotSupported()) { ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s", backup_iter->first, io_s.ToString().c_str()); corrupt_backups_.insert(std::make_pair( backup_iter->first, std::make_pair(io_s, std::move(backup_iter->second)))); } else if (!io_s.ok()) { // Distinguish corruption errors from errors in the backup Env. // Errors in the backup Env (i.e., this code path) will cause Open() to // fail, whereas corruption errors would not cause Open() failures. return io_s; } else { ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s", backup_iter->first, backup_iter->second->GetInfoString().c_str()); assert(latest_valid_backup_id_ == 0 || latest_valid_backup_id_ > backup_iter->first); if (latest_valid_backup_id_ == 0) { latest_valid_backup_id_ = backup_iter->first; } --valid_backups_to_open; } } for (const auto& corrupt : corrupt_backups_) { backups_.erase(backups_.find(corrupt.first)); } // erase the backups before max_valid_backups_to_open int num_unopened_backups; if (options_.max_valid_backups_to_open == 0) { num_unopened_backups = 0; } else { num_unopened_backups = std::max(0, static_cast(backups_.size()) - options_.max_valid_backups_to_open); } for (int i = 0; i < num_unopened_backups; ++i) { assert(backups_.begin()->second->Empty()); backups_.erase(backups_.begin()); } } ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_); ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u", latest_valid_backup_id_); // set up threads perform copies from files_to_copy_or_create_ in the // background threads_cpu_priority_ = CpuPriority::kNormal; threads_.reserve(options_.max_background_operations); for (int t = 0; t < options_.max_background_operations; t++) { threads_.emplace_back([this]() { #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) #if __GLIBC_PREREQ(2, 12) pthread_setname_np(pthread_self(), "backup_engine"); #endif #endif CpuPriority current_priority = CpuPriority::kNormal; CopyOrCreateWorkItem work_item; uint64_t bytes_toward_next_callback = 0; while (files_to_copy_or_create_.read(work_item)) { CpuPriority priority = threads_cpu_priority_; if (current_priority != priority) { TEST_SYNC_POINT_CALLBACK( "BackupEngineImpl::Initialize:SetCpuPriority", &priority); port::SetCpuPriority(0, priority); current_priority = priority; } // `bytes_read` and `bytes_written` stats are enabled based on // compile-time support and cannot be dynamically toggled. So we do not // need to worry about `PerfLevel` here, unlike many other // `IOStatsContext` / `PerfContext` stats. uint64_t prev_bytes_read = IOSTATS(bytes_read); uint64_t prev_bytes_written = IOSTATS(bytes_written); CopyOrCreateResult result; Temperature temp = work_item.src_temperature; result.io_status = CopyOrCreateFile( work_item.src_path, work_item.dst_path, work_item.contents, work_item.size_limit, work_item.src_env, work_item.dst_env, work_item.src_env_options, work_item.sync, work_item.rate_limiter, work_item.progress_callback, &temp, work_item.dst_temperature, &bytes_toward_next_callback, &result.size, &result.checksum_hex); RecordTick(work_item.stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read); RecordTick(work_item.stats, BACKUP_WRITE_BYTES, IOSTATS(bytes_written) - prev_bytes_written); result.db_id = work_item.db_id; result.db_session_id = work_item.db_session_id; result.expected_src_temperature = work_item.src_temperature; result.current_src_temperature = temp; if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) { // unknown checksum function name implies no db table file checksum in // db manifest; work_item.src_checksum_hex not empty means // backup engine has calculated its crc32c checksum for the table // file; therefore, we are able to compare the checksums. if (work_item.src_checksum_func_name == kUnknownFileChecksumFuncName || work_item.src_checksum_func_name == kDbFileChecksumFuncName) { if (work_item.src_checksum_hex != result.checksum_hex) { std::string checksum_info( "Expected checksum is " + work_item.src_checksum_hex + " while computed checksum is " + result.checksum_hex); result.io_status = IOStatus::Corruption( "Checksum mismatch after copying to " + work_item.dst_path + ": " + checksum_info); } } else { // FIXME(peterd): dead code? std::string checksum_function_info( "Existing checksum function is " + work_item.src_checksum_func_name + " while provided checksum function is " + kBackupFileChecksumFuncName); ROCKS_LOG_INFO( options_.info_log, "Unable to verify checksum after copying to %s: %s\n", work_item.dst_path.c_str(), checksum_function_info.c_str()); } } work_item.result.set_value(std::move(result)); } }); } ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine"); return IOStatus::OK(); } IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( const CreateBackupOptions& options, DB* db, const std::string& app_metadata, BackupID* new_backup_id_ptr) { assert(initialized_); assert(!read_only_); if (app_metadata.size() > kMaxAppMetaSize) { return IOStatus::InvalidArgument("App metadata too large"); } if (options.decrease_background_thread_cpu_priority) { if (options.background_thread_cpu_priority < threads_cpu_priority_) { threads_cpu_priority_.store(options.background_thread_cpu_priority); } } BackupID new_backup_id = latest_backup_id_ + 1; // `bytes_read` and `bytes_written` stats are enabled based on compile-time // support and cannot be dynamically toggled. So we do not need to worry about // `PerfLevel` here, unlike many other `IOStatsContext` / `PerfContext` stats. uint64_t prev_bytes_read = IOSTATS(bytes_read); uint64_t prev_bytes_written = IOSTATS(bytes_written); assert(backups_.find(new_backup_id) == backups_.end()); auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id)); IOStatus io_s = backup_fs_->FileExists(private_dir, io_options_, nullptr); if (io_s.ok()) { // maybe last backup failed and left partial state behind, clean it up. // need to do this before updating backups_ such that a private dir // named after new_backup_id will be cleaned up. // (If an incomplete new backup is followed by an incomplete delete // of the latest full backup, then there could be more than one next // id with a private dir, the last thing to be deleted in delete // backup, but all will be cleaned up with a GarbageCollect.) io_s = GarbageCollect(); } else if (io_s.IsNotFound()) { // normal case, the new backup's private dir doesn't exist yet io_s = IOStatus::OK(); } auto ret = backups_.insert(std::make_pair( new_backup_id, std::unique_ptr(new BackupMeta( GetBackupMetaFile(new_backup_id, false /* tmp */), GetBackupMetaFile(new_backup_id, true /* tmp */), &backuped_file_infos_, backup_env_, backup_fs_)))); assert(ret.second == true); auto& new_backup = ret.first->second; new_backup->RecordTimestamp(); new_backup->SetAppMetadata(app_metadata); auto start_backup = backup_env_->NowMicros(); ROCKS_LOG_INFO(options_.info_log, "Started the backup process -- creating backup %u", new_backup_id); if (options_.share_table_files && !options_.share_files_with_checksum) { ROCKS_LOG_WARN(options_.info_log, "BackupEngineOptions::share_files_with_checksum=false is " "DEPRECATED and could lead to data loss."); } if (io_s.ok()) { io_s = backup_fs_->CreateDir(private_dir, io_options_, nullptr); } // A set into which we will insert the dst_paths that are calculated for live // files and live WAL files. // This is used to check whether a live files shares a dst_path with another // live file. std::unordered_set live_dst_paths; std::vector backup_items_to_finish; // Add a CopyOrCreateWorkItem to the channel for each live file Status disabled = db->DisableFileDeletions(); DBOptions db_options = db->GetDBOptions(); Statistics* stats = db_options.statistics.get(); if (io_s.ok()) { CheckpointImpl checkpoint(db); uint64_t sequence_number = 0; FileChecksumGenFactory* db_checksum_factory = db_options.file_checksum_gen_factory.get(); const std::string kFileChecksumGenFactoryName = "FileChecksumGenCrc32cFactory"; bool compare_checksum = db_checksum_factory != nullptr && db_checksum_factory->Name() == kFileChecksumGenFactoryName ? true : false; EnvOptions src_raw_env_options(db_options); RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); io_s = status_to_io_status(checkpoint.CreateCustomCheckpoint( [&](const std::string& /*src_dirname*/, const std::string& /*fname*/, FileType) { // custom checkpoint will switch to calling copy_file_cb after it sees // NotSupported returned from link_file_cb. return IOStatus::NotSupported(); } /* link_file_cb */, [&](const std::string& src_dirname, const std::string& fname, uint64_t size_limit_bytes, FileType type, const std::string& checksum_func_name, const std::string& checksum_val, const Temperature src_temperature) { if (type == kWalFile && !options_.backup_log_files) { return IOStatus::OK(); } Log(options_.info_log, "add file for backup %s", fname.c_str()); uint64_t size_bytes = 0; IOStatus io_st; if (type == kTableFile || type == kBlobFile) { io_st = db_fs_->GetFileSize(src_dirname + "/" + fname, io_options_, &size_bytes, nullptr); if (!io_st.ok()) { Log(options_.info_log, "GetFileSize is failed: %s", io_st.ToString().c_str()); return io_st; } } EnvOptions src_env_options; switch (type) { case kWalFile: src_env_options = db_env_->OptimizeForLogRead(src_raw_env_options); break; case kTableFile: src_env_options = db_env_->OptimizeForCompactionTableRead( src_raw_env_options, ImmutableDBOptions(db_options)); break; case kDescriptorFile: src_env_options = db_env_->OptimizeForManifestRead(src_raw_env_options); break; case kBlobFile: src_env_options = db_env_->OptimizeForBlobFileRead( src_raw_env_options, ImmutableDBOptions(db_options)); break; default: // Other backed up files (like options file) are not read by live // DB, so don't need to worry about avoiding mixing buffered and // direct I/O. Just use plain defaults. src_env_options = src_raw_env_options; break; } io_st = AddBackupFileWorkItem( live_dst_paths, backup_items_to_finish, new_backup_id, options_.share_table_files && (type == kTableFile || type == kBlobFile), src_dirname, fname, src_env_options, rate_limiter, type, size_bytes, db_options.statistics.get(), size_limit_bytes, options_.share_files_with_checksum && (type == kTableFile || type == kBlobFile), options.progress_callback, "" /* contents */, checksum_func_name, checksum_val, src_temperature); return io_st; } /* copy_file_cb */, [&](const std::string& fname, const std::string& contents, FileType type) { Log(options_.info_log, "add file for backup %s", fname.c_str()); return AddBackupFileWorkItem( live_dst_paths, backup_items_to_finish, new_backup_id, false /* shared */, "" /* src_dir */, fname, EnvOptions() /* src_env_options */, rate_limiter, type, contents.size(), db_options.statistics.get(), 0 /* size_limit */, false /* shared_checksum */, options.progress_callback, contents); } /* create_file_cb */, &sequence_number, options.flush_before_backup ? 0 : std::numeric_limits::max(), compare_checksum)); if (io_s.ok()) { new_backup->SetSequenceNumber(sequence_number); } } ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish."); IOStatus item_io_status; for (auto& item : backup_items_to_finish) { item.result.wait(); auto result = item.result.get(); item_io_status = result.io_status; Temperature temp = result.expected_src_temperature; if (result.current_src_temperature != Temperature::kUnknown && (temp == Temperature::kUnknown || options_.current_temperatures_override_manifest)) { temp = result.current_src_temperature; } if (item_io_status.ok() && item.shared && item.needed_to_copy) { item_io_status = item.backup_env->GetFileSystem()->RenameFile( item.dst_path_tmp, item.dst_path, io_options_, nullptr); } if (item_io_status.ok()) { item_io_status = new_backup.get()->AddFile(std::make_shared( item.dst_relative, result.size, result.checksum_hex, result.db_id, result.db_session_id, temp)); } if (!item_io_status.ok()) { io_s = item_io_status; } } // we copied all the files, enable file deletions if (disabled.ok()) { // If we successfully disabled file deletions db->EnableFileDeletions(false).PermitUncheckedError(); } auto backup_time = backup_env_->NowMicros() - start_backup; if (io_s.ok()) { // persist the backup metadata on the disk io_s = new_backup->StoreToFile(options_.sync, options_.schema_version, schema_test_options_.get()); } if (io_s.ok() && options_.sync) { std::unique_ptr backup_private_directory; backup_fs_ ->NewDirectory(GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)), io_options_, &backup_private_directory, nullptr) .PermitUncheckedError(); if (backup_private_directory != nullptr) { io_s = backup_private_directory->FsyncWithDirOptions(io_options_, nullptr, DirFsyncOptions()); } if (io_s.ok() && private_directory_ != nullptr) { io_s = private_directory_->FsyncWithDirOptions(io_options_, nullptr, DirFsyncOptions()); } if (io_s.ok() && meta_directory_ != nullptr) { io_s = meta_directory_->FsyncWithDirOptions(io_options_, nullptr, DirFsyncOptions()); } if (io_s.ok() && shared_directory_ != nullptr) { io_s = shared_directory_->FsyncWithDirOptions(io_options_, nullptr, DirFsyncOptions()); } if (io_s.ok() && backup_directory_ != nullptr) { io_s = backup_directory_->FsyncWithDirOptions(io_options_, nullptr, DirFsyncOptions()); } } if (io_s.ok()) { backup_statistics_.IncrementNumberSuccessBackup(); // here we know that we succeeded and installed the new backup latest_backup_id_ = new_backup_id; latest_valid_backup_id_ = new_backup_id; if (new_backup_id_ptr) { *new_backup_id_ptr = new_backup_id; } ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good"); // backup_speed is in byte/second double backup_speed = new_backup->GetSize() / (1.048576 * backup_time); ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u", new_backup->GetNumberFiles()); char human_size[16]; AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size)); ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size); ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds", backup_time); ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed); ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s", backup_statistics_.ToString().c_str()); } else { backup_statistics_.IncrementNumberFailBackup(); // clean all the files we might have created ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s", io_s.ToString().c_str()); ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n", backup_statistics_.ToString().c_str()); // delete files that we might have already written might_need_garbage_collect_ = true; DeleteBackup(new_backup_id).PermitUncheckedError(); } RecordTick(stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read); RecordTick(stats, BACKUP_WRITE_BYTES, IOSTATS(bytes_written) - prev_bytes_written); return io_s; } IOStatus BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) { assert(initialized_); assert(!read_only_); // Best effort deletion even with errors IOStatus overall_status = IOStatus::OK(); ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u", num_backups_to_keep); std::vector to_delete; auto itr = backups_.begin(); while ((backups_.size() - to_delete.size()) > num_backups_to_keep) { to_delete.push_back(itr->first); itr++; } for (auto backup_id : to_delete) { // Do not GC until end IOStatus io_s = DeleteBackupNoGC(backup_id); if (!io_s.ok()) { overall_status = io_s; } } // Clean up after any incomplete backup deletion, potentially from // earlier session. if (might_need_garbage_collect_) { IOStatus io_s = GarbageCollect(); if (!io_s.ok() && overall_status.ok()) { overall_status = io_s; } } return overall_status; } IOStatus BackupEngineImpl::DeleteBackup(BackupID backup_id) { IOStatus s1 = DeleteBackupNoGC(backup_id); IOStatus s2 = IOStatus::OK(); // Clean up after any incomplete backup deletion, potentially from // earlier session. if (might_need_garbage_collect_) { s2 = GarbageCollect(); } if (!s1.ok()) { // Any failure in the primary objective trumps any failure in the // secondary objective. s2.PermitUncheckedError(); return s1; } else { return s2; } } // Does not auto-GarbageCollect nor lock IOStatus BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) { assert(initialized_); assert(!read_only_); ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id); auto backup = backups_.find(backup_id); if (backup != backups_.end()) { IOStatus io_s = backup->second->Delete(); if (!io_s.ok()) { return io_s; } backups_.erase(backup); } else { auto corrupt = corrupt_backups_.find(backup_id); if (corrupt == corrupt_backups_.end()) { return IOStatus::NotFound("Backup not found"); } IOStatus io_s = corrupt->second.second->Delete(); if (!io_s.ok()) { return io_s; } corrupt->second.first.PermitUncheckedError(); corrupt_backups_.erase(corrupt); } // After removing meta file, best effort deletion even with errors. // (Don't delete other files if we can't delete the meta file right // now.) std::vector to_delete; for (auto& itr : backuped_file_infos_) { if (itr.second->refs == 0) { IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(itr.first), io_options_, nullptr); ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), io_s.ToString().c_str()); to_delete.push_back(itr.first); if (!io_s.ok()) { // Trying again later might work might_need_garbage_collect_ = true; } } } for (auto& td : to_delete) { backuped_file_infos_.erase(td); } // take care of private dirs -- GarbageCollect() will take care of them // if they are not empty std::string private_dir = GetPrivateFileRel(backup_id); IOStatus io_s = backup_fs_->DeleteDir(GetAbsolutePath(private_dir), io_options_, nullptr); ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s", private_dir.c_str(), io_s.ToString().c_str()); if (!io_s.ok()) { // Full gc or trying again later might work might_need_garbage_collect_ = true; } return IOStatus::OK(); } void BackupEngineImpl::SetBackupInfoFromBackupMeta( BackupID id, const BackupMeta& meta, BackupInfo* backup_info, bool include_file_details) const { *backup_info = BackupInfo(id, meta.GetTimestamp(), meta.GetSize(), meta.GetNumberFiles(), meta.GetAppMetadata()); std::string dir = options_.backup_dir + "/" + kPrivateDirSlash + std::to_string(id); if (include_file_details) { auto& file_details = backup_info->file_details; file_details.reserve(meta.GetFiles().size()); for (auto& file_ptr : meta.GetFiles()) { BackupFileInfo& finfo = *file_details.emplace(file_details.end()); finfo.relative_filename = file_ptr->filename; finfo.size = file_ptr->size; finfo.directory = dir; uint64_t number; FileType type; bool ok = ParseFileName(file_ptr->filename, &number, &type); if (ok) { finfo.file_number = number; finfo.file_type = type; } // TODO: temperature, file_checksum, file_checksum_func_name } backup_info->name_for_open = GetAbsolutePath(GetPrivateFileRel(id)); backup_info->name_for_open.pop_back(); // remove trailing '/' backup_info->env_for_open = meta.GetEnvForOpen(); } } Status BackupEngineImpl::GetBackupInfo(BackupID backup_id, BackupInfo* backup_info, bool include_file_details) const { assert(initialized_); if (backup_id == kLatestBackupIDMarker) { // Note: Read latest_valid_backup_id_ inside of lock backup_id = latest_valid_backup_id_; } auto corrupt_itr = corrupt_backups_.find(backup_id); if (corrupt_itr != corrupt_backups_.end()) { return Status::Corruption(corrupt_itr->second.first.ToString()); } auto backup_itr = backups_.find(backup_id); if (backup_itr == backups_.end()) { return Status::NotFound("Backup not found"); } auto& backup = backup_itr->second; if (backup->Empty()) { return Status::NotFound("Backup not found"); } SetBackupInfoFromBackupMeta(backup_id, *backup, backup_info, include_file_details); return Status::OK(); } void BackupEngineImpl::GetBackupInfo(std::vector* backup_info, bool include_file_details) const { assert(initialized_); backup_info->resize(backups_.size()); size_t i = 0; for (auto& backup : backups_) { const BackupMeta& meta = *backup.second; if (!meta.Empty()) { SetBackupInfoFromBackupMeta(backup.first, meta, &backup_info->at(i++), include_file_details); } } } void BackupEngineImpl::GetCorruptedBackups( std::vector* corrupt_backup_ids) const { assert(initialized_); corrupt_backup_ids->reserve(corrupt_backups_.size()); for (auto& backup : corrupt_backups_) { corrupt_backup_ids->push_back(backup.first); } } IOStatus BackupEngineImpl::RestoreDBFromBackup( const RestoreOptions& options, BackupID backup_id, const std::string& db_dir, const std::string& wal_dir) const { assert(initialized_); if (backup_id == kLatestBackupIDMarker) { // Note: Read latest_valid_backup_id_ inside of lock backup_id = latest_valid_backup_id_; } auto corrupt_itr = corrupt_backups_.find(backup_id); if (corrupt_itr != corrupt_backups_.end()) { return corrupt_itr->second.first; } auto backup_itr = backups_.find(backup_id); if (backup_itr == backups_.end()) { return IOStatus::NotFound("Backup not found"); } auto& backup = backup_itr->second; if (backup->Empty()) { return IOStatus::NotFound("Backup not found"); } ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id); ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n", static_cast(options.keep_log_files)); // just in case. Ignore errors db_fs_->CreateDirIfMissing(db_dir, io_options_, nullptr) .PermitUncheckedError(); db_fs_->CreateDirIfMissing(wal_dir, io_options_, nullptr) .PermitUncheckedError(); if (options.keep_log_files) { // delete files in db_dir, but keep all the log files DeleteChildren(db_dir, 1 << kWalFile); // move all the files from archive dir to wal_dir std::string archive_dir = ArchivalDirectory(wal_dir); std::vector archive_files; db_fs_->GetChildren(archive_dir, io_options_, &archive_files, nullptr) .PermitUncheckedError(); // ignore errors for (const auto& f : archive_files) { uint64_t number; FileType type; bool ok = ParseFileName(f, &number, &type); if (ok && type == kWalFile) { ROCKS_LOG_INFO(options_.info_log, "Moving log file from archive/ to wal_dir: %s", f.c_str()); IOStatus io_s = db_fs_->RenameFile( archive_dir + "/" + f, wal_dir + "/" + f, io_options_, nullptr); if (!io_s.ok()) { // if we can't move log file from archive_dir to wal_dir, // we should fail, since it might mean data loss return io_s; } } } } else { DeleteChildren(wal_dir); DeleteChildren(ArchivalDirectory(wal_dir)); DeleteChildren(db_dir); } IOStatus io_s; std::vector restore_items_to_finish; std::string temporary_current_file; std::string final_current_file; std::unique_ptr db_dir_for_fsync; std::unique_ptr wal_dir_for_fsync; for (const auto& file_info : backup->GetFiles()) { const std::string& file = file_info->filename; // 1. get DB filename std::string dst = file_info->GetDbFileName(); // 2. find the filetype uint64_t number; FileType type; bool ok = ParseFileName(dst, &number, &type); if (!ok) { return IOStatus::Corruption("Backup corrupted: Fail to parse filename " + dst); } // 3. Construct the final path // kWalFile lives in wal_dir and all the rest live in db_dir if (type == kWalFile) { dst = wal_dir + "/" + dst; if (options_.sync && !wal_dir_for_fsync) { io_s = db_fs_->NewDirectory(wal_dir, io_options_, &wal_dir_for_fsync, nullptr); if (!io_s.ok()) { return io_s; } } } else { dst = db_dir + "/" + dst; if (options_.sync && !db_dir_for_fsync) { io_s = db_fs_->NewDirectory(db_dir, io_options_, &db_dir_for_fsync, nullptr); if (!io_s.ok()) { return io_s; } } } // For atomicity, initially restore CURRENT file to a temporary name. // This is useful even without options_.sync e.g. in case the restore // process is interrupted. if (type == kCurrentFile) { final_current_file = dst; dst = temporary_current_file = dst + ".tmp"; } ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); CopyOrCreateWorkItem copy_or_create_work_item( GetAbsolutePath(file), dst, Temperature::kUnknown /* src_temp */, file_info->temp, "" /* contents */, backup_env_, db_env_, EnvOptions() /* src_env_options */, options_.sync, options_.restore_rate_limiter.get(), file_info->size, nullptr /* stats */); RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), file, dst, file_info->checksum_hex); files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); restore_items_to_finish.push_back( std::move(after_copy_or_create_work_item)); } IOStatus item_io_status; for (auto& item : restore_items_to_finish) { item.result.wait(); auto result = item.result.get(); item_io_status = result.io_status; // Note: It is possible that both of the following bad-status cases occur // during copying. But, we only return one status. if (!item_io_status.ok()) { io_s = item_io_status; break; } else if (!item.checksum_hex.empty() && item.checksum_hex != result.checksum_hex) { io_s = IOStatus::Corruption( "While restoring " + item.from_file + " -> " + item.to_file + ": expected checksum is " + item.checksum_hex + " while computed checksum is " + result.checksum_hex); break; } } // When enabled, the first FsyncWithDirOptions is to ensure all files are // fully persisted before renaming CURRENT.tmp if (io_s.ok() && db_dir_for_fsync) { ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n"); io_s = db_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr, DirFsyncOptions()); } if (io_s.ok() && wal_dir_for_fsync) { io_s = wal_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr, DirFsyncOptions()); } if (io_s.ok() && !temporary_current_file.empty()) { ROCKS_LOG_INFO(options_.info_log, "Restore: atomic rename CURRENT.tmp\n"); assert(!final_current_file.empty()); io_s = db_fs_->RenameFile(temporary_current_file, final_current_file, io_options_, nullptr); } if (io_s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) { // Second FsyncWithDirOptions is to ensure the final atomic rename of DB // restore is fully persisted even if power goes out right after restore // operation returns success assert(db_dir_for_fsync); io_s = db_dir_for_fsync->FsyncWithDirOptions( io_options_, nullptr, DirFsyncOptions(final_current_file)); } ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n", io_s.ToString().c_str()); return io_s; } IOStatus BackupEngineImpl::VerifyBackup(BackupID backup_id, bool verify_with_checksum) const { assert(initialized_); // Check if backup_id is corrupted, or valid and registered auto corrupt_itr = corrupt_backups_.find(backup_id); if (corrupt_itr != corrupt_backups_.end()) { return corrupt_itr->second.first; } auto backup_itr = backups_.find(backup_id); if (backup_itr == backups_.end()) { return IOStatus::NotFound(); } auto& backup = backup_itr->second; if (backup->Empty()) { return IOStatus::NotFound(); } ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id); // Find all existing backup files belong to backup_id std::unordered_map curr_abs_path_to_size; for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(), GetSharedFileWithChecksumRel()}) { const auto abs_dir = GetAbsolutePath(rel_dir); // Shared directories allowed to be missing in some cases. Expected but // missing files will be reported a few lines down. ReadChildFileCurrentSizes(abs_dir, backup_fs_, &curr_abs_path_to_size) .PermitUncheckedError(); } // For all files registered in backup for (const auto& file_info : backup->GetFiles()) { const auto abs_path = GetAbsolutePath(file_info->filename); // check existence of the file if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) { return IOStatus::NotFound("File missing: " + abs_path); } // verify file size if (file_info->size != curr_abs_path_to_size[abs_path]) { std::string size_info("Expected file size is " + std::to_string(file_info->size) + " while found file size is " + std::to_string(curr_abs_path_to_size[abs_path])); return IOStatus::Corruption("File corrupted: File size mismatch for " + abs_path + ": " + size_info); } if (verify_with_checksum && !file_info->checksum_hex.empty()) { // verify file checksum std::string checksum_hex; ROCKS_LOG_INFO(options_.info_log, "Verifying %s checksum...\n", abs_path.c_str()); IOStatus io_s = ReadFileAndComputeChecksum( abs_path, backup_fs_, EnvOptions(), 0 /* size_limit */, &checksum_hex, Temperature::kUnknown); if (!io_s.ok()) { return io_s; } else if (file_info->checksum_hex != checksum_hex) { std::string checksum_info( "Expected checksum is " + file_info->checksum_hex + " while computed checksum is " + checksum_hex); return IOStatus::Corruption("File corrupted: Checksum mismatch for " + abs_path + ": " + checksum_info); } } } return IOStatus::OK(); } IOStatus BackupEngineImpl::CopyOrCreateFile( const std::string& src, const std::string& dst, const std::string& contents, uint64_t size_limit, Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter, std::function progress_callback, Temperature* src_temperature, Temperature dst_temperature, uint64_t* bytes_toward_next_callback, uint64_t* size, std::string* checksum_hex) { assert(src.empty() != contents.empty()); IOStatus io_s; std::unique_ptr dst_file; std::unique_ptr src_file; FileOptions dst_file_options; dst_file_options.use_mmap_writes = false; dst_file_options.temperature = dst_temperature; // TODO:(gzh) maybe use direct reads/writes here if possible if (size != nullptr) { *size = 0; } uint32_t checksum_value = 0; // Check if size limit is set. if not, set it to very big number if (size_limit == 0) { size_limit = std::numeric_limits::max(); } io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options, &dst_file, nullptr); if (io_s.ok() && !src.empty()) { auto src_file_options = FileOptions(src_env_options); src_file_options.temperature = *src_temperature; io_s = src_env->GetFileSystem()->NewSequentialFile(src, src_file_options, &src_file, nullptr); } if (io_s.IsPathNotFound() && *src_temperature != Temperature::kUnknown) { // Retry without temperature hint in case the FileSystem is strict with // non-kUnknown temperature option io_s = src_env->GetFileSystem()->NewSequentialFile( src, FileOptions(src_env_options), &src_file, nullptr); } if (!io_s.ok()) { return io_s; } size_t buf_size = rate_limiter ? static_cast(rate_limiter->GetSingleBurstBytes()) : kDefaultCopyFileBufferSize; std::unique_ptr dest_writer( new WritableFileWriter(std::move(dst_file), dst, dst_file_options)); std::unique_ptr src_reader; std::unique_ptr buf; if (!src.empty()) { // Return back current temperature in FileSystem *src_temperature = src_file->GetTemperature(); src_reader.reset(new SequentialFileReader( std::move(src_file), src, nullptr /* io_tracer */, {}, rate_limiter)); buf.reset(new char[buf_size]); } Slice data; do { if (stop_backup_.load(std::memory_order_acquire)) { return status_to_io_status(Status::Incomplete("Backup stopped")); } if (!src.empty()) { size_t buffer_to_read = (buf_size < size_limit) ? buf_size : static_cast(size_limit); io_s = src_reader->Read(buffer_to_read, &data, buf.get(), Env::IO_LOW /* rate_limiter_priority */); *bytes_toward_next_callback += data.size(); } else { data = contents; } size_limit -= data.size(); TEST_SYNC_POINT_CALLBACK( "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup", (src.length() > 4 && src.rfind(".sst") == src.length() - 4) ? &data : nullptr); if (!io_s.ok()) { return io_s; } if (size != nullptr) { *size += data.size(); } if (checksum_hex != nullptr) { checksum_value = crc32c::Extend(checksum_value, data.data(), data.size()); } io_s = dest_writer->Append(data); if (rate_limiter != nullptr) { if (!src.empty()) { rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, RateLimiter::OpType::kWrite); } else { LoopRateLimitRequestHelper(data.size(), rate_limiter, Env::IO_LOW, nullptr /* stats */, RateLimiter::OpType::kWrite); } } while (*bytes_toward_next_callback >= options_.callback_trigger_interval_size) { *bytes_toward_next_callback -= options_.callback_trigger_interval_size; std::lock_guard lock(byte_report_mutex_); progress_callback(); } } while (io_s.ok() && contents.empty() && data.size() > 0 && size_limit > 0); // Convert uint32_t checksum to hex checksum if (checksum_hex != nullptr) { checksum_hex->assign(ChecksumInt32ToHex(checksum_value)); } if (io_s.ok() && sync) { io_s = dest_writer->Sync(false); } if (io_s.ok()) { io_s = dest_writer->Close(); } return io_s; } // fname will always start with "/" IOStatus BackupEngineImpl::AddBackupFileWorkItem( std::unordered_set& live_dst_paths, std::vector& backup_items_to_finish, BackupID backup_id, bool shared, const std::string& src_dir, const std::string& fname, const EnvOptions& src_env_options, RateLimiter* rate_limiter, FileType file_type, uint64_t size_bytes, Statistics* stats, uint64_t size_limit, bool shared_checksum, std::function progress_callback, const std::string& contents, const std::string& src_checksum_func_name, const std::string& src_checksum_str, const Temperature src_temperature) { assert(contents.empty() != src_dir.empty()); std::string src_path = src_dir + "/" + fname; std::string dst_relative; std::string dst_relative_tmp; std::string db_id; std::string db_session_id; // crc32c checksum in hex. empty == unavailable / unknown std::string checksum_hex; // Whenever a default checksum function name is passed in, we will compares // the corresponding checksum values after copying. Note that only table and // blob files may have a known checksum function name passed in. // // If no default checksum function name is passed in and db session id is not // available, we will calculate the checksum *before* copying in two cases // (we always calcuate checksums when copying or creating for any file types): // a) share_files_with_checksum is true and file type is table; // b) share_table_files is true and the file exists already. // // Step 0: Check if default checksum function name is passed in if (kDbFileChecksumFuncName == src_checksum_func_name) { if (src_checksum_str == kUnknownFileChecksum) { return status_to_io_status( Status::Aborted("Unknown checksum value for " + fname)); } checksum_hex = ChecksumStrToHex(src_checksum_str); } // Step 1: Prepare the relative path to destination if (shared && shared_checksum) { if (GetNamingNoFlags() != BackupEngineOptions::kLegacyCrc32cAndFileSize && file_type != kBlobFile) { // Prepare db_session_id to add to the file name // Ignore the returned status // In the failed cases, db_id and db_session_id will be empty GetFileDbIdentities(db_env_, src_env_options, src_path, src_temperature, rate_limiter, &db_id, &db_session_id) .PermitUncheckedError(); } // Calculate checksum if checksum and db session id are not available. // If db session id is available, we will not calculate the checksum // since the session id should suffice to avoid file name collision in // the shared_checksum directory. if (checksum_hex.empty() && db_session_id.empty()) { IOStatus io_s = ReadFileAndComputeChecksum( src_path, db_fs_, src_env_options, size_limit, &checksum_hex, src_temperature); if (!io_s.ok()) { return io_s; } } if (size_bytes == std::numeric_limits::max()) { return IOStatus::NotFound("File missing: " + src_path); } // dst_relative depends on the following conditions: // 1) the naming scheme is kUseDbSessionId, // 2) db_session_id is not empty, // 3) checksum is available in the DB manifest. // If 1,2,3) are satisfied, then dst_relative will be of the form: // shared_checksum/__.sst // If 1,2) are satisfied, then dst_relative will be of the form: // shared_checksum/_.sst // Otherwise, dst_relative is of the form // shared_checksum/__.sst // // For blob files, db_session_id is not supported with the blob file format. // It uses original/legacy naming scheme. // dst_relative will be of the form: // shared_checksum/__.blob dst_relative = GetSharedFileWithChecksum(fname, checksum_hex, size_bytes, db_session_id); dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true); dst_relative = GetSharedFileWithChecksumRel(dst_relative, false); } else if (shared) { dst_relative_tmp = GetSharedFileRel(fname, true); dst_relative = GetSharedFileRel(fname, false); } else { dst_relative = GetPrivateFileRel(backup_id, false, fname); } // We copy into `temp_dest_path` and, once finished, rename it to // `final_dest_path`. This allows files to atomically appear at // `final_dest_path`. We can copy directly to the final path when atomicity // is unnecessary, like for files in private backup directories. const std::string* copy_dest_path; std::string temp_dest_path; std::string final_dest_path = GetAbsolutePath(dst_relative); if (!dst_relative_tmp.empty()) { temp_dest_path = GetAbsolutePath(dst_relative_tmp); copy_dest_path = &temp_dest_path; } else { copy_dest_path = &final_dest_path; } // Step 2: Determine whether to copy or not // if it's shared, we also need to check if it exists -- if it does, no need // to copy it again. bool need_to_copy = true; // true if final_dest_path is the same path as another live file const bool same_path = live_dst_paths.find(final_dest_path) != live_dst_paths.end(); bool file_exists = false; if (shared && !same_path) { // Should be in shared directory but not a live path, check existence in // shared directory IOStatus exist = backup_fs_->FileExists(final_dest_path, io_options_, nullptr); if (exist.ok()) { file_exists = true; } else if (exist.IsNotFound()) { file_exists = false; } else { return exist; } } if (!contents.empty()) { need_to_copy = false; } else if (shared && (same_path || file_exists)) { need_to_copy = false; auto find_result = backuped_file_infos_.find(dst_relative); if (find_result == backuped_file_infos_.end() && !same_path) { // file exists but not referenced ROCKS_LOG_INFO( options_.info_log, "%s already present, but not referenced by any backup. We will " "overwrite the file.", fname.c_str()); need_to_copy = true; // Defer any failure reporting to when we try to write the file backup_fs_->DeleteFile(final_dest_path, io_options_, nullptr) .PermitUncheckedError(); } else { // file exists and referenced if (checksum_hex.empty()) { // same_path should not happen for a standard DB, so OK to // read file contents to check for checksum mismatch between // two files from same DB getting same name. // For compatibility with future meta file that might not have // crc32c checksum available, consider it might be empty, but // we don't currently generate meta file without crc32c checksum. // Therefore we have to read & compute it if we don't have it. if (!same_path && !find_result->second->checksum_hex.empty()) { assert(find_result != backuped_file_infos_.end()); // Note: to save I/O on incremental backups, we copy prior known // checksum of the file instead of reading entire file contents // to recompute it. checksum_hex = find_result->second->checksum_hex; // Regarding corruption detection, consider: // (a) the DB file is corrupt (since previous backup) and the backup // file is OK: we failed to detect, but the backup is safe. DB can // be repaired/restored once its corruption is detected. // (b) the backup file is corrupt (since previous backup) and the // db file is OK: we failed to detect, but the backup is corrupt. // CreateNewBackup should support fast incremental backups and // there's no way to support that without reading all the files. // We might add an option for extra checks on incremental backup, // but until then, use VerifyBackups to check existing backup data. // (c) file name collision with legitimately different content. // This is almost inconceivable with a well-generated DB session // ID, but even in that case, we double check the file sizes in // BackupMeta::AddFile. } else { IOStatus io_s = ReadFileAndComputeChecksum( src_path, db_fs_, src_env_options, size_limit, &checksum_hex, src_temperature); if (!io_s.ok()) { return io_s; } } } if (!db_session_id.empty()) { ROCKS_LOG_INFO(options_.info_log, "%s already present, with checksum %s, size %" PRIu64 " and DB session identity %s", fname.c_str(), checksum_hex.c_str(), size_bytes, db_session_id.c_str()); } else { ROCKS_LOG_INFO(options_.info_log, "%s already present, with checksum %s and size %" PRIu64, fname.c_str(), checksum_hex.c_str(), size_bytes); } } } live_dst_paths.insert(final_dest_path); // Step 3: Add work item if (!contents.empty() || need_to_copy) { ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(), copy_dest_path->c_str()); CopyOrCreateWorkItem copy_or_create_work_item( src_dir.empty() ? "" : src_path, *copy_dest_path, src_temperature, Temperature::kUnknown /*dst_temp*/, contents, db_env_, backup_env_, src_env_options, options_.sync, rate_limiter, size_limit, stats, progress_callback, src_checksum_func_name, checksum_hex, db_id, db_session_id); BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), shared, need_to_copy, backup_env_, temp_dest_path, final_dest_path, dst_relative); files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item)); } else { std::promise promise_result; BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( promise_result.get_future(), shared, need_to_copy, backup_env_, temp_dest_path, final_dest_path, dst_relative); backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item)); CopyOrCreateResult result; result.io_status = IOStatus::OK(); result.size = size_bytes; result.checksum_hex = std::move(checksum_hex); result.db_id = std::move(db_id); result.db_session_id = std::move(db_session_id); promise_result.set_value(std::move(result)); } return IOStatus::OK(); } IOStatus BackupEngineImpl::ReadFileAndComputeChecksum( const std::string& src, const std::shared_ptr& src_fs, const EnvOptions& src_env_options, uint64_t size_limit, std::string* checksum_hex, const Temperature src_temperature) const { if (checksum_hex == nullptr) { return status_to_io_status(Status::Aborted("Checksum pointer is null")); } uint32_t checksum_value = 0; if (size_limit == 0) { size_limit = std::numeric_limits::max(); } std::unique_ptr src_reader; auto file_options = FileOptions(src_env_options); file_options.temperature = src_temperature; RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); IOStatus io_s = SequentialFileReader::Create( src_fs, src, file_options, &src_reader, nullptr /* dbg */, rate_limiter); if (io_s.IsPathNotFound() && src_temperature != Temperature::kUnknown) { // Retry without temperature hint in case the FileSystem is strict with // non-kUnknown temperature option file_options.temperature = Temperature::kUnknown; io_s = SequentialFileReader::Create(src_fs, src, file_options, &src_reader, nullptr /* dbg */, rate_limiter); } if (!io_s.ok()) { return io_s; } size_t buf_size = kDefaultCopyFileBufferSize; std::unique_ptr buf(new char[buf_size]); Slice data; do { if (stop_backup_.load(std::memory_order_acquire)) { return status_to_io_status(Status::Incomplete("Backup stopped")); } size_t buffer_to_read = (buf_size < size_limit) ? buf_size : static_cast(size_limit); io_s = src_reader->Read(buffer_to_read, &data, buf.get(), Env::IO_LOW /* rate_limiter_priority */); if (!io_s.ok()) { return io_s; } size_limit -= data.size(); checksum_value = crc32c::Extend(checksum_value, data.data(), data.size()); } while (data.size() > 0 && size_limit > 0); checksum_hex->assign(ChecksumInt32ToHex(checksum_value)); return io_s; } Status BackupEngineImpl::GetFileDbIdentities( Env* src_env, const EnvOptions& src_env_options, const std::string& file_path, Temperature file_temp, RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id) { assert(db_id != nullptr || db_session_id != nullptr); Options options; options.env = src_env; SstFileDumper sst_reader(options, file_path, file_temp, 2 * 1024 * 1024 /* readahead_size */, false /* verify_checksum */, false /* output_hex */, false /* decode_blob_index */, src_env_options, true /* silent */); const TableProperties* table_properties = nullptr; std::shared_ptr tp; Status s = sst_reader.getStatus(); if (s.ok()) { // Try to get table properties from the table reader of sst_reader if (!sst_reader.ReadTableProperties(&tp).ok()) { // Try to use table properites from the initialization of sst_reader table_properties = sst_reader.GetInitTableProperties(); } else { table_properties = tp.get(); if (table_properties != nullptr && rate_limiter != nullptr) { // sizeof(*table_properties) is a sufficent but far-from-exact // approximation of read bytes due to metaindex block, std::string // properties and varint compression LoopRateLimitRequestHelper(sizeof(*table_properties), rate_limiter, Env::IO_LOW, nullptr /* stats */, RateLimiter::OpType::kRead); } } } else { ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s", file_path.c_str(), s.ToString().c_str()); return s; } if (table_properties != nullptr) { if (db_id != nullptr) { db_id->assign(table_properties->db_id); } if (db_session_id != nullptr) { db_session_id->assign(table_properties->db_session_id); if (db_session_id->empty()) { s = Status::NotFound("DB session identity not found in " + file_path); ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str()); return s; } } return Status::OK(); } else { s = Status::Corruption("Table properties missing in " + file_path); ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str()); return s; } } void BackupEngineImpl::LoopRateLimitRequestHelper( const size_t total_bytes_to_request, RateLimiter* rate_limiter, const Env::IOPriority pri, Statistics* stats, const RateLimiter::OpType op_type) { assert(rate_limiter != nullptr); size_t remaining_bytes = total_bytes_to_request; size_t request_bytes = 0; while (remaining_bytes > 0) { request_bytes = std::min(static_cast(rate_limiter->GetSingleBurstBytes()), remaining_bytes); rate_limiter->Request(request_bytes, pri, stats, op_type); remaining_bytes -= request_bytes; } } void BackupEngineImpl::DeleteChildren(const std::string& dir, uint32_t file_type_filter) const { std::vector children; db_fs_->GetChildren(dir, io_options_, &children, nullptr) .PermitUncheckedError(); // ignore errors for (const auto& f : children) { uint64_t number; FileType type; bool ok = ParseFileName(f, &number, &type); if (ok && (file_type_filter & (1 << type))) { // don't delete this file continue; } db_fs_->DeleteFile(dir + "/" + f, io_options_, nullptr) .PermitUncheckedError(); // ignore errors } } IOStatus BackupEngineImpl::ReadChildFileCurrentSizes( const std::string& dir, const std::shared_ptr& fs, std::unordered_map* result) const { assert(result != nullptr); std::vector files_attrs; IOStatus io_status = fs->FileExists(dir, io_options_, nullptr); if (io_status.ok()) { io_status = fs->GetChildrenFileAttributes(dir, io_options_, &files_attrs, nullptr); } else if (io_status.IsNotFound()) { // Insert no entries can be considered success io_status = IOStatus::OK(); } const bool slash_needed = dir.empty() || dir.back() != '/'; for (const auto& file_attrs : files_attrs) { result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name, file_attrs.size_bytes); } return io_status; } IOStatus BackupEngineImpl::GarbageCollect() { assert(!read_only_); // We will make a best effort to remove all garbage even in the presence // of inconsistencies or I/O failures that inhibit finding garbage. IOStatus overall_status = IOStatus::OK(); // If all goes well, we don't need another auto-GC this session might_need_garbage_collect_ = false; ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection"); // delete obsolete shared files for (bool with_checksum : {false, true}) { std::vector shared_children; { std::string shared_path; if (with_checksum) { shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel()); } else { shared_path = GetAbsolutePath(GetSharedFileRel()); } IOStatus io_s = backup_fs_->FileExists(shared_path, io_options_, nullptr); if (io_s.ok()) { io_s = backup_fs_->GetChildren(shared_path, io_options_, &shared_children, nullptr); } else if (io_s.IsNotFound()) { io_s = IOStatus::OK(); } if (!io_s.ok()) { overall_status = io_s; // Trying again later might work might_need_garbage_collect_ = true; } } for (auto& child : shared_children) { std::string rel_fname; if (with_checksum) { rel_fname = GetSharedFileWithChecksumRel(child); } else { rel_fname = GetSharedFileRel(child); } auto child_itr = backuped_file_infos_.find(rel_fname); // if it's not refcounted, delete it if (child_itr == backuped_file_infos_.end() || child_itr->second->refs == 0) { // this might be a directory, but DeleteFile will just fail in that // case, so we're good IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(rel_fname), io_options_, nullptr); ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", rel_fname.c_str(), io_s.ToString().c_str()); backuped_file_infos_.erase(rel_fname); if (!io_s.ok()) { // Trying again later might work might_need_garbage_collect_ = true; } } } } // delete obsolete private files std::vector private_children; { IOStatus io_s = backup_fs_->GetChildren(GetAbsolutePath(kPrivateDirName), io_options_, &private_children, nullptr); if (!io_s.ok()) { overall_status = io_s; // Trying again later might work might_need_garbage_collect_ = true; } } for (auto& child : private_children) { BackupID backup_id = 0; bool tmp_dir = child.find(".tmp") != std::string::npos; sscanf(child.c_str(), "%u", &backup_id); if (!tmp_dir && // if it's tmp_dir, delete it (backup_id == 0 || backups_.find(backup_id) != backups_.end())) { // it's either not a number or it's still alive. continue continue; } // here we have to delete the dir and all its children std::string full_private_path = GetAbsolutePath(GetPrivateFileRel(backup_id)); std::vector subchildren; if (backup_fs_ ->GetChildren(full_private_path, io_options_, &subchildren, nullptr) .ok()) { for (auto& subchild : subchildren) { IOStatus io_s = backup_fs_->DeleteFile(full_private_path + subchild, io_options_, nullptr); ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", (full_private_path + subchild).c_str(), io_s.ToString().c_str()); if (!io_s.ok()) { // Trying again later might work might_need_garbage_collect_ = true; } } } // finally delete the private dir IOStatus io_s = backup_fs_->DeleteDir(full_private_path, io_options_, nullptr); ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s", full_private_path.c_str(), io_s.ToString().c_str()); if (!io_s.ok()) { // Trying again later might work might_need_garbage_collect_ = true; } } assert(overall_status.ok() || might_need_garbage_collect_); return overall_status; } // ------- BackupMeta class -------- IOStatus BackupEngineImpl::BackupMeta::AddFile( std::shared_ptr file_info) { auto itr = file_infos_->find(file_info->filename); if (itr == file_infos_->end()) { auto ret = file_infos_->insert({file_info->filename, file_info}); if (ret.second) { itr = ret.first; itr->second->refs = 1; } else { // if this happens, something is seriously wrong return IOStatus::Corruption("In memory metadata insertion error"); } } else { // Compare sizes, because we scanned that off the filesystem on both // ends. This is like a check in VerifyBackup. if (itr->second->size != file_info->size) { std::string msg = "Size mismatch for existing backup file: "; msg.append(file_info->filename); msg.append(" Size in backup is " + std::to_string(itr->second->size) + " while size in DB is " + std::to_string(file_info->size)); msg.append( " If this DB file checks as not corrupt, try deleting old" " backups or backing up to a different backup directory."); return IOStatus::Corruption(msg); } if (file_info->checksum_hex.empty()) { // No checksum available to check } else if (itr->second->checksum_hex.empty()) { // Remember checksum if newly acquired itr->second->checksum_hex = file_info->checksum_hex; } else if (itr->second->checksum_hex != file_info->checksum_hex) { // Note: to save I/O, these will be equal trivially on already backed // up files that don't have the checksum in their name. And it should // never fail for files that do have checksum in their name. // Should never reach here, but produce an appropriate corruption // message in case we do in a release build. assert(false); std::string msg = "Checksum mismatch for existing backup file: "; msg.append(file_info->filename); msg.append(" Expected checksum is " + itr->second->checksum_hex + " while computed checksum is " + file_info->checksum_hex); msg.append( " If this DB file checks as not corrupt, try deleting old" " backups or backing up to a different backup directory."); return IOStatus::Corruption(msg); } ++itr->second->refs; // increase refcount if already present } size_ += file_info->size; files_.push_back(itr->second); return IOStatus::OK(); } IOStatus BackupEngineImpl::BackupMeta::Delete(bool delete_meta) { IOStatus io_s; for (const auto& file : files_) { --file->refs; // decrease refcount } files_.clear(); // delete meta file if (delete_meta) { io_s = fs_->FileExists(meta_filename_, iooptions_, nullptr); if (io_s.ok()) { io_s = fs_->DeleteFile(meta_filename_, iooptions_, nullptr); } else if (io_s.IsNotFound()) { io_s = IOStatus::OK(); // nothing to delete } } timestamp_ = 0; return io_s; } // Constants for backup meta file schema (see LoadFromFile) const std::string kSchemaVersionPrefix{"schema_version "}; const std::string kFooterMarker{"// FOOTER"}; const std::string kAppMetaDataFieldName{"metadata"}; // WART: The checksums are crc32c but named "crc32" const std::string kFileCrc32cFieldName{"crc32"}; const std::string kFileSizeFieldName{"size"}; const std::string kTemperatureFieldName{"temp"}; // Marks a (future) field that should cause failure if not recognized. // Other fields are assumed to be ignorable. For example, in the future // we might add // ni::file_name_escape uri_percent // to indicate all file names have had spaces and special characters // escaped using a URI percent encoding. const std::string kNonIgnorableFieldPrefix{"ni::"}; // Each backup meta file is of the format (schema version 1): //---------------------------------------------------------- // // // metadata (optional) // // crc32 // crc32 // ... //---------------------------------------------------------- // // For schema version 2.x (not in public APIs, but // forward-compatibility started): //---------------------------------------------------------- // schema_version // // // [ ] // ... // // ( )* // ( )* // ... // [// FOOTER] // [ ] // ... //---------------------------------------------------------- // where // ::= [0-9]+([.][0-9]+) // ::= [A-Za-z_][A-Za-z_0-9.]+ // is anything but newline // is anything but space and newline // Although "// FOOTER" wouldn't strictly be required as a delimiter // given the number of files is included, it is there for parsing // sanity in case of corruption. It is only required if followed // by footer fields, such as a checksum of the meta file (so far). // Unrecognized fields are ignored, to support schema evolution on // non-critical features with forward compatibility. Update schema // major version for breaking changes. Schema minor versions are indicated // only for diagnostic/debugging purposes. // // Fields in schema version 2.0: // * Top-level meta fields: // * Only "metadata" as in schema version 1 // * File meta fields: // * "crc32" - a crc32c checksum as in schema version 1 // * "size" - the size of the file (new) // * Footer meta fields: // * None yet (future use for meta file checksum anticipated) // IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( const std::string& backup_dir, const std::unordered_map& abs_path_to_size, RateLimiter* rate_limiter, Logger* info_log, std::unordered_set* reported_ignored_fields) { assert(reported_ignored_fields); assert(Empty()); std::unique_ptr backup_meta_reader; { IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(), &backup_meta_reader, nullptr /* dbg */, rate_limiter); if (!io_s.ok()) { return io_s; } } // If we don't read an explicit schema_version, that implies version 1, // which is what we call the original backup meta schema. int schema_major_version = 1; // Failures handled at the end std::string line; if (backup_meta_reader->ReadLine(&line, Env::IO_LOW /* rate_limiter_priority */)) { if (StartsWith(line, kSchemaVersionPrefix)) { std::string ver = line.substr(kSchemaVersionPrefix.size()); if (ver == "2" || StartsWith(ver, "2.")) { schema_major_version = 2; } else { return IOStatus::NotSupported( "Unsupported/unrecognized schema version: " + ver); } line.clear(); } else if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); } } if (!line.empty()) { timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } else if (backup_meta_reader->ReadLine( &line, Env::IO_LOW /* rate_limiter_priority */)) { timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } if (backup_meta_reader->ReadLine(&line, Env::IO_LOW /* rate_limiter_priority */)) { sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } uint32_t num_files = UINT32_MAX; while (backup_meta_reader->ReadLine( &line, Env::IO_LOW /* rate_limiter_priority */)) { if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); } // Number -> number of files -> exit loop reading optional meta fields if (line[0] >= '0' && line[0] <= '9') { num_files = static_cast(strtoul(line.c_str(), nullptr, 10)); break; } // else, must be a meta field assignment auto space_pos = line.find_first_of(' '); if (space_pos == std::string::npos) { return IOStatus::Corruption("Expected number of files or meta field"); } std::string field_name = line.substr(0, space_pos); std::string field_data = line.substr(space_pos + 1); if (field_name == kAppMetaDataFieldName) { // app metadata present bool decode_success = Slice(field_data).DecodeHex(&app_metadata_); if (!decode_success) { return IOStatus::Corruption( "Failed to decode stored hex encoded app metadata"); } } else if (schema_major_version < 2) { return IOStatus::Corruption("Expected number of files or \"" + kAppMetaDataFieldName + "\" field"); } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) { return IOStatus::NotSupported("Unrecognized non-ignorable meta field " + field_name + " (from future version?)"); } else { // Warn the first time we see any particular unrecognized meta field if (reported_ignored_fields->insert("meta:" + field_name).second) { ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup meta field %s", field_name.c_str()); } } } std::vector> files; bool footer_present = false; while (backup_meta_reader->ReadLine( &line, Env::IO_LOW /* rate_limiter_priority */)) { std::vector components = StringSplit(line, ' '); if (components.size() < 1) { return IOStatus::Corruption("Empty line instead of file entry."); } if (schema_major_version >= 2 && components.size() == 2 && line == kFooterMarker) { footer_present = true; break; } const std::string& filename = components[0]; uint64_t actual_size; const std::shared_ptr file_info = GetFile(filename); if (file_info) { actual_size = file_info->size; } else { std::string abs_path = backup_dir + "/" + filename; auto e = abs_path_to_size.find(abs_path); if (e == abs_path_to_size.end()) { return IOStatus::Corruption( "Pathname in meta file not found on disk: " + abs_path); } actual_size = e->second; } if (schema_major_version >= 2) { if (components.size() % 2 != 1) { return IOStatus::Corruption( "Bad number of line components for file entry."); } } else { // Check restricted original schema if (components.size() < 3) { return IOStatus::Corruption("File checksum is missing for " + filename + " in " + meta_filename_); } if (components[1] != kFileCrc32cFieldName) { return IOStatus::Corruption("Unknown checksum type for " + filename + " in " + meta_filename_); } if (components.size() > 3) { return IOStatus::Corruption("Extra data for entry " + filename + " in " + meta_filename_); } } std::string checksum_hex; Temperature temp = Temperature::kUnknown; for (unsigned i = 1; i < components.size(); i += 2) { const std::string& field_name = components[i]; const std::string& field_data = components[i + 1]; if (field_name == kFileCrc32cFieldName) { uint32_t checksum_value = static_cast(strtoul(field_data.c_str(), nullptr, 10)); if (field_data != std::to_string(checksum_value)) { return IOStatus::Corruption("Invalid checksum value for " + filename + " in " + meta_filename_); } checksum_hex = ChecksumInt32ToHex(checksum_value); } else if (field_name == kFileSizeFieldName) { uint64_t ex_size = std::strtoull(field_data.c_str(), nullptr, /*base*/ 10); if (ex_size != actual_size) { return IOStatus::Corruption( "For file " + filename + " expected size " + std::to_string(ex_size) + " but found size" + std::to_string(actual_size)); } } else if (field_name == kTemperatureFieldName) { auto iter = temperature_string_map.find(field_data); if (iter != temperature_string_map.end()) { temp = iter->second; } else { // Could report corruption, but in case of new temperatures added // in future, letting those map to kUnknown which should generally // be safe. temp = Temperature::kUnknown; } } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) { return IOStatus::NotSupported("Unrecognized non-ignorable file field " + field_name + " (from future version?)"); } else { // Warn the first time we see any particular unrecognized file field if (reported_ignored_fields->insert("file:" + field_name).second) { ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup file field %s", field_name.c_str()); } } } files.emplace_back(new FileInfo(filename, actual_size, checksum_hex, /*id*/ "", /*sid*/ "", temp)); } if (footer_present) { assert(schema_major_version >= 2); while (backup_meta_reader->ReadLine( &line, Env::IO_LOW /* rate_limiter_priority */)) { if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); } auto space_pos = line.find_first_of(' '); if (space_pos == std::string::npos) { return IOStatus::Corruption("Expected footer field"); } std::string field_name = line.substr(0, space_pos); std::string field_data = line.substr(space_pos + 1); if (StartsWith(field_name, kNonIgnorableFieldPrefix)) { return IOStatus::NotSupported("Unrecognized non-ignorable field " + field_name + " (from future version?)"); } else if (reported_ignored_fields->insert("footer:" + field_name) .second) { // Warn the first time we see any particular unrecognized footer field ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup meta footer field %s", field_name.c_str()); } } } { IOStatus io_s = backup_meta_reader->GetStatus(); if (!io_s.ok()) { return io_s; } } if (num_files != files.size()) { return IOStatus::Corruption( "Inconsistent number of files or missing/incomplete header in " + meta_filename_); } files_.reserve(files.size()); for (const auto& file_info : files) { IOStatus io_s = AddFile(file_info); if (!io_s.ok()) { return io_s; } } return IOStatus::OK(); } const std::vector minor_version_strings{ "", // invalid major version 0 "", // implicit major version 1 "2.0", }; IOStatus BackupEngineImpl::BackupMeta::StoreToFile( bool sync, int schema_version, const TEST_BackupMetaSchemaOptions* schema_test_options) { if (schema_version < 1) { return IOStatus::InvalidArgument( "BackupEngineOptions::schema_version must be >= 1"); } if (schema_version > static_cast(minor_version_strings.size() - 1)) { return IOStatus::NotSupported( "Only BackupEngineOptions::schema_version <= " + std::to_string(minor_version_strings.size() - 1) + " is supported"); } std::string ver = minor_version_strings[schema_version]; // Need schema_version >= 2 for TEST_BackupMetaSchemaOptions assert(schema_version >= 2 || schema_test_options == nullptr); IOStatus io_s; std::unique_ptr backup_meta_file; FileOptions file_options; file_options.use_mmap_writes = false; file_options.use_direct_writes = false; io_s = fs_->NewWritableFile(meta_tmp_filename_, file_options, &backup_meta_file, nullptr); if (!io_s.ok()) { return io_s; } std::ostringstream buf; if (schema_test_options) { // override for testing ver = schema_test_options->version; } if (!ver.empty()) { assert(schema_version >= 2); buf << kSchemaVersionPrefix << ver << "\n"; } buf << static_cast(timestamp_) << "\n"; buf << sequence_number_ << "\n"; if (!app_metadata_.empty()) { std::string hex_encoded_metadata = Slice(app_metadata_).ToString(/* hex */ true); buf << kAppMetaDataFieldName << " " << hex_encoded_metadata << "\n"; } if (schema_test_options) { for (auto& e : schema_test_options->meta_fields) { buf << e.first << " " << e.second << "\n"; } } buf << files_.size() << "\n"; for (const auto& file : files_) { buf << file->filename; if (schema_test_options == nullptr || schema_test_options->crc32c_checksums) { // use crc32c for now, switch to something else if needed buf << " " << kFileCrc32cFieldName << " " << ChecksumHexToInt32(file->checksum_hex); } if (schema_version >= 2 && file->temp != Temperature::kUnknown) { buf << " " << kTemperatureFieldName << " " << temperature_to_string[file->temp]; } if (schema_test_options && schema_test_options->file_sizes) { buf << " " << kFileSizeFieldName << " " << std::to_string(file->size); } if (schema_test_options) { for (auto& e : schema_test_options->file_fields) { buf << " " << e.first << " " << e.second; } } buf << "\n"; } if (schema_test_options && !schema_test_options->footer_fields.empty()) { buf << kFooterMarker << "\n"; for (auto& e : schema_test_options->footer_fields) { buf << e.first << " " << e.second << "\n"; } } io_s = backup_meta_file->Append(Slice(buf.str()), iooptions_, nullptr); IOSTATS_ADD(bytes_written, buf.str().size()); if (io_s.ok() && sync) { io_s = backup_meta_file->Sync(iooptions_, nullptr); } if (io_s.ok()) { io_s = backup_meta_file->Close(iooptions_, nullptr); } if (io_s.ok()) { io_s = fs_->RenameFile(meta_tmp_filename_, meta_filename_, iooptions_, nullptr); } return io_s; } } // namespace IOStatus BackupEngineReadOnly::Open(const BackupEngineOptions& options, Env* env, BackupEngineReadOnly** backup_engine_ptr) { if (options.destroy_old_data) { return IOStatus::InvalidArgument( "Can't destroy old data with ReadOnly BackupEngine"); } std::unique_ptr backup_engine( new BackupEngineImplThreadSafe(options, env, true /*read_only*/)); auto s = backup_engine->Initialize(); if (!s.ok()) { *backup_engine_ptr = nullptr; return s; } *backup_engine_ptr = backup_engine.release(); return IOStatus::OK(); } void TEST_SetBackupMetaSchemaOptions( BackupEngine* engine, const TEST_BackupMetaSchemaOptions& options) { BackupEngineImplThreadSafe* impl = static_cast_with_check(engine); impl->TEST_SetBackupMetaSchemaOptions(options); } void TEST_SetDefaultRateLimitersClock( BackupEngine* engine, const std::shared_ptr& backup_rate_limiter_clock, const std::shared_ptr& restore_rate_limiter_clock) { BackupEngineImplThreadSafe* impl = static_cast_with_check(engine); impl->TEST_SetDefaultRateLimitersClock(backup_rate_limiter_clock, restore_rate_limiter_clock); } } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE