From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/file/delete_scheduler.cc | 411 +++++++++++++++++++++++++++++++++++ 1 file changed, 411 insertions(+) create mode 100644 src/rocksdb/file/delete_scheduler.cc (limited to 'src/rocksdb/file/delete_scheduler.cc') diff --git a/src/rocksdb/file/delete_scheduler.cc b/src/rocksdb/file/delete_scheduler.cc new file mode 100644 index 000000000..b97a0f224 --- /dev/null +++ b/src/rocksdb/file/delete_scheduler.cc @@ -0,0 +1,411 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "file/delete_scheduler.h" + +#include +#include +#include + +#include "file/sst_file_manager_impl.h" +#include "logging/logging.h" +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/file_system.h" +#include "rocksdb/system_clock.h" +#include "test_util/sync_point.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs, + int64_t rate_bytes_per_sec, Logger* info_log, + SstFileManagerImpl* sst_file_manager, + double max_trash_db_ratio, + uint64_t bytes_max_delete_chunk) + : clock_(clock), + fs_(fs), + total_trash_size_(0), + rate_bytes_per_sec_(rate_bytes_per_sec), + pending_files_(0), + bytes_max_delete_chunk_(bytes_max_delete_chunk), + closing_(false), + cv_(&mu_), + bg_thread_(nullptr), + info_log_(info_log), + sst_file_manager_(sst_file_manager), + max_trash_db_ratio_(max_trash_db_ratio) { + assert(sst_file_manager != nullptr); + assert(max_trash_db_ratio >= 0); + MaybeCreateBackgroundThread(); +} + +DeleteScheduler::~DeleteScheduler() { + { + InstrumentedMutexLock l(&mu_); + closing_ = true; + cv_.SignalAll(); + } + if (bg_thread_) { + bg_thread_->join(); + } + for (const auto& it : bg_errors_) { + it.second.PermitUncheckedError(); + } +} + +Status DeleteScheduler::DeleteFile(const std::string& file_path, + const std::string& dir_to_sync, + const bool force_bg) { + if (rate_bytes_per_sec_.load() <= 0 || + (!force_bg && + total_trash_size_.load() > + sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) { + // Rate limiting is disabled or trash size makes up more than + // max_trash_db_ratio_ (default 25%) of the total DB size + TEST_SYNC_POINT("DeleteScheduler::DeleteFile"); + Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr); + if (s.ok()) { + s = sst_file_manager_->OnDeleteFile(file_path); + ROCKS_LOG_INFO(info_log_, + "Deleted file %s immediately, rate_bytes_per_sec %" PRIi64 + ", total_trash_size %" PRIu64 " max_trash_db_ratio %lf", + file_path.c_str(), rate_bytes_per_sec_.load(), + total_trash_size_.load(), max_trash_db_ratio_.load()); + InstrumentedMutexLock l(&mu_); + RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY); + } + return s; + } + + // Move file to trash + std::string trash_file; + Status s = MarkAsTrash(file_path, &trash_file); + ROCKS_LOG_INFO(info_log_, "Mark file: %s as trash -- %s", trash_file.c_str(), + s.ToString().c_str()); + + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash -- %s", + file_path.c_str(), s.ToString().c_str()); + s = fs_->DeleteFile(file_path, IOOptions(), nullptr); + if (s.ok()) { + s = sst_file_manager_->OnDeleteFile(file_path); + ROCKS_LOG_INFO(info_log_, "Deleted file %s immediately", + trash_file.c_str()); + InstrumentedMutexLock l(&mu_); + RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY); + } + return s; + } + + // Update the total trash size + uint64_t trash_file_size = 0; + IOStatus io_s = + fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr); + if (io_s.ok()) { + total_trash_size_.fetch_add(trash_file_size); + } + //**TODO: What should we do if we failed to + // get the file size? + + // Add file to delete queue + { + InstrumentedMutexLock l(&mu_); + RecordTick(stats_.get(), FILES_MARKED_TRASH); + queue_.emplace(trash_file, dir_to_sync); + pending_files_++; + if (pending_files_ == 1) { + cv_.SignalAll(); + } + } + return s; +} + +std::map DeleteScheduler::GetBackgroundErrors() { + InstrumentedMutexLock l(&mu_); + return bg_errors_; +} + +const std::string DeleteScheduler::kTrashExtension = ".trash"; +bool DeleteScheduler::IsTrashFile(const std::string& file_path) { + return (file_path.size() >= kTrashExtension.size() && + file_path.rfind(kTrashExtension) == + file_path.size() - kTrashExtension.size()); +} + +Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm, + const std::string& path) { + Status s; + // Check if there are any files marked as trash in this path + std::vector files_in_path; + const auto& fs = env->GetFileSystem(); + IOOptions io_opts; + io_opts.do_not_recurse = true; + s = fs->GetChildren(path, io_opts, &files_in_path, + /*IODebugContext*=*/nullptr); + if (!s.ok()) { + return s; + } + for (const std::string& current_file : files_in_path) { + if (!DeleteScheduler::IsTrashFile(current_file)) { + // not a trash file, skip + continue; + } + + Status file_delete; + std::string trash_file = path + "/" + current_file; + if (sfm) { + // We have an SstFileManager that will schedule the file delete + s = sfm->OnAddFile(trash_file); + file_delete = sfm->ScheduleFileDeletion(trash_file, path); + } else { + // Delete the file immediately + file_delete = env->DeleteFile(trash_file); + } + + if (s.ok() && !file_delete.ok()) { + s = file_delete; + } + } + + return s; +} + +Status DeleteScheduler::MarkAsTrash(const std::string& file_path, + std::string* trash_file) { + // Sanity check of the path + size_t idx = file_path.rfind("/"); + if (idx == std::string::npos || idx == file_path.size() - 1) { + return Status::InvalidArgument("file_path is corrupted"); + } + + if (DeleteScheduler::IsTrashFile(file_path)) { + // This is already a trash file + *trash_file = file_path; + return Status::OK(); + } + + *trash_file = file_path + kTrashExtension; + // TODO(tec) : Implement Env::RenameFileIfNotExist and remove + // file_move_mu mutex. + int cnt = 0; + Status s; + InstrumentedMutexLock l(&file_move_mu_); + while (true) { + s = fs_->FileExists(*trash_file, IOOptions(), nullptr); + if (s.IsNotFound()) { + // We found a path for our file in trash + s = fs_->RenameFile(file_path, *trash_file, IOOptions(), nullptr); + break; + } else if (s.ok()) { + // Name conflict, generate new random suffix + *trash_file = file_path + std::to_string(cnt) + kTrashExtension; + } else { + // Error during FileExists call, we cannot continue + break; + } + cnt++; + } + if (s.ok()) { + s = sst_file_manager_->OnMoveFile(file_path, *trash_file); + } + return s; +} + +void DeleteScheduler::BackgroundEmptyTrash() { + TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash"); + + while (true) { + InstrumentedMutexLock l(&mu_); + while (queue_.empty() && !closing_) { + cv_.Wait(); + } + + if (closing_) { + return; + } + + // Delete all files in queue_ + uint64_t start_time = clock_->NowMicros(); + uint64_t total_deleted_bytes = 0; + int64_t current_delete_rate = rate_bytes_per_sec_.load(); + while (!queue_.empty() && !closing_) { + if (current_delete_rate != rate_bytes_per_sec_.load()) { + // User changed the delete rate + current_delete_rate = rate_bytes_per_sec_.load(); + start_time = clock_->NowMicros(); + total_deleted_bytes = 0; + ROCKS_LOG_INFO(info_log_, "rate_bytes_per_sec is changed to %" PRIi64, + current_delete_rate); + } + + // Get new file to delete + const FileAndDir& fad = queue_.front(); + std::string path_in_trash = fad.fname; + + // We don't need to hold the lock while deleting the file + mu_.Unlock(); + uint64_t deleted_bytes = 0; + bool is_complete = true; + // Delete file from trash and update total_penlty value + Status s = + DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete); + total_deleted_bytes += deleted_bytes; + mu_.Lock(); + if (is_complete) { + queue_.pop(); + } + + if (!s.ok()) { + bg_errors_[path_in_trash] = s; + } + + // Apply penalty if necessary + uint64_t total_penalty; + if (current_delete_rate > 0) { + // rate limiting is enabled + total_penalty = + ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate); + ROCKS_LOG_INFO(info_log_, + "Rate limiting is enabled with penalty %" PRIu64 + " after deleting file %s", + total_penalty, path_in_trash.c_str()); + while (!closing_ && !cv_.TimedWait(start_time + total_penalty)) { + } + } else { + // rate limiting is disabled + total_penalty = 0; + ROCKS_LOG_INFO(info_log_, + "Rate limiting is disabled after deleting file %s", + path_in_trash.c_str()); + } + TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait", + &total_penalty); + + if (is_complete) { + pending_files_--; + } + if (pending_files_ == 0) { + // Unblock WaitForEmptyTrash since there are no more files waiting + // to be deleted + cv_.SignalAll(); + } + } + } +} + +Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, + const std::string& dir_to_sync, + uint64_t* deleted_bytes, + bool* is_complete) { + uint64_t file_size; + Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr); + *is_complete = true; + TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile"); + if (s.ok()) { + bool need_full_delete = true; + if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) { + uint64_t num_hard_links = 2; + // We don't have to worry aobut data race between linking a new + // file after the number of file link check and ftruncte because + // the file is now in trash and no hardlink is supposed to create + // to trash files by RocksDB. + Status my_status = fs_->NumFileLinks(path_in_trash, IOOptions(), + &num_hard_links, nullptr); + if (my_status.ok()) { + if (num_hard_links == 1) { + std::unique_ptr wf; + my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(), &wf, + nullptr); + if (my_status.ok()) { + my_status = wf->Truncate(file_size - bytes_max_delete_chunk_, + IOOptions(), nullptr); + if (my_status.ok()) { + TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync"); + my_status = wf->Fsync(IOOptions(), nullptr); + } + } + if (my_status.ok()) { + *deleted_bytes = bytes_max_delete_chunk_; + need_full_delete = false; + *is_complete = false; + } else { + ROCKS_LOG_WARN(info_log_, + "Failed to partially delete %s from trash -- %s", + path_in_trash.c_str(), my_status.ToString().c_str()); + } + } else { + ROCKS_LOG_INFO(info_log_, + "Cannot delete %s slowly through ftruncate from trash " + "as it has other links", + path_in_trash.c_str()); + } + } else if (!num_link_error_printed_) { + ROCKS_LOG_INFO( + info_log_, + "Cannot delete files slowly through ftruncate from trash " + "as Env::NumFileLinks() returns error: %s", + my_status.ToString().c_str()); + num_link_error_printed_ = true; + } + } + + if (need_full_delete) { + s = fs_->DeleteFile(path_in_trash, IOOptions(), nullptr); + if (!dir_to_sync.empty()) { + std::unique_ptr dir_obj; + if (s.ok()) { + s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr); + } + if (s.ok()) { + s = dir_obj->FsyncWithDirOptions( + IOOptions(), nullptr, + DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted)); + TEST_SYNC_POINT_CALLBACK( + "DeleteScheduler::DeleteTrashFile::AfterSyncDir", + reinterpret_cast(const_cast(&dir_to_sync))); + } + } + if (s.ok()) { + *deleted_bytes = file_size; + s = sst_file_manager_->OnDeleteFile(path_in_trash); + } + } + } + if (!s.ok()) { + // Error while getting file size or while deleting + ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s", + path_in_trash.c_str(), s.ToString().c_str()); + *deleted_bytes = 0; + } else { + total_trash_size_.fetch_sub(*deleted_bytes); + } + + return s; +} + +void DeleteScheduler::WaitForEmptyTrash() { + InstrumentedMutexLock l(&mu_); + while (pending_files_ > 0 && !closing_) { + cv_.Wait(); + } +} + +void DeleteScheduler::MaybeCreateBackgroundThread() { + if (bg_thread_ == nullptr && rate_bytes_per_sec_.load() > 0) { + bg_thread_.reset( + new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this)); + ROCKS_LOG_INFO(info_log_, + "Created background thread for deletion scheduler with " + "rate_bytes_per_sec: %" PRIi64, + rate_bytes_per_sec_.load()); + } +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE -- cgit v1.2.3