diff options
Diffstat (limited to 'src/rocksdb/util/delete_scheduler.cc')
-rw-r--r-- | src/rocksdb/util/delete_scheduler.cc | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/src/rocksdb/util/delete_scheduler.cc b/src/rocksdb/util/delete_scheduler.cc new file mode 100644 index 00000000..f5ee2844 --- /dev/null +++ b/src/rocksdb/util/delete_scheduler.cc @@ -0,0 +1,353 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "util/delete_scheduler.h" + +#include <thread> +#include <vector> + +#include "port/port.h" +#include "rocksdb/env.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/sst_file_manager_impl.h" +#include "util/sync_point.h" + +namespace rocksdb { + +DeleteScheduler::DeleteScheduler(Env* env, int64_t rate_bytes_per_sec, + Logger* info_log, + SstFileManagerImpl* sst_file_manager, + double max_trash_db_ratio, + uint64_t bytes_max_delete_chunk) + : env_(env), + total_trash_size_(0), + rate_bytes_per_sec_(rate_bytes_per_sec), + pending_files_(0), + bytes_max_delete_chunk_(bytes_max_delete_chunk), + closing_(false), + cv_(&mu_), + info_log_(info_log), + sst_file_manager_(sst_file_manager), + max_trash_db_ratio_(max_trash_db_ratio) { + assert(sst_file_manager != nullptr); + assert(max_trash_db_ratio >= 0); + bg_thread_.reset( + new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this)); +} + +DeleteScheduler::~DeleteScheduler() { + { + InstrumentedMutexLock l(&mu_); + closing_ = true; + cv_.SignalAll(); + } + if (bg_thread_) { + bg_thread_->join(); + } +} + +Status DeleteScheduler::DeleteFile(const std::string& file_path, + const std::string& dir_to_sync, + const bool force_bg) { + Status s; + if (rate_bytes_per_sec_.load() <= 0 || (!force_bg && + total_trash_size_.load() > + sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) { + // Rate limiting is disabled or trash size makes up more than + // max_trash_db_ratio_ (default 25%) of the total DB size + TEST_SYNC_POINT("DeleteScheduler::DeleteFile"); + s = env_->DeleteFile(file_path); + if (s.ok()) { + sst_file_manager_->OnDeleteFile(file_path); + } + return s; + } + + // Move file to trash + std::string trash_file; + s = MarkAsTrash(file_path, &trash_file); + + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash", file_path.c_str()); + s = env_->DeleteFile(file_path); + if (s.ok()) { + sst_file_manager_->OnDeleteFile(file_path); + } + return s; + } + + // Update the total trash size + uint64_t trash_file_size = 0; + env_->GetFileSize(trash_file, &trash_file_size); + total_trash_size_.fetch_add(trash_file_size); + + // Add file to delete queue + { + InstrumentedMutexLock l(&mu_); + queue_.emplace(trash_file, dir_to_sync); + pending_files_++; + if (pending_files_ == 1) { + cv_.SignalAll(); + } + } + return s; +} + +std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() { + InstrumentedMutexLock l(&mu_); + return bg_errors_; +} + +const std::string DeleteScheduler::kTrashExtension = ".trash"; +bool DeleteScheduler::IsTrashFile(const std::string& file_path) { + return (file_path.size() >= kTrashExtension.size() && + file_path.rfind(kTrashExtension) == + file_path.size() - kTrashExtension.size()); +} + +Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm, + const std::string& path) { + Status s; + // Check if there are any files marked as trash in this path + std::vector<std::string> files_in_path; + s = env->GetChildren(path, &files_in_path); + if (!s.ok()) { + return s; + } + for (const std::string& current_file : files_in_path) { + if (!DeleteScheduler::IsTrashFile(current_file)) { + // not a trash file, skip + continue; + } + + Status file_delete; + std::string trash_file = path + "/" + current_file; + if (sfm) { + // We have an SstFileManager that will schedule the file delete + sfm->OnAddFile(trash_file); + file_delete = sfm->ScheduleFileDeletion(trash_file, path); + } else { + // Delete the file immediately + file_delete = env->DeleteFile(trash_file); + } + + if (s.ok() && !file_delete.ok()) { + s = file_delete; + } + } + + return s; +} + +Status DeleteScheduler::MarkAsTrash(const std::string& file_path, + std::string* trash_file) { + // Sanity check of the path + size_t idx = file_path.rfind("/"); + if (idx == std::string::npos || idx == file_path.size() - 1) { + return Status::InvalidArgument("file_path is corrupted"); + } + + Status s; + if (DeleteScheduler::IsTrashFile(file_path)) { + // This is already a trash file + *trash_file = file_path; + return s; + } + + *trash_file = file_path + kTrashExtension; + // TODO(tec) : Implement Env::RenameFileIfNotExist and remove + // file_move_mu mutex. + int cnt = 0; + InstrumentedMutexLock l(&file_move_mu_); + while (true) { + s = env_->FileExists(*trash_file); + if (s.IsNotFound()) { + // We found a path for our file in trash + s = env_->RenameFile(file_path, *trash_file); + break; + } else if (s.ok()) { + // Name conflict, generate new random suffix + *trash_file = file_path + std::to_string(cnt) + kTrashExtension; + } else { + // Error during FileExists call, we cannot continue + break; + } + cnt++; + } + if (s.ok()) { + sst_file_manager_->OnMoveFile(file_path, *trash_file); + } + return s; +} + +void DeleteScheduler::BackgroundEmptyTrash() { + TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash"); + + while (true) { + InstrumentedMutexLock l(&mu_); + while (queue_.empty() && !closing_) { + cv_.Wait(); + } + + if (closing_) { + return; + } + + // Delete all files in queue_ + uint64_t start_time = env_->NowMicros(); + uint64_t total_deleted_bytes = 0; + int64_t current_delete_rate = rate_bytes_per_sec_.load(); + while (!queue_.empty() && !closing_) { + if (current_delete_rate != rate_bytes_per_sec_.load()) { + // User changed the delete rate + current_delete_rate = rate_bytes_per_sec_.load(); + start_time = env_->NowMicros(); + total_deleted_bytes = 0; + } + + // Get new file to delete + const FileAndDir& fad = queue_.front(); + std::string path_in_trash = fad.fname; + + // We dont need to hold the lock while deleting the file + mu_.Unlock(); + uint64_t deleted_bytes = 0; + bool is_complete = true; + // Delete file from trash and update total_penlty value + Status s = + DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete); + total_deleted_bytes += deleted_bytes; + mu_.Lock(); + if (is_complete) { + queue_.pop(); + } + + if (!s.ok()) { + bg_errors_[path_in_trash] = s; + } + + // Apply penlty if necessary + uint64_t total_penlty; + if (current_delete_rate > 0) { + // rate limiting is enabled + total_penlty = + ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate); + while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {} + } else { + // rate limiting is disabled + total_penlty = 0; + } + TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait", + &total_penlty); + + if (is_complete) { + pending_files_--; + } + if (pending_files_ == 0) { + // Unblock WaitForEmptyTrash since there are no more files waiting + // to be deleted + cv_.SignalAll(); + } + } + } +} + +Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, + const std::string& dir_to_sync, + uint64_t* deleted_bytes, + bool* is_complete) { + uint64_t file_size; + Status s = env_->GetFileSize(path_in_trash, &file_size); + *is_complete = true; + TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile"); + if (s.ok()) { + bool need_full_delete = true; + if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) { + uint64_t num_hard_links = 2; + // We don't have to worry aobut data race between linking a new + // file after the number of file link check and ftruncte because + // the file is now in trash and no hardlink is supposed to create + // to trash files by RocksDB. + Status my_status = env_->NumFileLinks(path_in_trash, &num_hard_links); + if (my_status.ok()) { + if (num_hard_links == 1) { + std::unique_ptr<WritableFile> wf; + my_status = + env_->ReopenWritableFile(path_in_trash, &wf, EnvOptions()); + if (my_status.ok()) { + my_status = wf->Truncate(file_size - bytes_max_delete_chunk_); + if (my_status.ok()) { + TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync"); + my_status = wf->Fsync(); + } + } + if (my_status.ok()) { + *deleted_bytes = bytes_max_delete_chunk_; + need_full_delete = false; + *is_complete = false; + } else { + ROCKS_LOG_WARN(info_log_, + "Failed to partially delete %s from trash -- %s", + path_in_trash.c_str(), my_status.ToString().c_str()); + } + } else { + ROCKS_LOG_INFO(info_log_, + "Cannot delete %s slowly through ftruncate from trash " + "as it has other links", + path_in_trash.c_str()); + } + } else if (!num_link_error_printed_) { + ROCKS_LOG_INFO( + info_log_, + "Cannot delete files slowly through ftruncate from trash " + "as Env::NumFileLinks() returns error: %s", + my_status.ToString().c_str()); + num_link_error_printed_ = true; + } + } + + if (need_full_delete) { + s = env_->DeleteFile(path_in_trash); + if (!dir_to_sync.empty()) { + std::unique_ptr<Directory> dir_obj; + if (s.ok()) { + s = env_->NewDirectory(dir_to_sync, &dir_obj); + } + if (s.ok()) { + s = dir_obj->Fsync(); + TEST_SYNC_POINT_CALLBACK( + "DeleteScheduler::DeleteTrashFile::AfterSyncDir", + reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync))); + } + } + *deleted_bytes = file_size; + sst_file_manager_->OnDeleteFile(path_in_trash); + } + } + if (!s.ok()) { + // Error while getting file size or while deleting + ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s", + path_in_trash.c_str(), s.ToString().c_str()); + *deleted_bytes = 0; + } else { + total_trash_size_.fetch_sub(*deleted_bytes); + } + + return s; +} + +void DeleteScheduler::WaitForEmptyTrash() { + InstrumentedMutexLock l(&mu_); + while (pending_files_ > 0 && !closing_) { + cv_.Wait(); + } +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE |