From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/utilities/backup/backup_engine.cc | 3181 +++++++++++++++++++++++++ 1 file changed, 3181 insertions(+) create mode 100644 src/rocksdb/utilities/backup/backup_engine.cc (limited to 'src/rocksdb/utilities/backup/backup_engine.cc') diff --git a/src/rocksdb/utilities/backup/backup_engine.cc b/src/rocksdb/utilities/backup/backup_engine.cc new file mode 100644 index 000000000..81b4a6629 --- /dev/null +++ b/src/rocksdb/utilities/backup/backup_engine.cc @@ -0,0 +1,3181 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "env/composite_env_wrapper.h" +#include "env/fs_readonly.h" +#include "env/fs_remap.h" +#include "file/filename.h" +#include "file/line_file_reader.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" +#include "logging/logging.h" +#include "monitoring/iostats_context_imp.h" +#include "options/options_helper.h" +#include "port/port.h" +#include "rocksdb/advanced_options.h" +#include "rocksdb/env.h" +#include "rocksdb/rate_limiter.h" +#include "rocksdb/statistics.h" +#include "rocksdb/transaction_log.h" +#include "table/sst_file_dumper.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/channel.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/math.h" +#include "util/rate_limiter.h" +#include "util/string_util.h" +#include "utilities/backup/backup_engine_impl.h" +#include "utilities/checkpoint/checkpoint_impl.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +using ShareFilesNaming = BackupEngineOptions::ShareFilesNaming; + +constexpr BackupID kLatestBackupIDMarker = static_cast(-2); + +inline uint32_t ChecksumHexToInt32(const std::string& checksum_hex) { + std::string checksum_str; + Slice(checksum_hex).DecodeHex(&checksum_str); + return EndianSwapValue(DecodeFixed32(checksum_str.c_str())); +} +inline std::string ChecksumStrToHex(const std::string& checksum_str) { + return Slice(checksum_str).ToString(true); +} +inline std::string ChecksumInt32ToHex(const uint32_t& checksum_value) { + std::string checksum_str; + PutFixed32(&checksum_str, EndianSwapValue(checksum_value)); + return ChecksumStrToHex(checksum_str); +} + +const std::string kPrivateDirName = "private"; +const std::string kMetaDirName = "meta"; +const std::string kSharedDirName = "shared"; +const std::string kSharedChecksumDirName = "shared_checksum"; +const std::string kPrivateDirSlash = kPrivateDirName + "/"; +const std::string kMetaDirSlash = kMetaDirName + "/"; +const std::string kSharedDirSlash = kSharedDirName + "/"; +const std::string kSharedChecksumDirSlash = kSharedChecksumDirName + "/"; + +} // namespace + +void BackupStatistics::IncrementNumberSuccessBackup() { + number_success_backup++; +} +void BackupStatistics::IncrementNumberFailBackup() { number_fail_backup++; } + +uint32_t BackupStatistics::GetNumberSuccessBackup() const { + return number_success_backup; +} +uint32_t BackupStatistics::GetNumberFailBackup() const { + return number_fail_backup; +} + +std::string BackupStatistics::ToString() const { + char result[50]; + snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u", + GetNumberSuccessBackup(), GetNumberFailBackup()); + return result; +} + +void BackupEngineOptions::Dump(Logger* logger) const { + ROCKS_LOG_INFO(logger, " Options.backup_dir: %s", + backup_dir.c_str()); + ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env); + ROCKS_LOG_INFO(logger, " Options.share_table_files: %d", + static_cast(share_table_files)); + ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log); + ROCKS_LOG_INFO(logger, " Options.sync: %d", + static_cast(sync)); + ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d", + static_cast(destroy_old_data)); + ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d", + static_cast(backup_log_files)); + ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64, + backup_rate_limit); + ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64, + restore_rate_limit); + ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d", + max_background_operations); +} + +namespace { +// -------- BackupEngineImpl class --------- +class BackupEngineImpl { + public: + BackupEngineImpl(const BackupEngineOptions& options, Env* db_env, + bool read_only = false); + ~BackupEngineImpl(); + + IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options, + DB* db, const std::string& app_metadata, + BackupID* new_backup_id_ptr); + + IOStatus PurgeOldBackups(uint32_t num_backups_to_keep); + + IOStatus DeleteBackup(BackupID backup_id); + + void StopBackup() { stop_backup_.store(true, std::memory_order_release); } + + IOStatus GarbageCollect(); + + // The returned BackupInfos are in chronological order, which means the + // latest backup comes last. + void GetBackupInfo(std::vector* backup_info, + bool include_file_details) const; + + Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info, + bool include_file_details = false) const; + + void GetCorruptedBackups(std::vector* corrupt_backup_ids) const; + + IOStatus RestoreDBFromBackup(const RestoreOptions& options, + BackupID backup_id, const std::string& db_dir, + const std::string& wal_dir) const; + + IOStatus RestoreDBFromLatestBackup(const RestoreOptions& options, + const std::string& db_dir, + const std::string& wal_dir) const { + // Note: don't read latest_valid_backup_id_ outside of lock + return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir); + } + + IOStatus VerifyBackup(BackupID backup_id, + bool verify_with_checksum = false) const; + + IOStatus Initialize(); + + ShareFilesNaming GetNamingNoFlags() const { + return options_.share_files_with_checksum_naming & + BackupEngineOptions::kMaskNoNamingFlags; + } + ShareFilesNaming GetNamingFlags() const { + return options_.share_files_with_checksum_naming & + BackupEngineOptions::kMaskNamingFlags; + } + + void TEST_SetDefaultRateLimitersClock( + const std::shared_ptr& backup_rate_limiter_clock, + const std::shared_ptr& restore_rate_limiter_clock) { + if (backup_rate_limiter_clock) { + static_cast(options_.backup_rate_limiter.get()) + ->TEST_SetClock(backup_rate_limiter_clock); + } + + if (restore_rate_limiter_clock) { + static_cast(options_.restore_rate_limiter.get()) + ->TEST_SetClock(restore_rate_limiter_clock); + } + } + + private: + void DeleteChildren(const std::string& dir, + uint32_t file_type_filter = 0) const; + IOStatus DeleteBackupNoGC(BackupID backup_id); + + // Extends the "result" map with pathname->size mappings for the contents of + // "dir" in "env". Pathnames are prefixed with "dir". + IOStatus ReadChildFileCurrentSizes( + const std::string& dir, const std::shared_ptr&, + std::unordered_map* result) const; + + struct FileInfo { + FileInfo(const std::string& fname, uint64_t sz, const std::string& checksum, + const std::string& id, const std::string& sid, Temperature _temp) + : refs(0), + filename(fname), + size(sz), + checksum_hex(checksum), + db_id(id), + db_session_id(sid), + temp(_temp) {} + + FileInfo(const FileInfo&) = delete; + FileInfo& operator=(const FileInfo&) = delete; + + int refs; + const std::string filename; + const uint64_t size; + // crc32c checksum as hex. empty == unknown / unavailable + std::string checksum_hex; + // DB identities + // db_id is obtained for potential usage in the future but not used + // currently + const std::string db_id; + // db_session_id appears in the backup SST filename if the table naming + // option is kUseDbSessionId + const std::string db_session_id; + Temperature temp; + + std::string GetDbFileName() { + std::string rv; + // extract the filename part + size_t slash = filename.find_last_of('/'); + // file will either be shared/, shared_checksum/, + // shared_checksum/, shared_checksum/, + // or private// + assert(slash != std::string::npos); + rv = filename.substr(slash + 1); + + // if the file was in shared_checksum, extract the real file name + // in this case the file is __., + // _., or __. + if (filename.substr(0, slash) == kSharedChecksumDirName) { + rv = GetFileFromChecksumFile(rv); + } + return rv; + } + }; + + // TODO: deprecate this function once we migrate all BackupEngine's rate + // limiting to lower-level ones (i.e, ones in file access wrapper level like + // `WritableFileWriter`) + static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request, + RateLimiter* rate_limiter, + const Env::IOPriority pri, + Statistics* stats, + const RateLimiter::OpType op_type); + + static inline std::string WithoutTrailingSlash(const std::string& path) { + if (path.empty() || path.back() != '/') { + return path; + } else { + return path.substr(path.size() - 1); + } + } + + static inline std::string WithTrailingSlash(const std::string& path) { + if (path.empty() || path.back() != '/') { + return path + '/'; + } else { + return path; + } + } + + // A filesystem wrapper that makes shared backup files appear to be in the + // private backup directory (dst_dir), so that the private backup dir can + // be opened as a read-only DB. + class RemapSharedFileSystem : public RemapFileSystem { + public: + RemapSharedFileSystem(const std::shared_ptr& base, + const std::string& dst_dir, + const std::string& src_base_dir, + const std::vector>& files) + : RemapFileSystem(base), + dst_dir_(WithoutTrailingSlash(dst_dir)), + dst_dir_slash_(WithTrailingSlash(dst_dir)), + src_base_dir_(WithTrailingSlash(src_base_dir)) { + for (auto& info : files) { + if (!StartsWith(info->filename, kPrivateDirSlash)) { + assert(StartsWith(info->filename, kSharedDirSlash) || + StartsWith(info->filename, kSharedChecksumDirSlash)); + remaps_[info->GetDbFileName()] = info; + } + } + } + + const char* Name() const override { + return "BackupEngineImpl::RemapSharedFileSystem"; + } + + // Sometimes a directory listing is required in opening a DB + IOStatus GetChildren(const std::string& dir, const IOOptions& options, + std::vector* result, + IODebugContext* dbg) override { + IOStatus s = RemapFileSystem::GetChildren(dir, options, result, dbg); + if (s.ok() && (dir == dst_dir_ || dir == dst_dir_slash_)) { + // Assume remapped files exist + for (auto& r : remaps_) { + result->push_back(r.first); + } + } + return s; + } + + // Sometimes a directory listing is required in opening a DB + IOStatus GetChildrenFileAttributes(const std::string& dir, + const IOOptions& options, + std::vector* result, + IODebugContext* dbg) override { + IOStatus s = + RemapFileSystem::GetChildrenFileAttributes(dir, options, result, dbg); + if (s.ok() && (dir == dst_dir_ || dir == dst_dir_slash_)) { + // Assume remapped files exist with recorded size + for (auto& r : remaps_) { + result->emplace_back(); // clean up with C++20 + FileAttributes& attr = result->back(); + attr.name = r.first; + attr.size_bytes = r.second->size; + } + } + return s; + } + + protected: + // When a file in dst_dir is requested, see if we need to remap to shared + // file path. + std::pair EncodePath( + const std::string& path) override { + if (path.empty() || path[0] != '/') { + return {IOStatus::InvalidArgument(path, "Not an absolute path"), ""}; + } + std::pair rv{IOStatus(), path}; + if (StartsWith(path, dst_dir_slash_)) { + std::string relative = path.substr(dst_dir_slash_.size()); + auto it = remaps_.find(relative); + if (it != remaps_.end()) { + rv.second = src_base_dir_ + it->second->filename; + } + } + return rv; + } + + private: + // Absolute path to a directory that some extra files will be mapped into. + const std::string dst_dir_; + // Includes a trailing slash. + const std::string dst_dir_slash_; + // Absolute path to a directory containing some files to be mapped into + // dst_dir_. Includes a trailing slash. + const std::string src_base_dir_; + // If remaps_[x] exists, attempt to read dst_dir_ / x should instead read + // src_base_dir_ / remaps_[x]->filename. FileInfo is used to maximize + // sharing with other backup data in memory. + std::unordered_map> remaps_; + }; + + class BackupMeta { + public: + BackupMeta( + const std::string& meta_filename, const std::string& meta_tmp_filename, + std::unordered_map>* file_infos, + Env* env, const std::shared_ptr& fs) + : timestamp_(0), + sequence_number_(0), + size_(0), + meta_filename_(meta_filename), + meta_tmp_filename_(meta_tmp_filename), + file_infos_(file_infos), + env_(env), + fs_(fs) {} + + BackupMeta(const BackupMeta&) = delete; + BackupMeta& operator=(const BackupMeta&) = delete; + + ~BackupMeta() {} + + void RecordTimestamp() { + // Best effort + Status s = env_->GetCurrentTime(×tamp_); + if (!s.ok()) { + timestamp_ = /* something clearly fabricated */ 1; + } + } + int64_t GetTimestamp() const { return timestamp_; } + uint64_t GetSize() const { return size_; } + uint32_t GetNumberFiles() const { + return static_cast(files_.size()); + } + void SetSequenceNumber(uint64_t sequence_number) { + sequence_number_ = sequence_number; + } + uint64_t GetSequenceNumber() const { return sequence_number_; } + + const std::string& GetAppMetadata() const { return app_metadata_; } + + void SetAppMetadata(const std::string& app_metadata) { + app_metadata_ = app_metadata; + } + + IOStatus AddFile(std::shared_ptr file_info); + + IOStatus Delete(bool delete_meta = true); + + bool Empty() const { return files_.empty(); } + + std::shared_ptr GetFile(const std::string& filename) const { + auto it = file_infos_->find(filename); + if (it == file_infos_->end()) { + return nullptr; + } + return it->second; + } + + const std::vector>& GetFiles() const { + return files_; + } + + // @param abs_path_to_size Pre-fetched file sizes (bytes). + IOStatus LoadFromFile( + const std::string& backup_dir, + const std::unordered_map& abs_path_to_size, + RateLimiter* rate_limiter, Logger* info_log, + std::unordered_set* reported_ignored_fields); + IOStatus StoreToFile( + bool sync, int schema_version, + const TEST_BackupMetaSchemaOptions* schema_test_options); + + std::string GetInfoString() { + std::ostringstream ss; + ss << "Timestamp: " << timestamp_ << std::endl; + char human_size[16]; + AppendHumanBytes(size_, human_size, sizeof(human_size)); + ss << "Size: " << human_size << std::endl; + ss << "Files:" << std::endl; + for (const auto& file : files_) { + AppendHumanBytes(file->size, human_size, sizeof(human_size)); + ss << file->filename << ", size " << human_size << ", refs " + << file->refs << std::endl; + } + return ss.str(); + } + + const std::shared_ptr& GetEnvForOpen() const { + if (!env_for_open_) { + // Lazy initialize + // Find directories + std::string dst_dir = meta_filename_; + auto i = dst_dir.rfind(kMetaDirSlash); + assert(i != std::string::npos); + std::string src_base_dir = dst_dir.substr(0, i); + dst_dir.replace(i, kMetaDirSlash.size(), kPrivateDirSlash); + // Make the RemapSharedFileSystem + std::shared_ptr remap_fs = + std::make_shared(fs_, dst_dir, src_base_dir, + files_); + // Make it read-only for safety + remap_fs = std::make_shared(remap_fs); + // Make an Env wrapper + env_for_open_ = std::make_shared(env_, remap_fs); + } + return env_for_open_; + } + + private: + int64_t timestamp_; + // sequence number is only approximate, should not be used + // by clients + uint64_t sequence_number_; + uint64_t size_; + std::string app_metadata_; + std::string const meta_filename_; + std::string const meta_tmp_filename_; + // files with relative paths (without "/" prefix!!) + std::vector> files_; + std::unordered_map>* file_infos_; + Env* env_; + mutable std::shared_ptr env_for_open_; + std::shared_ptr fs_; + IOOptions iooptions_ = IOOptions(); + }; // BackupMeta + + void SetBackupInfoFromBackupMeta(BackupID id, const BackupMeta& meta, + BackupInfo* backup_info, + bool include_file_details) const; + + inline std::string GetAbsolutePath( + const std::string& relative_path = "") const { + assert(relative_path.size() == 0 || relative_path[0] != '/'); + return options_.backup_dir + "/" + relative_path; + } + inline std::string GetPrivateFileRel(BackupID backup_id, bool tmp = false, + const std::string& file = "") const { + assert(file.size() == 0 || file[0] != '/'); + return kPrivateDirSlash + std::to_string(backup_id) + (tmp ? ".tmp" : "") + + "/" + file; + } + inline std::string GetSharedFileRel(const std::string& file = "", + bool tmp = false) const { + assert(file.size() == 0 || file[0] != '/'); + return kSharedDirSlash + std::string(tmp ? "." : "") + file + + (tmp ? ".tmp" : ""); + } + inline std::string GetSharedFileWithChecksumRel(const std::string& file = "", + bool tmp = false) const { + assert(file.size() == 0 || file[0] != '/'); + return kSharedChecksumDirSlash + std::string(tmp ? "." : "") + file + + (tmp ? ".tmp" : ""); + } + inline bool UseLegacyNaming(const std::string& sid) const { + return GetNamingNoFlags() == + BackupEngineOptions::kLegacyCrc32cAndFileSize || + sid.empty(); + } + inline std::string GetSharedFileWithChecksum( + const std::string& file, const std::string& checksum_hex, + const uint64_t file_size, const std::string& db_session_id) const { + assert(file.size() == 0 || file[0] != '/'); + std::string file_copy = file; + if (UseLegacyNaming(db_session_id)) { + assert(!checksum_hex.empty()); + file_copy.insert(file_copy.find_last_of('.'), + "_" + std::to_string(ChecksumHexToInt32(checksum_hex)) + + "_" + std::to_string(file_size)); + } else { + file_copy.insert(file_copy.find_last_of('.'), "_s" + db_session_id); + if (GetNamingFlags() & BackupEngineOptions::kFlagIncludeFileSize) { + file_copy.insert(file_copy.find_last_of('.'), + "_" + std::to_string(file_size)); + } + } + return file_copy; + } + static inline std::string GetFileFromChecksumFile(const std::string& file) { + assert(file.size() == 0 || file[0] != '/'); + std::string file_copy = file; + size_t first_underscore = file_copy.find_first_of('_'); + return file_copy.erase(first_underscore, + file_copy.find_last_of('.') - first_underscore); + } + inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const { + return GetAbsolutePath(kMetaDirName) + "/" + (tmp ? "." : "") + + std::to_string(backup_id) + (tmp ? ".tmp" : ""); + } + + // If size_limit == 0, there is no size limit, copy everything. + // + // Exactly one of src and contents must be non-empty. + // + // @param src If non-empty, the file is copied from this pathname. + // @param contents If non-empty, the file will be created with these contents. + // @param src_temperature Pass in expected temperature of src, return back + // temperature reported by FileSystem + IOStatus CopyOrCreateFile(const std::string& src, const std::string& dst, + const std::string& contents, uint64_t size_limit, + Env* src_env, Env* dst_env, + const EnvOptions& src_env_options, bool sync, + RateLimiter* rate_limiter, + std::function progress_callback, + Temperature* src_temperature, + Temperature dst_temperature, + uint64_t* bytes_toward_next_callback, + uint64_t* size, std::string* checksum_hex); + + IOStatus ReadFileAndComputeChecksum(const std::string& src, + const std::shared_ptr& src_fs, + const EnvOptions& src_env_options, + uint64_t size_limit, + std::string* checksum_hex, + const Temperature src_temperature) const; + + // Obtain db_id and db_session_id from the table properties of file_path + Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options, + const std::string& file_path, + Temperature file_temp, RateLimiter* rate_limiter, + std::string* db_id, std::string* db_session_id); + + struct CopyOrCreateResult { + ~CopyOrCreateResult() { + // The Status needs to be ignored here for two reasons. + // First, if the BackupEngineImpl shuts down with jobs outstanding, then + // it is possible that the Status in the future/promise is never read, + // resulting in an unchecked Status. Second, if there are items in the + // channel when the BackupEngineImpl is shutdown, these will also have + // Status that have not been checked. This + // TODO: Fix those issues so that the Status + io_status.PermitUncheckedError(); + } + uint64_t size; + std::string checksum_hex; + std::string db_id; + std::string db_session_id; + IOStatus io_status; + Temperature expected_src_temperature = Temperature::kUnknown; + Temperature current_src_temperature = Temperature::kUnknown; + }; + + // Exactly one of src_path and contents must be non-empty. If src_path is + // non-empty, the file is copied from this pathname. Otherwise, if contents is + // non-empty, the file will be created at dst_path with these contents. + struct CopyOrCreateWorkItem { + std::string src_path; + std::string dst_path; + Temperature src_temperature; + Temperature dst_temperature; + std::string contents; + Env* src_env; + Env* dst_env; + EnvOptions src_env_options; + bool sync; + RateLimiter* rate_limiter; + uint64_t size_limit; + Statistics* stats; + std::promise result; + std::function progress_callback; + std::string src_checksum_func_name; + std::string src_checksum_hex; + std::string db_id; + std::string db_session_id; + + CopyOrCreateWorkItem() + : src_path(""), + dst_path(""), + src_temperature(Temperature::kUnknown), + dst_temperature(Temperature::kUnknown), + contents(""), + src_env(nullptr), + dst_env(nullptr), + src_env_options(), + sync(false), + rate_limiter(nullptr), + size_limit(0), + stats(nullptr), + src_checksum_func_name(kUnknownFileChecksumFuncName), + src_checksum_hex(""), + db_id(""), + db_session_id("") {} + + CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete; + CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete; + + CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) noexcept { + *this = std::move(o); + } + + CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) noexcept { + src_path = std::move(o.src_path); + dst_path = std::move(o.dst_path); + src_temperature = std::move(o.src_temperature); + dst_temperature = std::move(o.dst_temperature); + contents = std::move(o.contents); + src_env = o.src_env; + dst_env = o.dst_env; + src_env_options = std::move(o.src_env_options); + sync = o.sync; + rate_limiter = o.rate_limiter; + size_limit = o.size_limit; + stats = o.stats; + result = std::move(o.result); + progress_callback = std::move(o.progress_callback); + src_checksum_func_name = std::move(o.src_checksum_func_name); + src_checksum_hex = std::move(o.src_checksum_hex); + db_id = std::move(o.db_id); + db_session_id = std::move(o.db_session_id); + src_temperature = o.src_temperature; + return *this; + } + + CopyOrCreateWorkItem( + std::string _src_path, std::string _dst_path, + const Temperature _src_temperature, const Temperature _dst_temperature, + std::string _contents, Env* _src_env, Env* _dst_env, + EnvOptions _src_env_options, bool _sync, RateLimiter* _rate_limiter, + uint64_t _size_limit, Statistics* _stats, + std::function _progress_callback = []() {}, + const std::string& _src_checksum_func_name = + kUnknownFileChecksumFuncName, + const std::string& _src_checksum_hex = "", + const std::string& _db_id = "", const std::string& _db_session_id = "") + : src_path(std::move(_src_path)), + dst_path(std::move(_dst_path)), + src_temperature(_src_temperature), + dst_temperature(_dst_temperature), + contents(std::move(_contents)), + src_env(_src_env), + dst_env(_dst_env), + src_env_options(std::move(_src_env_options)), + sync(_sync), + rate_limiter(_rate_limiter), + size_limit(_size_limit), + stats(_stats), + progress_callback(_progress_callback), + src_checksum_func_name(_src_checksum_func_name), + src_checksum_hex(_src_checksum_hex), + db_id(_db_id), + db_session_id(_db_session_id) {} + }; + + struct BackupAfterCopyOrCreateWorkItem { + std::future result; + bool shared; + bool needed_to_copy; + Env* backup_env; + std::string dst_path_tmp; + std::string dst_path; + std::string dst_relative; + BackupAfterCopyOrCreateWorkItem() + : shared(false), + needed_to_copy(false), + backup_env(nullptr), + dst_path_tmp(""), + dst_path(""), + dst_relative("") {} + + BackupAfterCopyOrCreateWorkItem( + BackupAfterCopyOrCreateWorkItem&& o) noexcept { + *this = std::move(o); + } + + BackupAfterCopyOrCreateWorkItem& operator=( + BackupAfterCopyOrCreateWorkItem&& o) noexcept { + result = std::move(o.result); + shared = o.shared; + needed_to_copy = o.needed_to_copy; + backup_env = o.backup_env; + dst_path_tmp = std::move(o.dst_path_tmp); + dst_path = std::move(o.dst_path); + dst_relative = std::move(o.dst_relative); + return *this; + } + + BackupAfterCopyOrCreateWorkItem(std::future&& _result, + bool _shared, bool _needed_to_copy, + Env* _backup_env, std::string _dst_path_tmp, + std::string _dst_path, + std::string _dst_relative) + : result(std::move(_result)), + shared(_shared), + needed_to_copy(_needed_to_copy), + backup_env(_backup_env), + dst_path_tmp(std::move(_dst_path_tmp)), + dst_path(std::move(_dst_path)), + dst_relative(std::move(_dst_relative)) {} + }; + + struct RestoreAfterCopyOrCreateWorkItem { + std::future result; + std::string from_file; + std::string to_file; + std::string checksum_hex; + RestoreAfterCopyOrCreateWorkItem() : checksum_hex("") {} + RestoreAfterCopyOrCreateWorkItem(std::future&& _result, + const std::string& _from_file, + const std::string& _to_file, + const std::string& _checksum_hex) + : result(std::move(_result)), + from_file(_from_file), + to_file(_to_file), + checksum_hex(_checksum_hex) {} + RestoreAfterCopyOrCreateWorkItem( + RestoreAfterCopyOrCreateWorkItem&& o) noexcept { + *this = std::move(o); + } + + RestoreAfterCopyOrCreateWorkItem& operator=( + RestoreAfterCopyOrCreateWorkItem&& o) noexcept { + result = std::move(o.result); + checksum_hex = std::move(o.checksum_hex); + return *this; + } + }; + + bool initialized_; + std::mutex byte_report_mutex_; + mutable channel files_to_copy_or_create_; + std::vector threads_; + std::atomic threads_cpu_priority_; + + // Certain operations like PurgeOldBackups and DeleteBackup will trigger + // automatic GarbageCollect (true) unless we've already done one in this + // session and have not failed to delete backup files since then (false). + bool might_need_garbage_collect_ = true; + + // Adds a file to the backup work queue to be copied or created if it doesn't + // already exist. + // + // Exactly one of src_dir and contents must be non-empty. + // + // @param src_dir If non-empty, the file in this directory named fname will be + // copied. + // @param fname Name of destination file and, in case of copy, source file. + // @param contents If non-empty, the file will be created with these contents. + IOStatus AddBackupFileWorkItem( + std::unordered_set& live_dst_paths, + std::vector& backup_items_to_finish, + BackupID backup_id, bool shared, const std::string& src_dir, + const std::string& fname, // starts with "/" + const EnvOptions& src_env_options, RateLimiter* rate_limiter, + FileType file_type, uint64_t size_bytes, Statistics* stats, + uint64_t size_limit = 0, bool shared_checksum = false, + std::function progress_callback = []() {}, + const std::string& contents = std::string(), + const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName, + const std::string& src_checksum_str = kUnknownFileChecksum, + const Temperature src_temperature = Temperature::kUnknown); + + // backup state data + BackupID latest_backup_id_; + BackupID latest_valid_backup_id_; + std::map> backups_; + std::map>> + corrupt_backups_; + std::unordered_map> + backuped_file_infos_; + std::atomic stop_backup_; + + // options data + BackupEngineOptions options_; + Env* db_env_; + Env* backup_env_; + + // directories + std::unique_ptr backup_directory_; + std::unique_ptr shared_directory_; + std::unique_ptr meta_directory_; + std::unique_ptr private_directory_; + + static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB + bool read_only_; + BackupStatistics backup_statistics_; + std::unordered_set reported_ignored_fields_; + static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB + std::shared_ptr db_fs_; + std::shared_ptr backup_fs_; + IOOptions io_options_ = IOOptions(); + + public: + std::unique_ptr schema_test_options_; +}; + +// -------- BackupEngineImplThreadSafe class --------- +// This locking layer for thread safety in the public API is layered on +// top to prevent accidental recursive locking with RWMutex, which is UB. +// Note: BackupEngineReadOnlyBase inherited twice, but has no fields +class BackupEngineImplThreadSafe : public BackupEngine, + public BackupEngineReadOnly { + public: + BackupEngineImplThreadSafe(const BackupEngineOptions& options, Env* db_env, + bool read_only = false) + : impl_(options, db_env, read_only) {} + ~BackupEngineImplThreadSafe() override {} + + using BackupEngine::CreateNewBackupWithMetadata; + IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options, + DB* db, const std::string& app_metadata, + BackupID* new_backup_id) override { + WriteLock lock(&mutex_); + return impl_.CreateNewBackupWithMetadata(options, db, app_metadata, + new_backup_id); + } + + IOStatus PurgeOldBackups(uint32_t num_backups_to_keep) override { + WriteLock lock(&mutex_); + return impl_.PurgeOldBackups(num_backups_to_keep); + } + + IOStatus DeleteBackup(BackupID backup_id) override { + WriteLock lock(&mutex_); + return impl_.DeleteBackup(backup_id); + } + + void StopBackup() override { + // No locking needed + impl_.StopBackup(); + } + + IOStatus GarbageCollect() override { + WriteLock lock(&mutex_); + return impl_.GarbageCollect(); + } + + Status GetLatestBackupInfo(BackupInfo* backup_info, + bool include_file_details = false) const override { + ReadLock lock(&mutex_); + return impl_.GetBackupInfo(kLatestBackupIDMarker, backup_info, + include_file_details); + } + + Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info, + bool include_file_details = false) const override { + ReadLock lock(&mutex_); + return impl_.GetBackupInfo(backup_id, backup_info, include_file_details); + } + + void GetBackupInfo(std::vector* backup_info, + bool include_file_details) const override { + ReadLock lock(&mutex_); + impl_.GetBackupInfo(backup_info, include_file_details); + } + + void GetCorruptedBackups( + std::vector* corrupt_backup_ids) const override { + ReadLock lock(&mutex_); + impl_.GetCorruptedBackups(corrupt_backup_ids); + } + + using BackupEngine::RestoreDBFromBackup; + IOStatus RestoreDBFromBackup(const RestoreOptions& options, + BackupID backup_id, const std::string& db_dir, + const std::string& wal_dir) const override { + ReadLock lock(&mutex_); + return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir); + } + + using BackupEngine::RestoreDBFromLatestBackup; + IOStatus RestoreDBFromLatestBackup( + const RestoreOptions& options, const std::string& db_dir, + const std::string& wal_dir) const override { + // Defer to above function, which locks + return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir); + } + + IOStatus VerifyBackup(BackupID backup_id, + bool verify_with_checksum = false) const override { + ReadLock lock(&mutex_); + return impl_.VerifyBackup(backup_id, verify_with_checksum); + } + + // Not public API but needed + IOStatus Initialize() { + // No locking needed + return impl_.Initialize(); + } + + // Not public API but used in testing + void TEST_SetBackupMetaSchemaOptions( + const TEST_BackupMetaSchemaOptions& options) { + impl_.schema_test_options_.reset(new TEST_BackupMetaSchemaOptions(options)); + } + + // Not public API but used in testing + void TEST_SetDefaultRateLimitersClock( + const std::shared_ptr& backup_rate_limiter_clock = nullptr, + const std::shared_ptr& restore_rate_limiter_clock = + nullptr) { + impl_.TEST_SetDefaultRateLimitersClock(backup_rate_limiter_clock, + restore_rate_limiter_clock); + } + + private: + mutable port::RWMutex mutex_; + BackupEngineImpl impl_; +}; +} // namespace + +IOStatus BackupEngine::Open(const BackupEngineOptions& options, Env* env, + BackupEngine** backup_engine_ptr) { + std::unique_ptr backup_engine( + new BackupEngineImplThreadSafe(options, env)); + auto s = backup_engine->Initialize(); + if (!s.ok()) { + *backup_engine_ptr = nullptr; + return s; + } + *backup_engine_ptr = backup_engine.release(); + return IOStatus::OK(); +} + +namespace { +BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options, + Env* db_env, bool read_only) + : initialized_(false), + threads_cpu_priority_(), + latest_backup_id_(0), + latest_valid_backup_id_(0), + stop_backup_(false), + options_(options), + db_env_(db_env), + backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_), + read_only_(read_only) { + if (options_.backup_rate_limiter == nullptr && + options_.backup_rate_limit > 0) { + options_.backup_rate_limiter.reset( + NewGenericRateLimiter(options_.backup_rate_limit)); + } + if (options_.restore_rate_limiter == nullptr && + options_.restore_rate_limit > 0) { + options_.restore_rate_limiter.reset( + NewGenericRateLimiter(options_.restore_rate_limit)); + } + db_fs_ = db_env_->GetFileSystem(); + backup_fs_ = backup_env_->GetFileSystem(); +} + +BackupEngineImpl::~BackupEngineImpl() { + files_to_copy_or_create_.sendEof(); + for (auto& t : threads_) { + t.join(); + } + LogFlush(options_.info_log); + for (const auto& it : corrupt_backups_) { + it.second.first.PermitUncheckedError(); + } +} + +IOStatus BackupEngineImpl::Initialize() { + assert(!initialized_); + initialized_ = true; + if (read_only_) { + ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine"); + } + options_.Dump(options_.info_log); + + auto meta_path = GetAbsolutePath(kMetaDirName); + + if (!read_only_) { + // we might need to clean up from previous crash or I/O errors + might_need_garbage_collect_ = true; + + if (options_.max_valid_backups_to_open != + std::numeric_limits::max()) { + options_.max_valid_backups_to_open = std::numeric_limits::max(); + ROCKS_LOG_WARN( + options_.info_log, + "`max_valid_backups_to_open` is not set to the default value. " + "Ignoring its value since BackupEngine is not read-only."); + } + + // gather the list of directories that we need to create + std::vector*>> + directories; + directories.emplace_back(GetAbsolutePath(), &backup_directory_); + if (options_.share_table_files) { + if (options_.share_files_with_checksum) { + directories.emplace_back( + GetAbsolutePath(GetSharedFileWithChecksumRel()), + &shared_directory_); + } else { + directories.emplace_back(GetAbsolutePath(GetSharedFileRel()), + &shared_directory_); + } + } + directories.emplace_back(GetAbsolutePath(kPrivateDirName), + &private_directory_); + directories.emplace_back(meta_path, &meta_directory_); + // create all the dirs we need + for (const auto& d : directories) { + IOStatus io_s = + backup_fs_->CreateDirIfMissing(d.first, io_options_, nullptr); + if (io_s.ok()) { + io_s = + backup_fs_->NewDirectory(d.first, io_options_, d.second, nullptr); + } + if (!io_s.ok()) { + return io_s; + } + } + } + + std::vector backup_meta_files; + { + IOStatus io_s = backup_fs_->GetChildren(meta_path, io_options_, + &backup_meta_files, nullptr); + if (io_s.IsNotFound()) { + return IOStatus::NotFound(meta_path + " is missing"); + } else if (!io_s.ok()) { + return io_s; + } + } + // create backups_ structure + for (auto& file : backup_meta_files) { + ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str()); + BackupID backup_id = 0; + sscanf(file.c_str(), "%u", &backup_id); + if (backup_id == 0 || file != std::to_string(backup_id)) { + // Invalid file name, will be deleted with auto-GC when user + // initiates an append or write operation. (Behave as read-only until + // then.) + ROCKS_LOG_INFO(options_.info_log, "Skipping unrecognized meta file %s", + file.c_str()); + continue; + } + assert(backups_.find(backup_id) == backups_.end()); + // Insert all the (backup_id, BackupMeta) that will be loaded later + // The loading performed later will check whether there are corrupt backups + // and move the corrupt backups to corrupt_backups_ + backups_.insert(std::make_pair( + backup_id, std::unique_ptr(new BackupMeta( + GetBackupMetaFile(backup_id, false /* tmp */), + GetBackupMetaFile(backup_id, true /* tmp */), + &backuped_file_infos_, backup_env_, backup_fs_)))); + } + + latest_backup_id_ = 0; + latest_valid_backup_id_ = 0; + if (options_.destroy_old_data) { // Destroy old data + assert(!read_only_); + ROCKS_LOG_INFO( + options_.info_log, + "Backup Engine started with destroy_old_data == true, deleting all " + "backups"); + IOStatus io_s = PurgeOldBackups(0); + if (io_s.ok()) { + io_s = GarbageCollect(); + } + if (!io_s.ok()) { + return io_s; + } + } else { // Load data from storage + // abs_path_to_size: maps absolute paths of files in backup directory to + // their corresponding sizes + std::unordered_map abs_path_to_size; + // Insert files and their sizes in backup sub-directories (shared and + // shared_checksum) to abs_path_to_size + for (const auto& rel_dir : + {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) { + const auto abs_dir = GetAbsolutePath(rel_dir); + IOStatus io_s = + ReadChildFileCurrentSizes(abs_dir, backup_fs_, &abs_path_to_size); + if (!io_s.ok()) { + // I/O error likely impacting all backups + return io_s; + } + } + // load the backups if any, until valid_backups_to_open of the latest + // non-corrupted backups have been successfully opened. + int valid_backups_to_open = options_.max_valid_backups_to_open; + for (auto backup_iter = backups_.rbegin(); backup_iter != backups_.rend(); + ++backup_iter) { + assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first); + if (latest_backup_id_ == 0) { + latest_backup_id_ = backup_iter->first; + } + if (valid_backups_to_open == 0) { + break; + } + + // Insert files and their sizes in backup sub-directories + // (private/backup_id) to abs_path_to_size + IOStatus io_s = ReadChildFileCurrentSizes( + GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_fs_, + &abs_path_to_size); + if (io_s.ok()) { + io_s = backup_iter->second->LoadFromFile( + options_.backup_dir, abs_path_to_size, + options_.backup_rate_limiter.get(), options_.info_log, + &reported_ignored_fields_); + } + if (io_s.IsCorruption() || io_s.IsNotSupported()) { + ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s", + backup_iter->first, io_s.ToString().c_str()); + corrupt_backups_.insert(std::make_pair( + backup_iter->first, + std::make_pair(io_s, std::move(backup_iter->second)))); + } else if (!io_s.ok()) { + // Distinguish corruption errors from errors in the backup Env. + // Errors in the backup Env (i.e., this code path) will cause Open() to + // fail, whereas corruption errors would not cause Open() failures. + return io_s; + } else { + ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s", + backup_iter->first, + backup_iter->second->GetInfoString().c_str()); + assert(latest_valid_backup_id_ == 0 || + latest_valid_backup_id_ > backup_iter->first); + if (latest_valid_backup_id_ == 0) { + latest_valid_backup_id_ = backup_iter->first; + } + --valid_backups_to_open; + } + } + + for (const auto& corrupt : corrupt_backups_) { + backups_.erase(backups_.find(corrupt.first)); + } + // erase the backups before max_valid_backups_to_open + int num_unopened_backups; + if (options_.max_valid_backups_to_open == 0) { + num_unopened_backups = 0; + } else { + num_unopened_backups = + std::max(0, static_cast(backups_.size()) - + options_.max_valid_backups_to_open); + } + for (int i = 0; i < num_unopened_backups; ++i) { + assert(backups_.begin()->second->Empty()); + backups_.erase(backups_.begin()); + } + } + + ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_); + ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u", + latest_valid_backup_id_); + + // set up threads perform copies from files_to_copy_or_create_ in the + // background + threads_cpu_priority_ = CpuPriority::kNormal; + threads_.reserve(options_.max_background_operations); + for (int t = 0; t < options_.max_background_operations; t++) { + threads_.emplace_back([this]() { +#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) +#if __GLIBC_PREREQ(2, 12) + pthread_setname_np(pthread_self(), "backup_engine"); +#endif +#endif + CpuPriority current_priority = CpuPriority::kNormal; + CopyOrCreateWorkItem work_item; + uint64_t bytes_toward_next_callback = 0; + while (files_to_copy_or_create_.read(work_item)) { + CpuPriority priority = threads_cpu_priority_; + if (current_priority != priority) { + TEST_SYNC_POINT_CALLBACK( + "BackupEngineImpl::Initialize:SetCpuPriority", &priority); + port::SetCpuPriority(0, priority); + current_priority = priority; + } + // `bytes_read` and `bytes_written` stats are enabled based on + // compile-time support and cannot be dynamically toggled. So we do not + // need to worry about `PerfLevel` here, unlike many other + // `IOStatsContext` / `PerfContext` stats. + uint64_t prev_bytes_read = IOSTATS(bytes_read); + uint64_t prev_bytes_written = IOSTATS(bytes_written); + + CopyOrCreateResult result; + Temperature temp = work_item.src_temperature; + result.io_status = CopyOrCreateFile( + work_item.src_path, work_item.dst_path, work_item.contents, + work_item.size_limit, work_item.src_env, work_item.dst_env, + work_item.src_env_options, work_item.sync, work_item.rate_limiter, + work_item.progress_callback, &temp, work_item.dst_temperature, + &bytes_toward_next_callback, &result.size, &result.checksum_hex); + + RecordTick(work_item.stats, BACKUP_READ_BYTES, + IOSTATS(bytes_read) - prev_bytes_read); + RecordTick(work_item.stats, BACKUP_WRITE_BYTES, + IOSTATS(bytes_written) - prev_bytes_written); + + result.db_id = work_item.db_id; + result.db_session_id = work_item.db_session_id; + result.expected_src_temperature = work_item.src_temperature; + result.current_src_temperature = temp; + if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) { + // unknown checksum function name implies no db table file checksum in + // db manifest; work_item.src_checksum_hex not empty means + // backup engine has calculated its crc32c checksum for the table + // file; therefore, we are able to compare the checksums. + if (work_item.src_checksum_func_name == + kUnknownFileChecksumFuncName || + work_item.src_checksum_func_name == kDbFileChecksumFuncName) { + if (work_item.src_checksum_hex != result.checksum_hex) { + std::string checksum_info( + "Expected checksum is " + work_item.src_checksum_hex + + " while computed checksum is " + result.checksum_hex); + result.io_status = IOStatus::Corruption( + "Checksum mismatch after copying to " + work_item.dst_path + + ": " + checksum_info); + } + } else { + // FIXME(peterd): dead code? + std::string checksum_function_info( + "Existing checksum function is " + + work_item.src_checksum_func_name + + " while provided checksum function is " + + kBackupFileChecksumFuncName); + ROCKS_LOG_INFO( + options_.info_log, + "Unable to verify checksum after copying to %s: %s\n", + work_item.dst_path.c_str(), checksum_function_info.c_str()); + } + } + work_item.result.set_value(std::move(result)); + } + }); + } + ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine"); + return IOStatus::OK(); +} + +IOStatus BackupEngineImpl::CreateNewBackupWithMetadata( + const CreateBackupOptions& options, DB* db, const std::string& app_metadata, + BackupID* new_backup_id_ptr) { + assert(initialized_); + assert(!read_only_); + if (app_metadata.size() > kMaxAppMetaSize) { + return IOStatus::InvalidArgument("App metadata too large"); + } + + if (options.decrease_background_thread_cpu_priority) { + if (options.background_thread_cpu_priority < threads_cpu_priority_) { + threads_cpu_priority_.store(options.background_thread_cpu_priority); + } + } + + BackupID new_backup_id = latest_backup_id_ + 1; + + // `bytes_read` and `bytes_written` stats are enabled based on compile-time + // support and cannot be dynamically toggled. So we do not need to worry about + // `PerfLevel` here, unlike many other `IOStatsContext` / `PerfContext` stats. + uint64_t prev_bytes_read = IOSTATS(bytes_read); + uint64_t prev_bytes_written = IOSTATS(bytes_written); + + assert(backups_.find(new_backup_id) == backups_.end()); + + auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id)); + IOStatus io_s = backup_fs_->FileExists(private_dir, io_options_, nullptr); + if (io_s.ok()) { + // maybe last backup failed and left partial state behind, clean it up. + // need to do this before updating backups_ such that a private dir + // named after new_backup_id will be cleaned up. + // (If an incomplete new backup is followed by an incomplete delete + // of the latest full backup, then there could be more than one next + // id with a private dir, the last thing to be deleted in delete + // backup, but all will be cleaned up with a GarbageCollect.) + io_s = GarbageCollect(); + } else if (io_s.IsNotFound()) { + // normal case, the new backup's private dir doesn't exist yet + io_s = IOStatus::OK(); + } + + auto ret = backups_.insert(std::make_pair( + new_backup_id, std::unique_ptr(new BackupMeta( + GetBackupMetaFile(new_backup_id, false /* tmp */), + GetBackupMetaFile(new_backup_id, true /* tmp */), + &backuped_file_infos_, backup_env_, backup_fs_)))); + assert(ret.second == true); + auto& new_backup = ret.first->second; + new_backup->RecordTimestamp(); + new_backup->SetAppMetadata(app_metadata); + + auto start_backup = backup_env_->NowMicros(); + + ROCKS_LOG_INFO(options_.info_log, + "Started the backup process -- creating backup %u", + new_backup_id); + + if (options_.share_table_files && !options_.share_files_with_checksum) { + ROCKS_LOG_WARN(options_.info_log, + "BackupEngineOptions::share_files_with_checksum=false is " + "DEPRECATED and could lead to data loss."); + } + + if (io_s.ok()) { + io_s = backup_fs_->CreateDir(private_dir, io_options_, nullptr); + } + + // A set into which we will insert the dst_paths that are calculated for live + // files and live WAL files. + // This is used to check whether a live files shares a dst_path with another + // live file. + std::unordered_set live_dst_paths; + + std::vector backup_items_to_finish; + // Add a CopyOrCreateWorkItem to the channel for each live file + Status disabled = db->DisableFileDeletions(); + DBOptions db_options = db->GetDBOptions(); + Statistics* stats = db_options.statistics.get(); + if (io_s.ok()) { + CheckpointImpl checkpoint(db); + uint64_t sequence_number = 0; + FileChecksumGenFactory* db_checksum_factory = + db_options.file_checksum_gen_factory.get(); + const std::string kFileChecksumGenFactoryName = + "FileChecksumGenCrc32cFactory"; + bool compare_checksum = + db_checksum_factory != nullptr && + db_checksum_factory->Name() == kFileChecksumGenFactoryName + ? true + : false; + EnvOptions src_raw_env_options(db_options); + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); + io_s = status_to_io_status(checkpoint.CreateCustomCheckpoint( + [&](const std::string& /*src_dirname*/, const std::string& /*fname*/, + FileType) { + // custom checkpoint will switch to calling copy_file_cb after it sees + // NotSupported returned from link_file_cb. + return IOStatus::NotSupported(); + } /* link_file_cb */, + [&](const std::string& src_dirname, const std::string& fname, + uint64_t size_limit_bytes, FileType type, + const std::string& checksum_func_name, + const std::string& checksum_val, + const Temperature src_temperature) { + if (type == kWalFile && !options_.backup_log_files) { + return IOStatus::OK(); + } + Log(options_.info_log, "add file for backup %s", fname.c_str()); + uint64_t size_bytes = 0; + IOStatus io_st; + if (type == kTableFile || type == kBlobFile) { + io_st = db_fs_->GetFileSize(src_dirname + "/" + fname, io_options_, + &size_bytes, nullptr); + if (!io_st.ok()) { + Log(options_.info_log, "GetFileSize is failed: %s", + io_st.ToString().c_str()); + return io_st; + } + } + EnvOptions src_env_options; + switch (type) { + case kWalFile: + src_env_options = + db_env_->OptimizeForLogRead(src_raw_env_options); + break; + case kTableFile: + src_env_options = db_env_->OptimizeForCompactionTableRead( + src_raw_env_options, ImmutableDBOptions(db_options)); + break; + case kDescriptorFile: + src_env_options = + db_env_->OptimizeForManifestRead(src_raw_env_options); + break; + case kBlobFile: + src_env_options = db_env_->OptimizeForBlobFileRead( + src_raw_env_options, ImmutableDBOptions(db_options)); + break; + default: + // Other backed up files (like options file) are not read by live + // DB, so don't need to worry about avoiding mixing buffered and + // direct I/O. Just use plain defaults. + src_env_options = src_raw_env_options; + break; + } + io_st = AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + options_.share_table_files && + (type == kTableFile || type == kBlobFile), + src_dirname, fname, src_env_options, rate_limiter, type, + size_bytes, db_options.statistics.get(), size_limit_bytes, + options_.share_files_with_checksum && + (type == kTableFile || type == kBlobFile), + options.progress_callback, "" /* contents */, checksum_func_name, + checksum_val, src_temperature); + return io_st; + } /* copy_file_cb */, + [&](const std::string& fname, const std::string& contents, + FileType type) { + Log(options_.info_log, "add file for backup %s", fname.c_str()); + return AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + false /* shared */, "" /* src_dir */, fname, + EnvOptions() /* src_env_options */, rate_limiter, type, + contents.size(), db_options.statistics.get(), 0 /* size_limit */, + false /* shared_checksum */, options.progress_callback, contents); + } /* create_file_cb */, + &sequence_number, + options.flush_before_backup ? 0 : std::numeric_limits::max(), + compare_checksum)); + if (io_s.ok()) { + new_backup->SetSequenceNumber(sequence_number); + } + } + ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish."); + IOStatus item_io_status; + for (auto& item : backup_items_to_finish) { + item.result.wait(); + auto result = item.result.get(); + item_io_status = result.io_status; + Temperature temp = result.expected_src_temperature; + if (result.current_src_temperature != Temperature::kUnknown && + (temp == Temperature::kUnknown || + options_.current_temperatures_override_manifest)) { + temp = result.current_src_temperature; + } + if (item_io_status.ok() && item.shared && item.needed_to_copy) { + item_io_status = item.backup_env->GetFileSystem()->RenameFile( + item.dst_path_tmp, item.dst_path, io_options_, nullptr); + } + if (item_io_status.ok()) { + item_io_status = new_backup.get()->AddFile(std::make_shared( + item.dst_relative, result.size, result.checksum_hex, result.db_id, + result.db_session_id, temp)); + } + if (!item_io_status.ok()) { + io_s = item_io_status; + } + } + + // we copied all the files, enable file deletions + if (disabled.ok()) { // If we successfully disabled file deletions + db->EnableFileDeletions(false).PermitUncheckedError(); + } + auto backup_time = backup_env_->NowMicros() - start_backup; + + if (io_s.ok()) { + // persist the backup metadata on the disk + io_s = new_backup->StoreToFile(options_.sync, options_.schema_version, + schema_test_options_.get()); + } + if (io_s.ok() && options_.sync) { + std::unique_ptr backup_private_directory; + backup_fs_ + ->NewDirectory(GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)), + io_options_, &backup_private_directory, nullptr) + .PermitUncheckedError(); + if (backup_private_directory != nullptr) { + io_s = backup_private_directory->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + if (io_s.ok() && private_directory_ != nullptr) { + io_s = private_directory_->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + if (io_s.ok() && meta_directory_ != nullptr) { + io_s = meta_directory_->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + if (io_s.ok() && shared_directory_ != nullptr) { + io_s = shared_directory_->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + if (io_s.ok() && backup_directory_ != nullptr) { + io_s = backup_directory_->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + } + + if (io_s.ok()) { + backup_statistics_.IncrementNumberSuccessBackup(); + // here we know that we succeeded and installed the new backup + latest_backup_id_ = new_backup_id; + latest_valid_backup_id_ = new_backup_id; + if (new_backup_id_ptr) { + *new_backup_id_ptr = new_backup_id; + } + ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good"); + + // backup_speed is in byte/second + double backup_speed = new_backup->GetSize() / (1.048576 * backup_time); + ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u", + new_backup->GetNumberFiles()); + char human_size[16]; + AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size)); + ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size); + ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds", + backup_time); + ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed); + ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s", + backup_statistics_.ToString().c_str()); + } else { + backup_statistics_.IncrementNumberFailBackup(); + // clean all the files we might have created + ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s", + io_s.ToString().c_str()); + ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n", + backup_statistics_.ToString().c_str()); + // delete files that we might have already written + might_need_garbage_collect_ = true; + DeleteBackup(new_backup_id).PermitUncheckedError(); + } + + RecordTick(stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read); + RecordTick(stats, BACKUP_WRITE_BYTES, + IOSTATS(bytes_written) - prev_bytes_written); + return io_s; +} + +IOStatus BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) { + assert(initialized_); + assert(!read_only_); + + // Best effort deletion even with errors + IOStatus overall_status = IOStatus::OK(); + + ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u", + num_backups_to_keep); + std::vector to_delete; + auto itr = backups_.begin(); + while ((backups_.size() - to_delete.size()) > num_backups_to_keep) { + to_delete.push_back(itr->first); + itr++; + } + for (auto backup_id : to_delete) { + // Do not GC until end + IOStatus io_s = DeleteBackupNoGC(backup_id); + if (!io_s.ok()) { + overall_status = io_s; + } + } + // Clean up after any incomplete backup deletion, potentially from + // earlier session. + if (might_need_garbage_collect_) { + IOStatus io_s = GarbageCollect(); + if (!io_s.ok() && overall_status.ok()) { + overall_status = io_s; + } + } + return overall_status; +} + +IOStatus BackupEngineImpl::DeleteBackup(BackupID backup_id) { + IOStatus s1 = DeleteBackupNoGC(backup_id); + IOStatus s2 = IOStatus::OK(); + + // Clean up after any incomplete backup deletion, potentially from + // earlier session. + if (might_need_garbage_collect_) { + s2 = GarbageCollect(); + } + + if (!s1.ok()) { + // Any failure in the primary objective trumps any failure in the + // secondary objective. + s2.PermitUncheckedError(); + return s1; + } else { + return s2; + } +} + +// Does not auto-GarbageCollect nor lock +IOStatus BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) { + assert(initialized_); + assert(!read_only_); + + ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id); + auto backup = backups_.find(backup_id); + if (backup != backups_.end()) { + IOStatus io_s = backup->second->Delete(); + if (!io_s.ok()) { + return io_s; + } + backups_.erase(backup); + } else { + auto corrupt = corrupt_backups_.find(backup_id); + if (corrupt == corrupt_backups_.end()) { + return IOStatus::NotFound("Backup not found"); + } + IOStatus io_s = corrupt->second.second->Delete(); + if (!io_s.ok()) { + return io_s; + } + corrupt->second.first.PermitUncheckedError(); + corrupt_backups_.erase(corrupt); + } + + // After removing meta file, best effort deletion even with errors. + // (Don't delete other files if we can't delete the meta file right + // now.) + std::vector to_delete; + for (auto& itr : backuped_file_infos_) { + if (itr.second->refs == 0) { + IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(itr.first), + io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), + io_s.ToString().c_str()); + to_delete.push_back(itr.first); + if (!io_s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + } + for (auto& td : to_delete) { + backuped_file_infos_.erase(td); + } + + // take care of private dirs -- GarbageCollect() will take care of them + // if they are not empty + std::string private_dir = GetPrivateFileRel(backup_id); + IOStatus io_s = + backup_fs_->DeleteDir(GetAbsolutePath(private_dir), io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s", + private_dir.c_str(), io_s.ToString().c_str()); + if (!io_s.ok()) { + // Full gc or trying again later might work + might_need_garbage_collect_ = true; + } + return IOStatus::OK(); +} + +void BackupEngineImpl::SetBackupInfoFromBackupMeta( + BackupID id, const BackupMeta& meta, BackupInfo* backup_info, + bool include_file_details) const { + *backup_info = BackupInfo(id, meta.GetTimestamp(), meta.GetSize(), + meta.GetNumberFiles(), meta.GetAppMetadata()); + std::string dir = + options_.backup_dir + "/" + kPrivateDirSlash + std::to_string(id); + if (include_file_details) { + auto& file_details = backup_info->file_details; + file_details.reserve(meta.GetFiles().size()); + for (auto& file_ptr : meta.GetFiles()) { + BackupFileInfo& finfo = *file_details.emplace(file_details.end()); + finfo.relative_filename = file_ptr->filename; + finfo.size = file_ptr->size; + finfo.directory = dir; + uint64_t number; + FileType type; + bool ok = ParseFileName(file_ptr->filename, &number, &type); + if (ok) { + finfo.file_number = number; + finfo.file_type = type; + } + // TODO: temperature, file_checksum, file_checksum_func_name + } + backup_info->name_for_open = GetAbsolutePath(GetPrivateFileRel(id)); + backup_info->name_for_open.pop_back(); // remove trailing '/' + backup_info->env_for_open = meta.GetEnvForOpen(); + } +} + +Status BackupEngineImpl::GetBackupInfo(BackupID backup_id, + BackupInfo* backup_info, + bool include_file_details) const { + assert(initialized_); + if (backup_id == kLatestBackupIDMarker) { + // Note: Read latest_valid_backup_id_ inside of lock + backup_id = latest_valid_backup_id_; + } + auto corrupt_itr = corrupt_backups_.find(backup_id); + if (corrupt_itr != corrupt_backups_.end()) { + return Status::Corruption(corrupt_itr->second.first.ToString()); + } + auto backup_itr = backups_.find(backup_id); + if (backup_itr == backups_.end()) { + return Status::NotFound("Backup not found"); + } + auto& backup = backup_itr->second; + if (backup->Empty()) { + return Status::NotFound("Backup not found"); + } + + SetBackupInfoFromBackupMeta(backup_id, *backup, backup_info, + include_file_details); + return Status::OK(); +} + +void BackupEngineImpl::GetBackupInfo(std::vector* backup_info, + bool include_file_details) const { + assert(initialized_); + backup_info->resize(backups_.size()); + size_t i = 0; + for (auto& backup : backups_) { + const BackupMeta& meta = *backup.second; + if (!meta.Empty()) { + SetBackupInfoFromBackupMeta(backup.first, meta, &backup_info->at(i++), + include_file_details); + } + } +} + +void BackupEngineImpl::GetCorruptedBackups( + std::vector* corrupt_backup_ids) const { + assert(initialized_); + corrupt_backup_ids->reserve(corrupt_backups_.size()); + for (auto& backup : corrupt_backups_) { + corrupt_backup_ids->push_back(backup.first); + } +} + +IOStatus BackupEngineImpl::RestoreDBFromBackup( + const RestoreOptions& options, BackupID backup_id, + const std::string& db_dir, const std::string& wal_dir) const { + assert(initialized_); + if (backup_id == kLatestBackupIDMarker) { + // Note: Read latest_valid_backup_id_ inside of lock + backup_id = latest_valid_backup_id_; + } + auto corrupt_itr = corrupt_backups_.find(backup_id); + if (corrupt_itr != corrupt_backups_.end()) { + return corrupt_itr->second.first; + } + auto backup_itr = backups_.find(backup_id); + if (backup_itr == backups_.end()) { + return IOStatus::NotFound("Backup not found"); + } + auto& backup = backup_itr->second; + if (backup->Empty()) { + return IOStatus::NotFound("Backup not found"); + } + + ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id); + ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n", + static_cast(options.keep_log_files)); + + // just in case. Ignore errors + db_fs_->CreateDirIfMissing(db_dir, io_options_, nullptr) + .PermitUncheckedError(); + db_fs_->CreateDirIfMissing(wal_dir, io_options_, nullptr) + .PermitUncheckedError(); + + if (options.keep_log_files) { + // delete files in db_dir, but keep all the log files + DeleteChildren(db_dir, 1 << kWalFile); + // move all the files from archive dir to wal_dir + std::string archive_dir = ArchivalDirectory(wal_dir); + std::vector archive_files; + db_fs_->GetChildren(archive_dir, io_options_, &archive_files, nullptr) + .PermitUncheckedError(); // ignore errors + for (const auto& f : archive_files) { + uint64_t number; + FileType type; + bool ok = ParseFileName(f, &number, &type); + if (ok && type == kWalFile) { + ROCKS_LOG_INFO(options_.info_log, + "Moving log file from archive/ to wal_dir: %s", + f.c_str()); + IOStatus io_s = db_fs_->RenameFile( + archive_dir + "/" + f, wal_dir + "/" + f, io_options_, nullptr); + if (!io_s.ok()) { + // if we can't move log file from archive_dir to wal_dir, + // we should fail, since it might mean data loss + return io_s; + } + } + } + } else { + DeleteChildren(wal_dir); + DeleteChildren(ArchivalDirectory(wal_dir)); + DeleteChildren(db_dir); + } + + IOStatus io_s; + std::vector restore_items_to_finish; + std::string temporary_current_file; + std::string final_current_file; + std::unique_ptr db_dir_for_fsync; + std::unique_ptr wal_dir_for_fsync; + + for (const auto& file_info : backup->GetFiles()) { + const std::string& file = file_info->filename; + // 1. get DB filename + std::string dst = file_info->GetDbFileName(); + + // 2. find the filetype + uint64_t number; + FileType type; + bool ok = ParseFileName(dst, &number, &type); + if (!ok) { + return IOStatus::Corruption("Backup corrupted: Fail to parse filename " + + dst); + } + // 3. Construct the final path + // kWalFile lives in wal_dir and all the rest live in db_dir + if (type == kWalFile) { + dst = wal_dir + "/" + dst; + if (options_.sync && !wal_dir_for_fsync) { + io_s = db_fs_->NewDirectory(wal_dir, io_options_, &wal_dir_for_fsync, + nullptr); + if (!io_s.ok()) { + return io_s; + } + } + } else { + dst = db_dir + "/" + dst; + if (options_.sync && !db_dir_for_fsync) { + io_s = db_fs_->NewDirectory(db_dir, io_options_, &db_dir_for_fsync, + nullptr); + if (!io_s.ok()) { + return io_s; + } + } + } + // For atomicity, initially restore CURRENT file to a temporary name. + // This is useful even without options_.sync e.g. in case the restore + // process is interrupted. + if (type == kCurrentFile) { + final_current_file = dst; + dst = temporary_current_file = dst + ".tmp"; + } + + ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(), + dst.c_str()); + CopyOrCreateWorkItem copy_or_create_work_item( + GetAbsolutePath(file), dst, Temperature::kUnknown /* src_temp */, + file_info->temp, "" /* contents */, backup_env_, db_env_, + EnvOptions() /* src_env_options */, options_.sync, + options_.restore_rate_limiter.get(), file_info->size, + nullptr /* stats */); + RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( + copy_or_create_work_item.result.get_future(), file, dst, + file_info->checksum_hex); + files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); + restore_items_to_finish.push_back( + std::move(after_copy_or_create_work_item)); + } + IOStatus item_io_status; + for (auto& item : restore_items_to_finish) { + item.result.wait(); + auto result = item.result.get(); + item_io_status = result.io_status; + // Note: It is possible that both of the following bad-status cases occur + // during copying. But, we only return one status. + if (!item_io_status.ok()) { + io_s = item_io_status; + break; + } else if (!item.checksum_hex.empty() && + item.checksum_hex != result.checksum_hex) { + io_s = IOStatus::Corruption( + "While restoring " + item.from_file + " -> " + item.to_file + + ": expected checksum is " + item.checksum_hex + + " while computed checksum is " + result.checksum_hex); + break; + } + } + + // When enabled, the first FsyncWithDirOptions is to ensure all files are + // fully persisted before renaming CURRENT.tmp + if (io_s.ok() && db_dir_for_fsync) { + ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n"); + io_s = db_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + + if (io_s.ok() && wal_dir_for_fsync) { + io_s = wal_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr, + DirFsyncOptions()); + } + + if (io_s.ok() && !temporary_current_file.empty()) { + ROCKS_LOG_INFO(options_.info_log, "Restore: atomic rename CURRENT.tmp\n"); + assert(!final_current_file.empty()); + io_s = db_fs_->RenameFile(temporary_current_file, final_current_file, + io_options_, nullptr); + } + + if (io_s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) { + // Second FsyncWithDirOptions is to ensure the final atomic rename of DB + // restore is fully persisted even if power goes out right after restore + // operation returns success + assert(db_dir_for_fsync); + io_s = db_dir_for_fsync->FsyncWithDirOptions( + io_options_, nullptr, DirFsyncOptions(final_current_file)); + } + + ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n", + io_s.ToString().c_str()); + return io_s; +} + +IOStatus BackupEngineImpl::VerifyBackup(BackupID backup_id, + bool verify_with_checksum) const { + assert(initialized_); + // Check if backup_id is corrupted, or valid and registered + auto corrupt_itr = corrupt_backups_.find(backup_id); + if (corrupt_itr != corrupt_backups_.end()) { + return corrupt_itr->second.first; + } + + auto backup_itr = backups_.find(backup_id); + if (backup_itr == backups_.end()) { + return IOStatus::NotFound(); + } + + auto& backup = backup_itr->second; + if (backup->Empty()) { + return IOStatus::NotFound(); + } + + ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id); + + // Find all existing backup files belong to backup_id + std::unordered_map curr_abs_path_to_size; + for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(), + GetSharedFileWithChecksumRel()}) { + const auto abs_dir = GetAbsolutePath(rel_dir); + // Shared directories allowed to be missing in some cases. Expected but + // missing files will be reported a few lines down. + ReadChildFileCurrentSizes(abs_dir, backup_fs_, &curr_abs_path_to_size) + .PermitUncheckedError(); + } + + // For all files registered in backup + for (const auto& file_info : backup->GetFiles()) { + const auto abs_path = GetAbsolutePath(file_info->filename); + // check existence of the file + if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) { + return IOStatus::NotFound("File missing: " + abs_path); + } + // verify file size + if (file_info->size != curr_abs_path_to_size[abs_path]) { + std::string size_info("Expected file size is " + + std::to_string(file_info->size) + + " while found file size is " + + std::to_string(curr_abs_path_to_size[abs_path])); + return IOStatus::Corruption("File corrupted: File size mismatch for " + + abs_path + ": " + size_info); + } + if (verify_with_checksum && !file_info->checksum_hex.empty()) { + // verify file checksum + std::string checksum_hex; + ROCKS_LOG_INFO(options_.info_log, "Verifying %s checksum...\n", + abs_path.c_str()); + IOStatus io_s = ReadFileAndComputeChecksum( + abs_path, backup_fs_, EnvOptions(), 0 /* size_limit */, &checksum_hex, + Temperature::kUnknown); + if (!io_s.ok()) { + return io_s; + } else if (file_info->checksum_hex != checksum_hex) { + std::string checksum_info( + "Expected checksum is " + file_info->checksum_hex + + " while computed checksum is " + checksum_hex); + return IOStatus::Corruption("File corrupted: Checksum mismatch for " + + abs_path + ": " + checksum_info); + } + } + } + return IOStatus::OK(); +} + +IOStatus BackupEngineImpl::CopyOrCreateFile( + const std::string& src, const std::string& dst, const std::string& contents, + uint64_t size_limit, Env* src_env, Env* dst_env, + const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter, + std::function progress_callback, Temperature* src_temperature, + Temperature dst_temperature, uint64_t* bytes_toward_next_callback, + uint64_t* size, std::string* checksum_hex) { + assert(src.empty() != contents.empty()); + IOStatus io_s; + std::unique_ptr dst_file; + std::unique_ptr src_file; + FileOptions dst_file_options; + dst_file_options.use_mmap_writes = false; + dst_file_options.temperature = dst_temperature; + // TODO:(gzh) maybe use direct reads/writes here if possible + if (size != nullptr) { + *size = 0; + } + uint32_t checksum_value = 0; + + // Check if size limit is set. if not, set it to very big number + if (size_limit == 0) { + size_limit = std::numeric_limits::max(); + } + + io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options, + &dst_file, nullptr); + if (io_s.ok() && !src.empty()) { + auto src_file_options = FileOptions(src_env_options); + src_file_options.temperature = *src_temperature; + io_s = src_env->GetFileSystem()->NewSequentialFile(src, src_file_options, + &src_file, nullptr); + } + if (io_s.IsPathNotFound() && *src_temperature != Temperature::kUnknown) { + // Retry without temperature hint in case the FileSystem is strict with + // non-kUnknown temperature option + io_s = src_env->GetFileSystem()->NewSequentialFile( + src, FileOptions(src_env_options), &src_file, nullptr); + } + if (!io_s.ok()) { + return io_s; + } + + size_t buf_size = + rate_limiter ? static_cast(rate_limiter->GetSingleBurstBytes()) + : kDefaultCopyFileBufferSize; + + std::unique_ptr dest_writer( + new WritableFileWriter(std::move(dst_file), dst, dst_file_options)); + std::unique_ptr src_reader; + std::unique_ptr buf; + if (!src.empty()) { + // Return back current temperature in FileSystem + *src_temperature = src_file->GetTemperature(); + + src_reader.reset(new SequentialFileReader( + std::move(src_file), src, nullptr /* io_tracer */, {}, rate_limiter)); + buf.reset(new char[buf_size]); + } + + Slice data; + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return status_to_io_status(Status::Incomplete("Backup stopped")); + } + if (!src.empty()) { + size_t buffer_to_read = + (buf_size < size_limit) ? buf_size : static_cast(size_limit); + io_s = src_reader->Read(buffer_to_read, &data, buf.get(), + Env::IO_LOW /* rate_limiter_priority */); + *bytes_toward_next_callback += data.size(); + } else { + data = contents; + } + size_limit -= data.size(); + TEST_SYNC_POINT_CALLBACK( + "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup", + (src.length() > 4 && src.rfind(".sst") == src.length() - 4) ? &data + : nullptr); + + if (!io_s.ok()) { + return io_s; + } + + if (size != nullptr) { + *size += data.size(); + } + if (checksum_hex != nullptr) { + checksum_value = crc32c::Extend(checksum_value, data.data(), data.size()); + } + io_s = dest_writer->Append(data); + + if (rate_limiter != nullptr) { + if (!src.empty()) { + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kWrite); + } else { + LoopRateLimitRequestHelper(data.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kWrite); + } + } + while (*bytes_toward_next_callback >= + options_.callback_trigger_interval_size) { + *bytes_toward_next_callback -= options_.callback_trigger_interval_size; + std::lock_guard lock(byte_report_mutex_); + progress_callback(); + } + } while (io_s.ok() && contents.empty() && data.size() > 0 && size_limit > 0); + + // Convert uint32_t checksum to hex checksum + if (checksum_hex != nullptr) { + checksum_hex->assign(ChecksumInt32ToHex(checksum_value)); + } + + if (io_s.ok() && sync) { + io_s = dest_writer->Sync(false); + } + if (io_s.ok()) { + io_s = dest_writer->Close(); + } + return io_s; +} + +// fname will always start with "/" +IOStatus BackupEngineImpl::AddBackupFileWorkItem( + std::unordered_set& live_dst_paths, + std::vector& backup_items_to_finish, + BackupID backup_id, bool shared, const std::string& src_dir, + const std::string& fname, const EnvOptions& src_env_options, + RateLimiter* rate_limiter, FileType file_type, uint64_t size_bytes, + Statistics* stats, uint64_t size_limit, bool shared_checksum, + std::function progress_callback, const std::string& contents, + const std::string& src_checksum_func_name, + const std::string& src_checksum_str, const Temperature src_temperature) { + assert(contents.empty() != src_dir.empty()); + + std::string src_path = src_dir + "/" + fname; + std::string dst_relative; + std::string dst_relative_tmp; + std::string db_id; + std::string db_session_id; + // crc32c checksum in hex. empty == unavailable / unknown + std::string checksum_hex; + + // Whenever a default checksum function name is passed in, we will compares + // the corresponding checksum values after copying. Note that only table and + // blob files may have a known checksum function name passed in. + // + // If no default checksum function name is passed in and db session id is not + // available, we will calculate the checksum *before* copying in two cases + // (we always calcuate checksums when copying or creating for any file types): + // a) share_files_with_checksum is true and file type is table; + // b) share_table_files is true and the file exists already. + // + // Step 0: Check if default checksum function name is passed in + if (kDbFileChecksumFuncName == src_checksum_func_name) { + if (src_checksum_str == kUnknownFileChecksum) { + return status_to_io_status( + Status::Aborted("Unknown checksum value for " + fname)); + } + checksum_hex = ChecksumStrToHex(src_checksum_str); + } + + // Step 1: Prepare the relative path to destination + if (shared && shared_checksum) { + if (GetNamingNoFlags() != BackupEngineOptions::kLegacyCrc32cAndFileSize && + file_type != kBlobFile) { + // Prepare db_session_id to add to the file name + // Ignore the returned status + // In the failed cases, db_id and db_session_id will be empty + GetFileDbIdentities(db_env_, src_env_options, src_path, src_temperature, + rate_limiter, &db_id, &db_session_id) + .PermitUncheckedError(); + } + // Calculate checksum if checksum and db session id are not available. + // If db session id is available, we will not calculate the checksum + // since the session id should suffice to avoid file name collision in + // the shared_checksum directory. + if (checksum_hex.empty() && db_session_id.empty()) { + IOStatus io_s = ReadFileAndComputeChecksum( + src_path, db_fs_, src_env_options, size_limit, &checksum_hex, + src_temperature); + if (!io_s.ok()) { + return io_s; + } + } + if (size_bytes == std::numeric_limits::max()) { + return IOStatus::NotFound("File missing: " + src_path); + } + // dst_relative depends on the following conditions: + // 1) the naming scheme is kUseDbSessionId, + // 2) db_session_id is not empty, + // 3) checksum is available in the DB manifest. + // If 1,2,3) are satisfied, then dst_relative will be of the form: + // shared_checksum/__.sst + // If 1,2) are satisfied, then dst_relative will be of the form: + // shared_checksum/_.sst + // Otherwise, dst_relative is of the form + // shared_checksum/__.sst + // + // For blob files, db_session_id is not supported with the blob file format. + // It uses original/legacy naming scheme. + // dst_relative will be of the form: + // shared_checksum/__.blob + dst_relative = GetSharedFileWithChecksum(fname, checksum_hex, size_bytes, + db_session_id); + dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true); + dst_relative = GetSharedFileWithChecksumRel(dst_relative, false); + } else if (shared) { + dst_relative_tmp = GetSharedFileRel(fname, true); + dst_relative = GetSharedFileRel(fname, false); + } else { + dst_relative = GetPrivateFileRel(backup_id, false, fname); + } + + // We copy into `temp_dest_path` and, once finished, rename it to + // `final_dest_path`. This allows files to atomically appear at + // `final_dest_path`. We can copy directly to the final path when atomicity + // is unnecessary, like for files in private backup directories. + const std::string* copy_dest_path; + std::string temp_dest_path; + std::string final_dest_path = GetAbsolutePath(dst_relative); + if (!dst_relative_tmp.empty()) { + temp_dest_path = GetAbsolutePath(dst_relative_tmp); + copy_dest_path = &temp_dest_path; + } else { + copy_dest_path = &final_dest_path; + } + + // Step 2: Determine whether to copy or not + // if it's shared, we also need to check if it exists -- if it does, no need + // to copy it again. + bool need_to_copy = true; + // true if final_dest_path is the same path as another live file + const bool same_path = + live_dst_paths.find(final_dest_path) != live_dst_paths.end(); + + bool file_exists = false; + if (shared && !same_path) { + // Should be in shared directory but not a live path, check existence in + // shared directory + IOStatus exist = + backup_fs_->FileExists(final_dest_path, io_options_, nullptr); + if (exist.ok()) { + file_exists = true; + } else if (exist.IsNotFound()) { + file_exists = false; + } else { + return exist; + } + } + + if (!contents.empty()) { + need_to_copy = false; + } else if (shared && (same_path || file_exists)) { + need_to_copy = false; + auto find_result = backuped_file_infos_.find(dst_relative); + if (find_result == backuped_file_infos_.end() && !same_path) { + // file exists but not referenced + ROCKS_LOG_INFO( + options_.info_log, + "%s already present, but not referenced by any backup. We will " + "overwrite the file.", + fname.c_str()); + need_to_copy = true; + // Defer any failure reporting to when we try to write the file + backup_fs_->DeleteFile(final_dest_path, io_options_, nullptr) + .PermitUncheckedError(); + } else { + // file exists and referenced + if (checksum_hex.empty()) { + // same_path should not happen for a standard DB, so OK to + // read file contents to check for checksum mismatch between + // two files from same DB getting same name. + // For compatibility with future meta file that might not have + // crc32c checksum available, consider it might be empty, but + // we don't currently generate meta file without crc32c checksum. + // Therefore we have to read & compute it if we don't have it. + if (!same_path && !find_result->second->checksum_hex.empty()) { + assert(find_result != backuped_file_infos_.end()); + // Note: to save I/O on incremental backups, we copy prior known + // checksum of the file instead of reading entire file contents + // to recompute it. + checksum_hex = find_result->second->checksum_hex; + // Regarding corruption detection, consider: + // (a) the DB file is corrupt (since previous backup) and the backup + // file is OK: we failed to detect, but the backup is safe. DB can + // be repaired/restored once its corruption is detected. + // (b) the backup file is corrupt (since previous backup) and the + // db file is OK: we failed to detect, but the backup is corrupt. + // CreateNewBackup should support fast incremental backups and + // there's no way to support that without reading all the files. + // We might add an option for extra checks on incremental backup, + // but until then, use VerifyBackups to check existing backup data. + // (c) file name collision with legitimately different content. + // This is almost inconceivable with a well-generated DB session + // ID, but even in that case, we double check the file sizes in + // BackupMeta::AddFile. + } else { + IOStatus io_s = ReadFileAndComputeChecksum( + src_path, db_fs_, src_env_options, size_limit, &checksum_hex, + src_temperature); + if (!io_s.ok()) { + return io_s; + } + } + } + if (!db_session_id.empty()) { + ROCKS_LOG_INFO(options_.info_log, + "%s already present, with checksum %s, size %" PRIu64 + " and DB session identity %s", + fname.c_str(), checksum_hex.c_str(), size_bytes, + db_session_id.c_str()); + } else { + ROCKS_LOG_INFO(options_.info_log, + "%s already present, with checksum %s and size %" PRIu64, + fname.c_str(), checksum_hex.c_str(), size_bytes); + } + } + } + live_dst_paths.insert(final_dest_path); + + // Step 3: Add work item + if (!contents.empty() || need_to_copy) { + ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(), + copy_dest_path->c_str()); + CopyOrCreateWorkItem copy_or_create_work_item( + src_dir.empty() ? "" : src_path, *copy_dest_path, src_temperature, + Temperature::kUnknown /*dst_temp*/, contents, db_env_, backup_env_, + src_env_options, options_.sync, rate_limiter, size_limit, stats, + progress_callback, src_checksum_func_name, checksum_hex, db_id, + db_session_id); + BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( + copy_or_create_work_item.result.get_future(), shared, need_to_copy, + backup_env_, temp_dest_path, final_dest_path, dst_relative); + files_to_copy_or_create_.write(std::move(copy_or_create_work_item)); + backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item)); + } else { + std::promise promise_result; + BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( + promise_result.get_future(), shared, need_to_copy, backup_env_, + temp_dest_path, final_dest_path, dst_relative); + backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item)); + CopyOrCreateResult result; + result.io_status = IOStatus::OK(); + result.size = size_bytes; + result.checksum_hex = std::move(checksum_hex); + result.db_id = std::move(db_id); + result.db_session_id = std::move(db_session_id); + promise_result.set_value(std::move(result)); + } + return IOStatus::OK(); +} + +IOStatus BackupEngineImpl::ReadFileAndComputeChecksum( + const std::string& src, const std::shared_ptr& src_fs, + const EnvOptions& src_env_options, uint64_t size_limit, + std::string* checksum_hex, const Temperature src_temperature) const { + if (checksum_hex == nullptr) { + return status_to_io_status(Status::Aborted("Checksum pointer is null")); + } + uint32_t checksum_value = 0; + if (size_limit == 0) { + size_limit = std::numeric_limits::max(); + } + + std::unique_ptr src_reader; + auto file_options = FileOptions(src_env_options); + file_options.temperature = src_temperature; + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); + IOStatus io_s = SequentialFileReader::Create( + src_fs, src, file_options, &src_reader, nullptr /* dbg */, rate_limiter); + if (io_s.IsPathNotFound() && src_temperature != Temperature::kUnknown) { + // Retry without temperature hint in case the FileSystem is strict with + // non-kUnknown temperature option + file_options.temperature = Temperature::kUnknown; + io_s = SequentialFileReader::Create(src_fs, src, file_options, &src_reader, + nullptr /* dbg */, rate_limiter); + } + if (!io_s.ok()) { + return io_s; + } + + size_t buf_size = kDefaultCopyFileBufferSize; + std::unique_ptr buf(new char[buf_size]); + Slice data; + + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return status_to_io_status(Status::Incomplete("Backup stopped")); + } + size_t buffer_to_read = + (buf_size < size_limit) ? buf_size : static_cast(size_limit); + io_s = src_reader->Read(buffer_to_read, &data, buf.get(), + Env::IO_LOW /* rate_limiter_priority */); + if (!io_s.ok()) { + return io_s; + } + + size_limit -= data.size(); + checksum_value = crc32c::Extend(checksum_value, data.data(), data.size()); + } while (data.size() > 0 && size_limit > 0); + + checksum_hex->assign(ChecksumInt32ToHex(checksum_value)); + + return io_s; +} + +Status BackupEngineImpl::GetFileDbIdentities( + Env* src_env, const EnvOptions& src_env_options, + const std::string& file_path, Temperature file_temp, + RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id) { + assert(db_id != nullptr || db_session_id != nullptr); + + Options options; + options.env = src_env; + SstFileDumper sst_reader(options, file_path, file_temp, + 2 * 1024 * 1024 + /* readahead_size */, + false /* verify_checksum */, false /* output_hex */, + false /* decode_blob_index */, src_env_options, + true /* silent */); + + const TableProperties* table_properties = nullptr; + std::shared_ptr tp; + Status s = sst_reader.getStatus(); + + if (s.ok()) { + // Try to get table properties from the table reader of sst_reader + if (!sst_reader.ReadTableProperties(&tp).ok()) { + // Try to use table properites from the initialization of sst_reader + table_properties = sst_reader.GetInitTableProperties(); + } else { + table_properties = tp.get(); + if (table_properties != nullptr && rate_limiter != nullptr) { + // sizeof(*table_properties) is a sufficent but far-from-exact + // approximation of read bytes due to metaindex block, std::string + // properties and varint compression + LoopRateLimitRequestHelper(sizeof(*table_properties), rate_limiter, + Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); + } + } + } else { + ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s", + file_path.c_str(), s.ToString().c_str()); + return s; + } + + if (table_properties != nullptr) { + if (db_id != nullptr) { + db_id->assign(table_properties->db_id); + } + if (db_session_id != nullptr) { + db_session_id->assign(table_properties->db_session_id); + if (db_session_id->empty()) { + s = Status::NotFound("DB session identity not found in " + file_path); + ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str()); + return s; + } + } + return Status::OK(); + } else { + s = Status::Corruption("Table properties missing in " + file_path); + ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str()); + return s; + } +} + +void BackupEngineImpl::LoopRateLimitRequestHelper( + const size_t total_bytes_to_request, RateLimiter* rate_limiter, + const Env::IOPriority pri, Statistics* stats, + const RateLimiter::OpType op_type) { + assert(rate_limiter != nullptr); + size_t remaining_bytes = total_bytes_to_request; + size_t request_bytes = 0; + while (remaining_bytes > 0) { + request_bytes = + std::min(static_cast(rate_limiter->GetSingleBurstBytes()), + remaining_bytes); + rate_limiter->Request(request_bytes, pri, stats, op_type); + remaining_bytes -= request_bytes; + } +} + +void BackupEngineImpl::DeleteChildren(const std::string& dir, + uint32_t file_type_filter) const { + std::vector children; + db_fs_->GetChildren(dir, io_options_, &children, nullptr) + .PermitUncheckedError(); // ignore errors + + for (const auto& f : children) { + uint64_t number; + FileType type; + bool ok = ParseFileName(f, &number, &type); + if (ok && (file_type_filter & (1 << type))) { + // don't delete this file + continue; + } + db_fs_->DeleteFile(dir + "/" + f, io_options_, nullptr) + .PermitUncheckedError(); // ignore errors + } +} + +IOStatus BackupEngineImpl::ReadChildFileCurrentSizes( + const std::string& dir, const std::shared_ptr& fs, + std::unordered_map* result) const { + assert(result != nullptr); + std::vector files_attrs; + IOStatus io_status = fs->FileExists(dir, io_options_, nullptr); + if (io_status.ok()) { + io_status = + fs->GetChildrenFileAttributes(dir, io_options_, &files_attrs, nullptr); + } else if (io_status.IsNotFound()) { + // Insert no entries can be considered success + io_status = IOStatus::OK(); + } + const bool slash_needed = dir.empty() || dir.back() != '/'; + for (const auto& file_attrs : files_attrs) { + result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name, + file_attrs.size_bytes); + } + return io_status; +} + +IOStatus BackupEngineImpl::GarbageCollect() { + assert(!read_only_); + + // We will make a best effort to remove all garbage even in the presence + // of inconsistencies or I/O failures that inhibit finding garbage. + IOStatus overall_status = IOStatus::OK(); + // If all goes well, we don't need another auto-GC this session + might_need_garbage_collect_ = false; + + ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection"); + + // delete obsolete shared files + for (bool with_checksum : {false, true}) { + std::vector shared_children; + { + std::string shared_path; + if (with_checksum) { + shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel()); + } else { + shared_path = GetAbsolutePath(GetSharedFileRel()); + } + IOStatus io_s = backup_fs_->FileExists(shared_path, io_options_, nullptr); + if (io_s.ok()) { + io_s = backup_fs_->GetChildren(shared_path, io_options_, + &shared_children, nullptr); + } else if (io_s.IsNotFound()) { + io_s = IOStatus::OK(); + } + if (!io_s.ok()) { + overall_status = io_s; + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + for (auto& child : shared_children) { + std::string rel_fname; + if (with_checksum) { + rel_fname = GetSharedFileWithChecksumRel(child); + } else { + rel_fname = GetSharedFileRel(child); + } + auto child_itr = backuped_file_infos_.find(rel_fname); + // if it's not refcounted, delete it + if (child_itr == backuped_file_infos_.end() || + child_itr->second->refs == 0) { + // this might be a directory, but DeleteFile will just fail in that + // case, so we're good + IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(rel_fname), + io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", + rel_fname.c_str(), io_s.ToString().c_str()); + backuped_file_infos_.erase(rel_fname); + if (!io_s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + } + } + + // delete obsolete private files + std::vector private_children; + { + IOStatus io_s = + backup_fs_->GetChildren(GetAbsolutePath(kPrivateDirName), io_options_, + &private_children, nullptr); + if (!io_s.ok()) { + overall_status = io_s; + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + for (auto& child : private_children) { + BackupID backup_id = 0; + bool tmp_dir = child.find(".tmp") != std::string::npos; + sscanf(child.c_str(), "%u", &backup_id); + if (!tmp_dir && // if it's tmp_dir, delete it + (backup_id == 0 || backups_.find(backup_id) != backups_.end())) { + // it's either not a number or it's still alive. continue + continue; + } + // here we have to delete the dir and all its children + std::string full_private_path = + GetAbsolutePath(GetPrivateFileRel(backup_id)); + std::vector subchildren; + if (backup_fs_ + ->GetChildren(full_private_path, io_options_, &subchildren, nullptr) + .ok()) { + for (auto& subchild : subchildren) { + IOStatus io_s = backup_fs_->DeleteFile(full_private_path + subchild, + io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", + (full_private_path + subchild).c_str(), + io_s.ToString().c_str()); + if (!io_s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + } + // finally delete the private dir + IOStatus io_s = + backup_fs_->DeleteDir(full_private_path, io_options_, nullptr); + ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s", + full_private_path.c_str(), io_s.ToString().c_str()); + if (!io_s.ok()) { + // Trying again later might work + might_need_garbage_collect_ = true; + } + } + + assert(overall_status.ok() || might_need_garbage_collect_); + return overall_status; +} + +// ------- BackupMeta class -------- + +IOStatus BackupEngineImpl::BackupMeta::AddFile( + std::shared_ptr file_info) { + auto itr = file_infos_->find(file_info->filename); + if (itr == file_infos_->end()) { + auto ret = file_infos_->insert({file_info->filename, file_info}); + if (ret.second) { + itr = ret.first; + itr->second->refs = 1; + } else { + // if this happens, something is seriously wrong + return IOStatus::Corruption("In memory metadata insertion error"); + } + } else { + // Compare sizes, because we scanned that off the filesystem on both + // ends. This is like a check in VerifyBackup. + if (itr->second->size != file_info->size) { + std::string msg = "Size mismatch for existing backup file: "; + msg.append(file_info->filename); + msg.append(" Size in backup is " + std::to_string(itr->second->size) + + " while size in DB is " + std::to_string(file_info->size)); + msg.append( + " If this DB file checks as not corrupt, try deleting old" + " backups or backing up to a different backup directory."); + return IOStatus::Corruption(msg); + } + if (file_info->checksum_hex.empty()) { + // No checksum available to check + } else if (itr->second->checksum_hex.empty()) { + // Remember checksum if newly acquired + itr->second->checksum_hex = file_info->checksum_hex; + } else if (itr->second->checksum_hex != file_info->checksum_hex) { + // Note: to save I/O, these will be equal trivially on already backed + // up files that don't have the checksum in their name. And it should + // never fail for files that do have checksum in their name. + + // Should never reach here, but produce an appropriate corruption + // message in case we do in a release build. + assert(false); + std::string msg = "Checksum mismatch for existing backup file: "; + msg.append(file_info->filename); + msg.append(" Expected checksum is " + itr->second->checksum_hex + + " while computed checksum is " + file_info->checksum_hex); + msg.append( + " If this DB file checks as not corrupt, try deleting old" + " backups or backing up to a different backup directory."); + return IOStatus::Corruption(msg); + } + ++itr->second->refs; // increase refcount if already present + } + + size_ += file_info->size; + files_.push_back(itr->second); + + return IOStatus::OK(); +} + +IOStatus BackupEngineImpl::BackupMeta::Delete(bool delete_meta) { + IOStatus io_s; + for (const auto& file : files_) { + --file->refs; // decrease refcount + } + files_.clear(); + // delete meta file + if (delete_meta) { + io_s = fs_->FileExists(meta_filename_, iooptions_, nullptr); + if (io_s.ok()) { + io_s = fs_->DeleteFile(meta_filename_, iooptions_, nullptr); + } else if (io_s.IsNotFound()) { + io_s = IOStatus::OK(); // nothing to delete + } + } + timestamp_ = 0; + return io_s; +} + +// Constants for backup meta file schema (see LoadFromFile) +const std::string kSchemaVersionPrefix{"schema_version "}; +const std::string kFooterMarker{"// FOOTER"}; + +const std::string kAppMetaDataFieldName{"metadata"}; + +// WART: The checksums are crc32c but named "crc32" +const std::string kFileCrc32cFieldName{"crc32"}; +const std::string kFileSizeFieldName{"size"}; +const std::string kTemperatureFieldName{"temp"}; + +// Marks a (future) field that should cause failure if not recognized. +// Other fields are assumed to be ignorable. For example, in the future +// we might add +// ni::file_name_escape uri_percent +// to indicate all file names have had spaces and special characters +// escaped using a URI percent encoding. +const std::string kNonIgnorableFieldPrefix{"ni::"}; + +// Each backup meta file is of the format (schema version 1): +//---------------------------------------------------------- +// +// +// metadata (optional) +// +// crc32 +// crc32 +// ... +//---------------------------------------------------------- +// +// For schema version 2.x (not in public APIs, but +// forward-compatibility started): +//---------------------------------------------------------- +// schema_version +// +// +// [ ] +// ... +// +// ( )* +// ( )* +// ... +// [// FOOTER] +// [ ] +// ... +//---------------------------------------------------------- +// where +// ::= [0-9]+([.][0-9]+) +// ::= [A-Za-z_][A-Za-z_0-9.]+ +// is anything but newline +// is anything but space and newline +// Although "// FOOTER" wouldn't strictly be required as a delimiter +// given the number of files is included, it is there for parsing +// sanity in case of corruption. It is only required if followed +// by footer fields, such as a checksum of the meta file (so far). +// Unrecognized fields are ignored, to support schema evolution on +// non-critical features with forward compatibility. Update schema +// major version for breaking changes. Schema minor versions are indicated +// only for diagnostic/debugging purposes. +// +// Fields in schema version 2.0: +// * Top-level meta fields: +// * Only "metadata" as in schema version 1 +// * File meta fields: +// * "crc32" - a crc32c checksum as in schema version 1 +// * "size" - the size of the file (new) +// * Footer meta fields: +// * None yet (future use for meta file checksum anticipated) +// +IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( + const std::string& backup_dir, + const std::unordered_map& abs_path_to_size, + RateLimiter* rate_limiter, Logger* info_log, + std::unordered_set* reported_ignored_fields) { + assert(reported_ignored_fields); + assert(Empty()); + + std::unique_ptr backup_meta_reader; + { + IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(), + &backup_meta_reader, + nullptr /* dbg */, rate_limiter); + if (!io_s.ok()) { + return io_s; + } + } + + // If we don't read an explicit schema_version, that implies version 1, + // which is what we call the original backup meta schema. + int schema_major_version = 1; + + // Failures handled at the end + std::string line; + if (backup_meta_reader->ReadLine(&line, + Env::IO_LOW /* rate_limiter_priority */)) { + if (StartsWith(line, kSchemaVersionPrefix)) { + std::string ver = line.substr(kSchemaVersionPrefix.size()); + if (ver == "2" || StartsWith(ver, "2.")) { + schema_major_version = 2; + } else { + return IOStatus::NotSupported( + "Unsupported/unrecognized schema version: " + ver); + } + line.clear(); + } else if (line.empty()) { + return IOStatus::Corruption("Unexpected empty line"); + } + } + if (!line.empty()) { + timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); + } else if (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { + timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); + } + if (backup_meta_reader->ReadLine(&line, + Env::IO_LOW /* rate_limiter_priority */)) { + sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); + } + uint32_t num_files = UINT32_MAX; + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { + if (line.empty()) { + return IOStatus::Corruption("Unexpected empty line"); + } + // Number -> number of files -> exit loop reading optional meta fields + if (line[0] >= '0' && line[0] <= '9') { + num_files = static_cast(strtoul(line.c_str(), nullptr, 10)); + break; + } + // else, must be a meta field assignment + auto space_pos = line.find_first_of(' '); + if (space_pos == std::string::npos) { + return IOStatus::Corruption("Expected number of files or meta field"); + } + std::string field_name = line.substr(0, space_pos); + std::string field_data = line.substr(space_pos + 1); + if (field_name == kAppMetaDataFieldName) { + // app metadata present + bool decode_success = Slice(field_data).DecodeHex(&app_metadata_); + if (!decode_success) { + return IOStatus::Corruption( + "Failed to decode stored hex encoded app metadata"); + } + } else if (schema_major_version < 2) { + return IOStatus::Corruption("Expected number of files or \"" + + kAppMetaDataFieldName + "\" field"); + } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) { + return IOStatus::NotSupported("Unrecognized non-ignorable meta field " + + field_name + " (from future version?)"); + } else { + // Warn the first time we see any particular unrecognized meta field + if (reported_ignored_fields->insert("meta:" + field_name).second) { + ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup meta field %s", + field_name.c_str()); + } + } + } + std::vector> files; + bool footer_present = false; + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { + std::vector components = StringSplit(line, ' '); + + if (components.size() < 1) { + return IOStatus::Corruption("Empty line instead of file entry."); + } + if (schema_major_version >= 2 && components.size() == 2 && + line == kFooterMarker) { + footer_present = true; + break; + } + + const std::string& filename = components[0]; + + uint64_t actual_size; + const std::shared_ptr file_info = GetFile(filename); + if (file_info) { + actual_size = file_info->size; + } else { + std::string abs_path = backup_dir + "/" + filename; + auto e = abs_path_to_size.find(abs_path); + if (e == abs_path_to_size.end()) { + return IOStatus::Corruption( + "Pathname in meta file not found on disk: " + abs_path); + } + actual_size = e->second; + } + + if (schema_major_version >= 2) { + if (components.size() % 2 != 1) { + return IOStatus::Corruption( + "Bad number of line components for file entry."); + } + } else { + // Check restricted original schema + if (components.size() < 3) { + return IOStatus::Corruption("File checksum is missing for " + filename + + " in " + meta_filename_); + } + if (components[1] != kFileCrc32cFieldName) { + return IOStatus::Corruption("Unknown checksum type for " + filename + + " in " + meta_filename_); + } + if (components.size() > 3) { + return IOStatus::Corruption("Extra data for entry " + filename + + " in " + meta_filename_); + } + } + + std::string checksum_hex; + Temperature temp = Temperature::kUnknown; + for (unsigned i = 1; i < components.size(); i += 2) { + const std::string& field_name = components[i]; + const std::string& field_data = components[i + 1]; + + if (field_name == kFileCrc32cFieldName) { + uint32_t checksum_value = + static_cast(strtoul(field_data.c_str(), nullptr, 10)); + if (field_data != std::to_string(checksum_value)) { + return IOStatus::Corruption("Invalid checksum value for " + filename + + " in " + meta_filename_); + } + checksum_hex = ChecksumInt32ToHex(checksum_value); + } else if (field_name == kFileSizeFieldName) { + uint64_t ex_size = + std::strtoull(field_data.c_str(), nullptr, /*base*/ 10); + if (ex_size != actual_size) { + return IOStatus::Corruption( + "For file " + filename + " expected size " + + std::to_string(ex_size) + " but found size" + + std::to_string(actual_size)); + } + } else if (field_name == kTemperatureFieldName) { + auto iter = temperature_string_map.find(field_data); + if (iter != temperature_string_map.end()) { + temp = iter->second; + } else { + // Could report corruption, but in case of new temperatures added + // in future, letting those map to kUnknown which should generally + // be safe. + temp = Temperature::kUnknown; + } + } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) { + return IOStatus::NotSupported("Unrecognized non-ignorable file field " + + field_name + " (from future version?)"); + } else { + // Warn the first time we see any particular unrecognized file field + if (reported_ignored_fields->insert("file:" + field_name).second) { + ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup file field %s", + field_name.c_str()); + } + } + } + + files.emplace_back(new FileInfo(filename, actual_size, checksum_hex, + /*id*/ "", /*sid*/ "", temp)); + } + + if (footer_present) { + assert(schema_major_version >= 2); + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { + if (line.empty()) { + return IOStatus::Corruption("Unexpected empty line"); + } + auto space_pos = line.find_first_of(' '); + if (space_pos == std::string::npos) { + return IOStatus::Corruption("Expected footer field"); + } + std::string field_name = line.substr(0, space_pos); + std::string field_data = line.substr(space_pos + 1); + if (StartsWith(field_name, kNonIgnorableFieldPrefix)) { + return IOStatus::NotSupported("Unrecognized non-ignorable field " + + field_name + " (from future version?)"); + } else if (reported_ignored_fields->insert("footer:" + field_name) + .second) { + // Warn the first time we see any particular unrecognized footer field + ROCKS_LOG_WARN(info_log, + "Ignoring unrecognized backup meta footer field %s", + field_name.c_str()); + } + } + } + + { + IOStatus io_s = backup_meta_reader->GetStatus(); + if (!io_s.ok()) { + return io_s; + } + } + + if (num_files != files.size()) { + return IOStatus::Corruption( + "Inconsistent number of files or missing/incomplete header in " + + meta_filename_); + } + + files_.reserve(files.size()); + for (const auto& file_info : files) { + IOStatus io_s = AddFile(file_info); + if (!io_s.ok()) { + return io_s; + } + } + + return IOStatus::OK(); +} + +const std::vector minor_version_strings{ + "", // invalid major version 0 + "", // implicit major version 1 + "2.0", +}; + +IOStatus BackupEngineImpl::BackupMeta::StoreToFile( + bool sync, int schema_version, + const TEST_BackupMetaSchemaOptions* schema_test_options) { + if (schema_version < 1) { + return IOStatus::InvalidArgument( + "BackupEngineOptions::schema_version must be >= 1"); + } + if (schema_version > static_cast(minor_version_strings.size() - 1)) { + return IOStatus::NotSupported( + "Only BackupEngineOptions::schema_version <= " + + std::to_string(minor_version_strings.size() - 1) + " is supported"); + } + std::string ver = minor_version_strings[schema_version]; + + // Need schema_version >= 2 for TEST_BackupMetaSchemaOptions + assert(schema_version >= 2 || schema_test_options == nullptr); + + IOStatus io_s; + std::unique_ptr backup_meta_file; + FileOptions file_options; + file_options.use_mmap_writes = false; + file_options.use_direct_writes = false; + io_s = fs_->NewWritableFile(meta_tmp_filename_, file_options, + &backup_meta_file, nullptr); + if (!io_s.ok()) { + return io_s; + } + + std::ostringstream buf; + if (schema_test_options) { + // override for testing + ver = schema_test_options->version; + } + if (!ver.empty()) { + assert(schema_version >= 2); + buf << kSchemaVersionPrefix << ver << "\n"; + } + buf << static_cast(timestamp_) << "\n"; + buf << sequence_number_ << "\n"; + + if (!app_metadata_.empty()) { + std::string hex_encoded_metadata = + Slice(app_metadata_).ToString(/* hex */ true); + buf << kAppMetaDataFieldName << " " << hex_encoded_metadata << "\n"; + } + if (schema_test_options) { + for (auto& e : schema_test_options->meta_fields) { + buf << e.first << " " << e.second << "\n"; + } + } + buf << files_.size() << "\n"; + + for (const auto& file : files_) { + buf << file->filename; + if (schema_test_options == nullptr || + schema_test_options->crc32c_checksums) { + // use crc32c for now, switch to something else if needed + buf << " " << kFileCrc32cFieldName << " " + << ChecksumHexToInt32(file->checksum_hex); + } + if (schema_version >= 2 && file->temp != Temperature::kUnknown) { + buf << " " << kTemperatureFieldName << " " + << temperature_to_string[file->temp]; + } + if (schema_test_options && schema_test_options->file_sizes) { + buf << " " << kFileSizeFieldName << " " << std::to_string(file->size); + } + if (schema_test_options) { + for (auto& e : schema_test_options->file_fields) { + buf << " " << e.first << " " << e.second; + } + } + buf << "\n"; + } + + if (schema_test_options && !schema_test_options->footer_fields.empty()) { + buf << kFooterMarker << "\n"; + for (auto& e : schema_test_options->footer_fields) { + buf << e.first << " " << e.second << "\n"; + } + } + + io_s = backup_meta_file->Append(Slice(buf.str()), iooptions_, nullptr); + IOSTATS_ADD(bytes_written, buf.str().size()); + if (io_s.ok() && sync) { + io_s = backup_meta_file->Sync(iooptions_, nullptr); + } + if (io_s.ok()) { + io_s = backup_meta_file->Close(iooptions_, nullptr); + } + if (io_s.ok()) { + io_s = fs_->RenameFile(meta_tmp_filename_, meta_filename_, iooptions_, + nullptr); + } + return io_s; +} +} // namespace + +IOStatus BackupEngineReadOnly::Open(const BackupEngineOptions& options, + Env* env, + BackupEngineReadOnly** backup_engine_ptr) { + if (options.destroy_old_data) { + return IOStatus::InvalidArgument( + "Can't destroy old data with ReadOnly BackupEngine"); + } + std::unique_ptr backup_engine( + new BackupEngineImplThreadSafe(options, env, true /*read_only*/)); + auto s = backup_engine->Initialize(); + if (!s.ok()) { + *backup_engine_ptr = nullptr; + return s; + } + *backup_engine_ptr = backup_engine.release(); + return IOStatus::OK(); +} + +void TEST_SetBackupMetaSchemaOptions( + BackupEngine* engine, const TEST_BackupMetaSchemaOptions& options) { + BackupEngineImplThreadSafe* impl = + static_cast_with_check(engine); + impl->TEST_SetBackupMetaSchemaOptions(options); +} + +void TEST_SetDefaultRateLimitersClock( + BackupEngine* engine, + const std::shared_ptr& backup_rate_limiter_clock, + const std::shared_ptr& restore_rate_limiter_clock) { + BackupEngineImplThreadSafe* impl = + static_cast_with_check(engine); + impl->TEST_SetDefaultRateLimitersClock(backup_rate_limiter_clock, + restore_rate_limiter_clock); +} +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE -- cgit v1.2.3