summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/backup
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/utilities/backup')
-rw-r--r--src/rocksdb/utilities/backup/backup_engine.cc3181
-rw-r--r--src/rocksdb/utilities/backup/backup_engine_impl.h36
-rw-r--r--src/rocksdb/utilities/backup/backup_engine_test.cc4219
3 files changed, 7436 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/backup/backup_engine.cc b/src/rocksdb/utilities/backup/backup_engine.cc
new file mode 100644
index 000000000..81b4a6629
--- /dev/null
+++ b/src/rocksdb/utilities/backup/backup_engine.cc
@@ -0,0 +1,3181 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef ROCKSDB_LITE
+
+#include <algorithm>
+#include <atomic>
+#include <cinttypes>
+#include <cstdlib>
+#include <functional>
+#include <future>
+#include <limits>
+#include <map>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "env/composite_env_wrapper.h"
+#include "env/fs_readonly.h"
+#include "env/fs_remap.h"
+#include "file/filename.h"
+#include "file/line_file_reader.h"
+#include "file/sequence_file_reader.h"
+#include "file/writable_file_writer.h"
+#include "logging/logging.h"
+#include "monitoring/iostats_context_imp.h"
+#include "options/options_helper.h"
+#include "port/port.h"
+#include "rocksdb/advanced_options.h"
+#include "rocksdb/env.h"
+#include "rocksdb/rate_limiter.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/transaction_log.h"
+#include "table/sst_file_dumper.h"
+#include "test_util/sync_point.h"
+#include "util/cast_util.h"
+#include "util/channel.h"
+#include "util/coding.h"
+#include "util/crc32c.h"
+#include "util/math.h"
+#include "util/rate_limiter.h"
+#include "util/string_util.h"
+#include "utilities/backup/backup_engine_impl.h"
+#include "utilities/checkpoint/checkpoint_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+using ShareFilesNaming = BackupEngineOptions::ShareFilesNaming;
+
+constexpr BackupID kLatestBackupIDMarker = static_cast<BackupID>(-2);
+
+inline uint32_t ChecksumHexToInt32(const std::string& checksum_hex) {
+ std::string checksum_str;
+ Slice(checksum_hex).DecodeHex(&checksum_str);
+ return EndianSwapValue(DecodeFixed32(checksum_str.c_str()));
+}
+inline std::string ChecksumStrToHex(const std::string& checksum_str) {
+ return Slice(checksum_str).ToString(true);
+}
+inline std::string ChecksumInt32ToHex(const uint32_t& checksum_value) {
+ std::string checksum_str;
+ PutFixed32(&checksum_str, EndianSwapValue(checksum_value));
+ return ChecksumStrToHex(checksum_str);
+}
+
+const std::string kPrivateDirName = "private";
+const std::string kMetaDirName = "meta";
+const std::string kSharedDirName = "shared";
+const std::string kSharedChecksumDirName = "shared_checksum";
+const std::string kPrivateDirSlash = kPrivateDirName + "/";
+const std::string kMetaDirSlash = kMetaDirName + "/";
+const std::string kSharedDirSlash = kSharedDirName + "/";
+const std::string kSharedChecksumDirSlash = kSharedChecksumDirName + "/";
+
+} // namespace
+
+void BackupStatistics::IncrementNumberSuccessBackup() {
+ number_success_backup++;
+}
+void BackupStatistics::IncrementNumberFailBackup() { number_fail_backup++; }
+
+uint32_t BackupStatistics::GetNumberSuccessBackup() const {
+ return number_success_backup;
+}
+uint32_t BackupStatistics::GetNumberFailBackup() const {
+ return number_fail_backup;
+}
+
+std::string BackupStatistics::ToString() const {
+ char result[50];
+ snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
+ GetNumberSuccessBackup(), GetNumberFailBackup());
+ return result;
+}
+
+void BackupEngineOptions::Dump(Logger* logger) const {
+ ROCKS_LOG_INFO(logger, " Options.backup_dir: %s",
+ backup_dir.c_str());
+ ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env);
+ ROCKS_LOG_INFO(logger, " Options.share_table_files: %d",
+ static_cast<int>(share_table_files));
+ ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log);
+ ROCKS_LOG_INFO(logger, " Options.sync: %d",
+ static_cast<int>(sync));
+ ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d",
+ static_cast<int>(destroy_old_data));
+ ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d",
+ static_cast<int>(backup_log_files));
+ ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64,
+ backup_rate_limit);
+ ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64,
+ restore_rate_limit);
+ ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
+ max_background_operations);
+}
+
+namespace {
+// -------- BackupEngineImpl class ---------
+class BackupEngineImpl {
+ public:
+ BackupEngineImpl(const BackupEngineOptions& options, Env* db_env,
+ bool read_only = false);
+ ~BackupEngineImpl();
+
+ IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options,
+ DB* db, const std::string& app_metadata,
+ BackupID* new_backup_id_ptr);
+
+ IOStatus PurgeOldBackups(uint32_t num_backups_to_keep);
+
+ IOStatus DeleteBackup(BackupID backup_id);
+
+ void StopBackup() { stop_backup_.store(true, std::memory_order_release); }
+
+ IOStatus GarbageCollect();
+
+ // The returned BackupInfos are in chronological order, which means the
+ // latest backup comes last.
+ void GetBackupInfo(std::vector<BackupInfo>* backup_info,
+ bool include_file_details) const;
+
+ Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info,
+ bool include_file_details = false) const;
+
+ void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) const;
+
+ IOStatus RestoreDBFromBackup(const RestoreOptions& options,
+ BackupID backup_id, const std::string& db_dir,
+ const std::string& wal_dir) const;
+
+ IOStatus RestoreDBFromLatestBackup(const RestoreOptions& options,
+ const std::string& db_dir,
+ const std::string& wal_dir) const {
+ // Note: don't read latest_valid_backup_id_ outside of lock
+ return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir);
+ }
+
+ IOStatus VerifyBackup(BackupID backup_id,
+ bool verify_with_checksum = false) const;
+
+ IOStatus Initialize();
+
+ ShareFilesNaming GetNamingNoFlags() const {
+ return options_.share_files_with_checksum_naming &
+ BackupEngineOptions::kMaskNoNamingFlags;
+ }
+ ShareFilesNaming GetNamingFlags() const {
+ return options_.share_files_with_checksum_naming &
+ BackupEngineOptions::kMaskNamingFlags;
+ }
+
+ void TEST_SetDefaultRateLimitersClock(
+ const std::shared_ptr<SystemClock>& backup_rate_limiter_clock,
+ const std::shared_ptr<SystemClock>& restore_rate_limiter_clock) {
+ if (backup_rate_limiter_clock) {
+ static_cast<GenericRateLimiter*>(options_.backup_rate_limiter.get())
+ ->TEST_SetClock(backup_rate_limiter_clock);
+ }
+
+ if (restore_rate_limiter_clock) {
+ static_cast<GenericRateLimiter*>(options_.restore_rate_limiter.get())
+ ->TEST_SetClock(restore_rate_limiter_clock);
+ }
+ }
+
+ private:
+ void DeleteChildren(const std::string& dir,
+ uint32_t file_type_filter = 0) const;
+ IOStatus DeleteBackupNoGC(BackupID backup_id);
+
+ // Extends the "result" map with pathname->size mappings for the contents of
+ // "dir" in "env". Pathnames are prefixed with "dir".
+ IOStatus ReadChildFileCurrentSizes(
+ const std::string& dir, const std::shared_ptr<FileSystem>&,
+ std::unordered_map<std::string, uint64_t>* result) const;
+
+ struct FileInfo {
+ FileInfo(const std::string& fname, uint64_t sz, const std::string& checksum,
+ const std::string& id, const std::string& sid, Temperature _temp)
+ : refs(0),
+ filename(fname),
+ size(sz),
+ checksum_hex(checksum),
+ db_id(id),
+ db_session_id(sid),
+ temp(_temp) {}
+
+ FileInfo(const FileInfo&) = delete;
+ FileInfo& operator=(const FileInfo&) = delete;
+
+ int refs;
+ const std::string filename;
+ const uint64_t size;
+ // crc32c checksum as hex. empty == unknown / unavailable
+ std::string checksum_hex;
+ // DB identities
+ // db_id is obtained for potential usage in the future but not used
+ // currently
+ const std::string db_id;
+ // db_session_id appears in the backup SST filename if the table naming
+ // option is kUseDbSessionId
+ const std::string db_session_id;
+ Temperature temp;
+
+ std::string GetDbFileName() {
+ std::string rv;
+ // extract the filename part
+ size_t slash = filename.find_last_of('/');
+ // file will either be shared/<file>, shared_checksum/<file_crc32c_size>,
+ // shared_checksum/<file_session>, shared_checksum/<file_crc32c_session>,
+ // or private/<number>/<file>
+ assert(slash != std::string::npos);
+ rv = filename.substr(slash + 1);
+
+ // if the file was in shared_checksum, extract the real file name
+ // in this case the file is <number>_<checksum>_<size>.<type>,
+ // <number>_<session>.<type>, or <number>_<checksum>_<session>.<type>
+ if (filename.substr(0, slash) == kSharedChecksumDirName) {
+ rv = GetFileFromChecksumFile(rv);
+ }
+ return rv;
+ }
+ };
+
+ // TODO: deprecate this function once we migrate all BackupEngine's rate
+ // limiting to lower-level ones (i.e, ones in file access wrapper level like
+ // `WritableFileWriter`)
+ static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request,
+ RateLimiter* rate_limiter,
+ const Env::IOPriority pri,
+ Statistics* stats,
+ const RateLimiter::OpType op_type);
+
+ static inline std::string WithoutTrailingSlash(const std::string& path) {
+ if (path.empty() || path.back() != '/') {
+ return path;
+ } else {
+ return path.substr(path.size() - 1);
+ }
+ }
+
+ static inline std::string WithTrailingSlash(const std::string& path) {
+ if (path.empty() || path.back() != '/') {
+ return path + '/';
+ } else {
+ return path;
+ }
+ }
+
+ // A filesystem wrapper that makes shared backup files appear to be in the
+ // private backup directory (dst_dir), so that the private backup dir can
+ // be opened as a read-only DB.
+ class RemapSharedFileSystem : public RemapFileSystem {
+ public:
+ RemapSharedFileSystem(const std::shared_ptr<FileSystem>& base,
+ const std::string& dst_dir,
+ const std::string& src_base_dir,
+ const std::vector<std::shared_ptr<FileInfo>>& files)
+ : RemapFileSystem(base),
+ dst_dir_(WithoutTrailingSlash(dst_dir)),
+ dst_dir_slash_(WithTrailingSlash(dst_dir)),
+ src_base_dir_(WithTrailingSlash(src_base_dir)) {
+ for (auto& info : files) {
+ if (!StartsWith(info->filename, kPrivateDirSlash)) {
+ assert(StartsWith(info->filename, kSharedDirSlash) ||
+ StartsWith(info->filename, kSharedChecksumDirSlash));
+ remaps_[info->GetDbFileName()] = info;
+ }
+ }
+ }
+
+ const char* Name() const override {
+ return "BackupEngineImpl::RemapSharedFileSystem";
+ }
+
+ // Sometimes a directory listing is required in opening a DB
+ IOStatus GetChildren(const std::string& dir, const IOOptions& options,
+ std::vector<std::string>* result,
+ IODebugContext* dbg) override {
+ IOStatus s = RemapFileSystem::GetChildren(dir, options, result, dbg);
+ if (s.ok() && (dir == dst_dir_ || dir == dst_dir_slash_)) {
+ // Assume remapped files exist
+ for (auto& r : remaps_) {
+ result->push_back(r.first);
+ }
+ }
+ return s;
+ }
+
+ // Sometimes a directory listing is required in opening a DB
+ IOStatus GetChildrenFileAttributes(const std::string& dir,
+ const IOOptions& options,
+ std::vector<FileAttributes>* result,
+ IODebugContext* dbg) override {
+ IOStatus s =
+ RemapFileSystem::GetChildrenFileAttributes(dir, options, result, dbg);
+ if (s.ok() && (dir == dst_dir_ || dir == dst_dir_slash_)) {
+ // Assume remapped files exist with recorded size
+ for (auto& r : remaps_) {
+ result->emplace_back(); // clean up with C++20
+ FileAttributes& attr = result->back();
+ attr.name = r.first;
+ attr.size_bytes = r.second->size;
+ }
+ }
+ return s;
+ }
+
+ protected:
+ // When a file in dst_dir is requested, see if we need to remap to shared
+ // file path.
+ std::pair<IOStatus, std::string> EncodePath(
+ const std::string& path) override {
+ if (path.empty() || path[0] != '/') {
+ return {IOStatus::InvalidArgument(path, "Not an absolute path"), ""};
+ }
+ std::pair<IOStatus, std::string> rv{IOStatus(), path};
+ if (StartsWith(path, dst_dir_slash_)) {
+ std::string relative = path.substr(dst_dir_slash_.size());
+ auto it = remaps_.find(relative);
+ if (it != remaps_.end()) {
+ rv.second = src_base_dir_ + it->second->filename;
+ }
+ }
+ return rv;
+ }
+
+ private:
+ // Absolute path to a directory that some extra files will be mapped into.
+ const std::string dst_dir_;
+ // Includes a trailing slash.
+ const std::string dst_dir_slash_;
+ // Absolute path to a directory containing some files to be mapped into
+ // dst_dir_. Includes a trailing slash.
+ const std::string src_base_dir_;
+ // If remaps_[x] exists, attempt to read dst_dir_ / x should instead read
+ // src_base_dir_ / remaps_[x]->filename. FileInfo is used to maximize
+ // sharing with other backup data in memory.
+ std::unordered_map<std::string, std::shared_ptr<FileInfo>> remaps_;
+ };
+
+ class BackupMeta {
+ public:
+ BackupMeta(
+ const std::string& meta_filename, const std::string& meta_tmp_filename,
+ std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
+ Env* env, const std::shared_ptr<FileSystem>& fs)
+ : timestamp_(0),
+ sequence_number_(0),
+ size_(0),
+ meta_filename_(meta_filename),
+ meta_tmp_filename_(meta_tmp_filename),
+ file_infos_(file_infos),
+ env_(env),
+ fs_(fs) {}
+
+ BackupMeta(const BackupMeta&) = delete;
+ BackupMeta& operator=(const BackupMeta&) = delete;
+
+ ~BackupMeta() {}
+
+ void RecordTimestamp() {
+ // Best effort
+ Status s = env_->GetCurrentTime(&timestamp_);
+ if (!s.ok()) {
+ timestamp_ = /* something clearly fabricated */ 1;
+ }
+ }
+ int64_t GetTimestamp() const { return timestamp_; }
+ uint64_t GetSize() const { return size_; }
+ uint32_t GetNumberFiles() const {
+ return static_cast<uint32_t>(files_.size());
+ }
+ void SetSequenceNumber(uint64_t sequence_number) {
+ sequence_number_ = sequence_number;
+ }
+ uint64_t GetSequenceNumber() const { return sequence_number_; }
+
+ const std::string& GetAppMetadata() const { return app_metadata_; }
+
+ void SetAppMetadata(const std::string& app_metadata) {
+ app_metadata_ = app_metadata;
+ }
+
+ IOStatus AddFile(std::shared_ptr<FileInfo> file_info);
+
+ IOStatus Delete(bool delete_meta = true);
+
+ bool Empty() const { return files_.empty(); }
+
+ std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
+ auto it = file_infos_->find(filename);
+ if (it == file_infos_->end()) {
+ return nullptr;
+ }
+ return it->second;
+ }
+
+ const std::vector<std::shared_ptr<FileInfo>>& GetFiles() const {
+ return files_;
+ }
+
+ // @param abs_path_to_size Pre-fetched file sizes (bytes).
+ IOStatus LoadFromFile(
+ const std::string& backup_dir,
+ const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
+ RateLimiter* rate_limiter, Logger* info_log,
+ std::unordered_set<std::string>* reported_ignored_fields);
+ IOStatus StoreToFile(
+ bool sync, int schema_version,
+ const TEST_BackupMetaSchemaOptions* schema_test_options);
+
+ std::string GetInfoString() {
+ std::ostringstream ss;
+ ss << "Timestamp: " << timestamp_ << std::endl;
+ char human_size[16];
+ AppendHumanBytes(size_, human_size, sizeof(human_size));
+ ss << "Size: " << human_size << std::endl;
+ ss << "Files:" << std::endl;
+ for (const auto& file : files_) {
+ AppendHumanBytes(file->size, human_size, sizeof(human_size));
+ ss << file->filename << ", size " << human_size << ", refs "
+ << file->refs << std::endl;
+ }
+ return ss.str();
+ }
+
+ const std::shared_ptr<Env>& GetEnvForOpen() const {
+ if (!env_for_open_) {
+ // Lazy initialize
+ // Find directories
+ std::string dst_dir = meta_filename_;
+ auto i = dst_dir.rfind(kMetaDirSlash);
+ assert(i != std::string::npos);
+ std::string src_base_dir = dst_dir.substr(0, i);
+ dst_dir.replace(i, kMetaDirSlash.size(), kPrivateDirSlash);
+ // Make the RemapSharedFileSystem
+ std::shared_ptr<FileSystem> remap_fs =
+ std::make_shared<RemapSharedFileSystem>(fs_, dst_dir, src_base_dir,
+ files_);
+ // Make it read-only for safety
+ remap_fs = std::make_shared<ReadOnlyFileSystem>(remap_fs);
+ // Make an Env wrapper
+ env_for_open_ = std::make_shared<CompositeEnvWrapper>(env_, remap_fs);
+ }
+ return env_for_open_;
+ }
+
+ private:
+ int64_t timestamp_;
+ // sequence number is only approximate, should not be used
+ // by clients
+ uint64_t sequence_number_;
+ uint64_t size_;
+ std::string app_metadata_;
+ std::string const meta_filename_;
+ std::string const meta_tmp_filename_;
+ // files with relative paths (without "/" prefix!!)
+ std::vector<std::shared_ptr<FileInfo>> files_;
+ std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
+ Env* env_;
+ mutable std::shared_ptr<Env> env_for_open_;
+ std::shared_ptr<FileSystem> fs_;
+ IOOptions iooptions_ = IOOptions();
+ }; // BackupMeta
+
+ void SetBackupInfoFromBackupMeta(BackupID id, const BackupMeta& meta,
+ BackupInfo* backup_info,
+ bool include_file_details) const;
+
+ inline std::string GetAbsolutePath(
+ const std::string& relative_path = "") const {
+ assert(relative_path.size() == 0 || relative_path[0] != '/');
+ return options_.backup_dir + "/" + relative_path;
+ }
+ inline std::string GetPrivateFileRel(BackupID backup_id, bool tmp = false,
+ const std::string& file = "") const {
+ assert(file.size() == 0 || file[0] != '/');
+ return kPrivateDirSlash + std::to_string(backup_id) + (tmp ? ".tmp" : "") +
+ "/" + file;
+ }
+ inline std::string GetSharedFileRel(const std::string& file = "",
+ bool tmp = false) const {
+ assert(file.size() == 0 || file[0] != '/');
+ return kSharedDirSlash + std::string(tmp ? "." : "") + file +
+ (tmp ? ".tmp" : "");
+ }
+ inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
+ bool tmp = false) const {
+ assert(file.size() == 0 || file[0] != '/');
+ return kSharedChecksumDirSlash + std::string(tmp ? "." : "") + file +
+ (tmp ? ".tmp" : "");
+ }
+ inline bool UseLegacyNaming(const std::string& sid) const {
+ return GetNamingNoFlags() ==
+ BackupEngineOptions::kLegacyCrc32cAndFileSize ||
+ sid.empty();
+ }
+ inline std::string GetSharedFileWithChecksum(
+ const std::string& file, const std::string& checksum_hex,
+ const uint64_t file_size, const std::string& db_session_id) const {
+ assert(file.size() == 0 || file[0] != '/');
+ std::string file_copy = file;
+ if (UseLegacyNaming(db_session_id)) {
+ assert(!checksum_hex.empty());
+ file_copy.insert(file_copy.find_last_of('.'),
+ "_" + std::to_string(ChecksumHexToInt32(checksum_hex)) +
+ "_" + std::to_string(file_size));
+ } else {
+ file_copy.insert(file_copy.find_last_of('.'), "_s" + db_session_id);
+ if (GetNamingFlags() & BackupEngineOptions::kFlagIncludeFileSize) {
+ file_copy.insert(file_copy.find_last_of('.'),
+ "_" + std::to_string(file_size));
+ }
+ }
+ return file_copy;
+ }
+ static inline std::string GetFileFromChecksumFile(const std::string& file) {
+ assert(file.size() == 0 || file[0] != '/');
+ std::string file_copy = file;
+ size_t first_underscore = file_copy.find_first_of('_');
+ return file_copy.erase(first_underscore,
+ file_copy.find_last_of('.') - first_underscore);
+ }
+ inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
+ return GetAbsolutePath(kMetaDirName) + "/" + (tmp ? "." : "") +
+ std::to_string(backup_id) + (tmp ? ".tmp" : "");
+ }
+
+ // If size_limit == 0, there is no size limit, copy everything.
+ //
+ // Exactly one of src and contents must be non-empty.
+ //
+ // @param src If non-empty, the file is copied from this pathname.
+ // @param contents If non-empty, the file will be created with these contents.
+ // @param src_temperature Pass in expected temperature of src, return back
+ // temperature reported by FileSystem
+ IOStatus CopyOrCreateFile(const std::string& src, const std::string& dst,
+ const std::string& contents, uint64_t size_limit,
+ Env* src_env, Env* dst_env,
+ const EnvOptions& src_env_options, bool sync,
+ RateLimiter* rate_limiter,
+ std::function<void()> progress_callback,
+ Temperature* src_temperature,
+ Temperature dst_temperature,
+ uint64_t* bytes_toward_next_callback,
+ uint64_t* size, std::string* checksum_hex);
+
+ IOStatus ReadFileAndComputeChecksum(const std::string& src,
+ const std::shared_ptr<FileSystem>& src_fs,
+ const EnvOptions& src_env_options,
+ uint64_t size_limit,
+ std::string* checksum_hex,
+ const Temperature src_temperature) const;
+
+ // Obtain db_id and db_session_id from the table properties of file_path
+ Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
+ const std::string& file_path,
+ Temperature file_temp, RateLimiter* rate_limiter,
+ std::string* db_id, std::string* db_session_id);
+
+ struct CopyOrCreateResult {
+ ~CopyOrCreateResult() {
+ // The Status needs to be ignored here for two reasons.
+ // First, if the BackupEngineImpl shuts down with jobs outstanding, then
+ // it is possible that the Status in the future/promise is never read,
+ // resulting in an unchecked Status. Second, if there are items in the
+ // channel when the BackupEngineImpl is shutdown, these will also have
+ // Status that have not been checked. This
+ // TODO: Fix those issues so that the Status
+ io_status.PermitUncheckedError();
+ }
+ uint64_t size;
+ std::string checksum_hex;
+ std::string db_id;
+ std::string db_session_id;
+ IOStatus io_status;
+ Temperature expected_src_temperature = Temperature::kUnknown;
+ Temperature current_src_temperature = Temperature::kUnknown;
+ };
+
+ // Exactly one of src_path and contents must be non-empty. If src_path is
+ // non-empty, the file is copied from this pathname. Otherwise, if contents is
+ // non-empty, the file will be created at dst_path with these contents.
+ struct CopyOrCreateWorkItem {
+ std::string src_path;
+ std::string dst_path;
+ Temperature src_temperature;
+ Temperature dst_temperature;
+ std::string contents;
+ Env* src_env;
+ Env* dst_env;
+ EnvOptions src_env_options;
+ bool sync;
+ RateLimiter* rate_limiter;
+ uint64_t size_limit;
+ Statistics* stats;
+ std::promise<CopyOrCreateResult> result;
+ std::function<void()> progress_callback;
+ std::string src_checksum_func_name;
+ std::string src_checksum_hex;
+ std::string db_id;
+ std::string db_session_id;
+
+ CopyOrCreateWorkItem()
+ : src_path(""),
+ dst_path(""),
+ src_temperature(Temperature::kUnknown),
+ dst_temperature(Temperature::kUnknown),
+ contents(""),
+ src_env(nullptr),
+ dst_env(nullptr),
+ src_env_options(),
+ sync(false),
+ rate_limiter(nullptr),
+ size_limit(0),
+ stats(nullptr),
+ src_checksum_func_name(kUnknownFileChecksumFuncName),
+ src_checksum_hex(""),
+ db_id(""),
+ db_session_id("") {}
+
+ CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
+ CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
+
+ CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) noexcept {
+ *this = std::move(o);
+ }
+
+ CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) noexcept {
+ src_path = std::move(o.src_path);
+ dst_path = std::move(o.dst_path);
+ src_temperature = std::move(o.src_temperature);
+ dst_temperature = std::move(o.dst_temperature);
+ contents = std::move(o.contents);
+ src_env = o.src_env;
+ dst_env = o.dst_env;
+ src_env_options = std::move(o.src_env_options);
+ sync = o.sync;
+ rate_limiter = o.rate_limiter;
+ size_limit = o.size_limit;
+ stats = o.stats;
+ result = std::move(o.result);
+ progress_callback = std::move(o.progress_callback);
+ src_checksum_func_name = std::move(o.src_checksum_func_name);
+ src_checksum_hex = std::move(o.src_checksum_hex);
+ db_id = std::move(o.db_id);
+ db_session_id = std::move(o.db_session_id);
+ src_temperature = o.src_temperature;
+ return *this;
+ }
+
+ CopyOrCreateWorkItem(
+ std::string _src_path, std::string _dst_path,
+ const Temperature _src_temperature, const Temperature _dst_temperature,
+ std::string _contents, Env* _src_env, Env* _dst_env,
+ EnvOptions _src_env_options, bool _sync, RateLimiter* _rate_limiter,
+ uint64_t _size_limit, Statistics* _stats,
+ std::function<void()> _progress_callback = []() {},
+ const std::string& _src_checksum_func_name =
+ kUnknownFileChecksumFuncName,
+ const std::string& _src_checksum_hex = "",
+ const std::string& _db_id = "", const std::string& _db_session_id = "")
+ : src_path(std::move(_src_path)),
+ dst_path(std::move(_dst_path)),
+ src_temperature(_src_temperature),
+ dst_temperature(_dst_temperature),
+ contents(std::move(_contents)),
+ src_env(_src_env),
+ dst_env(_dst_env),
+ src_env_options(std::move(_src_env_options)),
+ sync(_sync),
+ rate_limiter(_rate_limiter),
+ size_limit(_size_limit),
+ stats(_stats),
+ progress_callback(_progress_callback),
+ src_checksum_func_name(_src_checksum_func_name),
+ src_checksum_hex(_src_checksum_hex),
+ db_id(_db_id),
+ db_session_id(_db_session_id) {}
+ };
+
+ struct BackupAfterCopyOrCreateWorkItem {
+ std::future<CopyOrCreateResult> result;
+ bool shared;
+ bool needed_to_copy;
+ Env* backup_env;
+ std::string dst_path_tmp;
+ std::string dst_path;
+ std::string dst_relative;
+ BackupAfterCopyOrCreateWorkItem()
+ : shared(false),
+ needed_to_copy(false),
+ backup_env(nullptr),
+ dst_path_tmp(""),
+ dst_path(""),
+ dst_relative("") {}
+
+ BackupAfterCopyOrCreateWorkItem(
+ BackupAfterCopyOrCreateWorkItem&& o) noexcept {
+ *this = std::move(o);
+ }
+
+ BackupAfterCopyOrCreateWorkItem& operator=(
+ BackupAfterCopyOrCreateWorkItem&& o) noexcept {
+ result = std::move(o.result);
+ shared = o.shared;
+ needed_to_copy = o.needed_to_copy;
+ backup_env = o.backup_env;
+ dst_path_tmp = std::move(o.dst_path_tmp);
+ dst_path = std::move(o.dst_path);
+ dst_relative = std::move(o.dst_relative);
+ return *this;
+ }
+
+ BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
+ bool _shared, bool _needed_to_copy,
+ Env* _backup_env, std::string _dst_path_tmp,
+ std::string _dst_path,
+ std::string _dst_relative)
+ : result(std::move(_result)),
+ shared(_shared),
+ needed_to_copy(_needed_to_copy),
+ backup_env(_backup_env),
+ dst_path_tmp(std::move(_dst_path_tmp)),
+ dst_path(std::move(_dst_path)),
+ dst_relative(std::move(_dst_relative)) {}
+ };
+
+ struct RestoreAfterCopyOrCreateWorkItem {
+ std::future<CopyOrCreateResult> result;
+ std::string from_file;
+ std::string to_file;
+ std::string checksum_hex;
+ RestoreAfterCopyOrCreateWorkItem() : checksum_hex("") {}
+ RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
+ const std::string& _from_file,
+ const std::string& _to_file,
+ const std::string& _checksum_hex)
+ : result(std::move(_result)),
+ from_file(_from_file),
+ to_file(_to_file),
+ checksum_hex(_checksum_hex) {}
+ RestoreAfterCopyOrCreateWorkItem(
+ RestoreAfterCopyOrCreateWorkItem&& o) noexcept {
+ *this = std::move(o);
+ }
+
+ RestoreAfterCopyOrCreateWorkItem& operator=(
+ RestoreAfterCopyOrCreateWorkItem&& o) noexcept {
+ result = std::move(o.result);
+ checksum_hex = std::move(o.checksum_hex);
+ return *this;
+ }
+ };
+
+ bool initialized_;
+ std::mutex byte_report_mutex_;
+ mutable channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
+ std::vector<port::Thread> threads_;
+ std::atomic<CpuPriority> threads_cpu_priority_;
+
+ // Certain operations like PurgeOldBackups and DeleteBackup will trigger
+ // automatic GarbageCollect (true) unless we've already done one in this
+ // session and have not failed to delete backup files since then (false).
+ bool might_need_garbage_collect_ = true;
+
+ // Adds a file to the backup work queue to be copied or created if it doesn't
+ // already exist.
+ //
+ // Exactly one of src_dir and contents must be non-empty.
+ //
+ // @param src_dir If non-empty, the file in this directory named fname will be
+ // copied.
+ // @param fname Name of destination file and, in case of copy, source file.
+ // @param contents If non-empty, the file will be created with these contents.
+ IOStatus AddBackupFileWorkItem(
+ std::unordered_set<std::string>& live_dst_paths,
+ std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
+ BackupID backup_id, bool shared, const std::string& src_dir,
+ const std::string& fname, // starts with "/"
+ const EnvOptions& src_env_options, RateLimiter* rate_limiter,
+ FileType file_type, uint64_t size_bytes, Statistics* stats,
+ uint64_t size_limit = 0, bool shared_checksum = false,
+ std::function<void()> progress_callback = []() {},
+ const std::string& contents = std::string(),
+ const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName,
+ const std::string& src_checksum_str = kUnknownFileChecksum,
+ const Temperature src_temperature = Temperature::kUnknown);
+
+ // backup state data
+ BackupID latest_backup_id_;
+ BackupID latest_valid_backup_id_;
+ std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
+ std::map<BackupID, std::pair<IOStatus, std::unique_ptr<BackupMeta>>>
+ corrupt_backups_;
+ std::unordered_map<std::string, std::shared_ptr<FileInfo>>
+ backuped_file_infos_;
+ std::atomic<bool> stop_backup_;
+
+ // options data
+ BackupEngineOptions options_;
+ Env* db_env_;
+ Env* backup_env_;
+
+ // directories
+ std::unique_ptr<FSDirectory> backup_directory_;
+ std::unique_ptr<FSDirectory> shared_directory_;
+ std::unique_ptr<FSDirectory> meta_directory_;
+ std::unique_ptr<FSDirectory> private_directory_;
+
+ static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
+ bool read_only_;
+ BackupStatistics backup_statistics_;
+ std::unordered_set<std::string> reported_ignored_fields_;
+ static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB
+ std::shared_ptr<FileSystem> db_fs_;
+ std::shared_ptr<FileSystem> backup_fs_;
+ IOOptions io_options_ = IOOptions();
+
+ public:
+ std::unique_ptr<TEST_BackupMetaSchemaOptions> schema_test_options_;
+};
+
+// -------- BackupEngineImplThreadSafe class ---------
+// This locking layer for thread safety in the public API is layered on
+// top to prevent accidental recursive locking with RWMutex, which is UB.
+// Note: BackupEngineReadOnlyBase inherited twice, but has no fields
+class BackupEngineImplThreadSafe : public BackupEngine,
+ public BackupEngineReadOnly {
+ public:
+ BackupEngineImplThreadSafe(const BackupEngineOptions& options, Env* db_env,
+ bool read_only = false)
+ : impl_(options, db_env, read_only) {}
+ ~BackupEngineImplThreadSafe() override {}
+
+ using BackupEngine::CreateNewBackupWithMetadata;
+ IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options,
+ DB* db, const std::string& app_metadata,
+ BackupID* new_backup_id) override {
+ WriteLock lock(&mutex_);
+ return impl_.CreateNewBackupWithMetadata(options, db, app_metadata,
+ new_backup_id);
+ }
+
+ IOStatus PurgeOldBackups(uint32_t num_backups_to_keep) override {
+ WriteLock lock(&mutex_);
+ return impl_.PurgeOldBackups(num_backups_to_keep);
+ }
+
+ IOStatus DeleteBackup(BackupID backup_id) override {
+ WriteLock lock(&mutex_);
+ return impl_.DeleteBackup(backup_id);
+ }
+
+ void StopBackup() override {
+ // No locking needed
+ impl_.StopBackup();
+ }
+
+ IOStatus GarbageCollect() override {
+ WriteLock lock(&mutex_);
+ return impl_.GarbageCollect();
+ }
+
+ Status GetLatestBackupInfo(BackupInfo* backup_info,
+ bool include_file_details = false) const override {
+ ReadLock lock(&mutex_);
+ return impl_.GetBackupInfo(kLatestBackupIDMarker, backup_info,
+ include_file_details);
+ }
+
+ Status GetBackupInfo(BackupID backup_id, BackupInfo* backup_info,
+ bool include_file_details = false) const override {
+ ReadLock lock(&mutex_);
+ return impl_.GetBackupInfo(backup_id, backup_info, include_file_details);
+ }
+
+ void GetBackupInfo(std::vector<BackupInfo>* backup_info,
+ bool include_file_details) const override {
+ ReadLock lock(&mutex_);
+ impl_.GetBackupInfo(backup_info, include_file_details);
+ }
+
+ void GetCorruptedBackups(
+ std::vector<BackupID>* corrupt_backup_ids) const override {
+ ReadLock lock(&mutex_);
+ impl_.GetCorruptedBackups(corrupt_backup_ids);
+ }
+
+ using BackupEngine::RestoreDBFromBackup;
+ IOStatus RestoreDBFromBackup(const RestoreOptions& options,
+ BackupID backup_id, const std::string& db_dir,
+ const std::string& wal_dir) const override {
+ ReadLock lock(&mutex_);
+ return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir);
+ }
+
+ using BackupEngine::RestoreDBFromLatestBackup;
+ IOStatus RestoreDBFromLatestBackup(
+ const RestoreOptions& options, const std::string& db_dir,
+ const std::string& wal_dir) const override {
+ // Defer to above function, which locks
+ return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir);
+ }
+
+ IOStatus VerifyBackup(BackupID backup_id,
+ bool verify_with_checksum = false) const override {
+ ReadLock lock(&mutex_);
+ return impl_.VerifyBackup(backup_id, verify_with_checksum);
+ }
+
+ // Not public API but needed
+ IOStatus Initialize() {
+ // No locking needed
+ return impl_.Initialize();
+ }
+
+ // Not public API but used in testing
+ void TEST_SetBackupMetaSchemaOptions(
+ const TEST_BackupMetaSchemaOptions& options) {
+ impl_.schema_test_options_.reset(new TEST_BackupMetaSchemaOptions(options));
+ }
+
+ // Not public API but used in testing
+ void TEST_SetDefaultRateLimitersClock(
+ const std::shared_ptr<SystemClock>& backup_rate_limiter_clock = nullptr,
+ const std::shared_ptr<SystemClock>& restore_rate_limiter_clock =
+ nullptr) {
+ impl_.TEST_SetDefaultRateLimitersClock(backup_rate_limiter_clock,
+ restore_rate_limiter_clock);
+ }
+
+ private:
+ mutable port::RWMutex mutex_;
+ BackupEngineImpl impl_;
+};
+} // namespace
+
+IOStatus BackupEngine::Open(const BackupEngineOptions& options, Env* env,
+ BackupEngine** backup_engine_ptr) {
+ std::unique_ptr<BackupEngineImplThreadSafe> backup_engine(
+ new BackupEngineImplThreadSafe(options, env));
+ auto s = backup_engine->Initialize();
+ if (!s.ok()) {
+ *backup_engine_ptr = nullptr;
+ return s;
+ }
+ *backup_engine_ptr = backup_engine.release();
+ return IOStatus::OK();
+}
+
+namespace {
+BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options,
+ Env* db_env, bool read_only)
+ : initialized_(false),
+ threads_cpu_priority_(),
+ latest_backup_id_(0),
+ latest_valid_backup_id_(0),
+ stop_backup_(false),
+ options_(options),
+ db_env_(db_env),
+ backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
+ read_only_(read_only) {
+ if (options_.backup_rate_limiter == nullptr &&
+ options_.backup_rate_limit > 0) {
+ options_.backup_rate_limiter.reset(
+ NewGenericRateLimiter(options_.backup_rate_limit));
+ }
+ if (options_.restore_rate_limiter == nullptr &&
+ options_.restore_rate_limit > 0) {
+ options_.restore_rate_limiter.reset(
+ NewGenericRateLimiter(options_.restore_rate_limit));
+ }
+ db_fs_ = db_env_->GetFileSystem();
+ backup_fs_ = backup_env_->GetFileSystem();
+}
+
+BackupEngineImpl::~BackupEngineImpl() {
+ files_to_copy_or_create_.sendEof();
+ for (auto& t : threads_) {
+ t.join();
+ }
+ LogFlush(options_.info_log);
+ for (const auto& it : corrupt_backups_) {
+ it.second.first.PermitUncheckedError();
+ }
+}
+
+IOStatus BackupEngineImpl::Initialize() {
+ assert(!initialized_);
+ initialized_ = true;
+ if (read_only_) {
+ ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
+ }
+ options_.Dump(options_.info_log);
+
+ auto meta_path = GetAbsolutePath(kMetaDirName);
+
+ if (!read_only_) {
+ // we might need to clean up from previous crash or I/O errors
+ might_need_garbage_collect_ = true;
+
+ if (options_.max_valid_backups_to_open !=
+ std::numeric_limits<int32_t>::max()) {
+ options_.max_valid_backups_to_open = std::numeric_limits<int32_t>::max();
+ ROCKS_LOG_WARN(
+ options_.info_log,
+ "`max_valid_backups_to_open` is not set to the default value. "
+ "Ignoring its value since BackupEngine is not read-only.");
+ }
+
+ // gather the list of directories that we need to create
+ std::vector<std::pair<std::string, std::unique_ptr<FSDirectory>*>>
+ directories;
+ directories.emplace_back(GetAbsolutePath(), &backup_directory_);
+ if (options_.share_table_files) {
+ if (options_.share_files_with_checksum) {
+ directories.emplace_back(
+ GetAbsolutePath(GetSharedFileWithChecksumRel()),
+ &shared_directory_);
+ } else {
+ directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
+ &shared_directory_);
+ }
+ }
+ directories.emplace_back(GetAbsolutePath(kPrivateDirName),
+ &private_directory_);
+ directories.emplace_back(meta_path, &meta_directory_);
+ // create all the dirs we need
+ for (const auto& d : directories) {
+ IOStatus io_s =
+ backup_fs_->CreateDirIfMissing(d.first, io_options_, nullptr);
+ if (io_s.ok()) {
+ io_s =
+ backup_fs_->NewDirectory(d.first, io_options_, d.second, nullptr);
+ }
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+ }
+
+ std::vector<std::string> backup_meta_files;
+ {
+ IOStatus io_s = backup_fs_->GetChildren(meta_path, io_options_,
+ &backup_meta_files, nullptr);
+ if (io_s.IsNotFound()) {
+ return IOStatus::NotFound(meta_path + " is missing");
+ } else if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+ // create backups_ structure
+ for (auto& file : backup_meta_files) {
+ ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
+ BackupID backup_id = 0;
+ sscanf(file.c_str(), "%u", &backup_id);
+ if (backup_id == 0 || file != std::to_string(backup_id)) {
+ // Invalid file name, will be deleted with auto-GC when user
+ // initiates an append or write operation. (Behave as read-only until
+ // then.)
+ ROCKS_LOG_INFO(options_.info_log, "Skipping unrecognized meta file %s",
+ file.c_str());
+ continue;
+ }
+ assert(backups_.find(backup_id) == backups_.end());
+ // Insert all the (backup_id, BackupMeta) that will be loaded later
+ // The loading performed later will check whether there are corrupt backups
+ // and move the corrupt backups to corrupt_backups_
+ backups_.insert(std::make_pair(
+ backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
+ GetBackupMetaFile(backup_id, false /* tmp */),
+ GetBackupMetaFile(backup_id, true /* tmp */),
+ &backuped_file_infos_, backup_env_, backup_fs_))));
+ }
+
+ latest_backup_id_ = 0;
+ latest_valid_backup_id_ = 0;
+ if (options_.destroy_old_data) { // Destroy old data
+ assert(!read_only_);
+ ROCKS_LOG_INFO(
+ options_.info_log,
+ "Backup Engine started with destroy_old_data == true, deleting all "
+ "backups");
+ IOStatus io_s = PurgeOldBackups(0);
+ if (io_s.ok()) {
+ io_s = GarbageCollect();
+ }
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ } else { // Load data from storage
+ // abs_path_to_size: maps absolute paths of files in backup directory to
+ // their corresponding sizes
+ std::unordered_map<std::string, uint64_t> abs_path_to_size;
+ // Insert files and their sizes in backup sub-directories (shared and
+ // shared_checksum) to abs_path_to_size
+ for (const auto& rel_dir :
+ {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
+ const auto abs_dir = GetAbsolutePath(rel_dir);
+ IOStatus io_s =
+ ReadChildFileCurrentSizes(abs_dir, backup_fs_, &abs_path_to_size);
+ if (!io_s.ok()) {
+ // I/O error likely impacting all backups
+ return io_s;
+ }
+ }
+ // load the backups if any, until valid_backups_to_open of the latest
+ // non-corrupted backups have been successfully opened.
+ int valid_backups_to_open = options_.max_valid_backups_to_open;
+ for (auto backup_iter = backups_.rbegin(); backup_iter != backups_.rend();
+ ++backup_iter) {
+ assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
+ if (latest_backup_id_ == 0) {
+ latest_backup_id_ = backup_iter->first;
+ }
+ if (valid_backups_to_open == 0) {
+ break;
+ }
+
+ // Insert files and their sizes in backup sub-directories
+ // (private/backup_id) to abs_path_to_size
+ IOStatus io_s = ReadChildFileCurrentSizes(
+ GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_fs_,
+ &abs_path_to_size);
+ if (io_s.ok()) {
+ io_s = backup_iter->second->LoadFromFile(
+ options_.backup_dir, abs_path_to_size,
+ options_.backup_rate_limiter.get(), options_.info_log,
+ &reported_ignored_fields_);
+ }
+ if (io_s.IsCorruption() || io_s.IsNotSupported()) {
+ ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
+ backup_iter->first, io_s.ToString().c_str());
+ corrupt_backups_.insert(std::make_pair(
+ backup_iter->first,
+ std::make_pair(io_s, std::move(backup_iter->second))));
+ } else if (!io_s.ok()) {
+ // Distinguish corruption errors from errors in the backup Env.
+ // Errors in the backup Env (i.e., this code path) will cause Open() to
+ // fail, whereas corruption errors would not cause Open() failures.
+ return io_s;
+ } else {
+ ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
+ backup_iter->first,
+ backup_iter->second->GetInfoString().c_str());
+ assert(latest_valid_backup_id_ == 0 ||
+ latest_valid_backup_id_ > backup_iter->first);
+ if (latest_valid_backup_id_ == 0) {
+ latest_valid_backup_id_ = backup_iter->first;
+ }
+ --valid_backups_to_open;
+ }
+ }
+
+ for (const auto& corrupt : corrupt_backups_) {
+ backups_.erase(backups_.find(corrupt.first));
+ }
+ // erase the backups before max_valid_backups_to_open
+ int num_unopened_backups;
+ if (options_.max_valid_backups_to_open == 0) {
+ num_unopened_backups = 0;
+ } else {
+ num_unopened_backups =
+ std::max(0, static_cast<int>(backups_.size()) -
+ options_.max_valid_backups_to_open);
+ }
+ for (int i = 0; i < num_unopened_backups; ++i) {
+ assert(backups_.begin()->second->Empty());
+ backups_.erase(backups_.begin());
+ }
+ }
+
+ ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
+ ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
+ latest_valid_backup_id_);
+
+ // set up threads perform copies from files_to_copy_or_create_ in the
+ // background
+ threads_cpu_priority_ = CpuPriority::kNormal;
+ threads_.reserve(options_.max_background_operations);
+ for (int t = 0; t < options_.max_background_operations; t++) {
+ threads_.emplace_back([this]() {
+#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
+#if __GLIBC_PREREQ(2, 12)
+ pthread_setname_np(pthread_self(), "backup_engine");
+#endif
+#endif
+ CpuPriority current_priority = CpuPriority::kNormal;
+ CopyOrCreateWorkItem work_item;
+ uint64_t bytes_toward_next_callback = 0;
+ while (files_to_copy_or_create_.read(work_item)) {
+ CpuPriority priority = threads_cpu_priority_;
+ if (current_priority != priority) {
+ TEST_SYNC_POINT_CALLBACK(
+ "BackupEngineImpl::Initialize:SetCpuPriority", &priority);
+ port::SetCpuPriority(0, priority);
+ current_priority = priority;
+ }
+ // `bytes_read` and `bytes_written` stats are enabled based on
+ // compile-time support and cannot be dynamically toggled. So we do not
+ // need to worry about `PerfLevel` here, unlike many other
+ // `IOStatsContext` / `PerfContext` stats.
+ uint64_t prev_bytes_read = IOSTATS(bytes_read);
+ uint64_t prev_bytes_written = IOSTATS(bytes_written);
+
+ CopyOrCreateResult result;
+ Temperature temp = work_item.src_temperature;
+ result.io_status = CopyOrCreateFile(
+ work_item.src_path, work_item.dst_path, work_item.contents,
+ work_item.size_limit, work_item.src_env, work_item.dst_env,
+ work_item.src_env_options, work_item.sync, work_item.rate_limiter,
+ work_item.progress_callback, &temp, work_item.dst_temperature,
+ &bytes_toward_next_callback, &result.size, &result.checksum_hex);
+
+ RecordTick(work_item.stats, BACKUP_READ_BYTES,
+ IOSTATS(bytes_read) - prev_bytes_read);
+ RecordTick(work_item.stats, BACKUP_WRITE_BYTES,
+ IOSTATS(bytes_written) - prev_bytes_written);
+
+ result.db_id = work_item.db_id;
+ result.db_session_id = work_item.db_session_id;
+ result.expected_src_temperature = work_item.src_temperature;
+ result.current_src_temperature = temp;
+ if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) {
+ // unknown checksum function name implies no db table file checksum in
+ // db manifest; work_item.src_checksum_hex not empty means
+ // backup engine has calculated its crc32c checksum for the table
+ // file; therefore, we are able to compare the checksums.
+ if (work_item.src_checksum_func_name ==
+ kUnknownFileChecksumFuncName ||
+ work_item.src_checksum_func_name == kDbFileChecksumFuncName) {
+ if (work_item.src_checksum_hex != result.checksum_hex) {
+ std::string checksum_info(
+ "Expected checksum is " + work_item.src_checksum_hex +
+ " while computed checksum is " + result.checksum_hex);
+ result.io_status = IOStatus::Corruption(
+ "Checksum mismatch after copying to " + work_item.dst_path +
+ ": " + checksum_info);
+ }
+ } else {
+ // FIXME(peterd): dead code?
+ std::string checksum_function_info(
+ "Existing checksum function is " +
+ work_item.src_checksum_func_name +
+ " while provided checksum function is " +
+ kBackupFileChecksumFuncName);
+ ROCKS_LOG_INFO(
+ options_.info_log,
+ "Unable to verify checksum after copying to %s: %s\n",
+ work_item.dst_path.c_str(), checksum_function_info.c_str());
+ }
+ }
+ work_item.result.set_value(std::move(result));
+ }
+ });
+ }
+ ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
+ return IOStatus::OK();
+}
+
+IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
+ const CreateBackupOptions& options, DB* db, const std::string& app_metadata,
+ BackupID* new_backup_id_ptr) {
+ assert(initialized_);
+ assert(!read_only_);
+ if (app_metadata.size() > kMaxAppMetaSize) {
+ return IOStatus::InvalidArgument("App metadata too large");
+ }
+
+ if (options.decrease_background_thread_cpu_priority) {
+ if (options.background_thread_cpu_priority < threads_cpu_priority_) {
+ threads_cpu_priority_.store(options.background_thread_cpu_priority);
+ }
+ }
+
+ BackupID new_backup_id = latest_backup_id_ + 1;
+
+ // `bytes_read` and `bytes_written` stats are enabled based on compile-time
+ // support and cannot be dynamically toggled. So we do not need to worry about
+ // `PerfLevel` here, unlike many other `IOStatsContext` / `PerfContext` stats.
+ uint64_t prev_bytes_read = IOSTATS(bytes_read);
+ uint64_t prev_bytes_written = IOSTATS(bytes_written);
+
+ assert(backups_.find(new_backup_id) == backups_.end());
+
+ auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
+ IOStatus io_s = backup_fs_->FileExists(private_dir, io_options_, nullptr);
+ if (io_s.ok()) {
+ // maybe last backup failed and left partial state behind, clean it up.
+ // need to do this before updating backups_ such that a private dir
+ // named after new_backup_id will be cleaned up.
+ // (If an incomplete new backup is followed by an incomplete delete
+ // of the latest full backup, then there could be more than one next
+ // id with a private dir, the last thing to be deleted in delete
+ // backup, but all will be cleaned up with a GarbageCollect.)
+ io_s = GarbageCollect();
+ } else if (io_s.IsNotFound()) {
+ // normal case, the new backup's private dir doesn't exist yet
+ io_s = IOStatus::OK();
+ }
+
+ auto ret = backups_.insert(std::make_pair(
+ new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
+ GetBackupMetaFile(new_backup_id, false /* tmp */),
+ GetBackupMetaFile(new_backup_id, true /* tmp */),
+ &backuped_file_infos_, backup_env_, backup_fs_))));
+ assert(ret.second == true);
+ auto& new_backup = ret.first->second;
+ new_backup->RecordTimestamp();
+ new_backup->SetAppMetadata(app_metadata);
+
+ auto start_backup = backup_env_->NowMicros();
+
+ ROCKS_LOG_INFO(options_.info_log,
+ "Started the backup process -- creating backup %u",
+ new_backup_id);
+
+ if (options_.share_table_files && !options_.share_files_with_checksum) {
+ ROCKS_LOG_WARN(options_.info_log,
+ "BackupEngineOptions::share_files_with_checksum=false is "
+ "DEPRECATED and could lead to data loss.");
+ }
+
+ if (io_s.ok()) {
+ io_s = backup_fs_->CreateDir(private_dir, io_options_, nullptr);
+ }
+
+ // A set into which we will insert the dst_paths that are calculated for live
+ // files and live WAL files.
+ // This is used to check whether a live files shares a dst_path with another
+ // live file.
+ std::unordered_set<std::string> live_dst_paths;
+
+ std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
+ // Add a CopyOrCreateWorkItem to the channel for each live file
+ Status disabled = db->DisableFileDeletions();
+ DBOptions db_options = db->GetDBOptions();
+ Statistics* stats = db_options.statistics.get();
+ if (io_s.ok()) {
+ CheckpointImpl checkpoint(db);
+ uint64_t sequence_number = 0;
+ FileChecksumGenFactory* db_checksum_factory =
+ db_options.file_checksum_gen_factory.get();
+ const std::string kFileChecksumGenFactoryName =
+ "FileChecksumGenCrc32cFactory";
+ bool compare_checksum =
+ db_checksum_factory != nullptr &&
+ db_checksum_factory->Name() == kFileChecksumGenFactoryName
+ ? true
+ : false;
+ EnvOptions src_raw_env_options(db_options);
+ RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
+ io_s = status_to_io_status(checkpoint.CreateCustomCheckpoint(
+ [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
+ FileType) {
+ // custom checkpoint will switch to calling copy_file_cb after it sees
+ // NotSupported returned from link_file_cb.
+ return IOStatus::NotSupported();
+ } /* link_file_cb */,
+ [&](const std::string& src_dirname, const std::string& fname,
+ uint64_t size_limit_bytes, FileType type,
+ const std::string& checksum_func_name,
+ const std::string& checksum_val,
+ const Temperature src_temperature) {
+ if (type == kWalFile && !options_.backup_log_files) {
+ return IOStatus::OK();
+ }
+ Log(options_.info_log, "add file for backup %s", fname.c_str());
+ uint64_t size_bytes = 0;
+ IOStatus io_st;
+ if (type == kTableFile || type == kBlobFile) {
+ io_st = db_fs_->GetFileSize(src_dirname + "/" + fname, io_options_,
+ &size_bytes, nullptr);
+ if (!io_st.ok()) {
+ Log(options_.info_log, "GetFileSize is failed: %s",
+ io_st.ToString().c_str());
+ return io_st;
+ }
+ }
+ EnvOptions src_env_options;
+ switch (type) {
+ case kWalFile:
+ src_env_options =
+ db_env_->OptimizeForLogRead(src_raw_env_options);
+ break;
+ case kTableFile:
+ src_env_options = db_env_->OptimizeForCompactionTableRead(
+ src_raw_env_options, ImmutableDBOptions(db_options));
+ break;
+ case kDescriptorFile:
+ src_env_options =
+ db_env_->OptimizeForManifestRead(src_raw_env_options);
+ break;
+ case kBlobFile:
+ src_env_options = db_env_->OptimizeForBlobFileRead(
+ src_raw_env_options, ImmutableDBOptions(db_options));
+ break;
+ default:
+ // Other backed up files (like options file) are not read by live
+ // DB, so don't need to worry about avoiding mixing buffered and
+ // direct I/O. Just use plain defaults.
+ src_env_options = src_raw_env_options;
+ break;
+ }
+ io_st = AddBackupFileWorkItem(
+ live_dst_paths, backup_items_to_finish, new_backup_id,
+ options_.share_table_files &&
+ (type == kTableFile || type == kBlobFile),
+ src_dirname, fname, src_env_options, rate_limiter, type,
+ size_bytes, db_options.statistics.get(), size_limit_bytes,
+ options_.share_files_with_checksum &&
+ (type == kTableFile || type == kBlobFile),
+ options.progress_callback, "" /* contents */, checksum_func_name,
+ checksum_val, src_temperature);
+ return io_st;
+ } /* copy_file_cb */,
+ [&](const std::string& fname, const std::string& contents,
+ FileType type) {
+ Log(options_.info_log, "add file for backup %s", fname.c_str());
+ return AddBackupFileWorkItem(
+ live_dst_paths, backup_items_to_finish, new_backup_id,
+ false /* shared */, "" /* src_dir */, fname,
+ EnvOptions() /* src_env_options */, rate_limiter, type,
+ contents.size(), db_options.statistics.get(), 0 /* size_limit */,
+ false /* shared_checksum */, options.progress_callback, contents);
+ } /* create_file_cb */,
+ &sequence_number,
+ options.flush_before_backup ? 0 : std::numeric_limits<uint64_t>::max(),
+ compare_checksum));
+ if (io_s.ok()) {
+ new_backup->SetSequenceNumber(sequence_number);
+ }
+ }
+ ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
+ IOStatus item_io_status;
+ for (auto& item : backup_items_to_finish) {
+ item.result.wait();
+ auto result = item.result.get();
+ item_io_status = result.io_status;
+ Temperature temp = result.expected_src_temperature;
+ if (result.current_src_temperature != Temperature::kUnknown &&
+ (temp == Temperature::kUnknown ||
+ options_.current_temperatures_override_manifest)) {
+ temp = result.current_src_temperature;
+ }
+ if (item_io_status.ok() && item.shared && item.needed_to_copy) {
+ item_io_status = item.backup_env->GetFileSystem()->RenameFile(
+ item.dst_path_tmp, item.dst_path, io_options_, nullptr);
+ }
+ if (item_io_status.ok()) {
+ item_io_status = new_backup.get()->AddFile(std::make_shared<FileInfo>(
+ item.dst_relative, result.size, result.checksum_hex, result.db_id,
+ result.db_session_id, temp));
+ }
+ if (!item_io_status.ok()) {
+ io_s = item_io_status;
+ }
+ }
+
+ // we copied all the files, enable file deletions
+ if (disabled.ok()) { // If we successfully disabled file deletions
+ db->EnableFileDeletions(false).PermitUncheckedError();
+ }
+ auto backup_time = backup_env_->NowMicros() - start_backup;
+
+ if (io_s.ok()) {
+ // persist the backup metadata on the disk
+ io_s = new_backup->StoreToFile(options_.sync, options_.schema_version,
+ schema_test_options_.get());
+ }
+ if (io_s.ok() && options_.sync) {
+ std::unique_ptr<FSDirectory> backup_private_directory;
+ backup_fs_
+ ->NewDirectory(GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
+ io_options_, &backup_private_directory, nullptr)
+ .PermitUncheckedError();
+ if (backup_private_directory != nullptr) {
+ io_s = backup_private_directory->FsyncWithDirOptions(io_options_, nullptr,
+ DirFsyncOptions());
+ }
+ if (io_s.ok() && private_directory_ != nullptr) {
+ io_s = private_directory_->FsyncWithDirOptions(io_options_, nullptr,
+ DirFsyncOptions());
+ }
+ if (io_s.ok() && meta_directory_ != nullptr) {
+ io_s = meta_directory_->FsyncWithDirOptions(io_options_, nullptr,
+ DirFsyncOptions());
+ }
+ if (io_s.ok() && shared_directory_ != nullptr) {
+ io_s = shared_directory_->FsyncWithDirOptions(io_options_, nullptr,
+ DirFsyncOptions());
+ }
+ if (io_s.ok() && backup_directory_ != nullptr) {
+ io_s = backup_directory_->FsyncWithDirOptions(io_options_, nullptr,
+ DirFsyncOptions());
+ }
+ }
+
+ if (io_s.ok()) {
+ backup_statistics_.IncrementNumberSuccessBackup();
+ // here we know that we succeeded and installed the new backup
+ latest_backup_id_ = new_backup_id;
+ latest_valid_backup_id_ = new_backup_id;
+ if (new_backup_id_ptr) {
+ *new_backup_id_ptr = new_backup_id;
+ }
+ ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
+
+ // backup_speed is in byte/second
+ double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
+ ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
+ new_backup->GetNumberFiles());
+ char human_size[16];
+ AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
+ ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
+ ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
+ backup_time);
+ ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
+ ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
+ backup_statistics_.ToString().c_str());
+ } else {
+ backup_statistics_.IncrementNumberFailBackup();
+ // clean all the files we might have created
+ ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
+ io_s.ToString().c_str());
+ ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
+ backup_statistics_.ToString().c_str());
+ // delete files that we might have already written
+ might_need_garbage_collect_ = true;
+ DeleteBackup(new_backup_id).PermitUncheckedError();
+ }
+
+ RecordTick(stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read);
+ RecordTick(stats, BACKUP_WRITE_BYTES,
+ IOSTATS(bytes_written) - prev_bytes_written);
+ return io_s;
+}
+
+IOStatus BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
+ assert(initialized_);
+ assert(!read_only_);
+
+ // Best effort deletion even with errors
+ IOStatus overall_status = IOStatus::OK();
+
+ ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
+ num_backups_to_keep);
+ std::vector<BackupID> to_delete;
+ auto itr = backups_.begin();
+ while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
+ to_delete.push_back(itr->first);
+ itr++;
+ }
+ for (auto backup_id : to_delete) {
+ // Do not GC until end
+ IOStatus io_s = DeleteBackupNoGC(backup_id);
+ if (!io_s.ok()) {
+ overall_status = io_s;
+ }
+ }
+ // Clean up after any incomplete backup deletion, potentially from
+ // earlier session.
+ if (might_need_garbage_collect_) {
+ IOStatus io_s = GarbageCollect();
+ if (!io_s.ok() && overall_status.ok()) {
+ overall_status = io_s;
+ }
+ }
+ return overall_status;
+}
+
+IOStatus BackupEngineImpl::DeleteBackup(BackupID backup_id) {
+ IOStatus s1 = DeleteBackupNoGC(backup_id);
+ IOStatus s2 = IOStatus::OK();
+
+ // Clean up after any incomplete backup deletion, potentially from
+ // earlier session.
+ if (might_need_garbage_collect_) {
+ s2 = GarbageCollect();
+ }
+
+ if (!s1.ok()) {
+ // Any failure in the primary objective trumps any failure in the
+ // secondary objective.
+ s2.PermitUncheckedError();
+ return s1;
+ } else {
+ return s2;
+ }
+}
+
+// Does not auto-GarbageCollect nor lock
+IOStatus BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
+ assert(initialized_);
+ assert(!read_only_);
+
+ ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
+ auto backup = backups_.find(backup_id);
+ if (backup != backups_.end()) {
+ IOStatus io_s = backup->second->Delete();
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ backups_.erase(backup);
+ } else {
+ auto corrupt = corrupt_backups_.find(backup_id);
+ if (corrupt == corrupt_backups_.end()) {
+ return IOStatus::NotFound("Backup not found");
+ }
+ IOStatus io_s = corrupt->second.second->Delete();
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ corrupt->second.first.PermitUncheckedError();
+ corrupt_backups_.erase(corrupt);
+ }
+
+ // After removing meta file, best effort deletion even with errors.
+ // (Don't delete other files if we can't delete the meta file right
+ // now.)
+ std::vector<std::string> to_delete;
+ for (auto& itr : backuped_file_infos_) {
+ if (itr.second->refs == 0) {
+ IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(itr.first),
+ io_options_, nullptr);
+ ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
+ io_s.ToString().c_str());
+ to_delete.push_back(itr.first);
+ if (!io_s.ok()) {
+ // Trying again later might work
+ might_need_garbage_collect_ = true;
+ }
+ }
+ }
+ for (auto& td : to_delete) {
+ backuped_file_infos_.erase(td);
+ }
+
+ // take care of private dirs -- GarbageCollect() will take care of them
+ // if they are not empty
+ std::string private_dir = GetPrivateFileRel(backup_id);
+ IOStatus io_s =
+ backup_fs_->DeleteDir(GetAbsolutePath(private_dir), io_options_, nullptr);
+ ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
+ private_dir.c_str(), io_s.ToString().c_str());
+ if (!io_s.ok()) {
+ // Full gc or trying again later might work
+ might_need_garbage_collect_ = true;
+ }
+ return IOStatus::OK();
+}
+
+void BackupEngineImpl::SetBackupInfoFromBackupMeta(
+ BackupID id, const BackupMeta& meta, BackupInfo* backup_info,
+ bool include_file_details) const {
+ *backup_info = BackupInfo(id, meta.GetTimestamp(), meta.GetSize(),
+ meta.GetNumberFiles(), meta.GetAppMetadata());
+ std::string dir =
+ options_.backup_dir + "/" + kPrivateDirSlash + std::to_string(id);
+ if (include_file_details) {
+ auto& file_details = backup_info->file_details;
+ file_details.reserve(meta.GetFiles().size());
+ for (auto& file_ptr : meta.GetFiles()) {
+ BackupFileInfo& finfo = *file_details.emplace(file_details.end());
+ finfo.relative_filename = file_ptr->filename;
+ finfo.size = file_ptr->size;
+ finfo.directory = dir;
+ uint64_t number;
+ FileType type;
+ bool ok = ParseFileName(file_ptr->filename, &number, &type);
+ if (ok) {
+ finfo.file_number = number;
+ finfo.file_type = type;
+ }
+ // TODO: temperature, file_checksum, file_checksum_func_name
+ }
+ backup_info->name_for_open = GetAbsolutePath(GetPrivateFileRel(id));
+ backup_info->name_for_open.pop_back(); // remove trailing '/'
+ backup_info->env_for_open = meta.GetEnvForOpen();
+ }
+}
+
+Status BackupEngineImpl::GetBackupInfo(BackupID backup_id,
+ BackupInfo* backup_info,
+ bool include_file_details) const {
+ assert(initialized_);
+ if (backup_id == kLatestBackupIDMarker) {
+ // Note: Read latest_valid_backup_id_ inside of lock
+ backup_id = latest_valid_backup_id_;
+ }
+ auto corrupt_itr = corrupt_backups_.find(backup_id);
+ if (corrupt_itr != corrupt_backups_.end()) {
+ return Status::Corruption(corrupt_itr->second.first.ToString());
+ }
+ auto backup_itr = backups_.find(backup_id);
+ if (backup_itr == backups_.end()) {
+ return Status::NotFound("Backup not found");
+ }
+ auto& backup = backup_itr->second;
+ if (backup->Empty()) {
+ return Status::NotFound("Backup not found");
+ }
+
+ SetBackupInfoFromBackupMeta(backup_id, *backup, backup_info,
+ include_file_details);
+ return Status::OK();
+}
+
+void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info,
+ bool include_file_details) const {
+ assert(initialized_);
+ backup_info->resize(backups_.size());
+ size_t i = 0;
+ for (auto& backup : backups_) {
+ const BackupMeta& meta = *backup.second;
+ if (!meta.Empty()) {
+ SetBackupInfoFromBackupMeta(backup.first, meta, &backup_info->at(i++),
+ include_file_details);
+ }
+ }
+}
+
+void BackupEngineImpl::GetCorruptedBackups(
+ std::vector<BackupID>* corrupt_backup_ids) const {
+ assert(initialized_);
+ corrupt_backup_ids->reserve(corrupt_backups_.size());
+ for (auto& backup : corrupt_backups_) {
+ corrupt_backup_ids->push_back(backup.first);
+ }
+}
+
+IOStatus BackupEngineImpl::RestoreDBFromBackup(
+ const RestoreOptions& options, BackupID backup_id,
+ const std::string& db_dir, const std::string& wal_dir) const {
+ assert(initialized_);
+ if (backup_id == kLatestBackupIDMarker) {
+ // Note: Read latest_valid_backup_id_ inside of lock
+ backup_id = latest_valid_backup_id_;
+ }
+ auto corrupt_itr = corrupt_backups_.find(backup_id);
+ if (corrupt_itr != corrupt_backups_.end()) {
+ return corrupt_itr->second.first;
+ }
+ auto backup_itr = backups_.find(backup_id);
+ if (backup_itr == backups_.end()) {
+ return IOStatus::NotFound("Backup not found");
+ }
+ auto& backup = backup_itr->second;
+ if (backup->Empty()) {
+ return IOStatus::NotFound("Backup not found");
+ }
+
+ ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
+ ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
+ static_cast<int>(options.keep_log_files));
+
+ // just in case. Ignore errors
+ db_fs_->CreateDirIfMissing(db_dir, io_options_, nullptr)
+ .PermitUncheckedError();
+ db_fs_->CreateDirIfMissing(wal_dir, io_options_, nullptr)
+ .PermitUncheckedError();
+
+ if (options.keep_log_files) {
+ // delete files in db_dir, but keep all the log files
+ DeleteChildren(db_dir, 1 << kWalFile);
+ // move all the files from archive dir to wal_dir
+ std::string archive_dir = ArchivalDirectory(wal_dir);
+ std::vector<std::string> archive_files;
+ db_fs_->GetChildren(archive_dir, io_options_, &archive_files, nullptr)
+ .PermitUncheckedError(); // ignore errors
+ for (const auto& f : archive_files) {
+ uint64_t number;
+ FileType type;
+ bool ok = ParseFileName(f, &number, &type);
+ if (ok && type == kWalFile) {
+ ROCKS_LOG_INFO(options_.info_log,
+ "Moving log file from archive/ to wal_dir: %s",
+ f.c_str());
+ IOStatus io_s = db_fs_->RenameFile(
+ archive_dir + "/" + f, wal_dir + "/" + f, io_options_, nullptr);
+ if (!io_s.ok()) {
+ // if we can't move log file from archive_dir to wal_dir,
+ // we should fail, since it might mean data loss
+ return io_s;
+ }
+ }
+ }
+ } else {
+ DeleteChildren(wal_dir);
+ DeleteChildren(ArchivalDirectory(wal_dir));
+ DeleteChildren(db_dir);
+ }
+
+ IOStatus io_s;
+ std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
+ std::string temporary_current_file;
+ std::string final_current_file;
+ std::unique_ptr<FSDirectory> db_dir_for_fsync;
+ std::unique_ptr<FSDirectory> wal_dir_for_fsync;
+
+ for (const auto& file_info : backup->GetFiles()) {
+ const std::string& file = file_info->filename;
+ // 1. get DB filename
+ std::string dst = file_info->GetDbFileName();
+
+ // 2. find the filetype
+ uint64_t number;
+ FileType type;
+ bool ok = ParseFileName(dst, &number, &type);
+ if (!ok) {
+ return IOStatus::Corruption("Backup corrupted: Fail to parse filename " +
+ dst);
+ }
+ // 3. Construct the final path
+ // kWalFile lives in wal_dir and all the rest live in db_dir
+ if (type == kWalFile) {
+ dst = wal_dir + "/" + dst;
+ if (options_.sync && !wal_dir_for_fsync) {
+ io_s = db_fs_->NewDirectory(wal_dir, io_options_, &wal_dir_for_fsync,
+ nullptr);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+ } else {
+ dst = db_dir + "/" + dst;
+ if (options_.sync && !db_dir_for_fsync) {
+ io_s = db_fs_->NewDirectory(db_dir, io_options_, &db_dir_for_fsync,
+ nullptr);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+ }
+ // For atomicity, initially restore CURRENT file to a temporary name.
+ // This is useful even without options_.sync e.g. in case the restore
+ // process is interrupted.
+ if (type == kCurrentFile) {
+ final_current_file = dst;
+ dst = temporary_current_file = dst + ".tmp";
+ }
+
+ ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
+ dst.c_str());
+ CopyOrCreateWorkItem copy_or_create_work_item(
+ GetAbsolutePath(file), dst, Temperature::kUnknown /* src_temp */,
+ file_info->temp, "" /* contents */, backup_env_, db_env_,
+ EnvOptions() /* src_env_options */, options_.sync,
+ options_.restore_rate_limiter.get(), file_info->size,
+ nullptr /* stats */);
+ RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
+ copy_or_create_work_item.result.get_future(), file, dst,
+ file_info->checksum_hex);
+ files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
+ restore_items_to_finish.push_back(
+ std::move(after_copy_or_create_work_item));
+ }
+ IOStatus item_io_status;
+ for (auto& item : restore_items_to_finish) {
+ item.result.wait();
+ auto result = item.result.get();
+ item_io_status = result.io_status;
+ // Note: It is possible that both of the following bad-status cases occur
+ // during copying. But, we only return one status.
+ if (!item_io_status.ok()) {
+ io_s = item_io_status;
+ break;
+ } else if (!item.checksum_hex.empty() &&
+ item.checksum_hex != result.checksum_hex) {
+ io_s = IOStatus::Corruption(
+ "While restoring " + item.from_file + " -> " + item.to_file +
+ ": expected checksum is " + item.checksum_hex +
+ " while computed checksum is " + result.checksum_hex);
+ break;
+ }
+ }
+
+ // When enabled, the first FsyncWithDirOptions is to ensure all files are
+ // fully persisted before renaming CURRENT.tmp
+ if (io_s.ok() && db_dir_for_fsync) {
+ ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n");
+ io_s = db_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr,
+ DirFsyncOptions());
+ }
+
+ if (io_s.ok() && wal_dir_for_fsync) {
+ io_s = wal_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr,
+ DirFsyncOptions());
+ }
+
+ if (io_s.ok() && !temporary_current_file.empty()) {
+ ROCKS_LOG_INFO(options_.info_log, "Restore: atomic rename CURRENT.tmp\n");
+ assert(!final_current_file.empty());
+ io_s = db_fs_->RenameFile(temporary_current_file, final_current_file,
+ io_options_, nullptr);
+ }
+
+ if (io_s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) {
+ // Second FsyncWithDirOptions is to ensure the final atomic rename of DB
+ // restore is fully persisted even if power goes out right after restore
+ // operation returns success
+ assert(db_dir_for_fsync);
+ io_s = db_dir_for_fsync->FsyncWithDirOptions(
+ io_options_, nullptr, DirFsyncOptions(final_current_file));
+ }
+
+ ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
+ io_s.ToString().c_str());
+ return io_s;
+}
+
+IOStatus BackupEngineImpl::VerifyBackup(BackupID backup_id,
+ bool verify_with_checksum) const {
+ assert(initialized_);
+ // Check if backup_id is corrupted, or valid and registered
+ auto corrupt_itr = corrupt_backups_.find(backup_id);
+ if (corrupt_itr != corrupt_backups_.end()) {
+ return corrupt_itr->second.first;
+ }
+
+ auto backup_itr = backups_.find(backup_id);
+ if (backup_itr == backups_.end()) {
+ return IOStatus::NotFound();
+ }
+
+ auto& backup = backup_itr->second;
+ if (backup->Empty()) {
+ return IOStatus::NotFound();
+ }
+
+ ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
+
+ // Find all existing backup files belong to backup_id
+ std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
+ for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
+ GetSharedFileWithChecksumRel()}) {
+ const auto abs_dir = GetAbsolutePath(rel_dir);
+ // Shared directories allowed to be missing in some cases. Expected but
+ // missing files will be reported a few lines down.
+ ReadChildFileCurrentSizes(abs_dir, backup_fs_, &curr_abs_path_to_size)
+ .PermitUncheckedError();
+ }
+
+ // For all files registered in backup
+ for (const auto& file_info : backup->GetFiles()) {
+ const auto abs_path = GetAbsolutePath(file_info->filename);
+ // check existence of the file
+ if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
+ return IOStatus::NotFound("File missing: " + abs_path);
+ }
+ // verify file size
+ if (file_info->size != curr_abs_path_to_size[abs_path]) {
+ std::string size_info("Expected file size is " +
+ std::to_string(file_info->size) +
+ " while found file size is " +
+ std::to_string(curr_abs_path_to_size[abs_path]));
+ return IOStatus::Corruption("File corrupted: File size mismatch for " +
+ abs_path + ": " + size_info);
+ }
+ if (verify_with_checksum && !file_info->checksum_hex.empty()) {
+ // verify file checksum
+ std::string checksum_hex;
+ ROCKS_LOG_INFO(options_.info_log, "Verifying %s checksum...\n",
+ abs_path.c_str());
+ IOStatus io_s = ReadFileAndComputeChecksum(
+ abs_path, backup_fs_, EnvOptions(), 0 /* size_limit */, &checksum_hex,
+ Temperature::kUnknown);
+ if (!io_s.ok()) {
+ return io_s;
+ } else if (file_info->checksum_hex != checksum_hex) {
+ std::string checksum_info(
+ "Expected checksum is " + file_info->checksum_hex +
+ " while computed checksum is " + checksum_hex);
+ return IOStatus::Corruption("File corrupted: Checksum mismatch for " +
+ abs_path + ": " + checksum_info);
+ }
+ }
+ }
+ return IOStatus::OK();
+}
+
+IOStatus BackupEngineImpl::CopyOrCreateFile(
+ const std::string& src, const std::string& dst, const std::string& contents,
+ uint64_t size_limit, Env* src_env, Env* dst_env,
+ const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter,
+ std::function<void()> progress_callback, Temperature* src_temperature,
+ Temperature dst_temperature, uint64_t* bytes_toward_next_callback,
+ uint64_t* size, std::string* checksum_hex) {
+ assert(src.empty() != contents.empty());
+ IOStatus io_s;
+ std::unique_ptr<FSWritableFile> dst_file;
+ std::unique_ptr<FSSequentialFile> src_file;
+ FileOptions dst_file_options;
+ dst_file_options.use_mmap_writes = false;
+ dst_file_options.temperature = dst_temperature;
+ // TODO:(gzh) maybe use direct reads/writes here if possible
+ if (size != nullptr) {
+ *size = 0;
+ }
+ uint32_t checksum_value = 0;
+
+ // Check if size limit is set. if not, set it to very big number
+ if (size_limit == 0) {
+ size_limit = std::numeric_limits<uint64_t>::max();
+ }
+
+ io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options,
+ &dst_file, nullptr);
+ if (io_s.ok() && !src.empty()) {
+ auto src_file_options = FileOptions(src_env_options);
+ src_file_options.temperature = *src_temperature;
+ io_s = src_env->GetFileSystem()->NewSequentialFile(src, src_file_options,
+ &src_file, nullptr);
+ }
+ if (io_s.IsPathNotFound() && *src_temperature != Temperature::kUnknown) {
+ // Retry without temperature hint in case the FileSystem is strict with
+ // non-kUnknown temperature option
+ io_s = src_env->GetFileSystem()->NewSequentialFile(
+ src, FileOptions(src_env_options), &src_file, nullptr);
+ }
+ if (!io_s.ok()) {
+ return io_s;
+ }
+
+ size_t buf_size =
+ rate_limiter ? static_cast<size_t>(rate_limiter->GetSingleBurstBytes())
+ : kDefaultCopyFileBufferSize;
+
+ std::unique_ptr<WritableFileWriter> dest_writer(
+ new WritableFileWriter(std::move(dst_file), dst, dst_file_options));
+ std::unique_ptr<SequentialFileReader> src_reader;
+ std::unique_ptr<char[]> buf;
+ if (!src.empty()) {
+ // Return back current temperature in FileSystem
+ *src_temperature = src_file->GetTemperature();
+
+ src_reader.reset(new SequentialFileReader(
+ std::move(src_file), src, nullptr /* io_tracer */, {}, rate_limiter));
+ buf.reset(new char[buf_size]);
+ }
+
+ Slice data;
+ do {
+ if (stop_backup_.load(std::memory_order_acquire)) {
+ return status_to_io_status(Status::Incomplete("Backup stopped"));
+ }
+ if (!src.empty()) {
+ size_t buffer_to_read =
+ (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
+ io_s = src_reader->Read(buffer_to_read, &data, buf.get(),
+ Env::IO_LOW /* rate_limiter_priority */);
+ *bytes_toward_next_callback += data.size();
+ } else {
+ data = contents;
+ }
+ size_limit -= data.size();
+ TEST_SYNC_POINT_CALLBACK(
+ "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup",
+ (src.length() > 4 && src.rfind(".sst") == src.length() - 4) ? &data
+ : nullptr);
+
+ if (!io_s.ok()) {
+ return io_s;
+ }
+
+ if (size != nullptr) {
+ *size += data.size();
+ }
+ if (checksum_hex != nullptr) {
+ checksum_value = crc32c::Extend(checksum_value, data.data(), data.size());
+ }
+ io_s = dest_writer->Append(data);
+
+ if (rate_limiter != nullptr) {
+ if (!src.empty()) {
+ rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kWrite);
+ } else {
+ LoopRateLimitRequestHelper(data.size(), rate_limiter, Env::IO_LOW,
+ nullptr /* stats */,
+ RateLimiter::OpType::kWrite);
+ }
+ }
+ while (*bytes_toward_next_callback >=
+ options_.callback_trigger_interval_size) {
+ *bytes_toward_next_callback -= options_.callback_trigger_interval_size;
+ std::lock_guard<std::mutex> lock(byte_report_mutex_);
+ progress_callback();
+ }
+ } while (io_s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
+
+ // Convert uint32_t checksum to hex checksum
+ if (checksum_hex != nullptr) {
+ checksum_hex->assign(ChecksumInt32ToHex(checksum_value));
+ }
+
+ if (io_s.ok() && sync) {
+ io_s = dest_writer->Sync(false);
+ }
+ if (io_s.ok()) {
+ io_s = dest_writer->Close();
+ }
+ return io_s;
+}
+
+// fname will always start with "/"
+IOStatus BackupEngineImpl::AddBackupFileWorkItem(
+ std::unordered_set<std::string>& live_dst_paths,
+ std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
+ BackupID backup_id, bool shared, const std::string& src_dir,
+ const std::string& fname, const EnvOptions& src_env_options,
+ RateLimiter* rate_limiter, FileType file_type, uint64_t size_bytes,
+ Statistics* stats, uint64_t size_limit, bool shared_checksum,
+ std::function<void()> progress_callback, const std::string& contents,
+ const std::string& src_checksum_func_name,
+ const std::string& src_checksum_str, const Temperature src_temperature) {
+ assert(contents.empty() != src_dir.empty());
+
+ std::string src_path = src_dir + "/" + fname;
+ std::string dst_relative;
+ std::string dst_relative_tmp;
+ std::string db_id;
+ std::string db_session_id;
+ // crc32c checksum in hex. empty == unavailable / unknown
+ std::string checksum_hex;
+
+ // Whenever a default checksum function name is passed in, we will compares
+ // the corresponding checksum values after copying. Note that only table and
+ // blob files may have a known checksum function name passed in.
+ //
+ // If no default checksum function name is passed in and db session id is not
+ // available, we will calculate the checksum *before* copying in two cases
+ // (we always calcuate checksums when copying or creating for any file types):
+ // a) share_files_with_checksum is true and file type is table;
+ // b) share_table_files is true and the file exists already.
+ //
+ // Step 0: Check if default checksum function name is passed in
+ if (kDbFileChecksumFuncName == src_checksum_func_name) {
+ if (src_checksum_str == kUnknownFileChecksum) {
+ return status_to_io_status(
+ Status::Aborted("Unknown checksum value for " + fname));
+ }
+ checksum_hex = ChecksumStrToHex(src_checksum_str);
+ }
+
+ // Step 1: Prepare the relative path to destination
+ if (shared && shared_checksum) {
+ if (GetNamingNoFlags() != BackupEngineOptions::kLegacyCrc32cAndFileSize &&
+ file_type != kBlobFile) {
+ // Prepare db_session_id to add to the file name
+ // Ignore the returned status
+ // In the failed cases, db_id and db_session_id will be empty
+ GetFileDbIdentities(db_env_, src_env_options, src_path, src_temperature,
+ rate_limiter, &db_id, &db_session_id)
+ .PermitUncheckedError();
+ }
+ // Calculate checksum if checksum and db session id are not available.
+ // If db session id is available, we will not calculate the checksum
+ // since the session id should suffice to avoid file name collision in
+ // the shared_checksum directory.
+ if (checksum_hex.empty() && db_session_id.empty()) {
+ IOStatus io_s = ReadFileAndComputeChecksum(
+ src_path, db_fs_, src_env_options, size_limit, &checksum_hex,
+ src_temperature);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+ if (size_bytes == std::numeric_limits<uint64_t>::max()) {
+ return IOStatus::NotFound("File missing: " + src_path);
+ }
+ // dst_relative depends on the following conditions:
+ // 1) the naming scheme is kUseDbSessionId,
+ // 2) db_session_id is not empty,
+ // 3) checksum is available in the DB manifest.
+ // If 1,2,3) are satisfied, then dst_relative will be of the form:
+ // shared_checksum/<file_number>_<checksum>_<db_session_id>.sst
+ // If 1,2) are satisfied, then dst_relative will be of the form:
+ // shared_checksum/<file_number>_<db_session_id>.sst
+ // Otherwise, dst_relative is of the form
+ // shared_checksum/<file_number>_<checksum>_<size>.sst
+ //
+ // For blob files, db_session_id is not supported with the blob file format.
+ // It uses original/legacy naming scheme.
+ // dst_relative will be of the form:
+ // shared_checksum/<file_number>_<checksum>_<size>.blob
+ dst_relative = GetSharedFileWithChecksum(fname, checksum_hex, size_bytes,
+ db_session_id);
+ dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
+ dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
+ } else if (shared) {
+ dst_relative_tmp = GetSharedFileRel(fname, true);
+ dst_relative = GetSharedFileRel(fname, false);
+ } else {
+ dst_relative = GetPrivateFileRel(backup_id, false, fname);
+ }
+
+ // We copy into `temp_dest_path` and, once finished, rename it to
+ // `final_dest_path`. This allows files to atomically appear at
+ // `final_dest_path`. We can copy directly to the final path when atomicity
+ // is unnecessary, like for files in private backup directories.
+ const std::string* copy_dest_path;
+ std::string temp_dest_path;
+ std::string final_dest_path = GetAbsolutePath(dst_relative);
+ if (!dst_relative_tmp.empty()) {
+ temp_dest_path = GetAbsolutePath(dst_relative_tmp);
+ copy_dest_path = &temp_dest_path;
+ } else {
+ copy_dest_path = &final_dest_path;
+ }
+
+ // Step 2: Determine whether to copy or not
+ // if it's shared, we also need to check if it exists -- if it does, no need
+ // to copy it again.
+ bool need_to_copy = true;
+ // true if final_dest_path is the same path as another live file
+ const bool same_path =
+ live_dst_paths.find(final_dest_path) != live_dst_paths.end();
+
+ bool file_exists = false;
+ if (shared && !same_path) {
+ // Should be in shared directory but not a live path, check existence in
+ // shared directory
+ IOStatus exist =
+ backup_fs_->FileExists(final_dest_path, io_options_, nullptr);
+ if (exist.ok()) {
+ file_exists = true;
+ } else if (exist.IsNotFound()) {
+ file_exists = false;
+ } else {
+ return exist;
+ }
+ }
+
+ if (!contents.empty()) {
+ need_to_copy = false;
+ } else if (shared && (same_path || file_exists)) {
+ need_to_copy = false;
+ auto find_result = backuped_file_infos_.find(dst_relative);
+ if (find_result == backuped_file_infos_.end() && !same_path) {
+ // file exists but not referenced
+ ROCKS_LOG_INFO(
+ options_.info_log,
+ "%s already present, but not referenced by any backup. We will "
+ "overwrite the file.",
+ fname.c_str());
+ need_to_copy = true;
+ // Defer any failure reporting to when we try to write the file
+ backup_fs_->DeleteFile(final_dest_path, io_options_, nullptr)
+ .PermitUncheckedError();
+ } else {
+ // file exists and referenced
+ if (checksum_hex.empty()) {
+ // same_path should not happen for a standard DB, so OK to
+ // read file contents to check for checksum mismatch between
+ // two files from same DB getting same name.
+ // For compatibility with future meta file that might not have
+ // crc32c checksum available, consider it might be empty, but
+ // we don't currently generate meta file without crc32c checksum.
+ // Therefore we have to read & compute it if we don't have it.
+ if (!same_path && !find_result->second->checksum_hex.empty()) {
+ assert(find_result != backuped_file_infos_.end());
+ // Note: to save I/O on incremental backups, we copy prior known
+ // checksum of the file instead of reading entire file contents
+ // to recompute it.
+ checksum_hex = find_result->second->checksum_hex;
+ // Regarding corruption detection, consider:
+ // (a) the DB file is corrupt (since previous backup) and the backup
+ // file is OK: we failed to detect, but the backup is safe. DB can
+ // be repaired/restored once its corruption is detected.
+ // (b) the backup file is corrupt (since previous backup) and the
+ // db file is OK: we failed to detect, but the backup is corrupt.
+ // CreateNewBackup should support fast incremental backups and
+ // there's no way to support that without reading all the files.
+ // We might add an option for extra checks on incremental backup,
+ // but until then, use VerifyBackups to check existing backup data.
+ // (c) file name collision with legitimately different content.
+ // This is almost inconceivable with a well-generated DB session
+ // ID, but even in that case, we double check the file sizes in
+ // BackupMeta::AddFile.
+ } else {
+ IOStatus io_s = ReadFileAndComputeChecksum(
+ src_path, db_fs_, src_env_options, size_limit, &checksum_hex,
+ src_temperature);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+ }
+ if (!db_session_id.empty()) {
+ ROCKS_LOG_INFO(options_.info_log,
+ "%s already present, with checksum %s, size %" PRIu64
+ " and DB session identity %s",
+ fname.c_str(), checksum_hex.c_str(), size_bytes,
+ db_session_id.c_str());
+ } else {
+ ROCKS_LOG_INFO(options_.info_log,
+ "%s already present, with checksum %s and size %" PRIu64,
+ fname.c_str(), checksum_hex.c_str(), size_bytes);
+ }
+ }
+ }
+ live_dst_paths.insert(final_dest_path);
+
+ // Step 3: Add work item
+ if (!contents.empty() || need_to_copy) {
+ ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
+ copy_dest_path->c_str());
+ CopyOrCreateWorkItem copy_or_create_work_item(
+ src_dir.empty() ? "" : src_path, *copy_dest_path, src_temperature,
+ Temperature::kUnknown /*dst_temp*/, contents, db_env_, backup_env_,
+ src_env_options, options_.sync, rate_limiter, size_limit, stats,
+ progress_callback, src_checksum_func_name, checksum_hex, db_id,
+ db_session_id);
+ BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
+ copy_or_create_work_item.result.get_future(), shared, need_to_copy,
+ backup_env_, temp_dest_path, final_dest_path, dst_relative);
+ files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
+ backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
+ } else {
+ std::promise<CopyOrCreateResult> promise_result;
+ BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
+ promise_result.get_future(), shared, need_to_copy, backup_env_,
+ temp_dest_path, final_dest_path, dst_relative);
+ backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
+ CopyOrCreateResult result;
+ result.io_status = IOStatus::OK();
+ result.size = size_bytes;
+ result.checksum_hex = std::move(checksum_hex);
+ result.db_id = std::move(db_id);
+ result.db_session_id = std::move(db_session_id);
+ promise_result.set_value(std::move(result));
+ }
+ return IOStatus::OK();
+}
+
+IOStatus BackupEngineImpl::ReadFileAndComputeChecksum(
+ const std::string& src, const std::shared_ptr<FileSystem>& src_fs,
+ const EnvOptions& src_env_options, uint64_t size_limit,
+ std::string* checksum_hex, const Temperature src_temperature) const {
+ if (checksum_hex == nullptr) {
+ return status_to_io_status(Status::Aborted("Checksum pointer is null"));
+ }
+ uint32_t checksum_value = 0;
+ if (size_limit == 0) {
+ size_limit = std::numeric_limits<uint64_t>::max();
+ }
+
+ std::unique_ptr<SequentialFileReader> src_reader;
+ auto file_options = FileOptions(src_env_options);
+ file_options.temperature = src_temperature;
+ RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
+ IOStatus io_s = SequentialFileReader::Create(
+ src_fs, src, file_options, &src_reader, nullptr /* dbg */, rate_limiter);
+ if (io_s.IsPathNotFound() && src_temperature != Temperature::kUnknown) {
+ // Retry without temperature hint in case the FileSystem is strict with
+ // non-kUnknown temperature option
+ file_options.temperature = Temperature::kUnknown;
+ io_s = SequentialFileReader::Create(src_fs, src, file_options, &src_reader,
+ nullptr /* dbg */, rate_limiter);
+ }
+ if (!io_s.ok()) {
+ return io_s;
+ }
+
+ size_t buf_size = kDefaultCopyFileBufferSize;
+ std::unique_ptr<char[]> buf(new char[buf_size]);
+ Slice data;
+
+ do {
+ if (stop_backup_.load(std::memory_order_acquire)) {
+ return status_to_io_status(Status::Incomplete("Backup stopped"));
+ }
+ size_t buffer_to_read =
+ (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
+ io_s = src_reader->Read(buffer_to_read, &data, buf.get(),
+ Env::IO_LOW /* rate_limiter_priority */);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+
+ size_limit -= data.size();
+ checksum_value = crc32c::Extend(checksum_value, data.data(), data.size());
+ } while (data.size() > 0 && size_limit > 0);
+
+ checksum_hex->assign(ChecksumInt32ToHex(checksum_value));
+
+ return io_s;
+}
+
+Status BackupEngineImpl::GetFileDbIdentities(
+ Env* src_env, const EnvOptions& src_env_options,
+ const std::string& file_path, Temperature file_temp,
+ RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id) {
+ assert(db_id != nullptr || db_session_id != nullptr);
+
+ Options options;
+ options.env = src_env;
+ SstFileDumper sst_reader(options, file_path, file_temp,
+ 2 * 1024 * 1024
+ /* readahead_size */,
+ false /* verify_checksum */, false /* output_hex */,
+ false /* decode_blob_index */, src_env_options,
+ true /* silent */);
+
+ const TableProperties* table_properties = nullptr;
+ std::shared_ptr<const TableProperties> tp;
+ Status s = sst_reader.getStatus();
+
+ if (s.ok()) {
+ // Try to get table properties from the table reader of sst_reader
+ if (!sst_reader.ReadTableProperties(&tp).ok()) {
+ // Try to use table properites from the initialization of sst_reader
+ table_properties = sst_reader.GetInitTableProperties();
+ } else {
+ table_properties = tp.get();
+ if (table_properties != nullptr && rate_limiter != nullptr) {
+ // sizeof(*table_properties) is a sufficent but far-from-exact
+ // approximation of read bytes due to metaindex block, std::string
+ // properties and varint compression
+ LoopRateLimitRequestHelper(sizeof(*table_properties), rate_limiter,
+ Env::IO_LOW, nullptr /* stats */,
+ RateLimiter::OpType::kRead);
+ }
+ }
+ } else {
+ ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s",
+ file_path.c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ if (table_properties != nullptr) {
+ if (db_id != nullptr) {
+ db_id->assign(table_properties->db_id);
+ }
+ if (db_session_id != nullptr) {
+ db_session_id->assign(table_properties->db_session_id);
+ if (db_session_id->empty()) {
+ s = Status::NotFound("DB session identity not found in " + file_path);
+ ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str());
+ return s;
+ }
+ }
+ return Status::OK();
+ } else {
+ s = Status::Corruption("Table properties missing in " + file_path);
+ ROCKS_LOG_INFO(options_.info_log, "%s", s.ToString().c_str());
+ return s;
+ }
+}
+
+void BackupEngineImpl::LoopRateLimitRequestHelper(
+ const size_t total_bytes_to_request, RateLimiter* rate_limiter,
+ const Env::IOPriority pri, Statistics* stats,
+ const RateLimiter::OpType op_type) {
+ assert(rate_limiter != nullptr);
+ size_t remaining_bytes = total_bytes_to_request;
+ size_t request_bytes = 0;
+ while (remaining_bytes > 0) {
+ request_bytes =
+ std::min(static_cast<size_t>(rate_limiter->GetSingleBurstBytes()),
+ remaining_bytes);
+ rate_limiter->Request(request_bytes, pri, stats, op_type);
+ remaining_bytes -= request_bytes;
+ }
+}
+
+void BackupEngineImpl::DeleteChildren(const std::string& dir,
+ uint32_t file_type_filter) const {
+ std::vector<std::string> children;
+ db_fs_->GetChildren(dir, io_options_, &children, nullptr)
+ .PermitUncheckedError(); // ignore errors
+
+ for (const auto& f : children) {
+ uint64_t number;
+ FileType type;
+ bool ok = ParseFileName(f, &number, &type);
+ if (ok && (file_type_filter & (1 << type))) {
+ // don't delete this file
+ continue;
+ }
+ db_fs_->DeleteFile(dir + "/" + f, io_options_, nullptr)
+ .PermitUncheckedError(); // ignore errors
+ }
+}
+
+IOStatus BackupEngineImpl::ReadChildFileCurrentSizes(
+ const std::string& dir, const std::shared_ptr<FileSystem>& fs,
+ std::unordered_map<std::string, uint64_t>* result) const {
+ assert(result != nullptr);
+ std::vector<Env::FileAttributes> files_attrs;
+ IOStatus io_status = fs->FileExists(dir, io_options_, nullptr);
+ if (io_status.ok()) {
+ io_status =
+ fs->GetChildrenFileAttributes(dir, io_options_, &files_attrs, nullptr);
+ } else if (io_status.IsNotFound()) {
+ // Insert no entries can be considered success
+ io_status = IOStatus::OK();
+ }
+ const bool slash_needed = dir.empty() || dir.back() != '/';
+ for (const auto& file_attrs : files_attrs) {
+ result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
+ file_attrs.size_bytes);
+ }
+ return io_status;
+}
+
+IOStatus BackupEngineImpl::GarbageCollect() {
+ assert(!read_only_);
+
+ // We will make a best effort to remove all garbage even in the presence
+ // of inconsistencies or I/O failures that inhibit finding garbage.
+ IOStatus overall_status = IOStatus::OK();
+ // If all goes well, we don't need another auto-GC this session
+ might_need_garbage_collect_ = false;
+
+ ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
+
+ // delete obsolete shared files
+ for (bool with_checksum : {false, true}) {
+ std::vector<std::string> shared_children;
+ {
+ std::string shared_path;
+ if (with_checksum) {
+ shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
+ } else {
+ shared_path = GetAbsolutePath(GetSharedFileRel());
+ }
+ IOStatus io_s = backup_fs_->FileExists(shared_path, io_options_, nullptr);
+ if (io_s.ok()) {
+ io_s = backup_fs_->GetChildren(shared_path, io_options_,
+ &shared_children, nullptr);
+ } else if (io_s.IsNotFound()) {
+ io_s = IOStatus::OK();
+ }
+ if (!io_s.ok()) {
+ overall_status = io_s;
+ // Trying again later might work
+ might_need_garbage_collect_ = true;
+ }
+ }
+ for (auto& child : shared_children) {
+ std::string rel_fname;
+ if (with_checksum) {
+ rel_fname = GetSharedFileWithChecksumRel(child);
+ } else {
+ rel_fname = GetSharedFileRel(child);
+ }
+ auto child_itr = backuped_file_infos_.find(rel_fname);
+ // if it's not refcounted, delete it
+ if (child_itr == backuped_file_infos_.end() ||
+ child_itr->second->refs == 0) {
+ // this might be a directory, but DeleteFile will just fail in that
+ // case, so we're good
+ IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(rel_fname),
+ io_options_, nullptr);
+ ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
+ rel_fname.c_str(), io_s.ToString().c_str());
+ backuped_file_infos_.erase(rel_fname);
+ if (!io_s.ok()) {
+ // Trying again later might work
+ might_need_garbage_collect_ = true;
+ }
+ }
+ }
+ }
+
+ // delete obsolete private files
+ std::vector<std::string> private_children;
+ {
+ IOStatus io_s =
+ backup_fs_->GetChildren(GetAbsolutePath(kPrivateDirName), io_options_,
+ &private_children, nullptr);
+ if (!io_s.ok()) {
+ overall_status = io_s;
+ // Trying again later might work
+ might_need_garbage_collect_ = true;
+ }
+ }
+ for (auto& child : private_children) {
+ BackupID backup_id = 0;
+ bool tmp_dir = child.find(".tmp") != std::string::npos;
+ sscanf(child.c_str(), "%u", &backup_id);
+ if (!tmp_dir && // if it's tmp_dir, delete it
+ (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
+ // it's either not a number or it's still alive. continue
+ continue;
+ }
+ // here we have to delete the dir and all its children
+ std::string full_private_path =
+ GetAbsolutePath(GetPrivateFileRel(backup_id));
+ std::vector<std::string> subchildren;
+ if (backup_fs_
+ ->GetChildren(full_private_path, io_options_, &subchildren, nullptr)
+ .ok()) {
+ for (auto& subchild : subchildren) {
+ IOStatus io_s = backup_fs_->DeleteFile(full_private_path + subchild,
+ io_options_, nullptr);
+ ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
+ (full_private_path + subchild).c_str(),
+ io_s.ToString().c_str());
+ if (!io_s.ok()) {
+ // Trying again later might work
+ might_need_garbage_collect_ = true;
+ }
+ }
+ }
+ // finally delete the private dir
+ IOStatus io_s =
+ backup_fs_->DeleteDir(full_private_path, io_options_, nullptr);
+ ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
+ full_private_path.c_str(), io_s.ToString().c_str());
+ if (!io_s.ok()) {
+ // Trying again later might work
+ might_need_garbage_collect_ = true;
+ }
+ }
+
+ assert(overall_status.ok() || might_need_garbage_collect_);
+ return overall_status;
+}
+
+// ------- BackupMeta class --------
+
+IOStatus BackupEngineImpl::BackupMeta::AddFile(
+ std::shared_ptr<FileInfo> file_info) {
+ auto itr = file_infos_->find(file_info->filename);
+ if (itr == file_infos_->end()) {
+ auto ret = file_infos_->insert({file_info->filename, file_info});
+ if (ret.second) {
+ itr = ret.first;
+ itr->second->refs = 1;
+ } else {
+ // if this happens, something is seriously wrong
+ return IOStatus::Corruption("In memory metadata insertion error");
+ }
+ } else {
+ // Compare sizes, because we scanned that off the filesystem on both
+ // ends. This is like a check in VerifyBackup.
+ if (itr->second->size != file_info->size) {
+ std::string msg = "Size mismatch for existing backup file: ";
+ msg.append(file_info->filename);
+ msg.append(" Size in backup is " + std::to_string(itr->second->size) +
+ " while size in DB is " + std::to_string(file_info->size));
+ msg.append(
+ " If this DB file checks as not corrupt, try deleting old"
+ " backups or backing up to a different backup directory.");
+ return IOStatus::Corruption(msg);
+ }
+ if (file_info->checksum_hex.empty()) {
+ // No checksum available to check
+ } else if (itr->second->checksum_hex.empty()) {
+ // Remember checksum if newly acquired
+ itr->second->checksum_hex = file_info->checksum_hex;
+ } else if (itr->second->checksum_hex != file_info->checksum_hex) {
+ // Note: to save I/O, these will be equal trivially on already backed
+ // up files that don't have the checksum in their name. And it should
+ // never fail for files that do have checksum in their name.
+
+ // Should never reach here, but produce an appropriate corruption
+ // message in case we do in a release build.
+ assert(false);
+ std::string msg = "Checksum mismatch for existing backup file: ";
+ msg.append(file_info->filename);
+ msg.append(" Expected checksum is " + itr->second->checksum_hex +
+ " while computed checksum is " + file_info->checksum_hex);
+ msg.append(
+ " If this DB file checks as not corrupt, try deleting old"
+ " backups or backing up to a different backup directory.");
+ return IOStatus::Corruption(msg);
+ }
+ ++itr->second->refs; // increase refcount if already present
+ }
+
+ size_ += file_info->size;
+ files_.push_back(itr->second);
+
+ return IOStatus::OK();
+}
+
+IOStatus BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
+ IOStatus io_s;
+ for (const auto& file : files_) {
+ --file->refs; // decrease refcount
+ }
+ files_.clear();
+ // delete meta file
+ if (delete_meta) {
+ io_s = fs_->FileExists(meta_filename_, iooptions_, nullptr);
+ if (io_s.ok()) {
+ io_s = fs_->DeleteFile(meta_filename_, iooptions_, nullptr);
+ } else if (io_s.IsNotFound()) {
+ io_s = IOStatus::OK(); // nothing to delete
+ }
+ }
+ timestamp_ = 0;
+ return io_s;
+}
+
+// Constants for backup meta file schema (see LoadFromFile)
+const std::string kSchemaVersionPrefix{"schema_version "};
+const std::string kFooterMarker{"// FOOTER"};
+
+const std::string kAppMetaDataFieldName{"metadata"};
+
+// WART: The checksums are crc32c but named "crc32"
+const std::string kFileCrc32cFieldName{"crc32"};
+const std::string kFileSizeFieldName{"size"};
+const std::string kTemperatureFieldName{"temp"};
+
+// Marks a (future) field that should cause failure if not recognized.
+// Other fields are assumed to be ignorable. For example, in the future
+// we might add
+// ni::file_name_escape uri_percent
+// to indicate all file names have had spaces and special characters
+// escaped using a URI percent encoding.
+const std::string kNonIgnorableFieldPrefix{"ni::"};
+
+// Each backup meta file is of the format (schema version 1):
+//----------------------------------------------------------
+// <timestamp>
+// <seq number>
+// metadata <metadata> (optional)
+// <number of files>
+// <file1> crc32 <crc32c_as_unsigned_decimal>
+// <file2> crc32 <crc32c_as_unsigned_decimal>
+// ...
+//----------------------------------------------------------
+//
+// For schema version 2.x (not in public APIs, but
+// forward-compatibility started):
+//----------------------------------------------------------
+// schema_version <ver>
+// <timestamp>
+// <seq number>
+// [<field name> <field data>]
+// ...
+// <number of files>
+// <file1>( <field name> <field data no spaces>)*
+// <file2>( <field name> <field data no spaces>)*
+// ...
+// [// FOOTER]
+// [<field name> <field data>]
+// ...
+//----------------------------------------------------------
+// where
+// <ver> ::= [0-9]+([.][0-9]+)
+// <field name> ::= [A-Za-z_][A-Za-z_0-9.]+
+// <field data> is anything but newline
+// <field data no spaces> is anything but space and newline
+// Although "// FOOTER" wouldn't strictly be required as a delimiter
+// given the number of files is included, it is there for parsing
+// sanity in case of corruption. It is only required if followed
+// by footer fields, such as a checksum of the meta file (so far).
+// Unrecognized fields are ignored, to support schema evolution on
+// non-critical features with forward compatibility. Update schema
+// major version for breaking changes. Schema minor versions are indicated
+// only for diagnostic/debugging purposes.
+//
+// Fields in schema version 2.0:
+// * Top-level meta fields:
+// * Only "metadata" as in schema version 1
+// * File meta fields:
+// * "crc32" - a crc32c checksum as in schema version 1
+// * "size" - the size of the file (new)
+// * Footer meta fields:
+// * None yet (future use for meta file checksum anticipated)
+//
+IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
+ const std::string& backup_dir,
+ const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
+ RateLimiter* rate_limiter, Logger* info_log,
+ std::unordered_set<std::string>* reported_ignored_fields) {
+ assert(reported_ignored_fields);
+ assert(Empty());
+
+ std::unique_ptr<LineFileReader> backup_meta_reader;
+ {
+ IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(),
+ &backup_meta_reader,
+ nullptr /* dbg */, rate_limiter);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+
+ // If we don't read an explicit schema_version, that implies version 1,
+ // which is what we call the original backup meta schema.
+ int schema_major_version = 1;
+
+ // Failures handled at the end
+ std::string line;
+ if (backup_meta_reader->ReadLine(&line,
+ Env::IO_LOW /* rate_limiter_priority */)) {
+ if (StartsWith(line, kSchemaVersionPrefix)) {
+ std::string ver = line.substr(kSchemaVersionPrefix.size());
+ if (ver == "2" || StartsWith(ver, "2.")) {
+ schema_major_version = 2;
+ } else {
+ return IOStatus::NotSupported(
+ "Unsupported/unrecognized schema version: " + ver);
+ }
+ line.clear();
+ } else if (line.empty()) {
+ return IOStatus::Corruption("Unexpected empty line");
+ }
+ }
+ if (!line.empty()) {
+ timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
+ } else if (backup_meta_reader->ReadLine(
+ &line, Env::IO_LOW /* rate_limiter_priority */)) {
+ timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
+ }
+ if (backup_meta_reader->ReadLine(&line,
+ Env::IO_LOW /* rate_limiter_priority */)) {
+ sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
+ }
+ uint32_t num_files = UINT32_MAX;
+ while (backup_meta_reader->ReadLine(
+ &line, Env::IO_LOW /* rate_limiter_priority */)) {
+ if (line.empty()) {
+ return IOStatus::Corruption("Unexpected empty line");
+ }
+ // Number -> number of files -> exit loop reading optional meta fields
+ if (line[0] >= '0' && line[0] <= '9') {
+ num_files = static_cast<uint32_t>(strtoul(line.c_str(), nullptr, 10));
+ break;
+ }
+ // else, must be a meta field assignment
+ auto space_pos = line.find_first_of(' ');
+ if (space_pos == std::string::npos) {
+ return IOStatus::Corruption("Expected number of files or meta field");
+ }
+ std::string field_name = line.substr(0, space_pos);
+ std::string field_data = line.substr(space_pos + 1);
+ if (field_name == kAppMetaDataFieldName) {
+ // app metadata present
+ bool decode_success = Slice(field_data).DecodeHex(&app_metadata_);
+ if (!decode_success) {
+ return IOStatus::Corruption(
+ "Failed to decode stored hex encoded app metadata");
+ }
+ } else if (schema_major_version < 2) {
+ return IOStatus::Corruption("Expected number of files or \"" +
+ kAppMetaDataFieldName + "\" field");
+ } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
+ return IOStatus::NotSupported("Unrecognized non-ignorable meta field " +
+ field_name + " (from future version?)");
+ } else {
+ // Warn the first time we see any particular unrecognized meta field
+ if (reported_ignored_fields->insert("meta:" + field_name).second) {
+ ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup meta field %s",
+ field_name.c_str());
+ }
+ }
+ }
+ std::vector<std::shared_ptr<FileInfo>> files;
+ bool footer_present = false;
+ while (backup_meta_reader->ReadLine(
+ &line, Env::IO_LOW /* rate_limiter_priority */)) {
+ std::vector<std::string> components = StringSplit(line, ' ');
+
+ if (components.size() < 1) {
+ return IOStatus::Corruption("Empty line instead of file entry.");
+ }
+ if (schema_major_version >= 2 && components.size() == 2 &&
+ line == kFooterMarker) {
+ footer_present = true;
+ break;
+ }
+
+ const std::string& filename = components[0];
+
+ uint64_t actual_size;
+ const std::shared_ptr<FileInfo> file_info = GetFile(filename);
+ if (file_info) {
+ actual_size = file_info->size;
+ } else {
+ std::string abs_path = backup_dir + "/" + filename;
+ auto e = abs_path_to_size.find(abs_path);
+ if (e == abs_path_to_size.end()) {
+ return IOStatus::Corruption(
+ "Pathname in meta file not found on disk: " + abs_path);
+ }
+ actual_size = e->second;
+ }
+
+ if (schema_major_version >= 2) {
+ if (components.size() % 2 != 1) {
+ return IOStatus::Corruption(
+ "Bad number of line components for file entry.");
+ }
+ } else {
+ // Check restricted original schema
+ if (components.size() < 3) {
+ return IOStatus::Corruption("File checksum is missing for " + filename +
+ " in " + meta_filename_);
+ }
+ if (components[1] != kFileCrc32cFieldName) {
+ return IOStatus::Corruption("Unknown checksum type for " + filename +
+ " in " + meta_filename_);
+ }
+ if (components.size() > 3) {
+ return IOStatus::Corruption("Extra data for entry " + filename +
+ " in " + meta_filename_);
+ }
+ }
+
+ std::string checksum_hex;
+ Temperature temp = Temperature::kUnknown;
+ for (unsigned i = 1; i < components.size(); i += 2) {
+ const std::string& field_name = components[i];
+ const std::string& field_data = components[i + 1];
+
+ if (field_name == kFileCrc32cFieldName) {
+ uint32_t checksum_value =
+ static_cast<uint32_t>(strtoul(field_data.c_str(), nullptr, 10));
+ if (field_data != std::to_string(checksum_value)) {
+ return IOStatus::Corruption("Invalid checksum value for " + filename +
+ " in " + meta_filename_);
+ }
+ checksum_hex = ChecksumInt32ToHex(checksum_value);
+ } else if (field_name == kFileSizeFieldName) {
+ uint64_t ex_size =
+ std::strtoull(field_data.c_str(), nullptr, /*base*/ 10);
+ if (ex_size != actual_size) {
+ return IOStatus::Corruption(
+ "For file " + filename + " expected size " +
+ std::to_string(ex_size) + " but found size" +
+ std::to_string(actual_size));
+ }
+ } else if (field_name == kTemperatureFieldName) {
+ auto iter = temperature_string_map.find(field_data);
+ if (iter != temperature_string_map.end()) {
+ temp = iter->second;
+ } else {
+ // Could report corruption, but in case of new temperatures added
+ // in future, letting those map to kUnknown which should generally
+ // be safe.
+ temp = Temperature::kUnknown;
+ }
+ } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
+ return IOStatus::NotSupported("Unrecognized non-ignorable file field " +
+ field_name + " (from future version?)");
+ } else {
+ // Warn the first time we see any particular unrecognized file field
+ if (reported_ignored_fields->insert("file:" + field_name).second) {
+ ROCKS_LOG_WARN(info_log, "Ignoring unrecognized backup file field %s",
+ field_name.c_str());
+ }
+ }
+ }
+
+ files.emplace_back(new FileInfo(filename, actual_size, checksum_hex,
+ /*id*/ "", /*sid*/ "", temp));
+ }
+
+ if (footer_present) {
+ assert(schema_major_version >= 2);
+ while (backup_meta_reader->ReadLine(
+ &line, Env::IO_LOW /* rate_limiter_priority */)) {
+ if (line.empty()) {
+ return IOStatus::Corruption("Unexpected empty line");
+ }
+ auto space_pos = line.find_first_of(' ');
+ if (space_pos == std::string::npos) {
+ return IOStatus::Corruption("Expected footer field");
+ }
+ std::string field_name = line.substr(0, space_pos);
+ std::string field_data = line.substr(space_pos + 1);
+ if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
+ return IOStatus::NotSupported("Unrecognized non-ignorable field " +
+ field_name + " (from future version?)");
+ } else if (reported_ignored_fields->insert("footer:" + field_name)
+ .second) {
+ // Warn the first time we see any particular unrecognized footer field
+ ROCKS_LOG_WARN(info_log,
+ "Ignoring unrecognized backup meta footer field %s",
+ field_name.c_str());
+ }
+ }
+ }
+
+ {
+ IOStatus io_s = backup_meta_reader->GetStatus();
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+
+ if (num_files != files.size()) {
+ return IOStatus::Corruption(
+ "Inconsistent number of files or missing/incomplete header in " +
+ meta_filename_);
+ }
+
+ files_.reserve(files.size());
+ for (const auto& file_info : files) {
+ IOStatus io_s = AddFile(file_info);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+
+ return IOStatus::OK();
+}
+
+const std::vector<std::string> minor_version_strings{
+ "", // invalid major version 0
+ "", // implicit major version 1
+ "2.0",
+};
+
+IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
+ bool sync, int schema_version,
+ const TEST_BackupMetaSchemaOptions* schema_test_options) {
+ if (schema_version < 1) {
+ return IOStatus::InvalidArgument(
+ "BackupEngineOptions::schema_version must be >= 1");
+ }
+ if (schema_version > static_cast<int>(minor_version_strings.size() - 1)) {
+ return IOStatus::NotSupported(
+ "Only BackupEngineOptions::schema_version <= " +
+ std::to_string(minor_version_strings.size() - 1) + " is supported");
+ }
+ std::string ver = minor_version_strings[schema_version];
+
+ // Need schema_version >= 2 for TEST_BackupMetaSchemaOptions
+ assert(schema_version >= 2 || schema_test_options == nullptr);
+
+ IOStatus io_s;
+ std::unique_ptr<FSWritableFile> backup_meta_file;
+ FileOptions file_options;
+ file_options.use_mmap_writes = false;
+ file_options.use_direct_writes = false;
+ io_s = fs_->NewWritableFile(meta_tmp_filename_, file_options,
+ &backup_meta_file, nullptr);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+
+ std::ostringstream buf;
+ if (schema_test_options) {
+ // override for testing
+ ver = schema_test_options->version;
+ }
+ if (!ver.empty()) {
+ assert(schema_version >= 2);
+ buf << kSchemaVersionPrefix << ver << "\n";
+ }
+ buf << static_cast<unsigned long long>(timestamp_) << "\n";
+ buf << sequence_number_ << "\n";
+
+ if (!app_metadata_.empty()) {
+ std::string hex_encoded_metadata =
+ Slice(app_metadata_).ToString(/* hex */ true);
+ buf << kAppMetaDataFieldName << " " << hex_encoded_metadata << "\n";
+ }
+ if (schema_test_options) {
+ for (auto& e : schema_test_options->meta_fields) {
+ buf << e.first << " " << e.second << "\n";
+ }
+ }
+ buf << files_.size() << "\n";
+
+ for (const auto& file : files_) {
+ buf << file->filename;
+ if (schema_test_options == nullptr ||
+ schema_test_options->crc32c_checksums) {
+ // use crc32c for now, switch to something else if needed
+ buf << " " << kFileCrc32cFieldName << " "
+ << ChecksumHexToInt32(file->checksum_hex);
+ }
+ if (schema_version >= 2 && file->temp != Temperature::kUnknown) {
+ buf << " " << kTemperatureFieldName << " "
+ << temperature_to_string[file->temp];
+ }
+ if (schema_test_options && schema_test_options->file_sizes) {
+ buf << " " << kFileSizeFieldName << " " << std::to_string(file->size);
+ }
+ if (schema_test_options) {
+ for (auto& e : schema_test_options->file_fields) {
+ buf << " " << e.first << " " << e.second;
+ }
+ }
+ buf << "\n";
+ }
+
+ if (schema_test_options && !schema_test_options->footer_fields.empty()) {
+ buf << kFooterMarker << "\n";
+ for (auto& e : schema_test_options->footer_fields) {
+ buf << e.first << " " << e.second << "\n";
+ }
+ }
+
+ io_s = backup_meta_file->Append(Slice(buf.str()), iooptions_, nullptr);
+ IOSTATS_ADD(bytes_written, buf.str().size());
+ if (io_s.ok() && sync) {
+ io_s = backup_meta_file->Sync(iooptions_, nullptr);
+ }
+ if (io_s.ok()) {
+ io_s = backup_meta_file->Close(iooptions_, nullptr);
+ }
+ if (io_s.ok()) {
+ io_s = fs_->RenameFile(meta_tmp_filename_, meta_filename_, iooptions_,
+ nullptr);
+ }
+ return io_s;
+}
+} // namespace
+
+IOStatus BackupEngineReadOnly::Open(const BackupEngineOptions& options,
+ Env* env,
+ BackupEngineReadOnly** backup_engine_ptr) {
+ if (options.destroy_old_data) {
+ return IOStatus::InvalidArgument(
+ "Can't destroy old data with ReadOnly BackupEngine");
+ }
+ std::unique_ptr<BackupEngineImplThreadSafe> backup_engine(
+ new BackupEngineImplThreadSafe(options, env, true /*read_only*/));
+ auto s = backup_engine->Initialize();
+ if (!s.ok()) {
+ *backup_engine_ptr = nullptr;
+ return s;
+ }
+ *backup_engine_ptr = backup_engine.release();
+ return IOStatus::OK();
+}
+
+void TEST_SetBackupMetaSchemaOptions(
+ BackupEngine* engine, const TEST_BackupMetaSchemaOptions& options) {
+ BackupEngineImplThreadSafe* impl =
+ static_cast_with_check<BackupEngineImplThreadSafe>(engine);
+ impl->TEST_SetBackupMetaSchemaOptions(options);
+}
+
+void TEST_SetDefaultRateLimitersClock(
+ BackupEngine* engine,
+ const std::shared_ptr<SystemClock>& backup_rate_limiter_clock,
+ const std::shared_ptr<SystemClock>& restore_rate_limiter_clock) {
+ BackupEngineImplThreadSafe* impl =
+ static_cast_with_check<BackupEngineImplThreadSafe>(engine);
+ impl->TEST_SetDefaultRateLimitersClock(backup_rate_limiter_clock,
+ restore_rate_limiter_clock);
+}
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/backup/backup_engine_impl.h b/src/rocksdb/utilities/backup/backup_engine_impl.h
new file mode 100644
index 000000000..398f47f27
--- /dev/null
+++ b/src/rocksdb/utilities/backup/backup_engine_impl.h
@@ -0,0 +1,36 @@
+// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include "rocksdb/utilities/backup_engine.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+struct TEST_BackupMetaSchemaOptions {
+ std::string version = "2";
+ bool crc32c_checksums = false;
+ bool file_sizes = true;
+ std::map<std::string, std::string> meta_fields;
+ std::map<std::string, std::string> file_fields;
+ std::map<std::string, std::string> footer_fields;
+};
+
+// Modifies the BackupEngine(Impl) to write backup meta files using the
+// unpublished schema version 2, for the life of this object (not backup_dir).
+// TEST_BackupMetaSchemaOptions offers some customization for testing.
+void TEST_SetBackupMetaSchemaOptions(
+ BackupEngine* engine, const TEST_BackupMetaSchemaOptions& options);
+
+// Modifies the BackupEngine(Impl) to use specified clocks for backup and
+// restore rate limiters created by default if not specified by users for
+// test speedup.
+void TEST_SetDefaultRateLimitersClock(
+ BackupEngine* engine,
+ const std::shared_ptr<SystemClock>& backup_rate_limiter_clock = nullptr,
+ const std::shared_ptr<SystemClock>& restore_rate_limiter_clock = nullptr);
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/backup/backup_engine_test.cc b/src/rocksdb/utilities/backup/backup_engine_test.cc
new file mode 100644
index 000000000..d1f74f769
--- /dev/null
+++ b/src/rocksdb/utilities/backup/backup_engine_test.cc
@@ -0,0 +1,4219 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
+
+#include "rocksdb/utilities/backup_engine.h"
+
+#include <algorithm>
+#include <array>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <random>
+#include <string>
+#include <utility>
+
+#include "db/db_impl/db_impl.h"
+#include "db/db_test_util.h"
+#include "env/composite_env_wrapper.h"
+#include "env/env_chroot.h"
+#include "file/filename.h"
+#include "port/port.h"
+#include "port/stack_trace.h"
+#include "rocksdb/advanced_options.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_checksum.h"
+#include "rocksdb/rate_limiter.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/transaction_log.h"
+#include "rocksdb/types.h"
+#include "rocksdb/utilities/options_util.h"
+#include "rocksdb/utilities/stackable_db.h"
+#include "test_util/sync_point.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
+#include "util/cast_util.h"
+#include "util/mutexlock.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+#include "util/stderr_logger.h"
+#include "util/string_util.h"
+#include "utilities/backup/backup_engine_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+using ShareFilesNaming = BackupEngineOptions::ShareFilesNaming;
+const auto kLegacyCrc32cAndFileSize =
+ BackupEngineOptions::kLegacyCrc32cAndFileSize;
+const auto kUseDbSessionId = BackupEngineOptions::kUseDbSessionId;
+const auto kFlagIncludeFileSize = BackupEngineOptions::kFlagIncludeFileSize;
+const auto kNamingDefault = kUseDbSessionId | kFlagIncludeFileSize;
+
+class DummyDB : public StackableDB {
+ public:
+ /* implicit */
+ DummyDB(const Options& options, const std::string& dbname)
+ : StackableDB(nullptr),
+ options_(options),
+ dbname_(dbname),
+ deletions_enabled_(true),
+ sequence_number_(0) {}
+
+ SequenceNumber GetLatestSequenceNumber() const override {
+ return ++sequence_number_;
+ }
+
+ const std::string& GetName() const override { return dbname_; }
+
+ Env* GetEnv() const override { return options_.env; }
+
+ using DB::GetOptions;
+ Options GetOptions(ColumnFamilyHandle* /*column_family*/) const override {
+ return options_;
+ }
+
+ DBOptions GetDBOptions() const override { return DBOptions(options_); }
+
+ Status EnableFileDeletions(bool /*force*/) override {
+ EXPECT_TRUE(!deletions_enabled_);
+ deletions_enabled_ = true;
+ return Status::OK();
+ }
+
+ Status DisableFileDeletions() override {
+ EXPECT_TRUE(deletions_enabled_);
+ deletions_enabled_ = false;
+ return Status::OK();
+ }
+
+ ColumnFamilyHandle* DefaultColumnFamily() const override { return nullptr; }
+
+ Status GetLiveFilesStorageInfo(
+ const LiveFilesStorageInfoOptions& opts,
+ std::vector<LiveFileStorageInfo>* files) override {
+ uint64_t number;
+ FileType type;
+ files->clear();
+ for (auto& f : live_files_) {
+ bool success = ParseFileName(f, &number, &type);
+ if (!success) {
+ return Status::InvalidArgument("Bad file name: " + f);
+ }
+ files->emplace_back();
+ LiveFileStorageInfo& info = files->back();
+ info.relative_filename = f;
+ info.directory = dbname_;
+ info.file_number = number;
+ info.file_type = type;
+ if (type == kDescriptorFile) {
+ info.size = 100; // See TestFs::GetChildrenFileAttributes below
+ info.trim_to_size = true;
+ } else if (type == kCurrentFile) {
+ info.size = 0;
+ info.trim_to_size = true;
+ } else {
+ info.size = 200; // See TestFs::GetChildrenFileAttributes below
+ }
+ if (opts.include_checksum_info) {
+ info.file_checksum = kUnknownFileChecksum;
+ info.file_checksum_func_name = kUnknownFileChecksumFuncName;
+ }
+ }
+ return Status::OK();
+ }
+
+ // To avoid FlushWAL called on stacked db which is nullptr
+ Status FlushWAL(bool /*sync*/) override { return Status::OK(); }
+
+ std::vector<std::string> live_files_;
+
+ private:
+ Options options_;
+ std::string dbname_;
+ bool deletions_enabled_;
+ mutable SequenceNumber sequence_number_;
+}; // DummyDB
+
+class TestFs : public FileSystemWrapper {
+ public:
+ explicit TestFs(const std::shared_ptr<FileSystem>& t)
+ : FileSystemWrapper(t) {}
+ const char* Name() const override { return "TestFs"; }
+
+ class DummySequentialFile : public FSSequentialFile {
+ public:
+ explicit DummySequentialFile(bool fail_reads)
+ : FSSequentialFile(), rnd_(5), fail_reads_(fail_reads) {}
+ IOStatus Read(size_t n, const IOOptions&, Slice* result, char* scratch,
+ IODebugContext*) override {
+ if (fail_reads_) {
+ return IOStatus::IOError();
+ }
+ size_t read_size = (n > size_left) ? size_left : n;
+ for (size_t i = 0; i < read_size; ++i) {
+ scratch[i] = rnd_.Next() & 255;
+ }
+ *result = Slice(scratch, read_size);
+ size_left -= read_size;
+ return IOStatus::OK();
+ }
+
+ IOStatus Skip(uint64_t n) override {
+ size_left = (n > size_left) ? size_left - n : 0;
+ return IOStatus::OK();
+ }
+
+ private:
+ size_t size_left = 200;
+ Random rnd_;
+ bool fail_reads_;
+ };
+
+ IOStatus NewSequentialFile(const std::string& f, const FileOptions& file_opts,
+ std::unique_ptr<FSSequentialFile>* r,
+ IODebugContext* dbg) override {
+ MutexLock l(&mutex_);
+ if (dummy_sequential_file_) {
+ r->reset(
+ new TestFs::DummySequentialFile(dummy_sequential_file_fail_reads_));
+ return IOStatus::OK();
+ } else {
+ IOStatus s = FileSystemWrapper::NewSequentialFile(f, file_opts, r, dbg);
+ if (s.ok()) {
+ if ((*r)->use_direct_io()) {
+ ++num_direct_seq_readers_;
+ }
+ ++num_seq_readers_;
+ }
+ return s;
+ }
+ }
+
+ IOStatus NewWritableFile(const std::string& f, const FileOptions& file_opts,
+ std::unique_ptr<FSWritableFile>* r,
+ IODebugContext* dbg) override {
+ MutexLock l(&mutex_);
+ written_files_.push_back(f);
+ if (limit_written_files_ == 0) {
+ return IOStatus::NotSupported("Limit on written files reached");
+ }
+ limit_written_files_--;
+ IOStatus s = FileSystemWrapper::NewWritableFile(f, file_opts, r, dbg);
+ if (s.ok()) {
+ if ((*r)->use_direct_io()) {
+ ++num_direct_writers_;
+ }
+ ++num_writers_;
+ }
+ return s;
+ }
+
+ IOStatus NewRandomAccessFile(const std::string& f,
+ const FileOptions& file_opts,
+ std::unique_ptr<FSRandomAccessFile>* r,
+ IODebugContext* dbg) override {
+ MutexLock l(&mutex_);
+ IOStatus s = FileSystemWrapper::NewRandomAccessFile(f, file_opts, r, dbg);
+ if (s.ok()) {
+ if ((*r)->use_direct_io()) {
+ ++num_direct_rand_readers_;
+ }
+ ++num_rand_readers_;
+ }
+ return s;
+ }
+
+ IOStatus DeleteFile(const std::string& f, const IOOptions& options,
+ IODebugContext* dbg) override {
+ MutexLock l(&mutex_);
+ if (fail_delete_files_) {
+ return IOStatus::IOError();
+ }
+ EXPECT_GT(limit_delete_files_, 0U);
+ limit_delete_files_--;
+ return FileSystemWrapper::DeleteFile(f, options, dbg);
+ }
+
+ IOStatus DeleteDir(const std::string& d, const IOOptions& options,
+ IODebugContext* dbg) override {
+ MutexLock l(&mutex_);
+ if (fail_delete_files_) {
+ return IOStatus::IOError();
+ }
+ return FileSystemWrapper::DeleteDir(d, options, dbg);
+ }
+
+ void AssertWrittenFiles(std::vector<std::string>& should_have_written) {
+ MutexLock l(&mutex_);
+ std::sort(should_have_written.begin(), should_have_written.end());
+ std::sort(written_files_.begin(), written_files_.end());
+
+ ASSERT_EQ(should_have_written, written_files_);
+ }
+
+ void ClearWrittenFiles() {
+ MutexLock l(&mutex_);
+ written_files_.clear();
+ }
+
+ void SetLimitWrittenFiles(uint64_t limit) {
+ MutexLock l(&mutex_);
+ limit_written_files_ = limit;
+ }
+
+ void SetLimitDeleteFiles(uint64_t limit) {
+ MutexLock l(&mutex_);
+ limit_delete_files_ = limit;
+ }
+
+ void SetDeleteFileFailure(bool fail) {
+ MutexLock l(&mutex_);
+ fail_delete_files_ = fail;
+ }
+
+ void SetDummySequentialFile(bool dummy_sequential_file) {
+ MutexLock l(&mutex_);
+ dummy_sequential_file_ = dummy_sequential_file;
+ }
+ void SetDummySequentialFileFailReads(bool dummy_sequential_file_fail_reads) {
+ MutexLock l(&mutex_);
+ dummy_sequential_file_fail_reads_ = dummy_sequential_file_fail_reads;
+ }
+
+ void SetGetChildrenFailure(bool fail) { get_children_failure_ = fail; }
+ IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts,
+ std::vector<std::string>* r,
+ IODebugContext* dbg) override {
+ if (get_children_failure_) {
+ return IOStatus::IOError("SimulatedFailure");
+ }
+ return FileSystemWrapper::GetChildren(dir, io_opts, r, dbg);
+ }
+
+ // Some test cases do not actually create the test files (e.g., see
+ // DummyDB::live_files_) - for those cases, we mock those files' attributes
+ // so CreateNewBackup() can get their attributes.
+ void SetFilenamesForMockedAttrs(const std::vector<std::string>& filenames) {
+ filenames_for_mocked_attrs_ = filenames;
+ }
+ IOStatus GetChildrenFileAttributes(const std::string& dir,
+ const IOOptions& options,
+ std::vector<FileAttributes>* result,
+ IODebugContext* dbg) override {
+ if (filenames_for_mocked_attrs_.size() > 0) {
+ for (const auto& filename : filenames_for_mocked_attrs_) {
+ uint64_t size_bytes = 200; // Match TestFs
+ if (filename.find("MANIFEST") == 0) {
+ size_bytes = 100; // Match DummyDB::GetLiveFiles
+ }
+ result->push_back({dir + "/" + filename, size_bytes});
+ }
+ return IOStatus::OK();
+ }
+ return FileSystemWrapper::GetChildrenFileAttributes(dir, options, result,
+ dbg);
+ }
+
+ IOStatus GetFileSize(const std::string& f, const IOOptions& options,
+ uint64_t* s, IODebugContext* dbg) override {
+ if (filenames_for_mocked_attrs_.size() > 0) {
+ auto fname = f.substr(f.find_last_of('/') + 1);
+ auto filename_iter = std::find(filenames_for_mocked_attrs_.begin(),
+ filenames_for_mocked_attrs_.end(), fname);
+ if (filename_iter != filenames_for_mocked_attrs_.end()) {
+ *s = 200; // Match TestFs
+ if (fname.find("MANIFEST") == 0) {
+ *s = 100; // Match DummyDB::GetLiveFiles
+ }
+ return IOStatus::OK();
+ }
+ return IOStatus::NotFound(fname);
+ }
+ return FileSystemWrapper::GetFileSize(f, options, s, dbg);
+ }
+
+ void SetCreateDirIfMissingFailure(bool fail) {
+ create_dir_if_missing_failure_ = fail;
+ }
+ IOStatus CreateDirIfMissing(const std::string& d, const IOOptions& options,
+ IODebugContext* dbg) override {
+ if (create_dir_if_missing_failure_) {
+ return IOStatus::IOError("SimulatedFailure");
+ }
+ return FileSystemWrapper::CreateDirIfMissing(d, options, dbg);
+ }
+
+ void SetNewDirectoryFailure(bool fail) { new_directory_failure_ = fail; }
+ IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts,
+ std::unique_ptr<FSDirectory>* result,
+ IODebugContext* dbg) override {
+ if (new_directory_failure_) {
+ return IOStatus::IOError("SimulatedFailure");
+ }
+ return FileSystemWrapper::NewDirectory(name, io_opts, result, dbg);
+ }
+
+ void ClearFileOpenCounters() {
+ MutexLock l(&mutex_);
+ num_rand_readers_ = 0;
+ num_direct_rand_readers_ = 0;
+ num_seq_readers_ = 0;
+ num_direct_seq_readers_ = 0;
+ num_writers_ = 0;
+ num_direct_writers_ = 0;
+ }
+
+ int num_rand_readers() { return num_rand_readers_; }
+ int num_direct_rand_readers() { return num_direct_rand_readers_; }
+ int num_seq_readers() { return num_seq_readers_; }
+ int num_direct_seq_readers() { return num_direct_seq_readers_; }
+ int num_writers() { return num_writers_; }
+ // FIXME(?): unused
+ int num_direct_writers() { return num_direct_writers_; }
+
+ private:
+ port::Mutex mutex_;
+ bool dummy_sequential_file_ = false;
+ bool dummy_sequential_file_fail_reads_ = false;
+ std::vector<std::string> written_files_;
+ std::vector<std::string> filenames_for_mocked_attrs_;
+ uint64_t limit_written_files_ = 1000000;
+ uint64_t limit_delete_files_ = 1000000;
+ bool fail_delete_files_ = false;
+
+ bool get_children_failure_ = false;
+ bool create_dir_if_missing_failure_ = false;
+ bool new_directory_failure_ = false;
+
+ // Keeps track of how many files of each type were successfully opened, and
+ // out of those, how many were opened with direct I/O.
+ std::atomic<int> num_rand_readers_{};
+ std::atomic<int> num_direct_rand_readers_{};
+ std::atomic<int> num_seq_readers_{};
+ std::atomic<int> num_direct_seq_readers_{};
+ std::atomic<int> num_writers_{};
+ std::atomic<int> num_direct_writers_{};
+}; // TestFs
+
+class FileManager : public EnvWrapper {
+ public:
+ explicit FileManager(Env* t) : EnvWrapper(t), rnd_(5) {}
+ const char* Name() const override { return "FileManager"; }
+
+ Status GetRandomFileInDir(const std::string& dir, std::string* fname,
+ uint64_t* fsize) {
+ std::vector<FileAttributes> children;
+ auto s = GetChildrenFileAttributes(dir, &children);
+ if (!s.ok()) {
+ return s;
+ } else if (children.size() <= 2) { // . and ..
+ return Status::NotFound("Empty directory: " + dir);
+ }
+ assert(fname != nullptr);
+ while (true) {
+ int i = rnd_.Next() % children.size();
+ fname->assign(dir + "/" + children[i].name);
+ *fsize = children[i].size_bytes;
+ return Status::OK();
+ }
+ // should never get here
+ assert(false);
+ return Status::NotFound("");
+ }
+
+ Status DeleteRandomFileInDir(const std::string& dir) {
+ std::vector<std::string> children;
+ Status s = GetChildren(dir, &children);
+ if (!s.ok()) {
+ return s;
+ }
+ while (true) {
+ int i = rnd_.Next() % children.size();
+ return DeleteFile(dir + "/" + children[i]);
+ }
+ // should never get here
+ assert(false);
+ return Status::NotFound("");
+ }
+
+ Status AppendToRandomFileInDir(const std::string& dir,
+ const std::string& data) {
+ std::vector<std::string> children;
+ Status s = GetChildren(dir, &children);
+ if (!s.ok()) {
+ return s;
+ }
+ while (true) {
+ int i = rnd_.Next() % children.size();
+ return WriteToFile(dir + "/" + children[i], data);
+ }
+ // should never get here
+ assert(false);
+ return Status::NotFound("");
+ }
+
+ Status CorruptFile(const std::string& fname, uint64_t bytes_to_corrupt) {
+ std::string file_contents;
+ Status s = ReadFileToString(this, fname, &file_contents);
+ if (!s.ok()) {
+ return s;
+ }
+ s = DeleteFile(fname);
+ if (!s.ok()) {
+ return s;
+ }
+
+ for (uint64_t i = 0; i < bytes_to_corrupt; ++i) {
+ std::string tmp = rnd_.RandomString(1);
+ file_contents[rnd_.Next() % file_contents.size()] = tmp[0];
+ }
+ return WriteToFile(fname, file_contents);
+ }
+
+ Status CorruptFileStart(const std::string& fname) {
+ std::string to_xor = "blah";
+ std::string file_contents;
+ Status s = ReadFileToString(this, fname, &file_contents);
+ if (!s.ok()) {
+ return s;
+ }
+ s = DeleteFile(fname);
+ if (!s.ok()) {
+ return s;
+ }
+ for (size_t i = 0; i < to_xor.size(); ++i) {
+ file_contents[i] ^= to_xor[i];
+ }
+ return WriteToFile(fname, file_contents);
+ }
+
+ Status CorruptChecksum(const std::string& fname, bool appear_valid) {
+ std::string metadata;
+ Status s = ReadFileToString(this, fname, &metadata);
+ if (!s.ok()) {
+ return s;
+ }
+ s = DeleteFile(fname);
+ if (!s.ok()) {
+ return s;
+ }
+
+ auto pos = metadata.find("private");
+ if (pos == std::string::npos) {
+ return Status::Corruption("private file is expected");
+ }
+ pos = metadata.find(" crc32 ", pos + 6);
+ if (pos == std::string::npos) {
+ return Status::Corruption("checksum not found");
+ }
+
+ if (metadata.size() < pos + 7) {
+ return Status::Corruption("bad CRC32 checksum value");
+ }
+
+ if (appear_valid) {
+ if (metadata[pos + 8] == '\n') {
+ // single digit value, safe to insert one more digit
+ metadata.insert(pos + 8, 1, '0');
+ } else {
+ metadata.erase(pos + 8, 1);
+ }
+ } else {
+ metadata[pos + 7] = 'a';
+ }
+
+ return WriteToFile(fname, metadata);
+ }
+
+ Status WriteToFile(const std::string& fname, const std::string& data) {
+ std::unique_ptr<WritableFile> file;
+ EnvOptions env_options;
+ env_options.use_mmap_writes = false;
+ Status s = EnvWrapper::NewWritableFile(fname, &file, env_options);
+ if (!s.ok()) {
+ return s;
+ }
+ return file->Append(Slice(data));
+ }
+
+ private:
+ Random rnd_;
+}; // FileManager
+
+// utility functions
+namespace {
+
+enum FillDBFlushAction {
+ kFlushMost,
+ kFlushAll,
+ kAutoFlushOnly,
+};
+
+// Many tests in this file expect FillDB to write at least one sst file,
+// so the default behavior (if not kAutoFlushOnly) of FillDB is to force
+// a flush. But to ensure coverage of the WAL file case, we also (by default)
+// do one Put after the Flush (kFlushMost).
+size_t FillDB(DB* db, int from, int to,
+ FillDBFlushAction flush_action = kFlushMost) {
+ size_t bytes_written = 0;
+ for (int i = from; i < to; ++i) {
+ std::string key = "testkey" + std::to_string(i);
+ std::string value = "testvalue" + std::to_string(i);
+ bytes_written += key.size() + value.size();
+
+ EXPECT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
+
+ if (flush_action == kFlushMost && i == to - 2) {
+ EXPECT_OK(db->Flush(FlushOptions()));
+ }
+ }
+ if (flush_action == kFlushAll) {
+ EXPECT_OK(db->Flush(FlushOptions()));
+ }
+ return bytes_written;
+}
+
+void AssertExists(DB* db, int from, int to) {
+ for (int i = from; i < to; ++i) {
+ std::string key = "testkey" + std::to_string(i);
+ std::string value;
+ Status s = db->Get(ReadOptions(), Slice(key), &value);
+ ASSERT_EQ(value, "testvalue" + std::to_string(i));
+ }
+}
+
+void AssertEmpty(DB* db, int from, int to) {
+ for (int i = from; i < to; ++i) {
+ std::string key = "testkey" + std::to_string(i);
+ std::string value = "testvalue" + std::to_string(i);
+
+ Status s = db->Get(ReadOptions(), Slice(key), &value);
+ ASSERT_TRUE(s.IsNotFound());
+ }
+}
+} // namespace
+
+class BackupEngineTest : public testing::Test {
+ public:
+ enum ShareOption {
+ kNoShare,
+ kShareNoChecksum,
+ kShareWithChecksum,
+ };
+
+ const std::vector<ShareOption> kAllShareOptions = {kNoShare, kShareNoChecksum,
+ kShareWithChecksum};
+
+ BackupEngineTest() {
+ // set up files
+ std::string db_chroot = test::PerThreadDBPath("db_for_backup");
+ std::string backup_chroot = test::PerThreadDBPath("db_backups");
+ EXPECT_OK(Env::Default()->CreateDirIfMissing(db_chroot));
+ EXPECT_OK(Env::Default()->CreateDirIfMissing(backup_chroot));
+ dbname_ = "/tempdb";
+ backupdir_ = "/tempbk";
+ latest_backup_ = backupdir_ + "/LATEST_BACKUP";
+
+ // set up FileSystem & Envs
+ db_chroot_fs_ = NewChrootFileSystem(FileSystem::Default(), db_chroot);
+ backup_chroot_fs_ =
+ NewChrootFileSystem(FileSystem::Default(), backup_chroot);
+ test_db_fs_ = std::make_shared<TestFs>(db_chroot_fs_);
+ test_backup_fs_ = std::make_shared<TestFs>(backup_chroot_fs_);
+ SetEnvsFromFileSystems();
+
+ // set up db options
+ options_.create_if_missing = true;
+ options_.paranoid_checks = true;
+ options_.write_buffer_size = 1 << 17; // 128KB
+ options_.wal_dir = dbname_;
+ options_.enable_blob_files = true;
+
+ // The sync option is not easily testable in unit tests, but should be
+ // smoke tested across all the other backup tests. However, it is
+ // certainly not worth doubling the runtime of backup tests for it.
+ // Thus, we can enable sync for one of our alternate testing
+ // configurations.
+ constexpr bool kUseSync =
+#ifdef ROCKSDB_MODIFY_NPHASH
+ true;
+#else
+ false;
+#endif // ROCKSDB_MODIFY_NPHASH
+
+ // set up backup db options
+ engine_options_.reset(new BackupEngineOptions(
+ backupdir_, test_backup_env_.get(), /*share_table_files*/ true,
+ logger_.get(), kUseSync));
+
+ // most tests will use multi-threaded backups
+ engine_options_->max_background_operations = 7;
+
+ // delete old files in db
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ // delete old LATEST_BACKUP file, which some tests create for compatibility
+ // testing.
+ backup_chroot_env_->DeleteFile(latest_backup_).PermitUncheckedError();
+ }
+
+ void SetEnvsFromFileSystems() {
+ db_chroot_env_.reset(
+ new CompositeEnvWrapper(Env::Default(), db_chroot_fs_));
+ backup_chroot_env_.reset(
+ new CompositeEnvWrapper(Env::Default(), backup_chroot_fs_));
+ test_db_env_.reset(new CompositeEnvWrapper(Env::Default(), test_db_fs_));
+ options_.env = test_db_env_.get();
+ test_backup_env_.reset(
+ new CompositeEnvWrapper(Env::Default(), test_backup_fs_));
+ if (engine_options_) {
+ engine_options_->backup_env = test_backup_env_.get();
+ }
+ file_manager_.reset(new FileManager(backup_chroot_env_.get()));
+ db_file_manager_.reset(new FileManager(db_chroot_env_.get()));
+
+ // Create logger
+ DBOptions logger_options;
+ logger_options.env = db_chroot_env_.get();
+ ASSERT_OK(CreateLoggerFromOptions(dbname_, logger_options, &logger_));
+ }
+
+ DB* OpenDB() {
+ DB* db;
+ EXPECT_OK(DB::Open(options_, dbname_, &db));
+ return db;
+ }
+
+ void CloseAndReopenDB(bool read_only = false) {
+ // Close DB
+ db_.reset();
+
+ // Open DB
+ test_db_fs_->SetLimitWrittenFiles(1000000);
+ DB* db;
+ if (read_only) {
+ ASSERT_OK(DB::OpenForReadOnly(options_, dbname_, &db));
+ } else {
+ ASSERT_OK(DB::Open(options_, dbname_, &db));
+ }
+ db_.reset(db);
+ }
+
+ void InitializeDBAndBackupEngine(bool dummy = false) {
+ // reset all the db env defaults
+ test_db_fs_->SetLimitWrittenFiles(1000000);
+ test_db_fs_->SetDummySequentialFile(dummy);
+
+ DB* db;
+ if (dummy) {
+ dummy_db_ = new DummyDB(options_, dbname_);
+ db = dummy_db_;
+ } else {
+ ASSERT_OK(DB::Open(options_, dbname_, &db));
+ }
+ db_.reset(db);
+ }
+
+ virtual void OpenDBAndBackupEngine(
+ bool destroy_old_data = false, bool dummy = false,
+ ShareOption shared_option = kShareNoChecksum) {
+ InitializeDBAndBackupEngine(dummy);
+ // reset backup env defaults
+ test_backup_fs_->SetLimitWrittenFiles(1000000);
+ engine_options_->destroy_old_data = destroy_old_data;
+ engine_options_->share_table_files = shared_option != kNoShare;
+ engine_options_->share_files_with_checksum =
+ shared_option == kShareWithChecksum;
+ OpenBackupEngine(destroy_old_data);
+ }
+
+ void CloseDBAndBackupEngine() {
+ db_.reset();
+ backup_engine_.reset();
+ }
+
+ void OpenBackupEngine(bool destroy_old_data = false) {
+ engine_options_->destroy_old_data = destroy_old_data;
+ engine_options_->info_log = logger_.get();
+ BackupEngine* backup_engine;
+ ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *engine_options_,
+ &backup_engine));
+ backup_engine_.reset(backup_engine);
+ }
+
+ void CloseBackupEngine() { backup_engine_.reset(nullptr); }
+
+ // cross-cutting test of GetBackupInfo
+ void AssertBackupInfoConsistency() {
+ std::vector<BackupInfo> backup_info;
+ backup_engine_->GetBackupInfo(&backup_info, /*with file details*/ true);
+ std::map<std::string, uint64_t> file_sizes;
+
+ // Find the files that are supposed to be there
+ for (auto& backup : backup_info) {
+ uint64_t sum_for_backup = 0;
+ for (auto& file : backup.file_details) {
+ auto e = file_sizes.find(file.relative_filename);
+ if (e == file_sizes.end()) {
+ // fprintf(stderr, "Adding %s -> %u\n",
+ // file.relative_filename.c_str(), (unsigned)file.size);
+ file_sizes[file.relative_filename] = file.size;
+ } else {
+ ASSERT_EQ(file_sizes[file.relative_filename], file.size);
+ }
+ sum_for_backup += file.size;
+ }
+ ASSERT_EQ(backup.size, sum_for_backup);
+ }
+
+ std::vector<BackupID> corrupt_backup_ids;
+ backup_engine_->GetCorruptedBackups(&corrupt_backup_ids);
+ bool has_corrupt = corrupt_backup_ids.size() > 0;
+
+ // Compare with what's in backup dir
+ std::vector<std::string> child_dirs;
+ ASSERT_OK(
+ test_backup_env_->GetChildren(backupdir_ + "/private", &child_dirs));
+ for (auto& dir : child_dirs) {
+ dir = "private/" + dir;
+ }
+ child_dirs.push_back("shared"); // might not exist
+ child_dirs.push_back("shared_checksum"); // might not exist
+ for (auto& dir : child_dirs) {
+ std::vector<std::string> children;
+ test_backup_env_->GetChildren(backupdir_ + "/" + dir, &children)
+ .PermitUncheckedError();
+ // fprintf(stderr, "ls %s\n", (backupdir_ + "/" + dir).c_str());
+ for (auto& file : children) {
+ uint64_t size;
+ size = UINT64_MAX; // appease clang-analyze
+ std::string rel_file = dir + "/" + file;
+ // fprintf(stderr, "stat %s\n", (backupdir_ + "/" + rel_file).c_str());
+ ASSERT_OK(
+ test_backup_env_->GetFileSize(backupdir_ + "/" + rel_file, &size));
+ auto e = file_sizes.find(rel_file);
+ if (e == file_sizes.end()) {
+ // The only case in which we should find files not reported
+ ASSERT_TRUE(has_corrupt);
+ } else {
+ ASSERT_EQ(e->second, size);
+ file_sizes.erase(e);
+ }
+ }
+ }
+
+ // Everything should have been matched
+ ASSERT_EQ(file_sizes.size(), 0);
+ }
+
+ // restores backup backup_id and asserts the existence of
+ // [start_exist, end_exist> and not-existence of
+ // [end_exist, end>
+ //
+ // if backup_id == 0, it means restore from latest
+ // if end == 0, don't check AssertEmpty
+ void AssertBackupConsistency(BackupID backup_id, uint32_t start_exist,
+ uint32_t end_exist, uint32_t end = 0,
+ bool keep_log_files = false) {
+ RestoreOptions restore_options(keep_log_files);
+ bool opened_backup_engine = false;
+ if (backup_engine_.get() == nullptr) {
+ opened_backup_engine = true;
+ OpenBackupEngine();
+ }
+ AssertBackupInfoConsistency();
+
+ // Now perform restore
+ if (backup_id > 0) {
+ ASSERT_OK(backup_engine_->RestoreDBFromBackup(backup_id, dbname_, dbname_,
+ restore_options));
+ } else {
+ ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_,
+ restore_options));
+ }
+ DB* db = OpenDB();
+ // Check DB contents
+ AssertExists(db, start_exist, end_exist);
+ if (end != 0) {
+ AssertEmpty(db, end_exist, end);
+ }
+ delete db;
+ if (opened_backup_engine) {
+ CloseBackupEngine();
+ }
+ }
+
+ void DeleteLogFiles() {
+ std::vector<std::string> delete_logs;
+ ASSERT_OK(db_chroot_env_->GetChildren(dbname_, &delete_logs));
+ for (auto f : delete_logs) {
+ uint64_t number;
+ FileType type;
+ bool ok = ParseFileName(f, &number, &type);
+ if (ok && type == kWalFile) {
+ ASSERT_OK(db_chroot_env_->DeleteFile(dbname_ + "/" + f));
+ }
+ }
+ }
+
+ Status GetDataFilesInDB(const FileType& file_type,
+ std::vector<FileAttributes>* files) {
+ std::vector<std::string> live;
+ uint64_t ignore_manifest_size;
+ Status s = db_->GetLiveFiles(live, &ignore_manifest_size, /*flush*/ false);
+ if (!s.ok()) {
+ return s;
+ }
+ std::vector<FileAttributes> children;
+ s = test_db_env_->GetChildrenFileAttributes(dbname_, &children);
+ for (const auto& child : children) {
+ FileType type;
+ uint64_t number = 0;
+ if (ParseFileName(child.name, &number, &type) && type == file_type &&
+ std::find(live.begin(), live.end(), "/" + child.name) != live.end()) {
+ files->push_back(child);
+ }
+ }
+ return s;
+ }
+
+ Status GetRandomDataFileInDB(const FileType& file_type,
+ std::string* fname_out,
+ uint64_t* fsize_out = nullptr) {
+ Random rnd(6); // NB: hardly "random"
+ std::vector<FileAttributes> files;
+ Status s = GetDataFilesInDB(file_type, &files);
+ if (!s.ok()) {
+ return s;
+ }
+ if (files.empty()) {
+ return Status::NotFound("");
+ }
+ size_t i = rnd.Uniform(static_cast<int>(files.size()));
+ *fname_out = dbname_ + "/" + files[i].name;
+ if (fsize_out) {
+ *fsize_out = files[i].size_bytes;
+ }
+ return Status::OK();
+ }
+
+ Status CorruptRandomDataFileInDB(const FileType& file_type) {
+ std::string fname;
+ uint64_t fsize = 0;
+ Status s = GetRandomDataFileInDB(file_type, &fname, &fsize);
+ if (!s.ok()) {
+ return s;
+ }
+
+ std::string file_contents;
+ s = ReadFileToString(test_db_env_.get(), fname, &file_contents);
+ if (!s.ok()) {
+ return s;
+ }
+ s = test_db_env_->DeleteFile(fname);
+ if (!s.ok()) {
+ return s;
+ }
+
+ file_contents[0] = (file_contents[0] + 257) % 256;
+ return WriteStringToFile(test_db_env_.get(), file_contents, fname);
+ }
+
+ void AssertDirectoryFilesMatchRegex(const std::string& dir,
+ const TestRegex& pattern,
+ const std::string& file_type,
+ int minimum_count) {
+ std::vector<FileAttributes> children;
+ ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children));
+ int found_count = 0;
+ for (const auto& child : children) {
+ if (EndsWith(child.name, file_type)) {
+ ASSERT_MATCHES_REGEX(child.name, pattern);
+ ++found_count;
+ }
+ }
+ ASSERT_GE(found_count, minimum_count);
+ }
+
+ void AssertDirectoryFilesSizeIndicators(const std::string& dir,
+ int minimum_count) {
+ std::vector<FileAttributes> children;
+ ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children));
+ int found_count = 0;
+ for (const auto& child : children) {
+ auto last_underscore = child.name.find_last_of('_');
+ auto last_dot = child.name.find_last_of('.');
+ ASSERT_NE(child.name, child.name.substr(0, last_underscore));
+ ASSERT_NE(child.name, child.name.substr(0, last_dot));
+ ASSERT_LT(last_underscore, last_dot);
+ std::string s = child.name.substr(last_underscore + 1,
+ last_dot - (last_underscore + 1));
+ ASSERT_EQ(s, std::to_string(child.size_bytes));
+ ++found_count;
+ }
+ ASSERT_GE(found_count, minimum_count);
+ }
+
+ // files
+ std::string dbname_;
+ std::string backupdir_;
+ std::string latest_backup_;
+
+ // logger_ must be above backup_engine_ such that the engine's destructor,
+ // which uses a raw pointer to the logger, executes first.
+ std::shared_ptr<Logger> logger_;
+
+ // FileSystems
+ std::shared_ptr<FileSystem> db_chroot_fs_;
+ std::shared_ptr<FileSystem> backup_chroot_fs_;
+ std::shared_ptr<TestFs> test_db_fs_;
+ std::shared_ptr<TestFs> test_backup_fs_;
+
+ // Env wrappers
+ std::unique_ptr<Env> db_chroot_env_;
+ std::unique_ptr<Env> backup_chroot_env_;
+ std::unique_ptr<Env> test_db_env_;
+ std::unique_ptr<Env> test_backup_env_;
+ std::unique_ptr<FileManager> file_manager_;
+ std::unique_ptr<FileManager> db_file_manager_;
+
+ // all the dbs!
+ DummyDB* dummy_db_; // owned as db_ when present
+ std::unique_ptr<DB> db_;
+ std::unique_ptr<BackupEngine> backup_engine_;
+
+ // options
+ Options options_;
+
+ protected:
+ void DestroyDBWithoutCheck(const std::string& dbname,
+ const Options& options) {
+ // DestroyDB may fail because the db might not be existed for some tests
+ DestroyDB(dbname, options).PermitUncheckedError();
+ }
+
+ std::unique_ptr<BackupEngineOptions> engine_options_;
+}; // BackupEngineTest
+
+void AppendPath(const std::string& path, std::vector<std::string>& v) {
+ for (auto& f : v) {
+ f = path + f;
+ }
+}
+
+class BackupEngineTestWithParam : public BackupEngineTest,
+ public testing::WithParamInterface<bool> {
+ public:
+ BackupEngineTestWithParam() {
+ engine_options_->share_files_with_checksum = GetParam();
+ }
+ void OpenDBAndBackupEngine(
+ bool destroy_old_data = false, bool dummy = false,
+ ShareOption shared_option = kShareNoChecksum) override {
+ BackupEngineTest::InitializeDBAndBackupEngine(dummy);
+ // reset backup env defaults
+ test_backup_fs_->SetLimitWrittenFiles(1000000);
+ engine_options_->destroy_old_data = destroy_old_data;
+ engine_options_->share_table_files = shared_option != kNoShare;
+ // NOTE: keep share_files_with_checksum setting from constructor
+ OpenBackupEngine(destroy_old_data);
+ }
+};
+
+TEST_F(BackupEngineTest, FileCollision) {
+ const int keys_iteration = 100;
+ for (const auto& sopt : kAllShareOptions) {
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, sopt);
+ FillDB(db_.get(), 0, keys_iteration);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ FillDB(db_.get(), keys_iteration, keys_iteration * 2);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+
+ // If the db directory has been cleaned up, it is sensitive to file
+ // collision.
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ // open fresh DB, but old backups present
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */,
+ sopt);
+ FillDB(db_.get(), 0, keys_iteration);
+ ASSERT_OK(db_->Flush(FlushOptions())); // like backup would do
+ FillDB(db_.get(), keys_iteration, keys_iteration * 2);
+ if (sopt != kShareNoChecksum) {
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ } else {
+ // The new table files created in FillDB() will clash with the old
+ // backup and sharing tables with no checksum will have the file
+ // collision problem.
+ ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get()));
+ ASSERT_OK(backup_engine_->PurgeOldBackups(0));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ }
+ CloseDBAndBackupEngine();
+
+ // delete old data
+ DestroyDBWithoutCheck(dbname_, options_);
+ }
+}
+
+// This test verifies that the verifyBackup method correctly identifies
+// invalid backups
+TEST_P(BackupEngineTestWithParam, VerifyBackup) {
+ const int keys_iteration = 5000;
+ OpenDBAndBackupEngine(true);
+ // create five backups
+ for (int i = 0; i < 5; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ }
+ CloseDBAndBackupEngine();
+
+ OpenDBAndBackupEngine();
+ // ---------- case 1. - valid backup -----------
+ ASSERT_TRUE(backup_engine_->VerifyBackup(1).ok());
+
+ // ---------- case 2. - delete a file -----------i
+ ASSERT_OK(file_manager_->DeleteRandomFileInDir(backupdir_ + "/private/1"));
+ ASSERT_TRUE(backup_engine_->VerifyBackup(1).IsNotFound());
+
+ // ---------- case 3. - corrupt a file -----------
+ std::string append_data = "Corrupting a random file";
+ ASSERT_OK(file_manager_->AppendToRandomFileInDir(backupdir_ + "/private/2",
+ append_data));
+ ASSERT_TRUE(backup_engine_->VerifyBackup(2).IsCorruption());
+
+ // ---------- case 4. - invalid backup -----------
+ ASSERT_TRUE(backup_engine_->VerifyBackup(6).IsNotFound());
+ CloseDBAndBackupEngine();
+}
+
+#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
+// open DB, write, close DB, backup, restore, repeat
+TEST_P(BackupEngineTestWithParam, OfflineIntegrationTest) {
+ // has to be a big number, so that it triggers the memtable flush
+ const int keys_iteration = 5000;
+ const int max_key = keys_iteration * 4 + 10;
+ // first iter -- flush before backup
+ // second iter -- don't flush before backup
+ for (int iter = 0; iter < 2; ++iter) {
+ // delete old data
+ DestroyDBWithoutCheck(dbname_, options_);
+ bool destroy_data = true;
+
+ // every iteration --
+ // 1. insert new data in the DB
+ // 2. backup the DB
+ // 3. destroy the db
+ // 4. restore the db, check everything is still there
+ for (int i = 0; i < 5; ++i) {
+ // in last iteration, put smaller amount of data,
+ int fill_up_to = std::min(keys_iteration * (i + 1), max_key);
+ // ---- insert new data and back up ----
+ OpenDBAndBackupEngine(destroy_data);
+ destroy_data = false;
+ // kAutoFlushOnly to preserve legacy test behavior (consider updating)
+ FillDB(db_.get(), keys_iteration * i, fill_up_to, kAutoFlushOnly);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), iter == 0))
+ << "iter: " << iter << ", idx: " << i;
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ // ---- make sure it's empty ----
+ DB* db = OpenDB();
+ AssertEmpty(db, 0, fill_up_to);
+ delete db;
+
+ // ---- restore the DB ----
+ OpenBackupEngine();
+ if (i >= 3) { // test purge old backups
+ // when i == 4, purge to only 1 backup
+ // when i == 3, purge to 2 backups
+ ASSERT_OK(backup_engine_->PurgeOldBackups(5 - i));
+ }
+ // ---- make sure the data is there ---
+ AssertBackupConsistency(0, 0, fill_up_to, max_key);
+ CloseBackupEngine();
+ }
+ }
+}
+
+// open DB, write, backup, write, backup, close, restore
+TEST_P(BackupEngineTestWithParam, OnlineIntegrationTest) {
+ // has to be a big number, so that it triggers the memtable flush
+ const int keys_iteration = 5000;
+ const int max_key = keys_iteration * 4 + 10;
+ Random rnd(7);
+ // delete old data
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ // TODO: Implement & test db_paths support in backup (not supported in
+ // restore)
+ // options_.db_paths.emplace_back(dbname_, 500 * 1024);
+ // options_.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
+
+ OpenDBAndBackupEngine(true);
+ // write some data, backup, repeat
+ for (int i = 0; i < 5; ++i) {
+ if (i == 4) {
+ // delete backup number 2, online delete!
+ ASSERT_OK(backup_engine_->DeleteBackup(2));
+ }
+ // in last iteration, put smaller amount of data,
+ // so that backups can share sst files
+ int fill_up_to = std::min(keys_iteration * (i + 1), max_key);
+ // kAutoFlushOnly to preserve legacy test behavior (consider updating)
+ FillDB(db_.get(), keys_iteration * i, fill_up_to, kAutoFlushOnly);
+ // we should get consistent results with flush_before_backup
+ // set to both true and false
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2)));
+ }
+ // close and destroy
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ // ---- make sure it's empty ----
+ DB* db = OpenDB();
+ AssertEmpty(db, 0, max_key);
+ delete db;
+
+ // ---- restore every backup and verify all the data is there ----
+ OpenBackupEngine();
+ for (int i = 1; i <= 5; ++i) {
+ if (i == 2) {
+ // we deleted backup 2
+ Status s = backup_engine_->RestoreDBFromBackup(2, dbname_, dbname_);
+ ASSERT_TRUE(!s.ok());
+ } else {
+ int fill_up_to = std::min(keys_iteration * i, max_key);
+ AssertBackupConsistency(i, 0, fill_up_to, max_key);
+ }
+ }
+
+ // delete some backups -- this should leave only backups 3 and 5 alive
+ ASSERT_OK(backup_engine_->DeleteBackup(4));
+ ASSERT_OK(backup_engine_->PurgeOldBackups(2));
+
+ std::vector<BackupInfo> backup_info;
+ backup_engine_->GetBackupInfo(&backup_info);
+ ASSERT_EQ(2UL, backup_info.size());
+
+ // check backup 3
+ AssertBackupConsistency(3, 0, 3 * keys_iteration, max_key);
+ // check backup 5
+ AssertBackupConsistency(5, 0, max_key);
+
+ CloseBackupEngine();
+}
+#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
+
+INSTANTIATE_TEST_CASE_P(BackupEngineTestWithParam, BackupEngineTestWithParam,
+ ::testing::Bool());
+
+// this will make sure that backup does not copy the same file twice
+TEST_F(BackupEngineTest, NoDoubleCopy_And_AutoGC) {
+ OpenDBAndBackupEngine(true, true);
+
+ // should write 5 DB files + one meta file
+ test_backup_fs_->SetLimitWrittenFiles(7);
+ test_backup_fs_->ClearWrittenFiles();
+ test_db_fs_->SetLimitWrittenFiles(0);
+ dummy_db_->live_files_ = {"00010.sst", "00011.sst", "CURRENT", "MANIFEST-01",
+ "00011.log"};
+ test_db_fs_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
+ std::vector<std::string> should_have_written = {
+ "/shared/.00010.sst.tmp", "/shared/.00011.sst.tmp", "/private/1/CURRENT",
+ "/private/1/MANIFEST-01", "/private/1/00011.log", "/meta/.1.tmp"};
+ AppendPath(backupdir_, should_have_written);
+ test_backup_fs_->AssertWrittenFiles(should_have_written);
+
+ char db_number = '1';
+
+ for (std::string other_sst : {"00015.sst", "00017.sst", "00019.sst"}) {
+ // should write 4 new DB files + one meta file
+ // should not write/copy 00010.sst, since it's already there!
+ test_backup_fs_->SetLimitWrittenFiles(6);
+ test_backup_fs_->ClearWrittenFiles();
+
+ dummy_db_->live_files_ = {"00010.sst", other_sst, "CURRENT", "MANIFEST-01",
+ "00011.log"};
+ test_db_fs_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
+ // should not open 00010.sst - it's already there
+
+ ++db_number;
+ std::string private_dir = std::string("/private/") + db_number;
+ should_have_written = {
+ "/shared/." + other_sst + ".tmp", private_dir + "/CURRENT",
+ private_dir + "/MANIFEST-01", private_dir + "/00011.log",
+ std::string("/meta/.") + db_number + ".tmp"};
+ AppendPath(backupdir_, should_have_written);
+ test_backup_fs_->AssertWrittenFiles(should_have_written);
+ }
+
+ ASSERT_OK(backup_engine_->DeleteBackup(1));
+ ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00010.sst"));
+
+ // 00011.sst was only in backup 1, should be deleted
+ ASSERT_EQ(Status::NotFound(),
+ test_backup_env_->FileExists(backupdir_ + "/shared/00011.sst"));
+ ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00015.sst"));
+
+ // MANIFEST file size should be only 100
+ uint64_t size = 0;
+ ASSERT_OK(test_backup_env_->GetFileSize(backupdir_ + "/private/2/MANIFEST-01",
+ &size));
+ ASSERT_EQ(100UL, size);
+ ASSERT_OK(
+ test_backup_env_->GetFileSize(backupdir_ + "/shared/00015.sst", &size));
+ ASSERT_EQ(200UL, size);
+
+ CloseBackupEngine();
+
+ //
+ // Now simulate incomplete delete by removing just meta
+ //
+ ASSERT_OK(test_backup_env_->DeleteFile(backupdir_ + "/meta/2"));
+
+ OpenBackupEngine();
+
+ // 1 appears to be removed, so
+ // 2 non-corrupt and 0 corrupt seen
+ std::vector<BackupInfo> backup_info;
+ std::vector<BackupID> corrupt_backup_ids;
+ backup_engine_->GetBackupInfo(&backup_info);
+ backup_engine_->GetCorruptedBackups(&corrupt_backup_ids);
+ ASSERT_EQ(2UL, backup_info.size());
+ ASSERT_EQ(0UL, corrupt_backup_ids.size());
+
+ // Keep the two we see, but this should suffice to purge unreferenced
+ // shared files from incomplete delete.
+ ASSERT_OK(backup_engine_->PurgeOldBackups(2));
+
+ // Make sure dangling sst file has been removed (somewhere along this
+ // process). GarbageCollect should not be needed.
+ ASSERT_EQ(Status::NotFound(),
+ test_backup_env_->FileExists(backupdir_ + "/shared/00015.sst"));
+ ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst"));
+ ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst"));
+
+ // Now actually purge a good one
+ ASSERT_OK(backup_engine_->PurgeOldBackups(1));
+
+ ASSERT_EQ(Status::NotFound(),
+ test_backup_env_->FileExists(backupdir_ + "/shared/00017.sst"));
+ ASSERT_OK(test_backup_env_->FileExists(backupdir_ + "/shared/00019.sst"));
+
+ CloseDBAndBackupEngine();
+}
+
+// test various kind of corruptions that may happen:
+// 1. Not able to write a file for backup - that backup should fail,
+// everything else should work
+// 2. Corrupted backup meta file or missing backuped file - we should
+// not be able to open that backup, but all other backups should be
+// fine
+// 3. Corrupted checksum value - if the checksum is not a valid uint32_t,
+// db open should fail, otherwise, it aborts during the restore process.
+TEST_F(BackupEngineTest, CorruptionsTest) {
+ const int keys_iteration = 5000;
+ Random rnd(6);
+ Status s;
+
+ OpenDBAndBackupEngine(true);
+ // create five backups
+ for (int i = 0; i < 5; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2)));
+ }
+
+ // ---------- case 1. - fail a write -----------
+ // try creating backup 6, but fail a write
+ FillDB(db_.get(), keys_iteration * 5, keys_iteration * 6);
+ test_backup_fs_->SetLimitWrittenFiles(2);
+ // should fail
+ s = backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2));
+ ASSERT_NOK(s);
+ test_backup_fs_->SetLimitWrittenFiles(1000000);
+ // latest backup should have all the keys
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(0, 0, keys_iteration * 5, keys_iteration * 6);
+
+ // --------- case 2. corrupted backup meta or missing backuped file ----
+ ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/5", 3));
+ // since 5 meta is now corrupted, latest backup should be 4
+ AssertBackupConsistency(0, 0, keys_iteration * 4, keys_iteration * 5);
+ OpenBackupEngine();
+ s = backup_engine_->RestoreDBFromBackup(5, dbname_, dbname_);
+ ASSERT_NOK(s);
+ CloseBackupEngine();
+ ASSERT_OK(file_manager_->DeleteRandomFileInDir(backupdir_ + "/private/4"));
+ // 4 is corrupted, 3 is the latest backup now
+ AssertBackupConsistency(0, 0, keys_iteration * 3, keys_iteration * 5);
+ OpenBackupEngine();
+ s = backup_engine_->RestoreDBFromBackup(4, dbname_, dbname_);
+ CloseBackupEngine();
+ ASSERT_NOK(s);
+
+ // --------- case 3. corrupted checksum value ----
+ ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false));
+ // checksum of backup 3 is an invalid value, this can be detected at
+ // db open time, and it reverts to the previous backup automatically
+ AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5);
+ // checksum of the backup 2 appears to be valid, this can cause checksum
+ // mismatch and abort restore process
+ ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/2"));
+ OpenBackupEngine();
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/2"));
+ s = backup_engine_->RestoreDBFromBackup(2, dbname_, dbname_);
+ ASSERT_NOK(s);
+
+ // make sure that no corrupt backups have actually been deleted!
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/1"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/2"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/3"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/4"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/5"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/1"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/2"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/3"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/4"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/5"));
+
+ // delete the corrupt backups and then make sure they're actually deleted
+ ASSERT_OK(backup_engine_->DeleteBackup(5));
+ ASSERT_OK(backup_engine_->DeleteBackup(4));
+ ASSERT_OK(backup_engine_->DeleteBackup(3));
+ ASSERT_OK(backup_engine_->DeleteBackup(2));
+ // Should not be needed anymore with auto-GC on DeleteBackup
+ //(void)backup_engine_->GarbageCollect();
+ ASSERT_EQ(Status::NotFound(),
+ file_manager_->FileExists(backupdir_ + "/meta/5"));
+ ASSERT_EQ(Status::NotFound(),
+ file_manager_->FileExists(backupdir_ + "/private/5"));
+ ASSERT_EQ(Status::NotFound(),
+ file_manager_->FileExists(backupdir_ + "/meta/4"));
+ ASSERT_EQ(Status::NotFound(),
+ file_manager_->FileExists(backupdir_ + "/private/4"));
+ ASSERT_EQ(Status::NotFound(),
+ file_manager_->FileExists(backupdir_ + "/meta/3"));
+ ASSERT_EQ(Status::NotFound(),
+ file_manager_->FileExists(backupdir_ + "/private/3"));
+ ASSERT_EQ(Status::NotFound(),
+ file_manager_->FileExists(backupdir_ + "/meta/2"));
+ ASSERT_EQ(Status::NotFound(),
+ file_manager_->FileExists(backupdir_ + "/private/2"));
+ CloseBackupEngine();
+ AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5);
+
+ // new backup should be 2!
+ OpenDBAndBackupEngine();
+ FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2)));
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5);
+}
+
+// Corrupt a file but maintain its size
+TEST_F(BackupEngineTest, CorruptFileMaintainSize) {
+ const int keys_iteration = 5000;
+ OpenDBAndBackupEngine(true);
+ // create a backup
+ FillDB(db_.get(), 0, keys_iteration);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ OpenDBAndBackupEngine();
+ // verify with file size
+ ASSERT_OK(backup_engine_->VerifyBackup(1, false));
+ // verify with file checksum
+ ASSERT_OK(backup_engine_->VerifyBackup(1, true));
+
+ std::string file_to_corrupt;
+ uint64_t file_size = 0;
+ // under normal circumstance, there should be at least one nonempty file
+ while (file_size == 0) {
+ // get a random file in /private/1
+ assert(file_manager_
+ ->GetRandomFileInDir(backupdir_ + "/private/1", &file_to_corrupt,
+ &file_size)
+ .ok());
+ // corrupt the file by replacing its content by file_size random bytes
+ ASSERT_OK(file_manager_->CorruptFile(file_to_corrupt, file_size));
+ }
+ // file sizes match
+ ASSERT_OK(backup_engine_->VerifyBackup(1, false));
+ // file checksums mismatch
+ ASSERT_NOK(backup_engine_->VerifyBackup(1, true));
+ // sanity check, use default second argument
+ ASSERT_OK(backup_engine_->VerifyBackup(1));
+ CloseDBAndBackupEngine();
+
+ // an extra challenge
+ // set share_files_with_checksum to true and do two more backups
+ // corrupt all the table files in shared_checksum but maintain their sizes
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kShareWithChecksum);
+ // creat two backups
+ for (int i = 1; i < 3; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ }
+ CloseDBAndBackupEngine();
+
+ OpenDBAndBackupEngine();
+ std::vector<FileAttributes> children;
+ const std::string dir = backupdir_ + "/shared_checksum";
+ ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children));
+ for (const auto& child : children) {
+ if (child.size_bytes == 0) {
+ continue;
+ }
+ // corrupt the file by replacing its content by file_size random bytes
+ ASSERT_OK(
+ file_manager_->CorruptFile(dir + "/" + child.name, child.size_bytes));
+ }
+ // file sizes match
+ ASSERT_OK(backup_engine_->VerifyBackup(1, false));
+ ASSERT_OK(backup_engine_->VerifyBackup(2, false));
+ // file checksums mismatch
+ ASSERT_NOK(backup_engine_->VerifyBackup(1, true));
+ ASSERT_NOK(backup_engine_->VerifyBackup(2, true));
+ CloseDBAndBackupEngine();
+}
+
+// Corrupt a blob file but maintain its size
+TEST_P(BackupEngineTestWithParam, CorruptBlobFileMaintainSize) {
+ const int keys_iteration = 5000;
+ OpenDBAndBackupEngine(true);
+ // create a backup
+ FillDB(db_.get(), 0, keys_iteration);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ OpenDBAndBackupEngine();
+ // verify with file size
+ ASSERT_OK(backup_engine_->VerifyBackup(1, false));
+ // verify with file checksum
+ ASSERT_OK(backup_engine_->VerifyBackup(1, true));
+
+ std::string file_to_corrupt;
+ std::vector<FileAttributes> children;
+
+ std::string dir = backupdir_;
+ if (engine_options_->share_files_with_checksum) {
+ dir += "/shared_checksum";
+ } else {
+ dir += "/shared";
+ }
+
+ ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children));
+
+ for (const auto& child : children) {
+ if (EndsWith(child.name, ".blob") && child.size_bytes != 0) {
+ // corrupt the blob files by replacing its content by file_size random
+ // bytes
+ ASSERT_OK(
+ file_manager_->CorruptFile(dir + "/" + child.name, child.size_bytes));
+ }
+ }
+
+ // file sizes match
+ ASSERT_OK(backup_engine_->VerifyBackup(1, false));
+ // file checksums mismatch
+ ASSERT_NOK(backup_engine_->VerifyBackup(1, true));
+ // sanity check, use default second argument
+ ASSERT_OK(backup_engine_->VerifyBackup(1));
+ CloseDBAndBackupEngine();
+}
+
+// Test if BackupEngine will fail to create new backup if some table has been
+// corrupted and the table file checksum is stored in the DB manifest
+TEST_F(BackupEngineTest, TableFileCorruptedBeforeBackup) {
+ const int keys_iteration = 50000;
+
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kNoShare);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseAndReopenDB(/*read_only*/ true);
+ // corrupt a random table file in the DB directory
+ ASSERT_OK(CorruptRandomDataFileInDB(kTableFile));
+ // file_checksum_gen_factory is null, and thus table checksum is not
+ // verified for creating a new backup; no correction is detected
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+
+ // delete old files in db
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ // Enable table file checksum in DB manifest
+ options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kNoShare);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseAndReopenDB(/*read_only*/ true);
+ // corrupt a random table file in the DB directory
+ ASSERT_OK(CorruptRandomDataFileInDB(kTableFile));
+ // table file checksum is enabled so we should be able to detect any
+ // corruption
+ ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+}
+
+// Test if BackupEngine will fail to create new backup if some blob files has
+// been corrupted and the blob file checksum is stored in the DB manifest
+TEST_F(BackupEngineTest, BlobFileCorruptedBeforeBackup) {
+ const int keys_iteration = 50000;
+
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kNoShare);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseAndReopenDB(/*read_only*/ true);
+ // corrupt a random blob file in the DB directory
+ ASSERT_OK(CorruptRandomDataFileInDB(kBlobFile));
+ // file_checksum_gen_factory is null, and thus blob checksum is not
+ // verified for creating a new backup; no correction is detected
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+
+ // delete old files in db
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ // Enable file checksum in DB manifest
+ options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kNoShare);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseAndReopenDB(/*read_only*/ true);
+ // corrupt a random blob file in the DB directory
+ ASSERT_OK(CorruptRandomDataFileInDB(kBlobFile));
+
+ // file checksum is enabled so we should be able to detect any
+ // corruption
+ ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+}
+
+#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
+// Test if BackupEngine will fail to create new backup if some table has been
+// corrupted and the table file checksum is stored in the DB manifest for the
+// case when backup table files will be stored in a shared directory
+TEST_P(BackupEngineTestWithParam, TableFileCorruptedBeforeBackup) {
+ const int keys_iteration = 50000;
+
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseAndReopenDB(/*read_only*/ true);
+ // corrupt a random table file in the DB directory
+ ASSERT_OK(CorruptRandomDataFileInDB(kTableFile));
+ // cannot detect corruption since DB manifest has no table checksums
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+
+ // delete old files in db
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ // Enable table checksums in DB manifest
+ options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseAndReopenDB(/*read_only*/ true);
+ // corrupt a random table file in the DB directory
+ ASSERT_OK(CorruptRandomDataFileInDB(kTableFile));
+ // corruption is detected
+ ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+}
+
+// Test if BackupEngine will fail to create new backup if some blob files have
+// been corrupted and the blob file checksum is stored in the DB manifest for
+// the case when backup blob files will be stored in a shared directory
+TEST_P(BackupEngineTestWithParam, BlobFileCorruptedBeforeBackup) {
+ const int keys_iteration = 50000;
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseAndReopenDB(/*read_only*/ true);
+ // corrupt a random blob file in the DB directory
+ ASSERT_OK(CorruptRandomDataFileInDB(kBlobFile));
+ // cannot detect corruption since DB manifest has no blob file checksums
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+
+ // delete old files in db
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ // Enable blob file checksums in DB manifest
+ options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseAndReopenDB(/*read_only*/ true);
+ // corrupt a random blob file in the DB directory
+ ASSERT_OK(CorruptRandomDataFileInDB(kBlobFile));
+ // corruption is detected
+ ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+}
+#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
+
+TEST_F(BackupEngineTest, TableFileWithoutDbChecksumCorruptedDuringBackup) {
+ const int keys_iteration = 50000;
+ engine_options_->share_files_with_checksum_naming = kLegacyCrc32cAndFileSize;
+ // When share_files_with_checksum is on, we calculate checksums of table
+ // files before and after copying. So we can test whether a corruption has
+ // happened during the file is copied to backup directory.
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kShareWithChecksum);
+
+ FillDB(db_.get(), 0, keys_iteration);
+ std::atomic<bool> corrupted{false};
+ // corrupt files when copying to the backup directory
+ SyncPoint::GetInstance()->SetCallBack(
+ "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup",
+ [&](void* data) {
+ if (data != nullptr) {
+ Slice* d = reinterpret_cast<Slice*>(data);
+ if (!d->empty()) {
+ d->remove_suffix(1);
+ corrupted = true;
+ }
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ Status s = backup_engine_->CreateNewBackup(db_.get());
+ if (corrupted) {
+ ASSERT_NOK(s);
+ } else {
+ // should not in this path in normal cases
+ ASSERT_OK(s);
+ }
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ CloseDBAndBackupEngine();
+ // delete old files in db
+ DestroyDBWithoutCheck(dbname_, options_);
+}
+
+TEST_F(BackupEngineTest, TableFileWithDbChecksumCorruptedDuringBackup) {
+ const int keys_iteration = 50000;
+ options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ for (auto& sopt : kAllShareOptions) {
+ // Since the default DB table file checksum is on, we obtain checksums of
+ // table files from the DB manifest before copying and verify it with the
+ // one calculated during copying.
+ // Therefore, we can test whether a corruption has happened during the file
+ // being copied to backup directory.
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, sopt);
+
+ FillDB(db_.get(), 0, keys_iteration);
+
+ // corrupt files when copying to the backup directory
+ SyncPoint::GetInstance()->SetCallBack(
+ "BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup",
+ [&](void* data) {
+ if (data != nullptr) {
+ Slice* d = reinterpret_cast<Slice*>(data);
+ if (!d->empty()) {
+ d->remove_suffix(1);
+ }
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ // The only case that we can't detect a corruption is when the file
+ // being backed up is empty. But as keys_iteration is large, such
+ // a case shouldn't have happened and we should be able to detect
+ // the corruption.
+ ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get()));
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ CloseDBAndBackupEngine();
+ // delete old files in db
+ DestroyDBWithoutCheck(dbname_, options_);
+ }
+}
+
+TEST_F(BackupEngineTest, InterruptCreationTest) {
+ // Interrupt backup creation by failing new writes and failing cleanup of the
+ // partial state. Then verify a subsequent backup can still succeed.
+ const int keys_iteration = 5000;
+ Random rnd(6);
+
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, keys_iteration);
+ test_backup_fs_->SetLimitWrittenFiles(2);
+ test_backup_fs_->SetDeleteFileFailure(true);
+ // should fail creation
+ ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2)));
+ CloseDBAndBackupEngine();
+ // should also fail cleanup so the tmp directory stays behind
+ ASSERT_OK(backup_chroot_env_->FileExists(backupdir_ + "/private/1/"));
+
+ OpenDBAndBackupEngine(false /* destroy_old_data */);
+ test_backup_fs_->SetLimitWrittenFiles(1000000);
+ test_backup_fs_->SetDeleteFileFailure(false);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2)));
+ // latest backup should have all the keys
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(0, 0, keys_iteration);
+}
+
+TEST_F(BackupEngineTest, FlushCompactDuringBackupCheckpoint) {
+ const int keys_iteration = 5000;
+ options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ for (const auto& sopt : kAllShareOptions) {
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, sopt);
+ FillDB(db_.get(), 0, keys_iteration);
+ // That FillDB leaves a mix of flushed and unflushed data
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1",
+ "BackupEngineTest::FlushCompactDuringBackupCheckpoint:Before"},
+ {"BackupEngineTest::FlushCompactDuringBackupCheckpoint:After",
+ "CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+ ROCKSDB_NAMESPACE::port::Thread flush_thread{[this]() {
+ TEST_SYNC_POINT(
+ "BackupEngineTest::FlushCompactDuringBackupCheckpoint:Before");
+ FillDB(db_.get(), keys_iteration, 2 * keys_iteration);
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ DBImpl* dbi = static_cast<DBImpl*>(db_.get());
+ ASSERT_OK(dbi->TEST_WaitForFlushMemTable());
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbi->TEST_WaitForCompact());
+ TEST_SYNC_POINT(
+ "BackupEngineTest::FlushCompactDuringBackupCheckpoint:After");
+ }};
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ flush_thread.join();
+ CloseDBAndBackupEngine();
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ /* FIXME(peterd): reinstate with option for checksum in file names
+ if (sopt == kShareWithChecksum) {
+ // Ensure we actually got DB manifest checksums by inspecting
+ // shared_checksum file names for hex checksum component
+ TestRegex expected("[^_]+_[0-9A-F]{8}_[^_]+.sst");
+ std::vector<FileAttributes> children;
+ const std::string dir = backupdir_ + "/shared_checksum";
+ ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children));
+ for (const auto& child : children) {
+ if (child.size_bytes == 0) {
+ continue;
+ }
+ EXPECT_MATCHES_REGEX(child.name, expected);
+ }
+ }
+ */
+ AssertBackupConsistency(0, 0, keys_iteration);
+ }
+}
+
+inline std::string OptionsPath(std::string ret, int backupID) {
+ ret += "/private/";
+ ret += std::to_string(backupID);
+ ret += "/";
+ return ret;
+}
+
+// Backup the LATEST options file to
+// "<backup_dir>/private/<backup_id>/OPTIONS<number>"
+
+TEST_F(BackupEngineTest, BackupOptions) {
+ OpenDBAndBackupEngine(true);
+ for (int i = 1; i < 5; i++) {
+ std::string name;
+ std::vector<std::string> filenames;
+ // Must reset() before reset(OpenDB()) again.
+ // Calling OpenDB() while *db_ is existing will cause LOCK issue
+ db_.reset();
+ db_.reset(OpenDB());
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ ASSERT_OK(ROCKSDB_NAMESPACE::GetLatestOptionsFileName(db_->GetName(),
+ options_.env, &name));
+ ASSERT_OK(file_manager_->FileExists(OptionsPath(backupdir_, i) + name));
+ ASSERT_OK(backup_chroot_env_->GetChildren(OptionsPath(backupdir_, i),
+ &filenames));
+ for (auto fn : filenames) {
+ if (fn.compare(0, 7, "OPTIONS") == 0) {
+ ASSERT_EQ(name, fn);
+ }
+ }
+ }
+
+ CloseDBAndBackupEngine();
+}
+
+TEST_F(BackupEngineTest, SetOptionsBackupRaceCondition) {
+ OpenDBAndBackupEngine(true);
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
+ "BackupEngineTest::SetOptionsBackupRaceCondition:BeforeSetOptions"},
+ {"BackupEngineTest::SetOptionsBackupRaceCondition:AfterSetOptions",
+ "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+ ROCKSDB_NAMESPACE::port::Thread setoptions_thread{[this]() {
+ TEST_SYNC_POINT(
+ "BackupEngineTest::SetOptionsBackupRaceCondition:BeforeSetOptions");
+ DBImpl* dbi = static_cast<DBImpl*>(db_.get());
+ // Change arbitrary option to trigger OPTIONS file deletion
+ ASSERT_OK(dbi->SetOptions(dbi->DefaultColumnFamily(),
+ {{"paranoid_file_checks", "false"}}));
+ ASSERT_OK(dbi->SetOptions(dbi->DefaultColumnFamily(),
+ {{"paranoid_file_checks", "true"}}));
+ ASSERT_OK(dbi->SetOptions(dbi->DefaultColumnFamily(),
+ {{"paranoid_file_checks", "false"}}));
+ TEST_SYNC_POINT(
+ "BackupEngineTest::SetOptionsBackupRaceCondition:AfterSetOptions");
+ }};
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ setoptions_thread.join();
+ CloseDBAndBackupEngine();
+}
+
+// This test verifies we don't delete the latest backup when read-only option is
+// set
+TEST_F(BackupEngineTest, NoDeleteWithReadOnly) {
+ const int keys_iteration = 5000;
+ Random rnd(6);
+
+ OpenDBAndBackupEngine(true);
+ // create five backups
+ for (int i = 0; i < 5; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(rnd.Next() % 2)));
+ }
+ CloseDBAndBackupEngine();
+ ASSERT_OK(file_manager_->WriteToFile(latest_backup_, "4"));
+
+ engine_options_->destroy_old_data = false;
+ BackupEngineReadOnly* read_only_backup_engine;
+ ASSERT_OK(BackupEngineReadOnly::Open(
+ backup_chroot_env_.get(), *engine_options_, &read_only_backup_engine));
+
+ // assert that data from backup 5 is still here (even though LATEST_BACKUP
+ // says 4 is latest)
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/meta/5"));
+ ASSERT_OK(file_manager_->FileExists(backupdir_ + "/private/5"));
+
+ // Behavior change: We now ignore LATEST_BACKUP contents. This means that
+ // we should have 5 backups, even if LATEST_BACKUP says 4.
+ std::vector<BackupInfo> backup_info;
+ read_only_backup_engine->GetBackupInfo(&backup_info);
+ ASSERT_EQ(5UL, backup_info.size());
+ delete read_only_backup_engine;
+}
+
+TEST_F(BackupEngineTest, FailOverwritingBackups) {
+ options_.write_buffer_size = 1024 * 1024 * 1024; // 1GB
+ options_.disable_auto_compactions = true;
+
+ // create backups 1, 2, 3, 4, 5
+ OpenDBAndBackupEngine(true);
+ for (int i = 0; i < 5; ++i) {
+ CloseDBAndBackupEngine();
+ DeleteLogFiles();
+ OpenDBAndBackupEngine(false);
+ FillDB(db_.get(), 100 * i, 100 * (i + 1), kFlushAll);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ }
+ CloseDBAndBackupEngine();
+
+ // restore 3
+ OpenBackupEngine();
+ ASSERT_OK(backup_engine_->RestoreDBFromBackup(3, dbname_, dbname_));
+ CloseBackupEngine();
+
+ OpenDBAndBackupEngine(false);
+ // More data, bigger SST
+ FillDB(db_.get(), 1000, 1300, kFlushAll);
+ Status s = backup_engine_->CreateNewBackup(db_.get());
+ // the new backup fails because new table files
+ // clash with old table files from backups 4 and 5
+ // (since write_buffer_size is huge, we can be sure that
+ // each backup will generate only one sst file and that
+ // a file generated here would have the same name as an
+ // sst file generated by backup 4, and will be bigger)
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_OK(backup_engine_->DeleteBackup(4));
+ ASSERT_OK(backup_engine_->DeleteBackup(5));
+ // now, the backup can succeed
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+}
+
+TEST_F(BackupEngineTest, NoShareTableFiles) {
+ const int keys_iteration = 5000;
+ OpenDBAndBackupEngine(true, false, kNoShare);
+ for (int i = 0; i < 5; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2)));
+ }
+ CloseDBAndBackupEngine();
+
+ for (int i = 0; i < 5; ++i) {
+ AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
+ keys_iteration * 6);
+ }
+}
+
+// Verify that you can backup and restore with share_files_with_checksum on
+TEST_F(BackupEngineTest, ShareTableFilesWithChecksums) {
+ const int keys_iteration = 5000;
+ OpenDBAndBackupEngine(true, false, kShareWithChecksum);
+ for (int i = 0; i < 5; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), !!(i % 2)));
+ }
+ CloseDBAndBackupEngine();
+
+ for (int i = 0; i < 5; ++i) {
+ AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
+ keys_iteration * 6);
+ }
+}
+
+// Verify that you can backup and restore using share_files_with_checksum set to
+// false and then transition this option to true
+TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsTransition) {
+ const int keys_iteration = 5000;
+ // set share_files_with_checksum to false
+ OpenDBAndBackupEngine(true, false, kShareNoChecksum);
+ for (int i = 0; i < 5; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ }
+ CloseDBAndBackupEngine();
+
+ for (int i = 0; i < 5; ++i) {
+ AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
+ keys_iteration * 6);
+ }
+
+ // set share_files_with_checksum to true and do some more backups
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false,
+ kShareWithChecksum);
+ for (int i = 5; i < 10; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ }
+ CloseDBAndBackupEngine();
+
+ // Verify first (about to delete)
+ AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 11);
+
+ // For an extra challenge, make sure that GarbageCollect / DeleteBackup
+ // is OK even if we open without share_table_files
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare);
+ ASSERT_OK(backup_engine_->DeleteBackup(1));
+ ASSERT_OK(backup_engine_->GarbageCollect());
+ CloseDBAndBackupEngine();
+
+ // Verify rest (not deleted)
+ for (int i = 1; i < 10; ++i) {
+ AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
+ keys_iteration * 11);
+ }
+}
+
+// Verify backup and restore with various naming options, check names
+TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsNewNaming) {
+ ASSERT_TRUE(engine_options_->share_files_with_checksum_naming ==
+ kNamingDefault);
+
+ const int keys_iteration = 5000;
+
+ OpenDBAndBackupEngine(true, false, kShareWithChecksum);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseDBAndBackupEngine();
+
+ static const std::map<ShareFilesNaming, TestRegex> option_to_expected = {
+ {kLegacyCrc32cAndFileSize, "[0-9]+_[0-9]+_[0-9]+[.]sst"},
+ // kFlagIncludeFileSize redundant here
+ {kLegacyCrc32cAndFileSize | kFlagIncludeFileSize,
+ "[0-9]+_[0-9]+_[0-9]+[.]sst"},
+ {kUseDbSessionId, "[0-9]+_s[0-9A-Z]{20}[.]sst"},
+ {kUseDbSessionId | kFlagIncludeFileSize,
+ "[0-9]+_s[0-9A-Z]{20}_[0-9]+[.]sst"},
+ };
+
+ const TestRegex blobfile_pattern = "[0-9]+_[0-9]+_[0-9]+[.]blob";
+
+ for (const auto& pair : option_to_expected) {
+ CloseAndReopenDB();
+ engine_options_->share_files_with_checksum_naming = pair.first;
+ OpenBackupEngine(true /*destroy_old_data*/);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 2);
+ AssertDirectoryFilesMatchRegex(backupdir_ + "/shared_checksum", pair.second,
+ ".sst", 1 /* minimum_count */);
+ if (std::string::npos != pair.second.GetPattern().find("_[0-9]+[.]sst")) {
+ AssertDirectoryFilesSizeIndicators(backupdir_ + "/shared_checksum",
+ 1 /* minimum_count */);
+ }
+
+ AssertDirectoryFilesMatchRegex(backupdir_ + "/shared_checksum",
+ blobfile_pattern, ".blob",
+ 1 /* minimum_count */);
+ }
+}
+
+// Mimic SST file generated by pre-6.12 releases and verify that
+// old names are always used regardless of naming option.
+TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsOldFileNaming) {
+ const int keys_iteration = 5000;
+
+ // Pre-6.12 release did not include db id and db session id properties.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "PropertyBlockBuilder::AddTableProperty:Start", [&](void* props_vs) {
+ auto props = static_cast<TableProperties*>(props_vs);
+ props->db_id = "";
+ props->db_session_id = "";
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Corrupting the table properties corrupts the unique id.
+ // Ignore the unique id recorded in the manifest.
+ options_.verify_sst_unique_id_in_manifest = false;
+
+ OpenDBAndBackupEngine(true, false, kShareWithChecksum);
+ FillDB(db_.get(), 0, keys_iteration);
+ CloseDBAndBackupEngine();
+
+ // Old names should always be used on old files
+ const TestRegex sstfile_pattern("[0-9]+_[0-9]+_[0-9]+[.]sst");
+
+ const TestRegex blobfile_pattern = "[0-9]+_[0-9]+_[0-9]+[.]blob";
+
+ for (ShareFilesNaming option : {kNamingDefault, kUseDbSessionId}) {
+ CloseAndReopenDB();
+ engine_options_->share_files_with_checksum_naming = option;
+ OpenBackupEngine(true /*destroy_old_data*/);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * 2);
+ AssertDirectoryFilesMatchRegex(backupdir_ + "/shared_checksum",
+ sstfile_pattern, ".sst",
+ 1 /* minimum_count */);
+ AssertDirectoryFilesMatchRegex(backupdir_ + "/shared_checksum",
+ blobfile_pattern, ".blob",
+ 1 /* minimum_count */);
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Test how naming options interact with detecting DB corruption
+// between incremental backups
+TEST_F(BackupEngineTest, TableFileCorruptionBeforeIncremental) {
+ const auto share_no_checksum = static_cast<ShareFilesNaming>(0);
+
+ for (bool corrupt_before_first_backup : {false, true}) {
+ for (ShareFilesNaming option :
+ {share_no_checksum, kLegacyCrc32cAndFileSize, kNamingDefault}) {
+ auto share =
+ option == share_no_checksum ? kShareNoChecksum : kShareWithChecksum;
+ if (option != share_no_checksum) {
+ engine_options_->share_files_with_checksum_naming = option;
+ }
+ OpenDBAndBackupEngine(true, false, share);
+ DBImpl* dbi = static_cast<DBImpl*>(db_.get());
+ // A small SST file
+ ASSERT_OK(dbi->Put(WriteOptions(), "x", "y"));
+ ASSERT_OK(dbi->Flush(FlushOptions()));
+ // And a bigger one
+ ASSERT_OK(dbi->Put(WriteOptions(), "y", Random(42).RandomString(500)));
+ ASSERT_OK(dbi->Flush(FlushOptions()));
+ ASSERT_OK(dbi->TEST_WaitForFlushMemTable());
+ CloseAndReopenDB(/*read_only*/ true);
+
+ std::vector<FileAttributes> table_files;
+ ASSERT_OK(GetDataFilesInDB(kTableFile, &table_files));
+ ASSERT_EQ(table_files.size(), 2);
+ std::string tf0 = dbname_ + "/" + table_files[0].name;
+ std::string tf1 = dbname_ + "/" + table_files[1].name;
+
+ CloseDBAndBackupEngine();
+
+ if (corrupt_before_first_backup) {
+ // This corrupts a data block, which does not cause DB open
+ // failure, only failure on accessing the block.
+ ASSERT_OK(db_file_manager_->CorruptFileStart(tf0));
+ }
+
+ OpenDBAndBackupEngine(false, false, share);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ CloseDBAndBackupEngine();
+
+ // if corrupt_before_first_backup, this undoes the initial corruption
+ ASSERT_OK(db_file_manager_->CorruptFileStart(tf0));
+
+ OpenDBAndBackupEngine(false, false, share);
+ Status s = backup_engine_->CreateNewBackup(db_.get());
+
+ // Even though none of the naming options catch the inconsistency
+ // between the first and second time backing up fname, in the case
+ // of kUseDbSessionId (kNamingDefault), this is an intentional
+ // trade-off to avoid full scan of files from the DB that are
+ // already backed up. If we did the scan, kUseDbSessionId could catch
+ // the corruption. kLegacyCrc32cAndFileSize does the scan (to
+ // compute checksum for name) without catching the corruption,
+ // because the corruption means the names don't merge.
+ EXPECT_OK(s);
+
+ // VerifyBackup doesn't check DB integrity or table file internal
+ // checksums
+ EXPECT_OK(backup_engine_->VerifyBackup(1, true));
+ EXPECT_OK(backup_engine_->VerifyBackup(2, true));
+
+ db_.reset();
+ ASSERT_OK(backup_engine_->RestoreDBFromBackup(2, dbname_, dbname_));
+ {
+ DB* db = OpenDB();
+ s = db->VerifyChecksum();
+ delete db;
+ }
+ if (option != kLegacyCrc32cAndFileSize && !corrupt_before_first_backup) {
+ // Second backup is OK because it used (uncorrupt) file from first
+ // backup instead of (corrupt) file from DB.
+ // This is arguably a good trade-off vs. treating the file as distinct
+ // from the old version, because a file should be more likely to be
+ // corrupt as it ages. Although the backed-up file might also corrupt
+ // with age, the alternative approach (checksum in file name computed
+ // from current DB file contents) wouldn't detect that case at backup
+ // time either. Although you would have both copies of the file with
+ // the alternative approach, that would only last until the older
+ // backup is deleted.
+ ASSERT_OK(s);
+ } else if (option == kLegacyCrc32cAndFileSize &&
+ corrupt_before_first_backup) {
+ // Second backup is OK because it saved the updated (uncorrupt)
+ // file from DB, instead of the sharing with first backup.
+ // Recall: if corrupt_before_first_backup, [second CorruptFileStart]
+ // undoes the initial corruption.
+ // This is arguably a bad trade-off vs. sharing the old version of the
+ // file because a file should be more likely to corrupt as it ages.
+ // (Not likely that the previously backed-up version was already
+ // corrupt and the new version is non-corrupt. This approach doesn't
+ // help if backed-up version is corrupted after taking the backup.)
+ ASSERT_OK(s);
+ } else {
+ // Something is legitimately corrupted, but we can't be sure what
+ // with information available (TODO? unless one passes block checksum
+ // test and other doesn't. Probably better to use end-to-end full file
+ // checksum anyway.)
+ ASSERT_TRUE(s.IsCorruption());
+ }
+
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+ }
+ }
+}
+
+// Test how naming options interact with detecting file size corruption
+// between incremental backups
+TEST_F(BackupEngineTest, FileSizeForIncremental) {
+ const auto share_no_checksum = static_cast<ShareFilesNaming>(0);
+ // TODO: enable blob files once Integrated BlobDB supports DB session id.
+ options_.enable_blob_files = false;
+
+ for (ShareFilesNaming option : {share_no_checksum, kLegacyCrc32cAndFileSize,
+ kNamingDefault, kUseDbSessionId}) {
+ auto share =
+ option == share_no_checksum ? kShareNoChecksum : kShareWithChecksum;
+ if (option != share_no_checksum) {
+ engine_options_->share_files_with_checksum_naming = option;
+ }
+ OpenDBAndBackupEngine(true, false, share);
+
+ std::vector<FileAttributes> children;
+ const std::string shared_dir =
+ backupdir_ +
+ (option == share_no_checksum ? "/shared" : "/shared_checksum");
+
+ // A single small SST file
+ ASSERT_OK(db_->Put(WriteOptions(), "x", "y"));
+
+ // First, test that we always detect file size corruption on the shared
+ // backup side on incremental. (Since sizes aren't really part of backup
+ // meta file, this works by querying the filesystem for the sizes.)
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true /*flush*/));
+ CloseDBAndBackupEngine();
+
+ // Corrupt backup SST file
+ ASSERT_OK(file_manager_->GetChildrenFileAttributes(shared_dir, &children));
+ ASSERT_EQ(children.size(), 1U); // one sst
+ for (const auto& child : children) {
+ if (child.name.size() > 4 && child.size_bytes > 0) {
+ ASSERT_OK(
+ file_manager_->WriteToFile(shared_dir + "/" + child.name, "asdf"));
+ break;
+ }
+ }
+
+ OpenDBAndBackupEngine(false, false, share);
+ Status s = backup_engine_->CreateNewBackup(db_.get());
+ EXPECT_TRUE(s.IsCorruption());
+
+ ASSERT_OK(backup_engine_->PurgeOldBackups(0));
+ CloseDBAndBackupEngine();
+
+ // Second, test that a hypothetical db session id collision would likely
+ // not suffice to corrupt a backup, because there's a good chance of
+ // file size difference (in this test, guaranteed) so either no name
+ // collision or detected collision.
+
+ // Create backup 1
+ OpenDBAndBackupEngine(false, false, share);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ // Even though we have "the same" DB state as backup 1, we need
+ // to restore to recreate the same conditions as later restore.
+ db_.reset();
+ DestroyDBWithoutCheck(dbname_, options_);
+ ASSERT_OK(backup_engine_->RestoreDBFromBackup(1, dbname_, dbname_));
+ CloseDBAndBackupEngine();
+
+ // Forge session id
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::SetDbSessionId", [](void* sid_void_star) {
+ std::string* sid = static_cast<std::string*>(sid_void_star);
+ *sid = "01234567890123456789";
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Create another SST file
+ OpenDBAndBackupEngine(false, false, share);
+ ASSERT_OK(db_->Put(WriteOptions(), "y", "x"));
+
+ // Create backup 2
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true /*flush*/));
+
+ // Restore backup 1 (again)
+ db_.reset();
+ DestroyDBWithoutCheck(dbname_, options_);
+ ASSERT_OK(backup_engine_->RestoreDBFromBackup(1, dbname_, dbname_));
+ CloseDBAndBackupEngine();
+
+ // Create another SST file with same number and db session id, only bigger
+ OpenDBAndBackupEngine(false, false, share);
+ ASSERT_OK(db_->Put(WriteOptions(), "y", Random(42).RandomString(500)));
+
+ // Count backup SSTs files.
+ children.clear();
+ ASSERT_OK(file_manager_->GetChildrenFileAttributes(shared_dir, &children));
+ ASSERT_EQ(children.size(), 2U); // two sst files
+
+ // Try create backup 3
+ s = backup_engine_->CreateNewBackup(db_.get(), true /*flush*/);
+
+ // Re-count backup SSTs
+ children.clear();
+ ASSERT_OK(file_manager_->GetChildrenFileAttributes(shared_dir, &children));
+
+ if (option == kUseDbSessionId) {
+ // Acceptable to call it corruption if size is not in name and
+ // db session id collision is practically impossible.
+ EXPECT_TRUE(s.IsCorruption());
+ EXPECT_EQ(children.size(), 2U); // no SST file added
+ } else if (option == share_no_checksum) {
+ // Good to call it corruption if both backups cannot be
+ // accommodated.
+ EXPECT_TRUE(s.IsCorruption());
+ EXPECT_EQ(children.size(), 2U); // no SST file added
+ } else {
+ // Since opening a DB seems sufficient for detecting size corruption
+ // on the DB side, this should be a good thing, ...
+ EXPECT_OK(s);
+ // ... as long as we did actually treat it as a distinct SST file.
+ EXPECT_EQ(children.size(), 3U); // Another SST added
+ }
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ }
+}
+
+// Verify backup and restore with share_files_with_checksum off and then
+// transition this option to on and share_files_with_checksum_naming to be
+// based on kUseDbSessionId
+TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsNewNamingTransition) {
+ const int keys_iteration = 5000;
+ // We may set share_files_with_checksum_naming to kLegacyCrc32cAndFileSize
+ // here but even if we don't, it should have no effect when
+ // share_files_with_checksum is false
+ ASSERT_TRUE(engine_options_->share_files_with_checksum_naming ==
+ kNamingDefault);
+ // set share_files_with_checksum to false
+ OpenDBAndBackupEngine(true, false, kShareNoChecksum);
+ int j = 3;
+ for (int i = 0; i < j; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ }
+ CloseDBAndBackupEngine();
+
+ for (int i = 0; i < j; ++i) {
+ AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
+ keys_iteration * (j + 1));
+ }
+
+ // set share_files_with_checksum to true and do some more backups
+ // and use session id in the name of SST file backup
+ ASSERT_TRUE(engine_options_->share_files_with_checksum_naming ==
+ kNamingDefault);
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false,
+ kShareWithChecksum);
+ FillDB(db_.get(), keys_iteration * j, keys_iteration * (j + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+ // Use checksum in the name as well
+ ++j;
+ options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false,
+ kShareWithChecksum);
+ FillDB(db_.get(), keys_iteration * j, keys_iteration * (j + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ // Verify first (about to delete)
+ AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * (j + 1));
+
+ // For an extra challenge, make sure that GarbageCollect / DeleteBackup
+ // is OK even if we open without share_table_files but with
+ // share_files_with_checksum_naming based on kUseDbSessionId
+ ASSERT_TRUE(engine_options_->share_files_with_checksum_naming ==
+ kNamingDefault);
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare);
+ ASSERT_OK(backup_engine_->DeleteBackup(1));
+ ASSERT_OK(backup_engine_->GarbageCollect());
+ CloseDBAndBackupEngine();
+
+ // Verify second (about to delete)
+ AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * (j + 1));
+
+ // Use checksum and file size for backup table file names and open without
+ // share_table_files
+ // Again, make sure that GarbageCollect / DeleteBackup is OK
+ engine_options_->share_files_with_checksum_naming = kLegacyCrc32cAndFileSize;
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare);
+ ASSERT_OK(backup_engine_->DeleteBackup(2));
+ ASSERT_OK(backup_engine_->GarbageCollect());
+ CloseDBAndBackupEngine();
+
+ // Verify rest (not deleted)
+ for (int i = 2; i < j; ++i) {
+ AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
+ keys_iteration * (j + 1));
+ }
+}
+
+// Verify backup and restore with share_files_with_checksum on and transition
+// from kLegacyCrc32cAndFileSize to kUseDbSessionId
+TEST_F(BackupEngineTest, ShareTableFilesWithChecksumsNewNamingUpgrade) {
+ engine_options_->share_files_with_checksum_naming = kLegacyCrc32cAndFileSize;
+ const int keys_iteration = 5000;
+ // set share_files_with_checksum to true
+ OpenDBAndBackupEngine(true, false, kShareWithChecksum);
+ int j = 3;
+ for (int i = 0; i < j; ++i) {
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ }
+ CloseDBAndBackupEngine();
+
+ for (int i = 0; i < j; ++i) {
+ AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
+ keys_iteration * (j + 1));
+ }
+
+ engine_options_->share_files_with_checksum_naming = kUseDbSessionId;
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false,
+ kShareWithChecksum);
+ FillDB(db_.get(), keys_iteration * j, keys_iteration * (j + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ ++j;
+ options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false,
+ kShareWithChecksum);
+ FillDB(db_.get(), keys_iteration * j, keys_iteration * (j + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ // Verify first (about to delete)
+ AssertBackupConsistency(1, 0, keys_iteration, keys_iteration * (j + 1));
+
+ // For an extra challenge, make sure that GarbageCollect / DeleteBackup
+ // is OK even if we open without share_table_files
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare);
+ ASSERT_OK(backup_engine_->DeleteBackup(1));
+ ASSERT_OK(backup_engine_->GarbageCollect());
+ CloseDBAndBackupEngine();
+
+ // Verify second (about to delete)
+ AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * (j + 1));
+
+ // Use checksum and file size for backup table file names and open without
+ // share_table_files
+ // Again, make sure that GarbageCollect / DeleteBackup is OK
+ engine_options_->share_files_with_checksum_naming = kLegacyCrc32cAndFileSize;
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false, kNoShare);
+ ASSERT_OK(backup_engine_->DeleteBackup(2));
+ ASSERT_OK(backup_engine_->GarbageCollect());
+ CloseDBAndBackupEngine();
+
+ // Verify rest (not deleted)
+ for (int i = 2; i < j; ++i) {
+ AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
+ keys_iteration * (j + 1));
+ }
+}
+
+// This test simulates cleaning up after aborted or incomplete creation
+// of a new backup.
+TEST_F(BackupEngineTest, DeleteTmpFiles) {
+ for (int cleanup_fn : {1, 2, 3, 4}) {
+ for (ShareOption shared_option : kAllShareOptions) {
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */,
+ shared_option);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ BackupID next_id = 1;
+ BackupID oldest_id = std::numeric_limits<BackupID>::max();
+ {
+ std::vector<BackupInfo> backup_info;
+ backup_engine_->GetBackupInfo(&backup_info);
+ for (const auto& bi : backup_info) {
+ next_id = std::max(next_id, bi.backup_id + 1);
+ oldest_id = std::min(oldest_id, bi.backup_id);
+ }
+ }
+ CloseDBAndBackupEngine();
+
+ // An aborted or incomplete new backup will always be in the next
+ // id (maybe more)
+ std::string next_private = "private/" + std::to_string(next_id);
+
+ // NOTE: both shared and shared_checksum should be cleaned up
+ // regardless of how the backup engine is opened.
+ std::vector<std::string> tmp_files_and_dirs;
+ for (const auto& dir_and_file : {
+ std::make_pair(std::string("shared"),
+ std::string(".00006.sst.tmp")),
+ std::make_pair(std::string("shared_checksum"),
+ std::string(".00007.sst.tmp")),
+ std::make_pair(next_private, std::string("00003.sst")),
+ }) {
+ std::string dir = backupdir_ + "/" + dir_and_file.first;
+ ASSERT_OK(file_manager_->CreateDirIfMissing(dir));
+ ASSERT_OK(file_manager_->FileExists(dir));
+
+ std::string file = dir + "/" + dir_and_file.second;
+ ASSERT_OK(file_manager_->WriteToFile(file, "tmp"));
+ ASSERT_OK(file_manager_->FileExists(file));
+
+ tmp_files_and_dirs.push_back(file);
+ }
+ if (cleanup_fn != /*CreateNewBackup*/ 4) {
+ // This exists after CreateNewBackup because it's deleted then
+ // re-created.
+ tmp_files_and_dirs.push_back(backupdir_ + "/" + next_private);
+ }
+
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false /* dummy */,
+ shared_option);
+ // Need to call one of these explicitly to delete tmp files
+ switch (cleanup_fn) {
+ case 1:
+ ASSERT_OK(backup_engine_->GarbageCollect());
+ break;
+ case 2:
+ ASSERT_OK(backup_engine_->DeleteBackup(oldest_id));
+ break;
+ case 3:
+ ASSERT_OK(backup_engine_->PurgeOldBackups(1));
+ break;
+ case 4:
+ // Does a garbage collect if it sees that next private dir exists
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+ break;
+ default:
+ assert(false);
+ }
+ CloseDBAndBackupEngine();
+ for (std::string file_or_dir : tmp_files_and_dirs) {
+ if (file_manager_->FileExists(file_or_dir) != Status::NotFound()) {
+ FAIL() << file_or_dir << " was expected to be deleted." << cleanup_fn;
+ }
+ }
+ }
+ }
+}
+
+TEST_F(BackupEngineTest, KeepLogFiles) {
+ engine_options_->backup_log_files = false;
+ // basically infinite
+ options_.WAL_ttl_seconds = 24 * 60 * 60;
+ OpenDBAndBackupEngine(true);
+ FillDB(db_.get(), 0, 100, kFlushAll);
+ FillDB(db_.get(), 100, 200, kFlushAll);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
+ FillDB(db_.get(), 200, 300, kFlushAll);
+ FillDB(db_.get(), 300, 400, kFlushAll);
+ FillDB(db_.get(), 400, 500, kFlushAll);
+ CloseDBAndBackupEngine();
+
+ // all data should be there if we call with keep_log_files = true
+ AssertBackupConsistency(0, 0, 500, 600, true);
+}
+
+#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
+class BackupEngineRateLimitingTestWithParam
+ : public BackupEngineTest,
+ public testing::WithParamInterface<
+ std::tuple<bool /* make throttle */,
+ int /* 0 = single threaded, 1 = multi threaded*/,
+ std::pair<uint64_t, uint64_t> /* limits */>> {
+ public:
+ BackupEngineRateLimitingTestWithParam() {}
+};
+
+uint64_t const MB = 1024 * 1024;
+
+INSTANTIATE_TEST_CASE_P(
+ RateLimiting, BackupEngineRateLimitingTestWithParam,
+ ::testing::Values(std::make_tuple(false, 0, std::make_pair(1 * MB, 5 * MB)),
+ std::make_tuple(false, 0, std::make_pair(2 * MB, 3 * MB)),
+ std::make_tuple(false, 1, std::make_pair(1 * MB, 5 * MB)),
+ std::make_tuple(false, 1, std::make_pair(2 * MB, 3 * MB)),
+ std::make_tuple(true, 0, std::make_pair(1 * MB, 5 * MB)),
+ std::make_tuple(true, 0, std::make_pair(2 * MB, 3 * MB)),
+ std::make_tuple(true, 1, std::make_pair(1 * MB, 5 * MB)),
+ std::make_tuple(true, 1,
+ std::make_pair(2 * MB, 3 * MB))));
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimiting) {
+ size_t const kMicrosPerSec = 1000 * 1000LL;
+ const bool custom_rate_limiter = std::get<0>(GetParam());
+ // iter 0 -- single threaded
+ // iter 1 -- multi threaded
+ const int iter = std::get<1>(GetParam());
+ const std::pair<uint64_t, uint64_t> limit = std::get<2>(GetParam());
+ std::unique_ptr<Env> special_env(
+ new SpecialEnv(db_chroot_env_.get(), /*time_elapse_only_sleep*/ true));
+ // destroy old data
+ Options options;
+ options.env = special_env.get();
+ DestroyDBWithoutCheck(dbname_, options);
+
+ if (custom_rate_limiter) {
+ std::shared_ptr<RateLimiter> backup_rate_limiter =
+ std::make_shared<GenericRateLimiter>(
+ limit.first, 100 * 1000 /* refill_period_us */, 10 /* fairness */,
+ RateLimiter::Mode::kWritesOnly /* mode */,
+ special_env->GetSystemClock(), false /* auto_tuned */);
+ std::shared_ptr<RateLimiter> restore_rate_limiter =
+ std::make_shared<GenericRateLimiter>(
+ limit.second, 100 * 1000 /* refill_period_us */, 10 /* fairness */,
+ RateLimiter::Mode::kWritesOnly /* mode */,
+ special_env->GetSystemClock(), false /* auto_tuned */);
+ engine_options_->backup_rate_limiter = backup_rate_limiter;
+ engine_options_->restore_rate_limiter = restore_rate_limiter;
+ } else {
+ engine_options_->backup_rate_limit = limit.first;
+ engine_options_->restore_rate_limit = limit.second;
+ }
+
+ engine_options_->max_background_operations = (iter == 0) ? 1 : 10;
+ options_.compression = kNoCompression;
+
+ // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the
+ // `Env` to advance its time according to the fake wait duration. The
+ // workaround is to install a callback that advance the `Env`'s mock time.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) {
+ int64_t time_waited_us = *static_cast<int64_t*>(arg);
+ special_env->SleepForMicroseconds(static_cast<int>(time_waited_us));
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ OpenDBAndBackupEngine(true);
+ TEST_SetDefaultRateLimitersClock(backup_engine_.get(),
+ special_env->GetSystemClock());
+
+ size_t bytes_written = FillDB(db_.get(), 0, 10000);
+
+ auto start_backup = special_env->NowMicros();
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
+ auto backup_time = special_env->NowMicros() - start_backup;
+ CloseDBAndBackupEngine();
+ auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / limit.first;
+ ASSERT_GT(backup_time, 0.8 * rate_limited_backup_time);
+
+ OpenBackupEngine();
+ TEST_SetDefaultRateLimitersClock(
+ backup_engine_.get(),
+ special_env->GetSystemClock() /* backup_rate_limiter_clock */,
+ special_env->GetSystemClock() /* restore_rate_limiter_clock */);
+
+ auto start_restore = special_env->NowMicros();
+ ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
+ auto restore_time = special_env->NowMicros() - start_restore;
+ CloseBackupEngine();
+ auto rate_limited_restore_time =
+ (bytes_written * kMicrosPerSec) / limit.second;
+ ASSERT_GT(restore_time, 0.8 * rate_limited_restore_time);
+
+ AssertBackupConsistency(0, 0, 10000, 10100);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "GenericRateLimiter::Request:PostTimedWait");
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingVerifyBackup) {
+ const std::size_t kMicrosPerSec = 1000 * 1000LL;
+ const bool custom_rate_limiter = std::get<0>(GetParam());
+ const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
+ const bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+ std::unique_ptr<Env> special_env(
+ new SpecialEnv(db_chroot_env_.get(), /*time_elapse_only_sleep*/ true));
+
+ if (custom_rate_limiter) {
+ std::shared_ptr<RateLimiter> backup_rate_limiter =
+ std::make_shared<GenericRateLimiter>(
+ backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */,
+ special_env->GetSystemClock(), false /* auto_tuned */);
+ engine_options_->backup_rate_limiter = backup_rate_limiter;
+ } else {
+ engine_options_->backup_rate_limit = backup_rate_limiter_limit;
+ }
+
+ engine_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+ Options options;
+ options.env = special_env.get();
+ DestroyDBWithoutCheck(dbname_, options);
+ // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the
+ // `Env` to advance its time according to the fake wait duration. The
+ // workaround is to install a callback that advance the `Env`'s mock time.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) {
+ int64_t time_waited_us = *static_cast<int64_t*>(arg);
+ special_env->SleepForMicroseconds(static_cast<int>(time_waited_us));
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ TEST_SetDefaultRateLimitersClock(backup_engine_.get(),
+ special_env->GetSystemClock(), nullptr);
+ FillDB(db_.get(), 0, 10000);
+
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+
+ std::vector<BackupInfo> backup_infos;
+ BackupInfo backup_info;
+ backup_engine_->GetBackupInfo(&backup_infos);
+ ASSERT_EQ(1, backup_infos.size());
+ const int backup_id = 1;
+ ASSERT_EQ(backup_id, backup_infos[0].backup_id);
+ ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info,
+ true /* include_file_details */));
+
+ std::uint64_t bytes_read_during_verify_backup = 0;
+ for (BackupFileInfo backup_file_info : backup_info.file_details) {
+ bytes_read_during_verify_backup += backup_file_info.size;
+ }
+ auto start_verify_backup = special_env->NowMicros();
+ ASSERT_OK(
+ backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */));
+ auto verify_backup_time = special_env->NowMicros() - start_verify_backup;
+ auto rate_limited_verify_backup_time =
+ (bytes_read_during_verify_backup * kMicrosPerSec) /
+ backup_rate_limiter_limit;
+ if (custom_rate_limiter) {
+ EXPECT_GE(verify_backup_time, 0.8 * rate_limited_verify_backup_time);
+ }
+
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(backup_id, 0, 10000, 10010);
+ DestroyDBWithoutCheck(dbname_, options);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "GenericRateLimiter::Request:PostTimedWait");
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInBackup) {
+ bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+ engine_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+ const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
+ std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter(
+ backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */));
+ engine_options_->backup_rate_limiter = backup_rate_limiter;
+
+ DestroyDBWithoutCheck(dbname_, Options());
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kShareWithChecksum /* shared_option */);
+ FillDB(db_.get(), 0, 10);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ std::int64_t total_bytes_through_with_no_read_charged =
+ backup_rate_limiter->GetTotalBytesThrough();
+ CloseBackupEngine();
+
+ backup_rate_limiter.reset(NewGenericRateLimiter(
+ backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
+ engine_options_->backup_rate_limiter = backup_rate_limiter;
+
+ OpenBackupEngine(true);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ std::int64_t total_bytes_through_with_read_charged =
+ backup_rate_limiter->GetTotalBytesThrough();
+ EXPECT_GT(total_bytes_through_with_read_charged,
+ total_bytes_through_with_no_read_charged);
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(1, 0, 10, 20);
+ DestroyDBWithoutCheck(dbname_, Options());
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInRestore) {
+ bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+ engine_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+ const std::uint64_t restore_rate_limiter_limit =
+ std::get<2>(GetParam()).second;
+ std::shared_ptr<RateLimiter> restore_rate_limiter(NewGenericRateLimiter(
+ restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */));
+ engine_options_->restore_rate_limiter = restore_rate_limiter;
+
+ DestroyDBWithoutCheck(dbname_, Options());
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, 10);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, Options());
+
+ OpenBackupEngine(false /* destroy_old_data */);
+ ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
+ std::int64_t total_bytes_through_with_no_read_charged =
+ restore_rate_limiter->GetTotalBytesThrough();
+ CloseBackupEngine();
+ DestroyDBWithoutCheck(dbname_, Options());
+
+ restore_rate_limiter.reset(NewGenericRateLimiter(
+ restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
+ engine_options_->restore_rate_limiter = restore_rate_limiter;
+
+ OpenBackupEngine(false /* destroy_old_data */);
+ ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
+ std::int64_t total_bytes_through_with_read_charged =
+ restore_rate_limiter->GetTotalBytesThrough();
+ EXPECT_EQ(total_bytes_through_with_read_charged,
+ total_bytes_through_with_no_read_charged * 2);
+ CloseBackupEngine();
+ AssertBackupConsistency(1, 0, 10, 20);
+ DestroyDBWithoutCheck(dbname_, Options());
+}
+
+TEST_P(BackupEngineRateLimitingTestWithParam,
+ RateLimitingChargeReadInInitialize) {
+ bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
+ engine_options_->max_background_operations = is_single_threaded ? 1 : 10;
+
+ const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
+ std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter(
+ backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
+ engine_options_->backup_rate_limiter = backup_rate_limiter;
+
+ DestroyDBWithoutCheck(dbname_, Options());
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ FillDB(db_.get(), 0, 10);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(1, 0, 10, 20);
+
+ std::int64_t total_bytes_through_before_initialize =
+ engine_options_->backup_rate_limiter->GetTotalBytesThrough();
+ OpenDBAndBackupEngine(false /* destroy_old_data */);
+ // We charge read in BackupEngineImpl::BackupMeta::LoadFromFile,
+ // which is called in BackupEngineImpl::Initialize() during
+ // OpenBackupEngine(false)
+ EXPECT_GT(engine_options_->backup_rate_limiter->GetTotalBytesThrough(),
+ total_bytes_through_before_initialize);
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, Options());
+}
+
+class BackupEngineRateLimitingTestWithParam2
+ : public BackupEngineTest,
+ public testing::WithParamInterface<
+ std::tuple<std::pair<uint64_t, uint64_t> /* limits */>> {
+ public:
+ BackupEngineRateLimitingTestWithParam2() {}
+};
+
+INSTANTIATE_TEST_CASE_P(
+ LowRefillBytesPerPeriod, BackupEngineRateLimitingTestWithParam2,
+ ::testing::Values(std::make_tuple(std::make_pair(1, 1))));
+// To verify we don't request over-sized bytes relative to
+// refill_bytes_per_period_ in each RateLimiter::Request() called in
+// BackupEngine through verifying we don't trigger assertion
+// failure on over-sized request in GenericRateLimiter in debug builds
+TEST_P(BackupEngineRateLimitingTestWithParam2,
+ RateLimitingWithLowRefillBytesPerPeriod) {
+ SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true);
+
+ engine_options_->max_background_operations = 1;
+ const uint64_t backup_rate_limiter_limit = std::get<0>(GetParam()).first;
+ std::shared_ptr<RateLimiter> backup_rate_limiter(
+ std::make_shared<GenericRateLimiter>(
+ backup_rate_limiter_limit, 1000 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */,
+ special_env.GetSystemClock(), false /* auto_tuned */));
+
+ engine_options_->backup_rate_limiter = backup_rate_limiter;
+
+ const uint64_t restore_rate_limiter_limit = std::get<0>(GetParam()).second;
+ std::shared_ptr<RateLimiter> restore_rate_limiter(
+ std::make_shared<GenericRateLimiter>(
+ restore_rate_limiter_limit, 1000 * 1000 /* refill_period_us */,
+ 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */,
+ special_env.GetSystemClock(), false /* auto_tuned */));
+
+ engine_options_->restore_rate_limiter = restore_rate_limiter;
+
+ // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the
+ // `Env` to advance its time according to the fake wait duration. The
+ // workaround is to install a callback that advance the `Env`'s mock time.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) {
+ int64_t time_waited_us = *static_cast<int64_t*>(arg);
+ special_env.SleepForMicroseconds(static_cast<int>(time_waited_us));
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ DestroyDBWithoutCheck(dbname_, Options());
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kShareWithChecksum /* shared_option */);
+
+ FillDB(db_.get(), 0, 100);
+ int64_t total_bytes_through_before_backup =
+ engine_options_->backup_rate_limiter->GetTotalBytesThrough();
+ EXPECT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ int64_t total_bytes_through_after_backup =
+ engine_options_->backup_rate_limiter->GetTotalBytesThrough();
+ ASSERT_GT(total_bytes_through_after_backup,
+ total_bytes_through_before_backup);
+
+ std::vector<BackupInfo> backup_infos;
+ BackupInfo backup_info;
+ backup_engine_->GetBackupInfo(&backup_infos);
+ ASSERT_EQ(1, backup_infos.size());
+ const int backup_id = 1;
+ ASSERT_EQ(backup_id, backup_infos[0].backup_id);
+ ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info,
+ true /* include_file_details */));
+ int64_t total_bytes_through_before_verify_backup =
+ engine_options_->backup_rate_limiter->GetTotalBytesThrough();
+ EXPECT_OK(
+ backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */));
+ int64_t total_bytes_through_after_verify_backup =
+ engine_options_->backup_rate_limiter->GetTotalBytesThrough();
+ ASSERT_GT(total_bytes_through_after_verify_backup,
+ total_bytes_through_before_verify_backup);
+
+ CloseDBAndBackupEngine();
+ AssertBackupConsistency(backup_id, 0, 100, 101);
+
+ int64_t total_bytes_through_before_initialize =
+ engine_options_->backup_rate_limiter->GetTotalBytesThrough();
+ OpenDBAndBackupEngine(false /* destroy_old_data */);
+ // We charge read in BackupEngineImpl::BackupMeta::LoadFromFile,
+ // which is called in BackupEngineImpl::Initialize() during
+ // OpenBackupEngine(false)
+ int64_t total_bytes_through_after_initialize =
+ engine_options_->backup_rate_limiter->GetTotalBytesThrough();
+ ASSERT_GT(total_bytes_through_after_initialize,
+ total_bytes_through_before_initialize);
+ CloseDBAndBackupEngine();
+
+ DestroyDBWithoutCheck(dbname_, Options());
+ OpenBackupEngine(false /* destroy_old_data */);
+ int64_t total_bytes_through_before_restore =
+ engine_options_->restore_rate_limiter->GetTotalBytesThrough();
+ EXPECT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
+ int64_t total_bytes_through_after_restore =
+ engine_options_->restore_rate_limiter->GetTotalBytesThrough();
+ ASSERT_GT(total_bytes_through_after_restore,
+ total_bytes_through_before_restore);
+ CloseBackupEngine();
+
+ DestroyDBWithoutCheck(dbname_, Options());
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "GenericRateLimiter::Request:PostTimedWait");
+}
+
+#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
+
+TEST_F(BackupEngineTest, ReadOnlyBackupEngine) {
+ DestroyDBWithoutCheck(dbname_, options_);
+ OpenDBAndBackupEngine(true);
+ FillDB(db_.get(), 0, 100);
+ // Also test read-only DB with CreateNewBackup and flush=true (no flush)
+ CloseAndReopenDB(/*read_only*/ true);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), /*flush*/ true));
+ CloseAndReopenDB(/*read_only*/ false);
+ FillDB(db_.get(), 100, 200);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), /*flush*/ true));
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+
+ engine_options_->destroy_old_data = false;
+ test_backup_fs_->ClearWrittenFiles();
+ test_backup_fs_->SetLimitDeleteFiles(0);
+ BackupEngineReadOnly* read_only_backup_engine;
+ ASSERT_OK(BackupEngineReadOnly::Open(db_chroot_env_.get(), *engine_options_,
+ &read_only_backup_engine));
+ std::vector<BackupInfo> backup_info;
+ read_only_backup_engine->GetBackupInfo(&backup_info);
+ ASSERT_EQ(backup_info.size(), 2U);
+
+ RestoreOptions restore_options(false);
+ ASSERT_OK(read_only_backup_engine->RestoreDBFromLatestBackup(
+ dbname_, dbname_, restore_options));
+ delete read_only_backup_engine;
+ std::vector<std::string> should_have_written;
+ test_backup_fs_->AssertWrittenFiles(should_have_written);
+
+ DB* db = OpenDB();
+ AssertExists(db, 0, 200);
+ delete db;
+}
+
+TEST_F(BackupEngineTest, OpenBackupAsReadOnlyDB) {
+ DestroyDBWithoutCheck(dbname_, options_);
+ options_.write_dbid_to_manifest = false;
+
+ OpenDBAndBackupEngine(true);
+ FillDB(db_.get(), 0, 100);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), /*flush*/ false));
+
+ options_.write_dbid_to_manifest = true; // exercises some read-only DB code
+ CloseAndReopenDB();
+
+ FillDB(db_.get(), 100, 200);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), /*flush*/ false));
+ db_.reset(); // CloseDB
+ DestroyDBWithoutCheck(dbname_, options_);
+ BackupInfo backup_info;
+ // First, check that we get empty fields without include_file_details
+ ASSERT_OK(backup_engine_->GetBackupInfo(/*id*/ 1U, &backup_info,
+ /*with file details*/ false));
+ ASSERT_EQ(backup_info.name_for_open, "");
+ ASSERT_FALSE(backup_info.env_for_open);
+
+ // Now for the real test
+ backup_info = BackupInfo();
+ ASSERT_OK(backup_engine_->GetBackupInfo(/*id*/ 1U, &backup_info,
+ /*with file details*/ true));
+
+ // Caution: DBOptions only holds a raw pointer to Env, so something else
+ // must keep it alive.
+ // Case 1: Keeping BackupEngine open suffices to keep Env alive
+ DB* db = nullptr;
+ Options opts = options_;
+ // Ensure some key defaults are set
+ opts.wal_dir = "";
+ opts.create_if_missing = false;
+ opts.info_log.reset();
+
+ opts.env = backup_info.env_for_open.get();
+ std::string name = backup_info.name_for_open;
+ backup_info = BackupInfo();
+ ASSERT_OK(DB::OpenForReadOnly(opts, name, &db));
+
+ AssertExists(db, 0, 100);
+ AssertEmpty(db, 100, 200);
+
+ delete db;
+ db = nullptr;
+
+ // Case 2: Keeping BackupInfo alive rather than BackupEngine also suffices
+ ASSERT_OK(backup_engine_->GetBackupInfo(/*id*/ 2U, &backup_info,
+ /*with file details*/ true));
+ CloseBackupEngine();
+ opts.create_if_missing = true; // check also OK (though pointless)
+ opts.env = backup_info.env_for_open.get();
+ name = backup_info.name_for_open;
+ // Note: keeping backup_info alive
+ ASSERT_OK(DB::OpenForReadOnly(opts, name, &db));
+
+ AssertExists(db, 0, 200);
+ delete db;
+ db = nullptr;
+
+ // Now try opening read-write and make sure it fails, for safety.
+ ASSERT_TRUE(DB::Open(opts, name, &db).IsIOError());
+}
+
+TEST_F(BackupEngineTest, ProgressCallbackDuringBackup) {
+ DestroyDBWithoutCheck(dbname_, options_);
+ // Too big for this small DB
+ engine_options_->callback_trigger_interval_size = 100000;
+ OpenDBAndBackupEngine(true);
+ FillDB(db_.get(), 0, 100);
+ bool is_callback_invoked = false;
+ ASSERT_OK(backup_engine_->CreateNewBackup(
+ db_.get(), true,
+ [&is_callback_invoked]() { is_callback_invoked = true; }));
+ ASSERT_FALSE(is_callback_invoked);
+ CloseBackupEngine();
+
+ // Easily small enough for this small DB
+ engine_options_->callback_trigger_interval_size = 1000;
+ OpenBackupEngine();
+ ASSERT_OK(backup_engine_->CreateNewBackup(
+ db_.get(), true,
+ [&is_callback_invoked]() { is_callback_invoked = true; }));
+ ASSERT_TRUE(is_callback_invoked);
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+}
+
+TEST_F(BackupEngineTest, GarbageCollectionBeforeBackup) {
+ DestroyDBWithoutCheck(dbname_, options_);
+ OpenDBAndBackupEngine(true);
+
+ ASSERT_OK(backup_chroot_env_->CreateDirIfMissing(backupdir_ + "/shared"));
+ std::string file_five = backupdir_ + "/shared/000009.sst";
+ std::string file_five_contents = "I'm not really a sst file";
+ // this depends on the fact that 00009.sst is the first file created by the DB
+ ASSERT_OK(file_manager_->WriteToFile(file_five, file_five_contents));
+
+ FillDB(db_.get(), 0, 100);
+ // backup overwrites file 000009.sst
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+
+ std::string new_file_five_contents;
+ ASSERT_OK(ReadFileToString(backup_chroot_env_.get(), file_five,
+ &new_file_five_contents));
+ // file 000009.sst was overwritten
+ ASSERT_TRUE(new_file_five_contents != file_five_contents);
+
+ CloseDBAndBackupEngine();
+
+ AssertBackupConsistency(0, 0, 100);
+}
+
+// Test that we properly propagate Env failures
+TEST_F(BackupEngineTest, EnvFailures) {
+ BackupEngine* backup_engine;
+
+ // get children failure
+ {
+ test_backup_fs_->SetGetChildrenFailure(true);
+ ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *engine_options_,
+ &backup_engine));
+ test_backup_fs_->SetGetChildrenFailure(false);
+ }
+
+ // created dir failure
+ {
+ test_backup_fs_->SetCreateDirIfMissingFailure(true);
+ ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *engine_options_,
+ &backup_engine));
+ test_backup_fs_->SetCreateDirIfMissingFailure(false);
+ }
+
+ // new directory failure
+ {
+ test_backup_fs_->SetNewDirectoryFailure(true);
+ ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *engine_options_,
+ &backup_engine));
+ test_backup_fs_->SetNewDirectoryFailure(false);
+ }
+
+ // Read from meta-file failure
+ {
+ DestroyDBWithoutCheck(dbname_, options_);
+ OpenDBAndBackupEngine(true);
+ FillDB(db_.get(), 0, 100);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+ test_backup_fs_->SetDummySequentialFile(true);
+ test_backup_fs_->SetDummySequentialFileFailReads(true);
+ engine_options_->destroy_old_data = false;
+ ASSERT_NOK(BackupEngine::Open(test_db_env_.get(), *engine_options_,
+ &backup_engine));
+ test_backup_fs_->SetDummySequentialFile(false);
+ test_backup_fs_->SetDummySequentialFileFailReads(false);
+ }
+
+ // no failure
+ {
+ ASSERT_OK(BackupEngine::Open(test_db_env_.get(), *engine_options_,
+ &backup_engine));
+ delete backup_engine;
+ }
+}
+
+// Verify manifest can roll while a backup is being created with the old
+// manifest.
+TEST_F(BackupEngineTest, ChangeManifestDuringBackupCreation) {
+ DestroyDBWithoutCheck(dbname_, options_);
+ options_.max_manifest_file_size = 0; // always rollover manifest for file add
+ OpenDBAndBackupEngine(true);
+ FillDB(db_.get(), 0, 100, kAutoFlushOnly);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
+ "VersionSet::LogAndApply:WriteManifest"},
+ {"VersionSet::LogAndApply:WriteManifestDone",
+ "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"},
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ROCKSDB_NAMESPACE::port::Thread flush_thread{
+ [this]() { ASSERT_OK(db_->Flush(FlushOptions())); }};
+
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
+
+ flush_thread.join();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+ // The last manifest roll would've already been cleaned up by the full scan
+ // that happens when CreateNewBackup invokes EnableFileDeletions. We need to
+ // trigger another roll to verify non-full scan purges stale manifests.
+ DBImpl* db_impl = static_cast_with_check<DBImpl>(db_.get());
+ std::string prev_manifest_path =
+ DescriptorFileName(dbname_, db_impl->TEST_Current_Manifest_FileNo());
+ FillDB(db_.get(), 0, 100, kAutoFlushOnly);
+ ASSERT_OK(db_chroot_env_->FileExists(prev_manifest_path));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ // Even though manual flush completed above, the background thread may not
+ // have finished its cleanup work. `TEST_WaitForBackgroundWork()` will wait
+ // until all the background thread's work has completed, including cleanup.
+ ASSERT_OK(db_impl->TEST_WaitForBackgroundWork());
+ ASSERT_TRUE(db_chroot_env_->FileExists(prev_manifest_path).IsNotFound());
+
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+ AssertBackupConsistency(0, 0, 100);
+}
+
+// see https://github.com/facebook/rocksdb/issues/921
+TEST_F(BackupEngineTest, Issue921Test) {
+ BackupEngine* backup_engine;
+ engine_options_->share_table_files = false;
+ ASSERT_OK(
+ backup_chroot_env_->CreateDirIfMissing(engine_options_->backup_dir));
+ engine_options_->backup_dir += "/new_dir";
+ ASSERT_OK(BackupEngine::Open(backup_chroot_env_.get(), *engine_options_,
+ &backup_engine));
+
+ delete backup_engine;
+}
+
+TEST_F(BackupEngineTest, BackupWithMetadata) {
+ const int keys_iteration = 5000;
+ OpenDBAndBackupEngine(true);
+ // create five backups
+ for (int i = 0; i < 5; ++i) {
+ const std::string metadata = std::to_string(i);
+ FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
+ // Here also test CreateNewBackupWithMetadata with CreateBackupOptions
+ // and outputting saved BackupID.
+ CreateBackupOptions opts;
+ opts.flush_before_backup = true;
+ BackupID new_id = 0;
+ ASSERT_OK(backup_engine_->CreateNewBackupWithMetadata(opts, db_.get(),
+ metadata, &new_id));
+ ASSERT_EQ(new_id, static_cast<BackupID>(i + 1));
+ }
+ CloseDBAndBackupEngine();
+
+ OpenDBAndBackupEngine();
+ { // Verify in bulk BackupInfo
+ std::vector<BackupInfo> backup_infos;
+ backup_engine_->GetBackupInfo(&backup_infos);
+ ASSERT_EQ(5, backup_infos.size());
+ for (int i = 0; i < 5; i++) {
+ ASSERT_EQ(std::to_string(i), backup_infos[i].app_metadata);
+ }
+ }
+ // Also verify in individual BackupInfo
+ for (int i = 0; i < 5; i++) {
+ BackupInfo backup_info;
+ ASSERT_OK(backup_engine_->GetBackupInfo(static_cast<BackupID>(i + 1),
+ &backup_info));
+ ASSERT_EQ(std::to_string(i), backup_info.app_metadata);
+ }
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+}
+
+TEST_F(BackupEngineTest, BinaryMetadata) {
+ OpenDBAndBackupEngine(true);
+ std::string binaryMetadata = "abc\ndef";
+ binaryMetadata.push_back('\0');
+ binaryMetadata.append("ghi");
+ ASSERT_OK(
+ backup_engine_->CreateNewBackupWithMetadata(db_.get(), binaryMetadata));
+ CloseDBAndBackupEngine();
+
+ OpenDBAndBackupEngine();
+ std::vector<BackupInfo> backup_infos;
+ backup_engine_->GetBackupInfo(&backup_infos);
+ ASSERT_EQ(1, backup_infos.size());
+ ASSERT_EQ(binaryMetadata, backup_infos[0].app_metadata);
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+}
+
+TEST_F(BackupEngineTest, MetadataTooLarge) {
+ OpenDBAndBackupEngine(true);
+ std::string largeMetadata(1024 * 1024 + 1, 0);
+ ASSERT_NOK(
+ backup_engine_->CreateNewBackupWithMetadata(db_.get(), largeMetadata));
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+}
+
+TEST_F(BackupEngineTest, MetaSchemaVersion2_SizeCorruption) {
+ engine_options_->schema_version = 1;
+ OpenDBAndBackupEngine(/*destroy_old_data*/ true);
+
+ // Backup 1: no future schema, no sizes, with checksums
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ CloseDBAndBackupEngine();
+ engine_options_->schema_version = 2;
+ OpenDBAndBackupEngine(/*destroy_old_data*/ false);
+
+ // Backup 2: no checksums, no sizes
+ TEST_BackupMetaSchemaOptions test_opts;
+ test_opts.crc32c_checksums = false;
+ test_opts.file_sizes = false;
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ // Backup 3: no checksums, with sizes
+ test_opts.file_sizes = true;
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ // Backup 4: with checksums and sizes
+ test_opts.crc32c_checksums = true;
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ CloseDBAndBackupEngine();
+
+ // Corrupt all the CURRENT files with the wrong size
+ const std::string private_dir = backupdir_ + "/private";
+
+ for (int id = 1; id <= 3; ++id) {
+ ASSERT_OK(file_manager_->WriteToFile(
+ private_dir + "/" + std::to_string(id) + "/CURRENT", "x"));
+ }
+ // Except corrupt Backup 4 with same size CURRENT file
+ {
+ uint64_t size = 0;
+ ASSERT_OK(test_backup_env_->GetFileSize(private_dir + "/4/CURRENT", &size));
+ ASSERT_OK(file_manager_->WriteToFile(private_dir + "/4/CURRENT",
+ std::string(size, 'x')));
+ }
+
+ OpenBackupEngine();
+
+ // Only the one with sizes in metadata will be immediately detected
+ // as corrupt
+ std::vector<BackupID> corrupted;
+ backup_engine_->GetCorruptedBackups(&corrupted);
+ ASSERT_EQ(corrupted.size(), 1);
+ ASSERT_EQ(corrupted[0], 3);
+
+ // Size corruption detected on Restore with checksum
+ ASSERT_TRUE(backup_engine_->RestoreDBFromBackup(1 /*id*/, dbname_, dbname_)
+ .IsCorruption());
+
+ // Size corruption not detected without checksums nor sizes
+ ASSERT_OK(backup_engine_->RestoreDBFromBackup(2 /*id*/, dbname_, dbname_));
+
+ // Non-size corruption detected on Restore with checksum
+ ASSERT_TRUE(backup_engine_->RestoreDBFromBackup(4 /*id*/, dbname_, dbname_)
+ .IsCorruption());
+
+ CloseBackupEngine();
+}
+
+TEST_F(BackupEngineTest, MetaSchemaVersion2_NotSupported) {
+ engine_options_->schema_version = 2;
+ TEST_BackupMetaSchemaOptions test_opts;
+ std::string app_metadata = "abc\ndef";
+
+ OpenDBAndBackupEngine(true);
+ // Start with supported
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(
+ backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
+
+ // Because we are injecting badness with a TEST API, the badness is only
+ // detected on attempt to restore.
+ // Not supported versions
+ test_opts.version = "3";
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(
+ backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
+ test_opts.version = "23.45.67";
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(
+ backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
+ test_opts.version = "2";
+
+ // Non-ignorable fields
+ test_opts.meta_fields["ni::blah"] = "123";
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(
+ backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
+ test_opts.meta_fields.clear();
+
+ test_opts.file_fields["ni::123"] = "xyz";
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(
+ backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
+ test_opts.file_fields.clear();
+
+ test_opts.footer_fields["ni::123"] = "xyz";
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(
+ backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
+ test_opts.footer_fields.clear();
+ CloseDBAndBackupEngine();
+
+ OpenBackupEngine();
+ std::vector<BackupID> corrupted;
+ backup_engine_->GetCorruptedBackups(&corrupted);
+ ASSERT_EQ(corrupted.size(), 5);
+
+ ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
+ CloseBackupEngine();
+}
+
+TEST_F(BackupEngineTest, MetaSchemaVersion2_Restore) {
+ engine_options_->schema_version = 2;
+ TEST_BackupMetaSchemaOptions test_opts;
+ const int keys_iteration = 5000;
+
+ OpenDBAndBackupEngine(true, false, kShareWithChecksum);
+ FillDB(db_.get(), 0, keys_iteration);
+ // Start with minimum metadata to ensure it works without it being filled
+ // based on shared files also in other backups with the metadata.
+ test_opts.crc32c_checksums = false;
+ test_opts.file_sizes = false;
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ AssertBackupConsistency(1 /* id */, 0, keys_iteration, keys_iteration * 2);
+
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false,
+ kShareWithChecksum);
+ test_opts.file_sizes = true;
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ for (int id = 1; id <= 2; ++id) {
+ AssertBackupConsistency(id, 0, keys_iteration, keys_iteration * 2);
+ }
+
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false,
+ kShareWithChecksum);
+ test_opts.crc32c_checksums = true;
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ for (int id = 1; id <= 3; ++id) {
+ AssertBackupConsistency(id, 0, keys_iteration, keys_iteration * 2);
+ }
+
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false,
+ kShareWithChecksum);
+ // No TEST_EnableWriteFutureSchemaVersion2
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ for (int id = 1; id <= 4; ++id) {
+ AssertBackupConsistency(id, 0, keys_iteration, keys_iteration * 2);
+ }
+
+ OpenDBAndBackupEngine(false /* destroy_old_data */, false,
+ kShareWithChecksum);
+ // Minor version updates should be forward-compatible
+ test_opts.version = "2.5.70";
+ test_opts.meta_fields["asdf.3456"] = "-42";
+ test_opts.meta_fields["__QRST"] = " 1 $ %%& ";
+ test_opts.file_fields["z94._"] = "^\\";
+ test_opts.file_fields["_7yyyyyyyyy"] = "111111111111";
+ test_opts.footer_fields["Qwzn.tz89"] = "ASDF!!@# ##=\t ";
+ test_opts.footer_fields["yes"] = "no!";
+ TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ for (int id = 1; id <= 5; ++id) {
+ AssertBackupConsistency(id, 0, keys_iteration, keys_iteration * 2);
+ }
+}
+
+TEST_F(BackupEngineTest, Concurrency) {
+ // Check that we can simultaneously:
+ // * Run several read operations in different threads on a single
+ // BackupEngine object, and
+ // * With another BackupEngine object on the same
+ // backup_dir, run the same read operations in another thread, and
+ // * With yet another BackupEngine object on the same
+ // backup_dir, create two new backups in parallel threads.
+ //
+ // Because of the challenges of integrating this into db_stress,
+ // this is a non-deterministic mini-stress test here instead.
+
+ // To check for a race condition in handling buffer size based on byte
+ // burst limit, we need a (generous) rate limiter
+ std::shared_ptr<RateLimiter> limiter{NewGenericRateLimiter(1000000000)};
+ engine_options_->backup_rate_limiter = limiter;
+ engine_options_->restore_rate_limiter = limiter;
+
+ OpenDBAndBackupEngine(true, false, kShareWithChecksum);
+
+ static constexpr int keys_iteration = 5000;
+ FillDB(db_.get(), 0, keys_iteration);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ FillDB(db_.get(), keys_iteration, 2 * keys_iteration);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ static constexpr int max_factor = 3;
+ FillDB(db_.get(), 2 * keys_iteration, max_factor * keys_iteration);
+ // will create another backup soon...
+
+ Options db_opts = options_;
+ db_opts.wal_dir = "";
+ db_opts.create_if_missing = false;
+ BackupEngineOptions be_opts = *engine_options_;
+ be_opts.destroy_old_data = false;
+
+ std::mt19937 rng{std::random_device()()};
+
+ std::array<std::thread, 4> read_threads;
+ std::array<std::thread, 4> restore_verify_threads;
+ for (uint32_t i = 0; i < read_threads.size(); ++i) {
+ uint32_t sleep_micros = rng() % 100000;
+ read_threads[i] = std::thread([this, i, sleep_micros, &db_opts, &be_opts,
+ &restore_verify_threads, &limiter] {
+ test_db_env_->SleepForMicroseconds(sleep_micros);
+
+ // Whether to also re-open the BackupEngine, potentially seeing
+ // additional backups
+ bool reopen = i == 3;
+ // Whether we are going to restore "latest"
+ bool latest = i > 1;
+
+ BackupEngine* my_be;
+ if (reopen) {
+ ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be));
+ } else {
+ my_be = backup_engine_.get();
+ }
+
+ // Verify metadata (we don't receive updates from concurrently
+ // creating a new backup)
+ std::vector<BackupInfo> infos;
+ my_be->GetBackupInfo(&infos);
+ const uint32_t count = static_cast<uint32_t>(infos.size());
+ infos.clear();
+ if (reopen) {
+ ASSERT_GE(count, 2U);
+ ASSERT_LE(count, 4U);
+ fprintf(stderr, "Reopen saw %u backups\n", count);
+ } else {
+ ASSERT_EQ(count, 2U);
+ }
+ std::vector<BackupID> ids;
+ my_be->GetCorruptedBackups(&ids);
+ ASSERT_EQ(ids.size(), 0U);
+
+ // (Eventually, see below) Restore one of the backups, or "latest"
+ std::string restore_db_dir = dbname_ + "/restore" + std::to_string(i);
+ DestroyDir(test_db_env_.get(), restore_db_dir).PermitUncheckedError();
+ BackupID to_restore;
+ if (latest) {
+ to_restore = count;
+ } else {
+ to_restore = i + 1;
+ }
+
+ // Open restored DB to verify its contents, but test atomic restore
+ // by doing it async and ensuring we either get OK or InvalidArgument
+ restore_verify_threads[i] =
+ std::thread([this, &db_opts, restore_db_dir, to_restore] {
+ DB* restored;
+ Status s;
+ for (;;) {
+ s = DB::Open(db_opts, restore_db_dir, &restored);
+ if (s.IsInvalidArgument()) {
+ // Restore hasn't finished
+ test_db_env_->SleepForMicroseconds(1000);
+ continue;
+ } else {
+ // We should only get InvalidArgument if restore is
+ // incomplete, or OK if complete
+ ASSERT_OK(s);
+ break;
+ }
+ }
+ int factor = std::min(static_cast<int>(to_restore), max_factor);
+ AssertExists(restored, 0, factor * keys_iteration);
+ AssertEmpty(restored, factor * keys_iteration,
+ (factor + 1) * keys_iteration);
+ delete restored;
+ });
+
+ // (Ok now) Restore one of the backups, or "latest"
+ if (latest) {
+ ASSERT_OK(
+ my_be->RestoreDBFromLatestBackup(restore_db_dir, restore_db_dir));
+ } else {
+ ASSERT_OK(my_be->VerifyBackup(to_restore, true));
+ ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir,
+ restore_db_dir));
+ }
+
+ // Test for race condition in reconfiguring limiter
+ // FIXME: this could set to a different value in all threads, except
+ // GenericRateLimiter::SetBytesPerSecond has a write-write race
+ // reported by TSAN
+ if (i == 0) {
+ limiter->SetBytesPerSecond(2000000000);
+ }
+
+ // Re-verify metadata (we don't receive updates from concurrently
+ // creating a new backup)
+ my_be->GetBackupInfo(&infos);
+ ASSERT_EQ(infos.size(), count);
+ my_be->GetCorruptedBackups(&ids);
+ ASSERT_EQ(ids.size(), 0);
+ // fprintf(stderr, "Finished read thread\n");
+
+ if (reopen) {
+ delete my_be;
+ }
+ });
+ }
+
+ BackupEngine* alt_be;
+ ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &alt_be));
+
+ std::array<std::thread, 2> append_threads;
+ for (unsigned i = 0; i < append_threads.size(); ++i) {
+ uint32_t sleep_micros = rng() % 100000;
+ append_threads[i] = std::thread([this, sleep_micros, alt_be] {
+ test_db_env_->SleepForMicroseconds(sleep_micros);
+ // WART: CreateNewBackup doesn't tell you the BackupID it just created,
+ // which is ugly for multithreaded setting.
+ // TODO: add delete backup also when that is added
+ ASSERT_OK(alt_be->CreateNewBackup(db_.get()));
+ // fprintf(stderr, "Finished append thread\n");
+ });
+ }
+
+ for (auto& t : append_threads) {
+ t.join();
+ }
+ // Verify metadata
+ std::vector<BackupInfo> infos;
+ alt_be->GetBackupInfo(&infos);
+ ASSERT_EQ(infos.size(), 2 + append_threads.size());
+
+ for (auto& t : read_threads) {
+ t.join();
+ }
+
+ delete alt_be;
+
+ for (auto& t : restore_verify_threads) {
+ t.join();
+ }
+
+ CloseDBAndBackupEngine();
+}
+
+TEST_F(BackupEngineTest, LimitBackupsOpened) {
+ // Verify the specified max backups are opened, including skipping over
+ // corrupted backups.
+ //
+ // Setup:
+ // - backups 1, 2, and 4 are valid
+ // - backup 3 is corrupt
+ // - max_valid_backups_to_open == 2
+ //
+ // Expectation: the engine opens backups 4 and 2 since those are latest two
+ // non-corrupt backups.
+ const int kNumKeys = 5000;
+ OpenDBAndBackupEngine(true);
+ for (int i = 1; i <= 4; ++i) {
+ FillDB(db_.get(), kNumKeys * i, kNumKeys * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ if (i == 3) {
+ ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/3", 3));
+ }
+ }
+ CloseDBAndBackupEngine();
+
+ engine_options_->max_valid_backups_to_open = 2;
+ engine_options_->destroy_old_data = false;
+ BackupEngineReadOnly* read_only_backup_engine;
+ ASSERT_OK(BackupEngineReadOnly::Open(
+ backup_chroot_env_.get(), *engine_options_, &read_only_backup_engine));
+
+ std::vector<BackupInfo> backup_infos;
+ read_only_backup_engine->GetBackupInfo(&backup_infos);
+ ASSERT_EQ(2, backup_infos.size());
+ ASSERT_EQ(2, backup_infos[0].backup_id);
+ ASSERT_EQ(4, backup_infos[1].backup_id);
+ delete read_only_backup_engine;
+}
+
+TEST_F(BackupEngineTest, IgnoreLimitBackupsOpenedWhenNotReadOnly) {
+ // Verify the specified max_valid_backups_to_open is ignored if the engine
+ // is not read-only.
+ //
+ // Setup:
+ // - backups 1, 2, and 4 are valid
+ // - backup 3 is corrupt
+ // - max_valid_backups_to_open == 2
+ //
+ // Expectation: the engine opens backups 4, 2, and 1 since those are latest
+ // non-corrupt backups, by ignoring max_valid_backups_to_open == 2.
+ const int kNumKeys = 5000;
+ OpenDBAndBackupEngine(true);
+ for (int i = 1; i <= 4; ++i) {
+ FillDB(db_.get(), kNumKeys * i, kNumKeys * (i + 1));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ if (i == 3) {
+ ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/3", 3));
+ }
+ }
+ CloseDBAndBackupEngine();
+
+ engine_options_->max_valid_backups_to_open = 2;
+ OpenDBAndBackupEngine();
+ std::vector<BackupInfo> backup_infos;
+ backup_engine_->GetBackupInfo(&backup_infos);
+ ASSERT_EQ(3, backup_infos.size());
+ ASSERT_EQ(1, backup_infos[0].backup_id);
+ ASSERT_EQ(2, backup_infos[1].backup_id);
+ ASSERT_EQ(4, backup_infos[2].backup_id);
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+}
+
+TEST_F(BackupEngineTest, CreateWhenLatestBackupCorrupted) {
+ // we should pick an ID greater than corrupted backups' IDs so creation can
+ // succeed even when latest backup is corrupted.
+ const int kNumKeys = 5000;
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ BackupInfo backup_info;
+ ASSERT_TRUE(backup_engine_->GetLatestBackupInfo(&backup_info).IsNotFound());
+ FillDB(db_.get(), 0 /* from */, kNumKeys);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ true /* flush_before_backup */));
+ ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/1",
+ 3 /* bytes_to_corrupt */));
+ CloseDBAndBackupEngine();
+
+ OpenDBAndBackupEngine();
+ ASSERT_TRUE(backup_engine_->GetLatestBackupInfo(&backup_info).IsNotFound());
+
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ true /* flush_before_backup */));
+
+ ASSERT_TRUE(backup_engine_->GetLatestBackupInfo(&backup_info).ok());
+ ASSERT_EQ(2, backup_info.backup_id);
+
+ std::vector<BackupInfo> backup_infos;
+ backup_engine_->GetBackupInfo(&backup_infos);
+ ASSERT_EQ(1, backup_infos.size());
+ ASSERT_EQ(2, backup_infos[0].backup_id);
+
+ // Verify individual GetBackupInfo by ID
+ ASSERT_TRUE(backup_engine_->GetBackupInfo(0U, &backup_info).IsNotFound());
+ ASSERT_TRUE(backup_engine_->GetBackupInfo(1U, &backup_info).IsCorruption());
+ ASSERT_TRUE(backup_engine_->GetBackupInfo(2U, &backup_info).ok());
+ ASSERT_TRUE(backup_engine_->GetBackupInfo(3U, &backup_info).IsNotFound());
+ ASSERT_TRUE(
+ backup_engine_->GetBackupInfo(999999U, &backup_info).IsNotFound());
+}
+
+TEST_F(BackupEngineTest, WriteOnlyEngineNoSharedFileDeletion) {
+ // Verifies a write-only BackupEngine does not delete files belonging to valid
+ // backups when GarbageCollect, PurgeOldBackups, or DeleteBackup are called.
+ const int kNumKeys = 5000;
+ for (int i = 0; i < 3; ++i) {
+ OpenDBAndBackupEngine(i == 0 /* destroy_old_data */);
+ FillDB(db_.get(), i * kNumKeys, (i + 1) * kNumKeys);
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
+ CloseDBAndBackupEngine();
+
+ engine_options_->max_valid_backups_to_open = 0;
+ OpenDBAndBackupEngine();
+ switch (i) {
+ case 0:
+ ASSERT_OK(backup_engine_->GarbageCollect());
+ break;
+ case 1:
+ ASSERT_OK(backup_engine_->PurgeOldBackups(1 /* num_backups_to_keep */));
+ break;
+ case 2:
+ ASSERT_OK(backup_engine_->DeleteBackup(2 /* backup_id */));
+ break;
+ default:
+ assert(false);
+ }
+ CloseDBAndBackupEngine();
+
+ engine_options_->max_valid_backups_to_open =
+ std::numeric_limits<int32_t>::max();
+ AssertBackupConsistency(i + 1, 0, (i + 1) * kNumKeys);
+ }
+}
+
+TEST_P(BackupEngineTestWithParam, BackupUsingDirectIO) {
+ // Tests direct I/O on the backup engine's reads and writes on the DB env and
+ // backup env
+ // We use ChrootEnv underneath so the below line checks for direct I/O support
+ // in the chroot directory, not the true filesystem root.
+ if (!test::IsDirectIOSupported(test_db_env_.get(), "/")) {
+ ROCKSDB_GTEST_SKIP("Test requires Direct I/O Support");
+ return;
+ }
+ const int kNumKeysPerBackup = 100;
+ const int kNumBackups = 3;
+ options_.use_direct_reads = true;
+ OpenDBAndBackupEngine(true /* destroy_old_data */);
+ for (int i = 0; i < kNumBackups; ++i) {
+ FillDB(db_.get(), i * kNumKeysPerBackup /* from */,
+ (i + 1) * kNumKeysPerBackup /* to */, kFlushAll);
+
+ // Clear the file open counters and then do a bunch of backup engine ops.
+ // For all ops, files should be opened in direct mode.
+ test_backup_fs_->ClearFileOpenCounters();
+ test_db_fs_->ClearFileOpenCounters();
+ CloseBackupEngine();
+ OpenBackupEngine();
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ ASSERT_OK(backup_engine_->VerifyBackup(i + 1));
+ CloseBackupEngine();
+ OpenBackupEngine();
+ std::vector<BackupInfo> backup_infos;
+ backup_engine_->GetBackupInfo(&backup_infos);
+ ASSERT_EQ(static_cast<size_t>(i + 1), backup_infos.size());
+
+ // Verify backup engine always opened files with direct I/O
+ ASSERT_EQ(0, test_db_fs_->num_writers());
+ ASSERT_GE(test_db_fs_->num_direct_rand_readers(), 0);
+ ASSERT_GT(test_db_fs_->num_direct_seq_readers(), 0);
+ // Currently the DB doesn't support reading WALs or manifest with direct
+ // I/O, so subtract two.
+ ASSERT_EQ(test_db_fs_->num_seq_readers() - 2,
+ test_db_fs_->num_direct_seq_readers());
+ ASSERT_EQ(test_db_fs_->num_rand_readers(),
+ test_db_fs_->num_direct_rand_readers());
+ }
+ CloseDBAndBackupEngine();
+
+ for (int i = 0; i < kNumBackups; ++i) {
+ AssertBackupConsistency(i + 1 /* backup_id */,
+ i * kNumKeysPerBackup /* start_exist */,
+ (i + 1) * kNumKeysPerBackup /* end_exist */,
+ (i + 2) * kNumKeysPerBackup /* end */);
+ }
+}
+
+TEST_F(BackupEngineTest, BackgroundThreadCpuPriority) {
+ std::atomic<CpuPriority> priority(CpuPriority::kNormal);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "BackupEngineImpl::Initialize:SetCpuPriority", [&](void* new_priority) {
+ priority.store(*reinterpret_cast<CpuPriority*>(new_priority));
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // 1 thread is easier to test, otherwise, we may not be sure which thread
+ // actually does the work during CreateNewBackup.
+ engine_options_->max_background_operations = 1;
+ OpenDBAndBackupEngine(true);
+
+ {
+ FillDB(db_.get(), 0, 100);
+
+ // by default, cpu priority is not changed.
+ CreateBackupOptions options;
+ ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get()));
+
+ ASSERT_EQ(priority, CpuPriority::kNormal);
+ }
+
+ {
+ FillDB(db_.get(), 101, 200);
+
+ // decrease cpu priority from normal to low.
+ CreateBackupOptions options;
+ options.decrease_background_thread_cpu_priority = true;
+ options.background_thread_cpu_priority = CpuPriority::kLow;
+ ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get()));
+
+ ASSERT_EQ(priority, CpuPriority::kLow);
+ }
+
+ {
+ FillDB(db_.get(), 201, 300);
+
+ // try to upgrade cpu priority back to normal,
+ // the priority should still low.
+ CreateBackupOptions options;
+ options.decrease_background_thread_cpu_priority = true;
+ options.background_thread_cpu_priority = CpuPriority::kNormal;
+ ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get()));
+
+ ASSERT_EQ(priority, CpuPriority::kLow);
+ }
+
+ {
+ FillDB(db_.get(), 301, 400);
+
+ // decrease cpu priority from low to idle.
+ CreateBackupOptions options;
+ options.decrease_background_thread_cpu_priority = true;
+ options.background_thread_cpu_priority = CpuPriority::kIdle;
+ ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get()));
+
+ ASSERT_EQ(priority, CpuPriority::kIdle);
+ }
+
+ {
+ FillDB(db_.get(), 301, 400);
+
+ // reset priority to later verify that it's not updated by SetCpuPriority.
+ priority = CpuPriority::kNormal;
+
+ // setting the same cpu priority won't call SetCpuPriority.
+ CreateBackupOptions options;
+ options.decrease_background_thread_cpu_priority = true;
+ options.background_thread_cpu_priority = CpuPriority::kIdle;
+
+ // Also check output backup_id with CreateNewBackup
+ BackupID new_id = 0;
+ ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get(), &new_id));
+ ASSERT_EQ(new_id, 5U);
+
+ ASSERT_EQ(priority, CpuPriority::kNormal);
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ CloseDBAndBackupEngine();
+ DestroyDBWithoutCheck(dbname_, options_);
+}
+
+// Populates `*total_size` with the size of all files under `backup_dir`.
+// We don't go through `BackupEngine` currently because it's hard to figure out
+// the metadata file size.
+Status GetSizeOfBackupFiles(FileSystem* backup_fs,
+ const std::string& backup_dir, size_t* total_size) {
+ *total_size = 0;
+ std::vector<std::string> dir_stack = {backup_dir};
+ Status s;
+ while (s.ok() && !dir_stack.empty()) {
+ std::string dir = std::move(dir_stack.back());
+ dir_stack.pop_back();
+ std::vector<std::string> children;
+ s = backup_fs->GetChildren(dir, IOOptions(), &children, nullptr /* dbg */);
+ for (size_t i = 0; s.ok() && i < children.size(); ++i) {
+ std::string path = dir + "/" + children[i];
+ bool is_dir;
+ s = backup_fs->IsDirectory(path, IOOptions(), &is_dir, nullptr /* dbg */);
+ uint64_t file_size = 0;
+ if (s.ok()) {
+ if (is_dir) {
+ dir_stack.emplace_back(std::move(path));
+ } else {
+ s = backup_fs->GetFileSize(path, IOOptions(), &file_size,
+ nullptr /* dbg */);
+ }
+ }
+ if (s.ok()) {
+ *total_size += file_size;
+ }
+ }
+ }
+ return s;
+}
+
+TEST_F(BackupEngineTest, IOStats) {
+ // Tests the `BACKUP_READ_BYTES` and `BACKUP_WRITE_BYTES` ticker stats have
+ // the expected values according to the files in the backups.
+
+ // These ticker stats are expected to be populated regardless of `PerfLevel`
+ // in user thread
+ SetPerfLevel(kDisable);
+
+ options_.statistics = CreateDBStatistics();
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kShareWithChecksum);
+
+ FillDB(db_.get(), 0 /* from */, 100 /* to */, kFlushMost);
+
+ ASSERT_EQ(0, options_.statistics->getTickerCount(BACKUP_READ_BYTES));
+ ASSERT_EQ(0, options_.statistics->getTickerCount(BACKUP_WRITE_BYTES));
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+
+ size_t orig_backup_files_size;
+ ASSERT_OK(GetSizeOfBackupFiles(test_backup_env_->GetFileSystem().get(),
+ backupdir_, &orig_backup_files_size));
+ size_t expected_bytes_written = orig_backup_files_size;
+ ASSERT_EQ(expected_bytes_written,
+ options_.statistics->getTickerCount(BACKUP_WRITE_BYTES));
+ // Bytes read is more difficult to pin down since there are reads for many
+ // purposes other than creating file, like `GetSortedWalFiles()` to find first
+ // sequence number, or `CreateNewBackup()` thread to find SST file session ID.
+ // So we loosely require there are at least as many reads as needed for
+ // copying, but not as many as twice that.
+ ASSERT_GE(options_.statistics->getTickerCount(BACKUP_READ_BYTES),
+ expected_bytes_written);
+ ASSERT_LT(expected_bytes_written,
+ 2 * options_.statistics->getTickerCount(BACKUP_READ_BYTES));
+
+ FillDB(db_.get(), 100 /* from */, 200 /* to */, kFlushMost);
+
+ ASSERT_OK(options_.statistics->Reset());
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
+ false /* flush_before_backup */));
+ size_t final_backup_files_size;
+ ASSERT_OK(GetSizeOfBackupFiles(test_backup_env_->GetFileSystem().get(),
+ backupdir_, &final_backup_files_size));
+ expected_bytes_written = final_backup_files_size - orig_backup_files_size;
+ ASSERT_EQ(expected_bytes_written,
+ options_.statistics->getTickerCount(BACKUP_WRITE_BYTES));
+ // See above for why these bounds were chosen.
+ ASSERT_GE(options_.statistics->getTickerCount(BACKUP_READ_BYTES),
+ expected_bytes_written);
+ ASSERT_LT(expected_bytes_written,
+ 2 * options_.statistics->getTickerCount(BACKUP_READ_BYTES));
+}
+
+TEST_F(BackupEngineTest, FileTemperatures) {
+ CloseDBAndBackupEngine();
+
+ // Required for recording+restoring temperatures
+ engine_options_->schema_version = 2;
+
+ // More file IO instrumentation
+ auto my_db_fs = std::make_shared<FileTemperatureTestFS>(db_chroot_fs_);
+ test_db_fs_ = std::make_shared<TestFs>(my_db_fs);
+ SetEnvsFromFileSystems();
+
+ // Use temperatures
+ options_.bottommost_temperature = Temperature::kWarm;
+ options_.level0_file_num_compaction_trigger = 2;
+ // set dynamic_level to true so the compaction would compact the data to the
+ // last level directly which will have the last_level_temperature
+ options_.level_compaction_dynamic_level_bytes = true;
+
+ OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
+ kShareWithChecksum);
+
+ // generate a bottommost file (combined from 2) and a non-bottommost file
+ DBImpl* dbi = static_cast_with_check<DBImpl>(db_.get());
+ ASSERT_OK(db_->Put(WriteOptions(), "a", "val"));
+ ASSERT_OK(db_->Put(WriteOptions(), "c", "val"));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_OK(db_->Put(WriteOptions(), "b", "val"));
+ ASSERT_OK(db_->Put(WriteOptions(), "d", "val"));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_OK(dbi->TEST_WaitForCompact());
+ ASSERT_OK(db_->Put(WriteOptions(), "e", "val"));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+
+ // Get temperatures from manifest
+ std::map<uint64_t, Temperature> manifest_temps;
+ std::map<Temperature, int> manifest_temp_counts;
+ {
+ std::vector<LiveFileStorageInfo> infos;
+ ASSERT_OK(
+ db_->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(), &infos));
+ for (auto info : infos) {
+ if (info.file_type == kTableFile) {
+ manifest_temps.emplace(info.file_number, info.temperature);
+ manifest_temp_counts[info.temperature]++;
+ }
+ }
+ }
+
+ // Verify expected manifest temperatures
+ ASSERT_EQ(manifest_temp_counts.size(), 2);
+ ASSERT_EQ(manifest_temp_counts[Temperature::kWarm], 1);
+ ASSERT_EQ(manifest_temp_counts[Temperature::kUnknown], 1);
+
+ // Verify manifest temperatures match FS temperatures
+ std::map<uint64_t, Temperature> current_temps;
+ my_db_fs->CopyCurrentSstFileTemperatures(&current_temps);
+ for (const auto& manifest_temp : manifest_temps) {
+ ASSERT_EQ(current_temps[manifest_temp.first], manifest_temp.second);
+ }
+
+ // Try a few different things
+ for (int i = 1; i <= 5; ++i) {
+ // Expected temperatures after restore are based on manifest temperatures
+ std::map<uint64_t, Temperature> expected_temps = manifest_temps;
+
+ if (i >= 2) {
+ // For iterations 2 & 3, override current temperature of one file
+ // and vary which temperature is authoritative (current or manifest).
+ // For iterations 4 & 5, override current temperature of both files
+ // but make sure an current temperate always takes precedence over
+ // unknown regardless of current_temperatures_override_manifest setting.
+ bool use_current = ((i % 2) == 1);
+ engine_options_->current_temperatures_override_manifest = use_current;
+ CloseBackupEngine();
+ OpenBackupEngine();
+ for (const auto& manifest_temp : manifest_temps) {
+ if (i <= 3) {
+ if (manifest_temp.second == Temperature::kWarm) {
+ my_db_fs->OverrideSstFileTemperature(manifest_temp.first,
+ Temperature::kCold);
+ if (use_current) {
+ expected_temps[manifest_temp.first] = Temperature::kCold;
+ }
+ }
+ } else {
+ assert(i <= 5);
+ if (manifest_temp.second == Temperature::kWarm) {
+ my_db_fs->OverrideSstFileTemperature(manifest_temp.first,
+ Temperature::kUnknown);
+ } else {
+ ASSERT_EQ(manifest_temp.second, Temperature::kUnknown);
+ my_db_fs->OverrideSstFileTemperature(manifest_temp.first,
+ Temperature::kHot);
+ // regardless of use_current
+ expected_temps[manifest_temp.first] = Temperature::kHot;
+ }
+ }
+ }
+ }
+
+ // Sample requested temperatures in opening files for backup
+ my_db_fs->PopRequestedSstFileTemperatures();
+ ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
+
+ // Verify requested temperatures against manifest temperatures (before
+ // retry with kUnknown if needed, and before backup finds out current
+ // temperatures in FileSystem)
+ std::vector<std::pair<uint64_t, Temperature>> requested_temps;
+ my_db_fs->PopRequestedSstFileTemperatures(&requested_temps);
+ std::set<uint64_t> distinct_requests;
+ for (const auto& requested_temp : requested_temps) {
+ // Matching manifest temperatures, except allow retry request with
+ // kUnknown
+ auto manifest_temp = manifest_temps.at(requested_temp.first);
+ if (manifest_temp == Temperature::kUnknown ||
+ requested_temp.second != Temperature::kUnknown) {
+ ASSERT_EQ(manifest_temp, requested_temp.second);
+ }
+ distinct_requests.insert(requested_temp.first);
+ }
+ // Two distinct requests
+ ASSERT_EQ(distinct_requests.size(), 2);
+
+ // Verify against backup info file details API
+ BackupInfo info;
+ ASSERT_OK(backup_engine_->GetLatestBackupInfo(
+ &info, /*include_file_details*/ true));
+ ASSERT_GT(info.file_details.size(), 2);
+ for (auto& e : info.file_details) {
+ ASSERT_EQ(expected_temps[e.file_number], e.temperature);
+ }
+
+ // Restore backup to another virtual (tiered) dir
+ const std::string restore_dir = "/restore" + std::to_string(i);
+ ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(
+ RestoreOptions(), restore_dir, restore_dir));
+
+ // Verify restored FS temperatures match expectation
+ // (FileTemperatureTestFS doesn't distinguish directories when reporting
+ // current temperatures, just whatever SST was written or overridden last
+ // with that file number.)
+ my_db_fs->CopyCurrentSstFileTemperatures(&current_temps);
+ for (const auto& expected_temp : expected_temps) {
+ ASSERT_EQ(current_temps[expected_temp.first], expected_temp.second);
+ }
+
+ // Delete backup to force next backup to copy files
+ ASSERT_OK(backup_engine_->PurgeOldBackups(0));
+ }
+}
+
+} // namespace
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+#include <stdio.h>
+
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr, "SKIPPED as BackupEngine is not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+
+#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)