summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/file
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/file')
-rw-r--r--src/rocksdb/file/delete_scheduler.cc357
-rw-r--r--src/rocksdb/file/delete_scheduler.h141
-rw-r--r--src/rocksdb/file/delete_scheduler_test.cc693
-rw-r--r--src/rocksdb/file/file_prefetch_buffer.cc136
-rw-r--r--src/rocksdb/file/file_prefetch_buffer.h97
-rw-r--r--src/rocksdb/file/file_util.cc124
-rw-r--r--src/rocksdb/file/file_util.h33
-rw-r--r--src/rocksdb/file/filename.cc456
-rw-r--r--src/rocksdb/file/filename.h185
-rw-r--r--src/rocksdb/file/random_access_file_reader.cc189
-rw-r--r--src/rocksdb/file/random_access_file_reader.h120
-rw-r--r--src/rocksdb/file/read_write_util.cc67
-rw-r--r--src/rocksdb/file/read_write_util.h34
-rw-r--r--src/rocksdb/file/readahead_raf.cc162
-rw-r--r--src/rocksdb/file/readahead_raf.h27
-rw-r--r--src/rocksdb/file/sequence_file_reader.cc237
-rw-r--r--src/rocksdb/file/sequence_file_reader.h67
-rw-r--r--src/rocksdb/file/sst_file_manager_impl.cc558
-rw-r--r--src/rocksdb/file/sst_file_manager_impl.h197
-rw-r--r--src/rocksdb/file/writable_file_writer.cc429
-rw-r--r--src/rocksdb/file/writable_file_writer.h171
21 files changed, 4480 insertions, 0 deletions
diff --git a/src/rocksdb/file/delete_scheduler.cc b/src/rocksdb/file/delete_scheduler.cc
new file mode 100644
index 000000000..bb318e595
--- /dev/null
+++ b/src/rocksdb/file/delete_scheduler.cc
@@ -0,0 +1,357 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "file/delete_scheduler.h"
+
+#include <thread>
+#include <vector>
+
+#include "file/sst_file_manager_impl.h"
+#include "logging/logging.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "test_util/sync_point.h"
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+DeleteScheduler::DeleteScheduler(Env* env, FileSystem* fs,
+ int64_t rate_bytes_per_sec, Logger* info_log,
+ SstFileManagerImpl* sst_file_manager,
+ double max_trash_db_ratio,
+ uint64_t bytes_max_delete_chunk)
+ : env_(env),
+ fs_(fs),
+ total_trash_size_(0),
+ rate_bytes_per_sec_(rate_bytes_per_sec),
+ pending_files_(0),
+ bytes_max_delete_chunk_(bytes_max_delete_chunk),
+ closing_(false),
+ cv_(&mu_),
+ info_log_(info_log),
+ sst_file_manager_(sst_file_manager),
+ max_trash_db_ratio_(max_trash_db_ratio) {
+ assert(sst_file_manager != nullptr);
+ assert(max_trash_db_ratio >= 0);
+ bg_thread_.reset(
+ new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
+}
+
+DeleteScheduler::~DeleteScheduler() {
+ {
+ InstrumentedMutexLock l(&mu_);
+ closing_ = true;
+ cv_.SignalAll();
+ }
+ if (bg_thread_) {
+ bg_thread_->join();
+ }
+}
+
+Status DeleteScheduler::DeleteFile(const std::string& file_path,
+ const std::string& dir_to_sync,
+ const bool force_bg) {
+ Status s;
+ if (rate_bytes_per_sec_.load() <= 0 || (!force_bg &&
+ total_trash_size_.load() >
+ sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) {
+ // Rate limiting is disabled or trash size makes up more than
+ // max_trash_db_ratio_ (default 25%) of the total DB size
+ TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
+ s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
+ if (s.ok()) {
+ sst_file_manager_->OnDeleteFile(file_path);
+ }
+ return s;
+ }
+
+ // Move file to trash
+ std::string trash_file;
+ s = MarkAsTrash(file_path, &trash_file);
+
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash -- %s",
+ file_path.c_str(), s.ToString().c_str());
+ s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
+ if (s.ok()) {
+ sst_file_manager_->OnDeleteFile(file_path);
+ }
+ return s;
+ }
+
+ // Update the total trash size
+ uint64_t trash_file_size = 0;
+ fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
+ total_trash_size_.fetch_add(trash_file_size);
+
+ // Add file to delete queue
+ {
+ InstrumentedMutexLock l(&mu_);
+ queue_.emplace(trash_file, dir_to_sync);
+ pending_files_++;
+ if (pending_files_ == 1) {
+ cv_.SignalAll();
+ }
+ }
+ return s;
+}
+
+std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() {
+ InstrumentedMutexLock l(&mu_);
+ return bg_errors_;
+}
+
+const std::string DeleteScheduler::kTrashExtension = ".trash";
+bool DeleteScheduler::IsTrashFile(const std::string& file_path) {
+ return (file_path.size() >= kTrashExtension.size() &&
+ file_path.rfind(kTrashExtension) ==
+ file_path.size() - kTrashExtension.size());
+}
+
+Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
+ const std::string& path) {
+ Status s;
+ // Check if there are any files marked as trash in this path
+ std::vector<std::string> files_in_path;
+ s = env->GetChildren(path, &files_in_path);
+ if (!s.ok()) {
+ return s;
+ }
+ for (const std::string& current_file : files_in_path) {
+ if (!DeleteScheduler::IsTrashFile(current_file)) {
+ // not a trash file, skip
+ continue;
+ }
+
+ Status file_delete;
+ std::string trash_file = path + "/" + current_file;
+ if (sfm) {
+ // We have an SstFileManager that will schedule the file delete
+ sfm->OnAddFile(trash_file);
+ file_delete = sfm->ScheduleFileDeletion(trash_file, path);
+ } else {
+ // Delete the file immediately
+ file_delete = env->DeleteFile(trash_file);
+ }
+
+ if (s.ok() && !file_delete.ok()) {
+ s = file_delete;
+ }
+ }
+
+ return s;
+}
+
+Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
+ std::string* trash_file) {
+ // Sanity check of the path
+ size_t idx = file_path.rfind("/");
+ if (idx == std::string::npos || idx == file_path.size() - 1) {
+ return Status::InvalidArgument("file_path is corrupted");
+ }
+
+ Status s;
+ if (DeleteScheduler::IsTrashFile(file_path)) {
+ // This is already a trash file
+ *trash_file = file_path;
+ return s;
+ }
+
+ *trash_file = file_path + kTrashExtension;
+ // TODO(tec) : Implement Env::RenameFileIfNotExist and remove
+ // file_move_mu mutex.
+ int cnt = 0;
+ InstrumentedMutexLock l(&file_move_mu_);
+ while (true) {
+ s = fs_->FileExists(*trash_file, IOOptions(), nullptr);
+ if (s.IsNotFound()) {
+ // We found a path for our file in trash
+ s = fs_->RenameFile(file_path, *trash_file, IOOptions(), nullptr);
+ break;
+ } else if (s.ok()) {
+ // Name conflict, generate new random suffix
+ *trash_file = file_path + std::to_string(cnt) + kTrashExtension;
+ } else {
+ // Error during FileExists call, we cannot continue
+ break;
+ }
+ cnt++;
+ }
+ if (s.ok()) {
+ sst_file_manager_->OnMoveFile(file_path, *trash_file);
+ }
+ return s;
+}
+
+void DeleteScheduler::BackgroundEmptyTrash() {
+ TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash");
+
+ while (true) {
+ InstrumentedMutexLock l(&mu_);
+ while (queue_.empty() && !closing_) {
+ cv_.Wait();
+ }
+
+ if (closing_) {
+ return;
+ }
+
+ // Delete all files in queue_
+ uint64_t start_time = env_->NowMicros();
+ uint64_t total_deleted_bytes = 0;
+ int64_t current_delete_rate = rate_bytes_per_sec_.load();
+ while (!queue_.empty() && !closing_) {
+ if (current_delete_rate != rate_bytes_per_sec_.load()) {
+ // User changed the delete rate
+ current_delete_rate = rate_bytes_per_sec_.load();
+ start_time = env_->NowMicros();
+ total_deleted_bytes = 0;
+ }
+
+ // Get new file to delete
+ const FileAndDir& fad = queue_.front();
+ std::string path_in_trash = fad.fname;
+
+ // We dont need to hold the lock while deleting the file
+ mu_.Unlock();
+ uint64_t deleted_bytes = 0;
+ bool is_complete = true;
+ // Delete file from trash and update total_penlty value
+ Status s =
+ DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete);
+ total_deleted_bytes += deleted_bytes;
+ mu_.Lock();
+ if (is_complete) {
+ queue_.pop();
+ }
+
+ if (!s.ok()) {
+ bg_errors_[path_in_trash] = s;
+ }
+
+ // Apply penlty if necessary
+ uint64_t total_penlty;
+ if (current_delete_rate > 0) {
+ // rate limiting is enabled
+ total_penlty =
+ ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
+ while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
+ } else {
+ // rate limiting is disabled
+ total_penlty = 0;
+ }
+ TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
+ &total_penlty);
+
+ if (is_complete) {
+ pending_files_--;
+ }
+ if (pending_files_ == 0) {
+ // Unblock WaitForEmptyTrash since there are no more files waiting
+ // to be deleted
+ cv_.SignalAll();
+ }
+ }
+ }
+}
+
+Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
+ const std::string& dir_to_sync,
+ uint64_t* deleted_bytes,
+ bool* is_complete) {
+ uint64_t file_size;
+ Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr);
+ *is_complete = true;
+ TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
+ if (s.ok()) {
+ bool need_full_delete = true;
+ if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) {
+ uint64_t num_hard_links = 2;
+ // We don't have to worry aobut data race between linking a new
+ // file after the number of file link check and ftruncte because
+ // the file is now in trash and no hardlink is supposed to create
+ // to trash files by RocksDB.
+ Status my_status = fs_->NumFileLinks(path_in_trash, IOOptions(),
+ &num_hard_links, nullptr);
+ if (my_status.ok()) {
+ if (num_hard_links == 1) {
+ std::unique_ptr<FSWritableFile> wf;
+ my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(),
+ &wf, nullptr);
+ if (my_status.ok()) {
+ my_status = wf->Truncate(file_size - bytes_max_delete_chunk_,
+ IOOptions(), nullptr);
+ if (my_status.ok()) {
+ TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync");
+ my_status = wf->Fsync(IOOptions(), nullptr);
+ }
+ }
+ if (my_status.ok()) {
+ *deleted_bytes = bytes_max_delete_chunk_;
+ need_full_delete = false;
+ *is_complete = false;
+ } else {
+ ROCKS_LOG_WARN(info_log_,
+ "Failed to partially delete %s from trash -- %s",
+ path_in_trash.c_str(), my_status.ToString().c_str());
+ }
+ } else {
+ ROCKS_LOG_INFO(info_log_,
+ "Cannot delete %s slowly through ftruncate from trash "
+ "as it has other links",
+ path_in_trash.c_str());
+ }
+ } else if (!num_link_error_printed_) {
+ ROCKS_LOG_INFO(
+ info_log_,
+ "Cannot delete files slowly through ftruncate from trash "
+ "as Env::NumFileLinks() returns error: %s",
+ my_status.ToString().c_str());
+ num_link_error_printed_ = true;
+ }
+ }
+
+ if (need_full_delete) {
+ s = fs_->DeleteFile(path_in_trash, IOOptions(), nullptr);
+ if (!dir_to_sync.empty()) {
+ std::unique_ptr<FSDirectory> dir_obj;
+ if (s.ok()) {
+ s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);
+ }
+ if (s.ok()) {
+ s = dir_obj->Fsync(IOOptions(), nullptr);
+ TEST_SYNC_POINT_CALLBACK(
+ "DeleteScheduler::DeleteTrashFile::AfterSyncDir",
+ reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync)));
+ }
+ }
+ *deleted_bytes = file_size;
+ sst_file_manager_->OnDeleteFile(path_in_trash);
+ }
+ }
+ if (!s.ok()) {
+ // Error while getting file size or while deleting
+ ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s",
+ path_in_trash.c_str(), s.ToString().c_str());
+ *deleted_bytes = 0;
+ } else {
+ total_trash_size_.fetch_sub(*deleted_bytes);
+ }
+
+ return s;
+}
+
+void DeleteScheduler::WaitForEmptyTrash() {
+ InstrumentedMutexLock l(&mu_);
+ while (pending_files_ > 0 && !closing_) {
+ cv_.Wait();
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/file/delete_scheduler.h b/src/rocksdb/file/delete_scheduler.h
new file mode 100644
index 000000000..60948480a
--- /dev/null
+++ b/src/rocksdb/file/delete_scheduler.h
@@ -0,0 +1,141 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <map>
+#include <queue>
+#include <string>
+#include <thread>
+
+#include "monitoring/instrumented_mutex.h"
+#include "port/port.h"
+
+#include "rocksdb/file_system.h"
+#include "rocksdb/status.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class Env;
+class Logger;
+class SstFileManagerImpl;
+
+// DeleteScheduler allows the DB to enforce a rate limit on file deletion,
+// Instead of deleteing files immediately, files are marked as trash
+// and deleted in a background thread that apply sleep penlty between deletes
+// if they are happening in a rate faster than rate_bytes_per_sec,
+//
+// Rate limiting can be turned off by setting rate_bytes_per_sec = 0, In this
+// case DeleteScheduler will delete files immediately.
+class DeleteScheduler {
+ public:
+ DeleteScheduler(Env* env, FileSystem* fs, int64_t rate_bytes_per_sec,
+ Logger* info_log, SstFileManagerImpl* sst_file_manager,
+ double max_trash_db_ratio, uint64_t bytes_max_delete_chunk);
+
+ ~DeleteScheduler();
+
+ // Return delete rate limit in bytes per second
+ int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_.load(); }
+
+ // Set delete rate limit in bytes per second
+ void SetRateBytesPerSecond(int64_t bytes_per_sec) {
+ rate_bytes_per_sec_.store(bytes_per_sec);
+ }
+
+ // Mark file as trash directory and schedule it's deletion. If force_bg is
+ // set, it forces the file to always be deleted in the background thread,
+ // except when rate limiting is disabled
+ Status DeleteFile(const std::string& fname, const std::string& dir_to_sync,
+ const bool force_bg = false);
+
+ // Wait for all files being deleteing in the background to finish or for
+ // destructor to be called.
+ void WaitForEmptyTrash();
+
+ // Return a map containing errors that happened in BackgroundEmptyTrash
+ // file_path => error status
+ std::map<std::string, Status> GetBackgroundErrors();
+
+ uint64_t GetTotalTrashSize() { return total_trash_size_.load(); }
+
+ // Return trash/DB size ratio where new files will be deleted immediately
+ double GetMaxTrashDBRatio() {
+ return max_trash_db_ratio_.load();
+ }
+
+ // Update trash/DB size ratio where new files will be deleted immediately
+ void SetMaxTrashDBRatio(double r) {
+ assert(r >= 0);
+ max_trash_db_ratio_.store(r);
+ }
+
+ static const std::string kTrashExtension;
+ static bool IsTrashFile(const std::string& file_path);
+
+ // Check if there are any .trash filse in path, and schedule their deletion
+ // Or delete immediately if sst_file_manager is nullptr
+ static Status CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
+ const std::string& path);
+
+ private:
+ Status MarkAsTrash(const std::string& file_path, std::string* path_in_trash);
+
+ Status DeleteTrashFile(const std::string& path_in_trash,
+ const std::string& dir_to_sync,
+ uint64_t* deleted_bytes, bool* is_complete);
+
+ void BackgroundEmptyTrash();
+
+ Env* env_;
+ FileSystem* fs_;
+
+ // total size of trash files
+ std::atomic<uint64_t> total_trash_size_;
+ // Maximum number of bytes that should be deleted per second
+ std::atomic<int64_t> rate_bytes_per_sec_;
+ // Mutex to protect queue_, pending_files_, bg_errors_, closing_
+ InstrumentedMutex mu_;
+
+ struct FileAndDir {
+ FileAndDir(const std::string& f, const std::string& d) : fname(f), dir(d) {}
+ std::string fname;
+ std::string dir; // empty will be skipped.
+ };
+
+ // Queue of trash files that need to be deleted
+ std::queue<FileAndDir> queue_;
+ // Number of trash files that are waiting to be deleted
+ int32_t pending_files_;
+ uint64_t bytes_max_delete_chunk_;
+ // Errors that happened in BackgroundEmptyTrash (file_path => error)
+ std::map<std::string, Status> bg_errors_;
+
+ bool num_link_error_printed_ = false;
+ // Set to true in ~DeleteScheduler() to force BackgroundEmptyTrash to stop
+ bool closing_;
+ // Condition variable signaled in these conditions
+ // - pending_files_ value change from 0 => 1
+ // - pending_files_ value change from 1 => 0
+ // - closing_ value is set to true
+ InstrumentedCondVar cv_;
+ // Background thread running BackgroundEmptyTrash
+ std::unique_ptr<port::Thread> bg_thread_;
+ // Mutex to protect threads from file name conflicts
+ InstrumentedMutex file_move_mu_;
+ Logger* info_log_;
+ SstFileManagerImpl* sst_file_manager_;
+ // If the trash size constitutes for more than this fraction of the total DB
+ // size we will start deleting new files passed to DeleteScheduler
+ // immediately
+ std::atomic<double> max_trash_db_ratio_;
+ static const uint64_t kMicrosInSecond = 1000 * 1000LL;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/file/delete_scheduler_test.cc b/src/rocksdb/file/delete_scheduler_test.cc
new file mode 100644
index 000000000..cff645de5
--- /dev/null
+++ b/src/rocksdb/file/delete_scheduler_test.cc
@@ -0,0 +1,693 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <atomic>
+#include <cinttypes>
+#include <thread>
+#include <vector>
+
+#include "file/delete_scheduler.h"
+#include "file/sst_file_manager_impl.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "test_util/sync_point.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
+#include "util/string_util.h"
+
+#ifndef ROCKSDB_LITE
+
+namespace ROCKSDB_NAMESPACE {
+
+class DeleteSchedulerTest : public testing::Test {
+ public:
+ DeleteSchedulerTest() : env_(Env::Default()) {
+ const int kNumDataDirs = 3;
+ dummy_files_dirs_.reserve(kNumDataDirs);
+ for (size_t i = 0; i < kNumDataDirs; ++i) {
+ dummy_files_dirs_.emplace_back(
+ test::PerThreadDBPath(env_, "delete_scheduler_dummy_data_dir") +
+ ToString(i));
+ DestroyAndCreateDir(dummy_files_dirs_.back());
+ }
+ }
+
+ ~DeleteSchedulerTest() override {
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ for (const auto& dummy_files_dir : dummy_files_dirs_) {
+ test::DestroyDir(env_, dummy_files_dir);
+ }
+ }
+
+ void DestroyAndCreateDir(const std::string& dir) {
+ ASSERT_OK(test::DestroyDir(env_, dir));
+ EXPECT_OK(env_->CreateDir(dir));
+ }
+
+ int CountNormalFiles(size_t dummy_files_dirs_idx = 0) {
+ std::vector<std::string> files_in_dir;
+ EXPECT_OK(env_->GetChildren(dummy_files_dirs_[dummy_files_dirs_idx],
+ &files_in_dir));
+
+ int normal_cnt = 0;
+ for (auto& f : files_in_dir) {
+ if (!DeleteScheduler::IsTrashFile(f) && f != "." && f != "..") {
+ normal_cnt++;
+ }
+ }
+ return normal_cnt;
+ }
+
+ int CountTrashFiles(size_t dummy_files_dirs_idx = 0) {
+ std::vector<std::string> files_in_dir;
+ EXPECT_OK(env_->GetChildren(dummy_files_dirs_[dummy_files_dirs_idx],
+ &files_in_dir));
+
+ int trash_cnt = 0;
+ for (auto& f : files_in_dir) {
+ if (DeleteScheduler::IsTrashFile(f)) {
+ trash_cnt++;
+ }
+ }
+ return trash_cnt;
+ }
+
+ std::string NewDummyFile(const std::string& file_name, uint64_t size = 1024,
+ size_t dummy_files_dirs_idx = 0) {
+ std::string file_path =
+ dummy_files_dirs_[dummy_files_dirs_idx] + "/" + file_name;
+ std::unique_ptr<WritableFile> f;
+ env_->NewWritableFile(file_path, &f, EnvOptions());
+ std::string data(size, 'A');
+ EXPECT_OK(f->Append(data));
+ EXPECT_OK(f->Close());
+ sst_file_mgr_->OnAddFile(file_path, false);
+ return file_path;
+ }
+
+ void NewDeleteScheduler() {
+ // Tests in this file are for DeleteScheduler component and dont create any
+ // DBs, so we need to set max_trash_db_ratio to 100% (instead of default
+ // 25%)
+ std::shared_ptr<FileSystem>
+ fs(std::make_shared<LegacyFileSystemWrapper>(env_));
+ sst_file_mgr_.reset(
+ new SstFileManagerImpl(env_, fs, nullptr, rate_bytes_per_sec_,
+ /* max_trash_db_ratio= */ 1.1, 128 * 1024));
+ delete_scheduler_ = sst_file_mgr_->delete_scheduler();
+ }
+
+ Env* env_;
+ std::vector<std::string> dummy_files_dirs_;
+ int64_t rate_bytes_per_sec_;
+ DeleteScheduler* delete_scheduler_;
+ std::unique_ptr<SstFileManagerImpl> sst_file_mgr_;
+};
+
+// Test the basic functionality of DeleteScheduler (Rate Limiting).
+// 1- Create 100 dummy files
+// 2- Delete the 100 dummy files using DeleteScheduler
+// --- Hold DeleteScheduler::BackgroundEmptyTrash ---
+// 3- Wait for DeleteScheduler to delete all files in trash
+// 4- Verify that BackgroundEmptyTrash used to correct penlties for the files
+// 5- Make sure that all created files were completely deleted
+TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ {"DeleteSchedulerTest::BasicRateLimiting:1",
+ "DeleteScheduler::BackgroundEmptyTrash"},
+ });
+
+ std::vector<uint64_t> penalties;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::BackgroundEmptyTrash:Wait",
+ [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
+ int dir_synced = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile::AfterSyncDir", [&](void* arg) {
+ dir_synced++;
+ std::string* dir = reinterpret_cast<std::string*>(arg);
+ EXPECT_EQ(dummy_files_dirs_[0], *dir);
+ });
+
+ int num_files = 100; // 100 files
+ uint64_t file_size = 1024; // every file is 1 kb
+ std::vector<uint64_t> delete_kbs_per_sec = {512, 200, 100, 50, 25};
+
+ for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
+ penalties.clear();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ DestroyAndCreateDir(dummy_files_dirs_[0]);
+ rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
+ NewDeleteScheduler();
+
+ dir_synced = 0;
+ // Create 100 dummy files, every file is 1 Kb
+ std::vector<std::string> generated_files;
+ for (int i = 0; i < num_files; i++) {
+ std::string file_name = "file" + ToString(i) + ".data";
+ generated_files.push_back(NewDummyFile(file_name, file_size));
+ }
+
+ // Delete dummy files and measure time spent to empty trash
+ for (int i = 0; i < num_files; i++) {
+ ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i],
+ dummy_files_dirs_[0]));
+ }
+ ASSERT_EQ(CountNormalFiles(), 0);
+
+ uint64_t delete_start_time = env_->NowMicros();
+ TEST_SYNC_POINT("DeleteSchedulerTest::BasicRateLimiting:1");
+ delete_scheduler_->WaitForEmptyTrash();
+ uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
+
+ auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+ ASSERT_EQ(bg_errors.size(), 0);
+
+ uint64_t total_files_size = 0;
+ uint64_t expected_penlty = 0;
+ ASSERT_EQ(penalties.size(), num_files);
+ for (int i = 0; i < num_files; i++) {
+ total_files_size += file_size;
+ expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec_);
+ ASSERT_EQ(expected_penlty, penalties[i]);
+ }
+ ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
+
+ ASSERT_EQ(num_files, dir_synced);
+
+ ASSERT_EQ(CountTrashFiles(), 0);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ }
+}
+
+TEST_F(DeleteSchedulerTest, MultiDirectoryDeletionsScheduled) {
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ {"DeleteSchedulerTest::MultiDbPathDeletionsScheduled:1",
+ "DeleteScheduler::BackgroundEmptyTrash"},
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ rate_bytes_per_sec_ = 1 << 20; // 1MB
+ NewDeleteScheduler();
+
+ // Generate dummy files in multiple directories
+ const size_t kNumFiles = dummy_files_dirs_.size();
+ const size_t kFileSize = 1 << 10; // 1KB
+ std::vector<std::string> generated_files;
+ for (size_t i = 0; i < kNumFiles; i++) {
+ generated_files.push_back(NewDummyFile("file", kFileSize, i));
+ ASSERT_EQ(1, CountNormalFiles(i));
+ }
+
+ // Mark dummy files as trash
+ for (size_t i = 0; i < kNumFiles; i++) {
+ ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i], ""));
+ ASSERT_EQ(0, CountNormalFiles(i));
+ ASSERT_EQ(1, CountTrashFiles(i));
+ }
+ TEST_SYNC_POINT("DeleteSchedulerTest::MultiDbPathDeletionsScheduled:1");
+ delete_scheduler_->WaitForEmptyTrash();
+
+ // Verify dummy files eventually got deleted
+ for (size_t i = 0; i < kNumFiles; i++) {
+ ASSERT_EQ(0, CountNormalFiles(i));
+ ASSERT_EQ(0, CountTrashFiles(i));
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+// Same as the BasicRateLimiting test but delete files in multiple threads.
+// 1- Create 100 dummy files
+// 2- Delete the 100 dummy files using DeleteScheduler using 10 threads
+// --- Hold DeleteScheduler::BackgroundEmptyTrash ---
+// 3- Wait for DeleteScheduler to delete all files in queue
+// 4- Verify that BackgroundEmptyTrash used to correct penlties for the files
+// 5- Make sure that all created files were completely deleted
+TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ {"DeleteSchedulerTest::RateLimitingMultiThreaded:1",
+ "DeleteScheduler::BackgroundEmptyTrash"},
+ });
+
+ std::vector<uint64_t> penalties;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::BackgroundEmptyTrash:Wait",
+ [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
+
+ int thread_cnt = 10;
+ int num_files = 10; // 10 files per thread
+ uint64_t file_size = 1024; // every file is 1 kb
+
+ std::vector<uint64_t> delete_kbs_per_sec = {512, 200, 100, 50, 25};
+ for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
+ penalties.clear();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ DestroyAndCreateDir(dummy_files_dirs_[0]);
+ rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
+ NewDeleteScheduler();
+
+ // Create 100 dummy files, every file is 1 Kb
+ std::vector<std::string> generated_files;
+ for (int i = 0; i < num_files * thread_cnt; i++) {
+ std::string file_name = "file" + ToString(i) + ".data";
+ generated_files.push_back(NewDummyFile(file_name, file_size));
+ }
+
+ // Delete dummy files using 10 threads and measure time spent to empty trash
+ std::atomic<int> thread_num(0);
+ std::vector<port::Thread> threads;
+ std::function<void()> delete_thread = [&]() {
+ int idx = thread_num.fetch_add(1);
+ int range_start = idx * num_files;
+ int range_end = range_start + num_files;
+ for (int j = range_start; j < range_end; j++) {
+ ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[j], ""));
+ }
+ };
+
+ for (int i = 0; i < thread_cnt; i++) {
+ threads.emplace_back(delete_thread);
+ }
+
+ for (size_t i = 0; i < threads.size(); i++) {
+ threads[i].join();
+ }
+
+ uint64_t delete_start_time = env_->NowMicros();
+ TEST_SYNC_POINT("DeleteSchedulerTest::RateLimitingMultiThreaded:1");
+ delete_scheduler_->WaitForEmptyTrash();
+ uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
+
+ auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+ ASSERT_EQ(bg_errors.size(), 0);
+
+ uint64_t total_files_size = 0;
+ uint64_t expected_penlty = 0;
+ ASSERT_EQ(penalties.size(), num_files * thread_cnt);
+ for (int i = 0; i < num_files * thread_cnt; i++) {
+ total_files_size += file_size;
+ expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec_);
+ ASSERT_EQ(expected_penlty, penalties[i]);
+ }
+ ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
+
+ ASSERT_EQ(CountNormalFiles(), 0);
+ ASSERT_EQ(CountTrashFiles(), 0);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ }
+}
+
+// Disable rate limiting by setting rate_bytes_per_sec_ to 0 and make sure
+// that when DeleteScheduler delete a file it delete it immediately and dont
+// move it to trash
+TEST_F(DeleteSchedulerTest, DisableRateLimiting) {
+ int bg_delete_file = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void* /*arg*/) { bg_delete_file++; });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 0;
+ NewDeleteScheduler();
+
+ for (int i = 0; i < 10; i++) {
+ // Every file we delete will be deleted immediately
+ std::string dummy_file = NewDummyFile("dummy.data");
+ ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file, ""));
+ ASSERT_TRUE(env_->FileExists(dummy_file).IsNotFound());
+ ASSERT_EQ(CountNormalFiles(), 0);
+ ASSERT_EQ(CountTrashFiles(), 0);
+ }
+
+ ASSERT_EQ(bg_delete_file, 0);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+// Testing that moving files to trash with the same name is not a problem
+// 1- Create 10 files with the same name "conflict.data"
+// 2- Delete the 10 files using DeleteScheduler
+// 3- Make sure that trash directory contain 10 files ("conflict.data" x 10)
+// --- Hold DeleteScheduler::BackgroundEmptyTrash ---
+// 4- Make sure that files are deleted from trash
+TEST_F(DeleteSchedulerTest, ConflictNames) {
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ {"DeleteSchedulerTest::ConflictNames:1",
+ "DeleteScheduler::BackgroundEmptyTrash"},
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec
+ NewDeleteScheduler();
+
+ // Create "conflict.data" and move it to trash 10 times
+ for (int i = 0; i < 10; i++) {
+ std::string dummy_file = NewDummyFile("conflict.data");
+ ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file, ""));
+ }
+ ASSERT_EQ(CountNormalFiles(), 0);
+ // 10 files ("conflict.data" x 10) in trash
+ ASSERT_EQ(CountTrashFiles(), 10);
+
+ // Hold BackgroundEmptyTrash
+ TEST_SYNC_POINT("DeleteSchedulerTest::ConflictNames:1");
+ delete_scheduler_->WaitForEmptyTrash();
+ ASSERT_EQ(CountTrashFiles(), 0);
+
+ auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+ ASSERT_EQ(bg_errors.size(), 0);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+// 1- Create 10 dummy files
+// 2- Delete the 10 files using DeleteScheduler (move them to trsah)
+// 3- Delete the 10 files directly (using env_->DeleteFile)
+// --- Hold DeleteScheduler::BackgroundEmptyTrash ---
+// 4- Make sure that DeleteScheduler failed to delete the 10 files and
+// reported 10 background errors
+TEST_F(DeleteSchedulerTest, BackgroundError) {
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ {"DeleteSchedulerTest::BackgroundError:1",
+ "DeleteScheduler::BackgroundEmptyTrash"},
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec
+ NewDeleteScheduler();
+
+ // Generate 10 dummy files and move them to trash
+ for (int i = 0; i < 10; i++) {
+ std::string file_name = "data_" + ToString(i) + ".data";
+ ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name), ""));
+ }
+ ASSERT_EQ(CountNormalFiles(), 0);
+ ASSERT_EQ(CountTrashFiles(), 10);
+
+ // Delete 10 files from trash, this will cause background errors in
+ // BackgroundEmptyTrash since we already deleted the files it was
+ // goind to delete
+ for (int i = 0; i < 10; i++) {
+ std::string file_name = "data_" + ToString(i) + ".data.trash";
+ ASSERT_OK(env_->DeleteFile(dummy_files_dirs_[0] + "/" + file_name));
+ }
+
+ // Hold BackgroundEmptyTrash
+ TEST_SYNC_POINT("DeleteSchedulerTest::BackgroundError:1");
+ delete_scheduler_->WaitForEmptyTrash();
+ auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+ ASSERT_EQ(bg_errors.size(), 10);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+// 1- Create 10 dummy files
+// 2- Delete 10 dummy files using DeleteScheduler
+// 3- Wait for DeleteScheduler to delete all files in queue
+// 4- Make sure all files in trash directory were deleted
+// 5- Repeat previous steps 5 times
+TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) {
+ int bg_delete_file = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void* /*arg*/) { bg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / sec
+ NewDeleteScheduler();
+
+ // Move files to trash, wait for empty trash, start again
+ for (int run = 1; run <= 5; run++) {
+ // Generate 10 dummy files and move them to trash
+ for (int i = 0; i < 10; i++) {
+ std::string file_name = "data_" + ToString(i) + ".data";
+ ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name), ""));
+ }
+ ASSERT_EQ(CountNormalFiles(), 0);
+ delete_scheduler_->WaitForEmptyTrash();
+ ASSERT_EQ(bg_delete_file, 10 * run);
+ ASSERT_EQ(CountTrashFiles(), 0);
+
+ auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+ ASSERT_EQ(bg_errors.size(), 0);
+ }
+
+ ASSERT_EQ(bg_delete_file, 50);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+}
+
+TEST_F(DeleteSchedulerTest, DeletePartialFile) {
+ int bg_delete_file = 0;
+ int bg_fsync = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void*) { bg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:Fsync", [&](void*) { bg_fsync++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / sec
+ NewDeleteScheduler();
+
+ // Should delete in 4 batch
+ ASSERT_OK(
+ delete_scheduler_->DeleteFile(NewDummyFile("data_1", 500 * 1024), ""));
+ ASSERT_OK(
+ delete_scheduler_->DeleteFile(NewDummyFile("data_2", 100 * 1024), ""));
+ // Should delete in 2 batch
+ ASSERT_OK(
+ delete_scheduler_->DeleteFile(NewDummyFile("data_2", 200 * 1024), ""));
+
+ delete_scheduler_->WaitForEmptyTrash();
+
+ auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+ ASSERT_EQ(bg_errors.size(), 0);
+ ASSERT_EQ(7, bg_delete_file);
+ ASSERT_EQ(4, bg_fsync);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+}
+
+#ifdef OS_LINUX
+TEST_F(DeleteSchedulerTest, NoPartialDeleteWithLink) {
+ int bg_delete_file = 0;
+ int bg_fsync = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void*) { bg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:Fsync", [&](void*) { bg_fsync++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / sec
+ NewDeleteScheduler();
+
+ std::string file1 = NewDummyFile("data_1", 500 * 1024);
+ std::string file2 = NewDummyFile("data_2", 100 * 1024);
+
+ ASSERT_OK(env_->LinkFile(file1, dummy_files_dirs_[0] + "/data_1b"));
+ ASSERT_OK(env_->LinkFile(file2, dummy_files_dirs_[0] + "/data_2b"));
+
+ // Should delete in 4 batch if there is no hardlink
+ ASSERT_OK(delete_scheduler_->DeleteFile(file1, ""));
+ ASSERT_OK(delete_scheduler_->DeleteFile(file2, ""));
+
+ delete_scheduler_->WaitForEmptyTrash();
+
+ auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+ ASSERT_EQ(bg_errors.size(), 0);
+ ASSERT_EQ(2, bg_delete_file);
+ ASSERT_EQ(0, bg_fsync);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+}
+#endif
+
+// 1- Create a DeleteScheduler with very slow rate limit (1 Byte / sec)
+// 2- Delete 100 files using DeleteScheduler
+// 3- Delete the DeleteScheduler (call the destructor while queue is not empty)
+// 4- Make sure that not all files were deleted from trash and that
+// DeleteScheduler background thread did not delete all files
+TEST_F(DeleteSchedulerTest, DestructorWithNonEmptyQueue) {
+ int bg_delete_file = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void* /*arg*/) { bg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 1; // 1 Byte / sec
+ NewDeleteScheduler();
+
+ for (int i = 0; i < 100; i++) {
+ std::string file_name = "data_" + ToString(i) + ".data";
+ ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name), ""));
+ }
+
+ // Deleting 100 files will need >28 hours to delete
+ // we will delete the DeleteScheduler while delete queue is not empty
+ sst_file_mgr_.reset();
+
+ ASSERT_LT(bg_delete_file, 100);
+ ASSERT_GT(CountTrashFiles(), 0);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DeleteSchedulerTest, DISABLED_DynamicRateLimiting1) {
+ std::vector<uint64_t> penalties;
+ int bg_delete_file = 0;
+ int fg_delete_file = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void* /*arg*/) { bg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteFile", [&](void* /*arg*/) { fg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::BackgroundEmptyTrash:Wait",
+ [&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ {"DeleteSchedulerTest::DynamicRateLimiting1:1",
+ "DeleteScheduler::BackgroundEmptyTrash"},
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ rate_bytes_per_sec_ = 0; // Disable rate limiting initially
+ NewDeleteScheduler();
+
+
+ int num_files = 10; // 10 files
+ uint64_t file_size = 1024; // every file is 1 kb
+
+ std::vector<int64_t> delete_kbs_per_sec = {512, 200, 0, 100, 50, -2, 25};
+ for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
+ penalties.clear();
+ bg_delete_file = 0;
+ fg_delete_file = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ DestroyAndCreateDir(dummy_files_dirs_[0]);
+ rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
+ delete_scheduler_->SetRateBytesPerSecond(rate_bytes_per_sec_);
+
+ // Create 100 dummy files, every file is 1 Kb
+ std::vector<std::string> generated_files;
+ for (int i = 0; i < num_files; i++) {
+ std::string file_name = "file" + ToString(i) + ".data";
+ generated_files.push_back(NewDummyFile(file_name, file_size));
+ }
+
+ // Delete dummy files and measure time spent to empty trash
+ for (int i = 0; i < num_files; i++) {
+ ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i], ""));
+ }
+ ASSERT_EQ(CountNormalFiles(), 0);
+
+ if (rate_bytes_per_sec_ > 0) {
+ uint64_t delete_start_time = env_->NowMicros();
+ TEST_SYNC_POINT("DeleteSchedulerTest::DynamicRateLimiting1:1");
+ delete_scheduler_->WaitForEmptyTrash();
+ uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
+
+ auto bg_errors = delete_scheduler_->GetBackgroundErrors();
+ ASSERT_EQ(bg_errors.size(), 0);
+
+ uint64_t total_files_size = 0;
+ uint64_t expected_penlty = 0;
+ ASSERT_EQ(penalties.size(), num_files);
+ for (int i = 0; i < num_files; i++) {
+ total_files_size += file_size;
+ expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec_);
+ ASSERT_EQ(expected_penlty, penalties[i]);
+ }
+ ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
+ ASSERT_EQ(bg_delete_file, num_files);
+ ASSERT_EQ(fg_delete_file, 0);
+ } else {
+ ASSERT_EQ(penalties.size(), 0);
+ ASSERT_EQ(bg_delete_file, 0);
+ ASSERT_EQ(fg_delete_file, num_files);
+ }
+
+ ASSERT_EQ(CountTrashFiles(), 0);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ }
+}
+
+TEST_F(DeleteSchedulerTest, ImmediateDeleteOn25PercDBSize) {
+ int bg_delete_file = 0;
+ int fg_delete_file = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteTrashFile:DeleteFile",
+ [&](void* /*arg*/) { bg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteFile", [&](void* /*arg*/) { fg_delete_file++; });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ int num_files = 100; // 100 files
+ uint64_t file_size = 1024 * 10; // 100 KB as a file size
+ rate_bytes_per_sec_ = 1; // 1 byte per sec (very slow trash delete)
+
+ NewDeleteScheduler();
+ delete_scheduler_->SetMaxTrashDBRatio(0.25);
+
+ std::vector<std::string> generated_files;
+ for (int i = 0; i < num_files; i++) {
+ std::string file_name = "file" + ToString(i) + ".data";
+ generated_files.push_back(NewDummyFile(file_name, file_size));
+ }
+
+ for (std::string& file_name : generated_files) {
+ delete_scheduler_->DeleteFile(file_name, "");
+ }
+
+ // When we end up with 26 files in trash we will start
+ // deleting new files immediately
+ ASSERT_EQ(fg_delete_file, 74);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DeleteSchedulerTest, IsTrashCheck) {
+ // Trash files
+ ASSERT_TRUE(DeleteScheduler::IsTrashFile("x.trash"));
+ ASSERT_TRUE(DeleteScheduler::IsTrashFile(".trash"));
+ ASSERT_TRUE(DeleteScheduler::IsTrashFile("abc.sst.trash"));
+ ASSERT_TRUE(DeleteScheduler::IsTrashFile("/a/b/c/abc..sst.trash"));
+ ASSERT_TRUE(DeleteScheduler::IsTrashFile("log.trash"));
+ ASSERT_TRUE(DeleteScheduler::IsTrashFile("^^^^^.log.trash"));
+ ASSERT_TRUE(DeleteScheduler::IsTrashFile("abc.t.trash"));
+
+ // Not trash files
+ ASSERT_FALSE(DeleteScheduler::IsTrashFile("abc.sst"));
+ ASSERT_FALSE(DeleteScheduler::IsTrashFile("abc.txt"));
+ ASSERT_FALSE(DeleteScheduler::IsTrashFile("/a/b/c/abc.sst"));
+ ASSERT_FALSE(DeleteScheduler::IsTrashFile("/a/b/c/abc.sstrash"));
+ ASSERT_FALSE(DeleteScheduler::IsTrashFile("^^^^^.trashh"));
+ ASSERT_FALSE(DeleteScheduler::IsTrashFile("abc.ttrash"));
+ ASSERT_FALSE(DeleteScheduler::IsTrashFile(".ttrash"));
+ ASSERT_FALSE(DeleteScheduler::IsTrashFile("abc.trashx"));
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+int main(int /*argc*/, char** /*argv*/) {
+ printf("DeleteScheduler is not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/file/file_prefetch_buffer.cc b/src/rocksdb/file/file_prefetch_buffer.cc
new file mode 100644
index 000000000..7b55bd397
--- /dev/null
+++ b/src/rocksdb/file/file_prefetch_buffer.cc
@@ -0,0 +1,136 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "file/file_prefetch_buffer.h"
+
+#include <algorithm>
+#include <mutex>
+
+#include "file/random_access_file_reader.h"
+#include "monitoring/histogram.h"
+#include "monitoring/iostats_context_imp.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+
+namespace ROCKSDB_NAMESPACE {
+Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,
+ uint64_t offset, size_t n,
+ bool for_compaction) {
+ if (!enable_ || reader == nullptr) {
+ return Status::OK();
+ }
+ size_t alignment = reader->file()->GetRequiredBufferAlignment();
+ size_t offset_ = static_cast<size_t>(offset);
+ uint64_t rounddown_offset = Rounddown(offset_, alignment);
+ uint64_t roundup_end = Roundup(offset_ + n, alignment);
+ uint64_t roundup_len = roundup_end - rounddown_offset;
+ assert(roundup_len >= alignment);
+ assert(roundup_len % alignment == 0);
+
+ // Check if requested bytes are in the existing buffer_.
+ // If all bytes exist -- return.
+ // If only a few bytes exist -- reuse them & read only what is really needed.
+ // This is typically the case of incremental reading of data.
+ // If no bytes exist in buffer -- full pread.
+
+ Status s;
+ uint64_t chunk_offset_in_buffer = 0;
+ uint64_t chunk_len = 0;
+ bool copy_data_to_new_buffer = false;
+ if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ &&
+ offset <= buffer_offset_ + buffer_.CurrentSize()) {
+ if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) {
+ // All requested bytes are already in the buffer. So no need to Read
+ // again.
+ return s;
+ } else {
+ // Only a few requested bytes are in the buffer. memmove those chunk of
+ // bytes to the beginning, and memcpy them back into the new buffer if a
+ // new buffer is created.
+ chunk_offset_in_buffer =
+ Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment);
+ chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer;
+ assert(chunk_offset_in_buffer % alignment == 0);
+ assert(chunk_len % alignment == 0);
+ assert(chunk_offset_in_buffer + chunk_len <=
+ buffer_offset_ + buffer_.CurrentSize());
+ if (chunk_len > 0) {
+ copy_data_to_new_buffer = true;
+ } else {
+ // this reset is not necessary, but just to be safe.
+ chunk_offset_in_buffer = 0;
+ }
+ }
+ }
+
+ // Create a new buffer only if current capacity is not sufficient, and memcopy
+ // bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
+ if (buffer_.Capacity() < roundup_len) {
+ buffer_.Alignment(alignment);
+ buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
+ copy_data_to_new_buffer, chunk_offset_in_buffer,
+ static_cast<size_t>(chunk_len));
+ } else if (chunk_len > 0) {
+ // New buffer not needed. But memmove bytes from tail to the beginning since
+ // chunk_len is greater than 0.
+ buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
+ static_cast<size_t>(chunk_len));
+ }
+
+ Slice result;
+ s = reader->Read(rounddown_offset + chunk_len,
+ static_cast<size_t>(roundup_len - chunk_len), &result,
+ buffer_.BufferStart() + chunk_len, for_compaction);
+ if (s.ok()) {
+ buffer_offset_ = rounddown_offset;
+ buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
+ }
+ return s;
+}
+
+bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n,
+ Slice* result, bool for_compaction) {
+ if (track_min_offset_ && offset < min_offset_read_) {
+ min_offset_read_ = static_cast<size_t>(offset);
+ }
+ if (!enable_ || offset < buffer_offset_) {
+ return false;
+ }
+
+ // If the buffer contains only a few of the requested bytes:
+ // If readahead is enabled: prefetch the remaining bytes + readadhead bytes
+ // and satisfy the request.
+ // If readahead is not enabled: return false.
+ if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
+ if (readahead_size_ > 0) {
+ assert(file_reader_ != nullptr);
+ assert(max_readahead_size_ >= readahead_size_);
+ Status s;
+ if (for_compaction) {
+ s = Prefetch(file_reader_, offset, std::max(n, readahead_size_),
+ for_compaction);
+ } else {
+ s = Prefetch(file_reader_, offset, n + readahead_size_, for_compaction);
+ }
+ if (!s.ok()) {
+ return false;
+ }
+ readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
+ } else {
+ return false;
+ }
+ }
+
+ uint64_t offset_in_buffer = offset - buffer_offset_;
+ *result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
+ return true;
+}
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/file_prefetch_buffer.h b/src/rocksdb/file/file_prefetch_buffer.h
new file mode 100644
index 000000000..d53f627b5
--- /dev/null
+++ b/src/rocksdb/file/file_prefetch_buffer.h
@@ -0,0 +1,97 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <atomic>
+#include <sstream>
+#include <string>
+#include "file/random_access_file_reader.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "util/aligned_buffer.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// FilePrefetchBuffer is a smart buffer to store and read data from a file.
+class FilePrefetchBuffer {
+ public:
+ // Constructor.
+ //
+ // All arguments are optional.
+ // file_reader : the file reader to use. Can be a nullptr.
+ // readahead_size : the initial readahead size.
+ // max_readahead_size : the maximum readahead size.
+ // If max_readahead_size > readahead_size, the readahead size will be
+ // doubled on every IO until max_readahead_size is hit.
+ // Typically this is set as a multiple of readahead_size.
+ // max_readahead_size should be greater than equal to readahead_size.
+ // enable : controls whether reading from the buffer is enabled.
+ // If false, TryReadFromCache() always return false, and we only take stats
+ // for the minimum offset if track_min_offset = true.
+ // track_min_offset : Track the minimum offset ever read and collect stats on
+ // it. Used for adaptable readahead of the file footer/metadata.
+ //
+ // Automatic readhead is enabled for a file if file_reader, readahead_size,
+ // and max_readahead_size are passed in.
+ // If file_reader is a nullptr, setting readadhead_size and max_readahead_size
+ // does not make any sense. So it does nothing.
+ // A user can construct a FilePrefetchBuffer without any arguments, but use
+ // `Prefetch` to load data into the buffer.
+ FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr,
+ size_t readadhead_size = 0, size_t max_readahead_size = 0,
+ bool enable = true, bool track_min_offset = false)
+ : buffer_offset_(0),
+ file_reader_(file_reader),
+ readahead_size_(readadhead_size),
+ max_readahead_size_(max_readahead_size),
+ min_offset_read_(port::kMaxSizet),
+ enable_(enable),
+ track_min_offset_(track_min_offset) {}
+
+ // Load data into the buffer from a file.
+ // reader : the file reader.
+ // offset : the file offset to start reading from.
+ // n : the number of bytes to read.
+ // for_compaction : if prefetch is done for compaction read.
+ Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n,
+ bool for_compaction = false);
+
+ // Tries returning the data for a file raed from this buffer, if that data is
+ // in the buffer.
+ // It handles tracking the minimum read offset if track_min_offset = true.
+ // It also does the exponential readahead when readadhead_size is set as part
+ // of the constructor.
+ //
+ // offset : the file offset.
+ // n : the number of bytes.
+ // result : output buffer to put the data into.
+ // for_compaction : if cache read is done for compaction read.
+ bool TryReadFromCache(uint64_t offset, size_t n, Slice* result,
+ bool for_compaction = false);
+
+ // The minimum `offset` ever passed to TryReadFromCache(). This will nly be
+ // tracked if track_min_offset = true.
+ size_t min_offset_read() const { return min_offset_read_; }
+
+ private:
+ AlignedBuffer buffer_;
+ uint64_t buffer_offset_;
+ RandomAccessFileReader* file_reader_;
+ size_t readahead_size_;
+ size_t max_readahead_size_;
+ // The minimum `offset` ever passed to TryReadFromCache().
+ size_t min_offset_read_;
+ // if false, TryReadFromCache() always return false, and we only take stats
+ // for track_min_offset_ if track_min_offset_ = true
+ bool enable_;
+ // If true, track minimum `offset` ever passed to TryReadFromCache(), which
+ // can be fetched from min_offset_read().
+ bool track_min_offset_;
+};
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/file_util.cc b/src/rocksdb/file/file_util.cc
new file mode 100644
index 000000000..c231bf7d1
--- /dev/null
+++ b/src/rocksdb/file/file_util.cc
@@ -0,0 +1,124 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#include "file/file_util.h"
+
+#include <string>
+#include <algorithm>
+
+#include "file/random_access_file_reader.h"
+#include "file/sequence_file_reader.h"
+#include "file/sst_file_manager_impl.h"
+#include "file/writable_file_writer.h"
+#include "rocksdb/env.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Utility function to copy a file up to a specified length
+Status CopyFile(FileSystem* fs, const std::string& source,
+ const std::string& destination, uint64_t size, bool use_fsync) {
+ const FileOptions soptions;
+ Status s;
+ std::unique_ptr<SequentialFileReader> src_reader;
+ std::unique_ptr<WritableFileWriter> dest_writer;
+
+ {
+ std::unique_ptr<FSSequentialFile> srcfile;
+ s = fs->NewSequentialFile(source, soptions, &srcfile, nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ std::unique_ptr<FSWritableFile> destfile;
+ s = fs->NewWritableFile(destination, soptions, &destfile, nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (size == 0) {
+ // default argument means copy everything
+ s = fs->GetFileSize(source, IOOptions(), &size, nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ src_reader.reset(new SequentialFileReader(std::move(srcfile), source));
+ dest_writer.reset(
+ new WritableFileWriter(std::move(destfile), destination, soptions));
+ }
+
+ char buffer[4096];
+ Slice slice;
+ while (size > 0) {
+ size_t bytes_to_read = std::min(sizeof(buffer), static_cast<size_t>(size));
+ s = src_reader->Read(bytes_to_read, &slice, buffer);
+ if (!s.ok()) {
+ return s;
+ }
+ if (slice.size() == 0) {
+ return Status::Corruption("file too small");
+ }
+ s = dest_writer->Append(slice);
+ if (!s.ok()) {
+ return s;
+ }
+ size -= slice.size();
+ }
+ return dest_writer->Sync(use_fsync);
+}
+
+// Utility function to create a file with the provided contents
+Status CreateFile(FileSystem* fs, const std::string& destination,
+ const std::string& contents, bool use_fsync) {
+ const EnvOptions soptions;
+ Status s;
+ std::unique_ptr<WritableFileWriter> dest_writer;
+
+ std::unique_ptr<FSWritableFile> destfile;
+ s = fs->NewWritableFile(destination, soptions, &destfile, nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ dest_writer.reset(
+ new WritableFileWriter(std::move(destfile), destination, soptions));
+ s = dest_writer->Append(Slice(contents));
+ if (!s.ok()) {
+ return s;
+ }
+ return dest_writer->Sync(use_fsync);
+}
+
+Status DeleteDBFile(const ImmutableDBOptions* db_options,
+ const std::string& fname, const std::string& dir_to_sync,
+ const bool force_bg, const bool force_fg) {
+#ifndef ROCKSDB_LITE
+ SstFileManagerImpl* sfm =
+ static_cast<SstFileManagerImpl*>(db_options->sst_file_manager.get());
+ if (sfm && !force_fg) {
+ return sfm->ScheduleFileDeletion(fname, dir_to_sync, force_bg);
+ } else {
+ return db_options->env->DeleteFile(fname);
+ }
+#else
+ (void)dir_to_sync;
+ (void)force_bg;
+ (void)force_fg;
+ // SstFileManager is not supported in ROCKSDB_LITE
+ // Delete file immediately
+ return db_options->env->DeleteFile(fname);
+#endif
+}
+
+bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options) {
+ bool same = false;
+ assert(!db_options->db_paths.empty());
+ Status s = db_options->env->AreFilesSame(db_options->wal_dir,
+ db_options->db_paths[0].path, &same);
+ if (s.IsNotSupported()) {
+ same = db_options->wal_dir == db_options->db_paths[0].path;
+ }
+ return same;
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/file_util.h b/src/rocksdb/file/file_util.h
new file mode 100644
index 000000000..333749e4d
--- /dev/null
+++ b/src/rocksdb/file/file_util.h
@@ -0,0 +1,33 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+#include <string>
+
+#include "file/filename.h"
+#include "options/db_options.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/status.h"
+#include "rocksdb/types.h"
+
+namespace ROCKSDB_NAMESPACE {
+// use_fsync maps to options.use_fsync, which determines the way that
+// the file is synced after copying.
+extern Status CopyFile(FileSystem* fs, const std::string& source,
+ const std::string& destination, uint64_t size,
+ bool use_fsync);
+
+extern Status CreateFile(FileSystem* fs, const std::string& destination,
+ const std::string& contents, bool use_fsync);
+
+extern Status DeleteDBFile(const ImmutableDBOptions* db_options,
+ const std::string& fname,
+ const std::string& path_to_sync, const bool force_bg,
+ const bool force_fg);
+
+extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options);
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/filename.cc b/src/rocksdb/file/filename.cc
new file mode 100644
index 000000000..d620b7351
--- /dev/null
+++ b/src/rocksdb/file/filename.cc
@@ -0,0 +1,456 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "file/filename.h"
+#include <cinttypes>
+
+#include <ctype.h>
+#include <stdio.h>
+#include <vector>
+#include "file/writable_file_writer.h"
+#include "logging/logging.h"
+#include "rocksdb/env.h"
+#include "test_util/sync_point.h"
+#include "util/stop_watch.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+static const std::string kRocksDbTFileExt = "sst";
+static const std::string kLevelDbTFileExt = "ldb";
+static const std::string kRocksDBBlobFileExt = "blob";
+
+// Given a path, flatten the path name by replacing all chars not in
+// {[0-9,a-z,A-Z,-,_,.]} with _. And append '_LOG\0' at the end.
+// Return the number of chars stored in dest not including the trailing '\0'.
+static size_t GetInfoLogPrefix(const std::string& path, char* dest, int len) {
+ const char suffix[] = "_LOG";
+
+ size_t write_idx = 0;
+ size_t i = 0;
+ size_t src_len = path.size();
+
+ while (i < src_len && write_idx < len - sizeof(suffix)) {
+ if ((path[i] >= 'a' && path[i] <= 'z') ||
+ (path[i] >= '0' && path[i] <= '9') ||
+ (path[i] >= 'A' && path[i] <= 'Z') ||
+ path[i] == '-' ||
+ path[i] == '.' ||
+ path[i] == '_'){
+ dest[write_idx++] = path[i];
+ } else {
+ if (i > 0) {
+ dest[write_idx++] = '_';
+ }
+ }
+ i++;
+ }
+ assert(sizeof(suffix) <= len - write_idx);
+ // "\0" is automatically added by snprintf
+ snprintf(dest + write_idx, len - write_idx, suffix);
+ write_idx += sizeof(suffix) - 1;
+ return write_idx;
+}
+
+static std::string MakeFileName(uint64_t number, const char* suffix) {
+ char buf[100];
+ snprintf(buf, sizeof(buf), "%06llu.%s",
+ static_cast<unsigned long long>(number), suffix);
+ return buf;
+}
+
+static std::string MakeFileName(const std::string& name, uint64_t number,
+ const char* suffix) {
+ return name + "/" + MakeFileName(number, suffix);
+}
+
+std::string LogFileName(const std::string& name, uint64_t number) {
+ assert(number > 0);
+ return MakeFileName(name, number, "log");
+}
+
+std::string LogFileName(uint64_t number) {
+ assert(number > 0);
+ return MakeFileName(number, "log");
+}
+
+std::string BlobFileName(const std::string& blobdirname, uint64_t number) {
+ assert(number > 0);
+ return MakeFileName(blobdirname, number, kRocksDBBlobFileExt.c_str());
+}
+
+std::string BlobFileName(const std::string& dbname, const std::string& blob_dir,
+ uint64_t number) {
+ assert(number > 0);
+ return MakeFileName(dbname + "/" + blob_dir, number,
+ kRocksDBBlobFileExt.c_str());
+}
+
+std::string ArchivalDirectory(const std::string& dir) {
+ return dir + "/" + ARCHIVAL_DIR;
+}
+std::string ArchivedLogFileName(const std::string& name, uint64_t number) {
+ assert(number > 0);
+ return MakeFileName(name + "/" + ARCHIVAL_DIR, number, "log");
+}
+
+std::string MakeTableFileName(const std::string& path, uint64_t number) {
+ return MakeFileName(path, number, kRocksDbTFileExt.c_str());
+}
+
+std::string MakeTableFileName(uint64_t number) {
+ return MakeFileName(number, kRocksDbTFileExt.c_str());
+}
+
+std::string Rocks2LevelTableFileName(const std::string& fullname) {
+ assert(fullname.size() > kRocksDbTFileExt.size() + 1);
+ if (fullname.size() <= kRocksDbTFileExt.size() + 1) {
+ return "";
+ }
+ return fullname.substr(0, fullname.size() - kRocksDbTFileExt.size()) +
+ kLevelDbTFileExt;
+}
+
+uint64_t TableFileNameToNumber(const std::string& name) {
+ uint64_t number = 0;
+ uint64_t base = 1;
+ int pos = static_cast<int>(name.find_last_of('.'));
+ while (--pos >= 0 && name[pos] >= '0' && name[pos] <= '9') {
+ number += (name[pos] - '0') * base;
+ base *= 10;
+ }
+ return number;
+}
+
+std::string TableFileName(const std::vector<DbPath>& db_paths, uint64_t number,
+ uint32_t path_id) {
+ assert(number > 0);
+ std::string path;
+ if (path_id >= db_paths.size()) {
+ path = db_paths.back().path;
+ } else {
+ path = db_paths[path_id].path;
+ }
+ return MakeTableFileName(path, number);
+}
+
+void FormatFileNumber(uint64_t number, uint32_t path_id, char* out_buf,
+ size_t out_buf_size) {
+ if (path_id == 0) {
+ snprintf(out_buf, out_buf_size, "%" PRIu64, number);
+ } else {
+ snprintf(out_buf, out_buf_size, "%" PRIu64
+ "(path "
+ "%" PRIu32 ")",
+ number, path_id);
+ }
+}
+
+std::string DescriptorFileName(const std::string& dbname, uint64_t number) {
+ assert(number > 0);
+ char buf[100];
+ snprintf(buf, sizeof(buf), "/MANIFEST-%06llu",
+ static_cast<unsigned long long>(number));
+ return dbname + buf;
+}
+
+std::string CurrentFileName(const std::string& dbname) {
+ return dbname + "/CURRENT";
+}
+
+std::string LockFileName(const std::string& dbname) {
+ return dbname + "/LOCK";
+}
+
+std::string TempFileName(const std::string& dbname, uint64_t number) {
+ return MakeFileName(dbname, number, kTempFileNameSuffix.c_str());
+}
+
+InfoLogPrefix::InfoLogPrefix(bool has_log_dir,
+ const std::string& db_absolute_path) {
+ if (!has_log_dir) {
+ const char kInfoLogPrefix[] = "LOG";
+ // "\0" is automatically added to the end
+ snprintf(buf, sizeof(buf), kInfoLogPrefix);
+ prefix = Slice(buf, sizeof(kInfoLogPrefix) - 1);
+ } else {
+ size_t len = GetInfoLogPrefix(db_absolute_path, buf, sizeof(buf));
+ prefix = Slice(buf, len);
+ }
+}
+
+std::string InfoLogFileName(const std::string& dbname,
+ const std::string& db_path, const std::string& log_dir) {
+ if (log_dir.empty()) {
+ return dbname + "/LOG";
+ }
+
+ InfoLogPrefix info_log_prefix(true, db_path);
+ return log_dir + "/" + info_log_prefix.buf;
+}
+
+// Return the name of the old info log file for "dbname".
+std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts,
+ const std::string& db_path, const std::string& log_dir) {
+ char buf[50];
+ snprintf(buf, sizeof(buf), "%llu", static_cast<unsigned long long>(ts));
+
+ if (log_dir.empty()) {
+ return dbname + "/LOG.old." + buf;
+ }
+
+ InfoLogPrefix info_log_prefix(true, db_path);
+ return log_dir + "/" + info_log_prefix.buf + ".old." + buf;
+}
+
+std::string OptionsFileName(const std::string& dbname, uint64_t file_num) {
+ char buffer[256];
+ snprintf(buffer, sizeof(buffer), "%s%06" PRIu64,
+ kOptionsFileNamePrefix.c_str(), file_num);
+ return dbname + "/" + buffer;
+}
+
+std::string TempOptionsFileName(const std::string& dbname, uint64_t file_num) {
+ char buffer[256];
+ snprintf(buffer, sizeof(buffer), "%s%06" PRIu64 ".%s",
+ kOptionsFileNamePrefix.c_str(), file_num,
+ kTempFileNameSuffix.c_str());
+ return dbname + "/" + buffer;
+}
+
+std::string MetaDatabaseName(const std::string& dbname, uint64_t number) {
+ char buf[100];
+ snprintf(buf, sizeof(buf), "/METADB-%llu",
+ static_cast<unsigned long long>(number));
+ return dbname + buf;
+}
+
+std::string IdentityFileName(const std::string& dbname) {
+ return dbname + "/IDENTITY";
+}
+
+// Owned filenames have the form:
+// dbname/IDENTITY
+// dbname/CURRENT
+// dbname/LOCK
+// dbname/<info_log_name_prefix>
+// dbname/<info_log_name_prefix>.old.[0-9]+
+// dbname/MANIFEST-[0-9]+
+// dbname/[0-9]+.(log|sst|blob)
+// dbname/METADB-[0-9]+
+// dbname/OPTIONS-[0-9]+
+// dbname/OPTIONS-[0-9]+.dbtmp
+// Disregards / at the beginning
+bool ParseFileName(const std::string& fname,
+ uint64_t* number,
+ FileType* type,
+ WalFileType* log_type) {
+ return ParseFileName(fname, number, "", type, log_type);
+}
+
+bool ParseFileName(const std::string& fname, uint64_t* number,
+ const Slice& info_log_name_prefix, FileType* type,
+ WalFileType* log_type) {
+ Slice rest(fname);
+ if (fname.length() > 1 && fname[0] == '/') {
+ rest.remove_prefix(1);
+ }
+ if (rest == "IDENTITY") {
+ *number = 0;
+ *type = kIdentityFile;
+ } else if (rest == "CURRENT") {
+ *number = 0;
+ *type = kCurrentFile;
+ } else if (rest == "LOCK") {
+ *number = 0;
+ *type = kDBLockFile;
+ } else if (info_log_name_prefix.size() > 0 &&
+ rest.starts_with(info_log_name_prefix)) {
+ rest.remove_prefix(info_log_name_prefix.size());
+ if (rest == "" || rest == ".old") {
+ *number = 0;
+ *type = kInfoLogFile;
+ } else if (rest.starts_with(".old.")) {
+ uint64_t ts_suffix;
+ // sizeof also counts the trailing '\0'.
+ rest.remove_prefix(sizeof(".old.") - 1);
+ if (!ConsumeDecimalNumber(&rest, &ts_suffix)) {
+ return false;
+ }
+ *number = ts_suffix;
+ *type = kInfoLogFile;
+ }
+ } else if (rest.starts_with("MANIFEST-")) {
+ rest.remove_prefix(strlen("MANIFEST-"));
+ uint64_t num;
+ if (!ConsumeDecimalNumber(&rest, &num)) {
+ return false;
+ }
+ if (!rest.empty()) {
+ return false;
+ }
+ *type = kDescriptorFile;
+ *number = num;
+ } else if (rest.starts_with("METADB-")) {
+ rest.remove_prefix(strlen("METADB-"));
+ uint64_t num;
+ if (!ConsumeDecimalNumber(&rest, &num)) {
+ return false;
+ }
+ if (!rest.empty()) {
+ return false;
+ }
+ *type = kMetaDatabase;
+ *number = num;
+ } else if (rest.starts_with(kOptionsFileNamePrefix)) {
+ uint64_t ts_suffix;
+ bool is_temp_file = false;
+ rest.remove_prefix(kOptionsFileNamePrefix.size());
+ const std::string kTempFileNameSuffixWithDot =
+ std::string(".") + kTempFileNameSuffix;
+ if (rest.ends_with(kTempFileNameSuffixWithDot)) {
+ rest.remove_suffix(kTempFileNameSuffixWithDot.size());
+ is_temp_file = true;
+ }
+ if (!ConsumeDecimalNumber(&rest, &ts_suffix)) {
+ return false;
+ }
+ *number = ts_suffix;
+ *type = is_temp_file ? kTempFile : kOptionsFile;
+ } else {
+ // Avoid strtoull() to keep filename format independent of the
+ // current locale
+ bool archive_dir_found = false;
+ if (rest.starts_with(ARCHIVAL_DIR)) {
+ if (rest.size() <= ARCHIVAL_DIR.size()) {
+ return false;
+ }
+ rest.remove_prefix(ARCHIVAL_DIR.size() + 1); // Add 1 to remove / also
+ if (log_type) {
+ *log_type = kArchivedLogFile;
+ }
+ archive_dir_found = true;
+ }
+ uint64_t num;
+ if (!ConsumeDecimalNumber(&rest, &num)) {
+ return false;
+ }
+ if (rest.size() <= 1 || rest[0] != '.') {
+ return false;
+ }
+ rest.remove_prefix(1);
+
+ Slice suffix = rest;
+ if (suffix == Slice("log")) {
+ *type = kLogFile;
+ if (log_type && !archive_dir_found) {
+ *log_type = kAliveLogFile;
+ }
+ } else if (archive_dir_found) {
+ return false; // Archive dir can contain only log files
+ } else if (suffix == Slice(kRocksDbTFileExt) ||
+ suffix == Slice(kLevelDbTFileExt)) {
+ *type = kTableFile;
+ } else if (suffix == Slice(kRocksDBBlobFileExt)) {
+ *type = kBlobFile;
+ } else if (suffix == Slice(kTempFileNameSuffix)) {
+ *type = kTempFile;
+ } else {
+ return false;
+ }
+ *number = num;
+ }
+ return true;
+}
+
+Status SetCurrentFile(Env* env, const std::string& dbname,
+ uint64_t descriptor_number,
+ Directory* directory_to_fsync) {
+ // Remove leading "dbname/" and add newline to manifest file name
+ std::string manifest = DescriptorFileName(dbname, descriptor_number);
+ Slice contents = manifest;
+ assert(contents.starts_with(dbname + "/"));
+ contents.remove_prefix(dbname.size() + 1);
+ std::string tmp = TempFileName(dbname, descriptor_number);
+ Status s = WriteStringToFile(env, contents.ToString() + "\n", tmp, true);
+ if (s.ok()) {
+ TEST_KILL_RANDOM("SetCurrentFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
+ s = env->RenameFile(tmp, CurrentFileName(dbname));
+ TEST_KILL_RANDOM("SetCurrentFile:1", rocksdb_kill_odds * REDUCE_ODDS2);
+ }
+ if (s.ok()) {
+ if (directory_to_fsync != nullptr) {
+ s = directory_to_fsync->Fsync();
+ }
+ } else {
+ env->DeleteFile(tmp);
+ }
+ return s;
+}
+
+Status SetIdentityFile(Env* env, const std::string& dbname,
+ const std::string& db_id) {
+ std::string id;
+ if (db_id.empty()) {
+ id = env->GenerateUniqueId();
+ } else {
+ id = db_id;
+ }
+ assert(!id.empty());
+ // Reserve the filename dbname/000000.dbtmp for the temporary identity file
+ std::string tmp = TempFileName(dbname, 0);
+ Status s = WriteStringToFile(env, id, tmp, true);
+ if (s.ok()) {
+ s = env->RenameFile(tmp, IdentityFileName(dbname));
+ }
+ if (!s.ok()) {
+ env->DeleteFile(tmp);
+ }
+ return s;
+}
+
+Status SyncManifest(Env* env, const ImmutableDBOptions* db_options,
+ WritableFileWriter* file) {
+ TEST_KILL_RANDOM("SyncManifest:0", rocksdb_kill_odds * REDUCE_ODDS2);
+ StopWatch sw(env, db_options->statistics.get(), MANIFEST_FILE_SYNC_MICROS);
+ return file->Sync(db_options->use_fsync);
+}
+
+Status GetInfoLogFiles(Env* env, const std::string& db_log_dir,
+ const std::string& dbname, std::string* parent_dir,
+ std::vector<std::string>* info_log_list) {
+ assert(parent_dir != nullptr);
+ assert(info_log_list != nullptr);
+ uint64_t number = 0;
+ FileType type = kLogFile;
+
+ if (!db_log_dir.empty()) {
+ *parent_dir = db_log_dir;
+ } else {
+ *parent_dir = dbname;
+ }
+
+ InfoLogPrefix info_log_prefix(!db_log_dir.empty(), dbname);
+
+ std::vector<std::string> file_names;
+ Status s = env->GetChildren(*parent_dir, &file_names);
+
+ if (!s.ok()) {
+ return s;
+ }
+
+ for (auto& f : file_names) {
+ if (ParseFileName(f, &number, info_log_prefix.prefix, &type) &&
+ (type == kInfoLogFile)) {
+ info_log_list->push_back(f);
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/filename.h b/src/rocksdb/file/filename.h
new file mode 100644
index 000000000..909287c25
--- /dev/null
+++ b/src/rocksdb/file/filename.h
@@ -0,0 +1,185 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// File names used by DB code
+
+#pragma once
+#include <stdint.h>
+#include <unordered_map>
+#include <string>
+#include <vector>
+
+#include "options/db_options.h"
+#include "port/port.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+#include "rocksdb/transaction_log.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class Env;
+class Directory;
+class WritableFileWriter;
+
+enum FileType {
+ kLogFile,
+ kDBLockFile,
+ kTableFile,
+ kDescriptorFile,
+ kCurrentFile,
+ kTempFile,
+ kInfoLogFile, // Either the current one, or an old one
+ kMetaDatabase,
+ kIdentityFile,
+ kOptionsFile,
+ kBlobFile
+};
+
+// Return the name of the log file with the specified number
+// in the db named by "dbname". The result will be prefixed with
+// "dbname".
+extern std::string LogFileName(const std::string& dbname, uint64_t number);
+
+extern std::string LogFileName(uint64_t number);
+
+extern std::string BlobFileName(const std::string& bdirname, uint64_t number);
+
+extern std::string BlobFileName(const std::string& dbname,
+ const std::string& blob_dir, uint64_t number);
+
+static const std::string ARCHIVAL_DIR = "archive";
+
+extern std::string ArchivalDirectory(const std::string& dbname);
+
+// Return the name of the archived log file with the specified number
+// in the db named by "dbname". The result will be prefixed with "dbname".
+extern std::string ArchivedLogFileName(const std::string& dbname,
+ uint64_t num);
+
+extern std::string MakeTableFileName(const std::string& name, uint64_t number);
+
+extern std::string MakeTableFileName(uint64_t number);
+
+// Return the name of sstable with LevelDB suffix
+// created from RocksDB sstable suffixed name
+extern std::string Rocks2LevelTableFileName(const std::string& fullname);
+
+// the reverse function of MakeTableFileName
+// TODO(yhchiang): could merge this function with ParseFileName()
+extern uint64_t TableFileNameToNumber(const std::string& name);
+
+// Return the name of the sstable with the specified number
+// in the db named by "dbname". The result will be prefixed with
+// "dbname".
+extern std::string TableFileName(const std::vector<DbPath>& db_paths,
+ uint64_t number, uint32_t path_id);
+
+// Sufficient buffer size for FormatFileNumber.
+const size_t kFormatFileNumberBufSize = 38;
+
+extern void FormatFileNumber(uint64_t number, uint32_t path_id, char* out_buf,
+ size_t out_buf_size);
+
+// Return the name of the descriptor file for the db named by
+// "dbname" and the specified incarnation number. The result will be
+// prefixed with "dbname".
+extern std::string DescriptorFileName(const std::string& dbname,
+ uint64_t number);
+
+// Return the name of the current file. This file contains the name
+// of the current manifest file. The result will be prefixed with
+// "dbname".
+extern std::string CurrentFileName(const std::string& dbname);
+
+// Return the name of the lock file for the db named by
+// "dbname". The result will be prefixed with "dbname".
+extern std::string LockFileName(const std::string& dbname);
+
+// Return the name of a temporary file owned by the db named "dbname".
+// The result will be prefixed with "dbname".
+extern std::string TempFileName(const std::string& dbname, uint64_t number);
+
+// A helper structure for prefix of info log names.
+struct InfoLogPrefix {
+ char buf[260];
+ Slice prefix;
+ // Prefix with DB absolute path encoded
+ explicit InfoLogPrefix(bool has_log_dir, const std::string& db_absolute_path);
+ // Default Prefix
+ explicit InfoLogPrefix();
+};
+
+// Return the name of the info log file for "dbname".
+extern std::string InfoLogFileName(const std::string& dbname,
+ const std::string& db_path = "",
+ const std::string& log_dir = "");
+
+// Return the name of the old info log file for "dbname".
+extern std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts,
+ const std::string& db_path = "",
+ const std::string& log_dir = "");
+
+static const std::string kOptionsFileNamePrefix = "OPTIONS-";
+static const std::string kTempFileNameSuffix = "dbtmp";
+
+// Return a options file name given the "dbname" and file number.
+// Format: OPTIONS-[number].dbtmp
+extern std::string OptionsFileName(const std::string& dbname,
+ uint64_t file_num);
+
+// Return a temp options file name given the "dbname" and file number.
+// Format: OPTIONS-[number]
+extern std::string TempOptionsFileName(const std::string& dbname,
+ uint64_t file_num);
+
+// Return the name to use for a metadatabase. The result will be prefixed with
+// "dbname".
+extern std::string MetaDatabaseName(const std::string& dbname,
+ uint64_t number);
+
+// Return the name of the Identity file which stores a unique number for the db
+// that will get regenerated if the db loses all its data and is recreated fresh
+// either from a backup-image or empty
+extern std::string IdentityFileName(const std::string& dbname);
+
+// If filename is a rocksdb file, store the type of the file in *type.
+// The number encoded in the filename is stored in *number. If the
+// filename was successfully parsed, returns true. Else return false.
+// info_log_name_prefix is the path of info logs.
+extern bool ParseFileName(const std::string& filename, uint64_t* number,
+ const Slice& info_log_name_prefix, FileType* type,
+ WalFileType* log_type = nullptr);
+// Same as previous function, but skip info log files.
+extern bool ParseFileName(const std::string& filename, uint64_t* number,
+ FileType* type, WalFileType* log_type = nullptr);
+
+// Make the CURRENT file point to the descriptor file with the
+// specified number.
+extern Status SetCurrentFile(Env* env, const std::string& dbname,
+ uint64_t descriptor_number,
+ Directory* directory_to_fsync);
+
+// Make the IDENTITY file for the db
+extern Status SetIdentityFile(Env* env, const std::string& dbname,
+ const std::string& db_id = {});
+
+// Sync manifest file `file`.
+extern Status SyncManifest(Env* env, const ImmutableDBOptions* db_options,
+ WritableFileWriter* file);
+
+// Return list of file names of info logs in `file_names`.
+// The list only contains file name. The parent directory name is stored
+// in `parent_dir`.
+// `db_log_dir` should be the one as in options.db_log_dir
+extern Status GetInfoLogFiles(Env* env, const std::string& db_log_dir,
+ const std::string& dbname,
+ std::string* parent_dir,
+ std::vector<std::string>* file_names);
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/random_access_file_reader.cc b/src/rocksdb/file/random_access_file_reader.cc
new file mode 100644
index 000000000..46892360f
--- /dev/null
+++ b/src/rocksdb/file/random_access_file_reader.cc
@@ -0,0 +1,189 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "file/random_access_file_reader.h"
+
+#include <algorithm>
+#include <mutex>
+
+#include "monitoring/histogram.h"
+#include "monitoring/iostats_context_imp.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+
+namespace ROCKSDB_NAMESPACE {
+Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch, bool for_compaction) const {
+ Status s;
+ uint64_t elapsed = 0;
+ {
+ StopWatch sw(env_, stats_, hist_type_,
+ (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
+ true /*delay_enabled*/);
+ auto prev_perf_level = GetPerfLevel();
+ IOSTATS_TIMER_GUARD(read_nanos);
+ if (use_direct_io()) {
+#ifndef ROCKSDB_LITE
+ size_t alignment = file_->GetRequiredBufferAlignment();
+ size_t aligned_offset =
+ TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
+ size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
+ size_t read_size =
+ Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
+ AlignedBuffer buf;
+ buf.Alignment(alignment);
+ buf.AllocateNewBuffer(read_size);
+ while (buf.CurrentSize() < read_size) {
+ size_t allowed;
+ if (for_compaction && rate_limiter_ != nullptr) {
+ allowed = rate_limiter_->RequestToken(
+ buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
+ Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
+ } else {
+ assert(buf.CurrentSize() == 0);
+ allowed = read_size;
+ }
+ Slice tmp;
+
+ FileOperationInfo::TimePoint start_ts;
+ uint64_t orig_offset = 0;
+ if (ShouldNotifyListeners()) {
+ start_ts = std::chrono::system_clock::now();
+ orig_offset = aligned_offset + buf.CurrentSize();
+ }
+ {
+ IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
+ s = file_->Read(aligned_offset + buf.CurrentSize(), allowed,
+ IOOptions(), &tmp, buf.Destination(), nullptr);
+ }
+ if (ShouldNotifyListeners()) {
+ auto finish_ts = std::chrono::system_clock::now();
+ NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
+ s);
+ }
+
+ buf.Size(buf.CurrentSize() + tmp.size());
+ if (!s.ok() || tmp.size() < allowed) {
+ break;
+ }
+ }
+ size_t res_len = 0;
+ if (s.ok() && offset_advance < buf.CurrentSize()) {
+ res_len = buf.Read(scratch, offset_advance,
+ std::min(buf.CurrentSize() - offset_advance, n));
+ }
+ *result = Slice(scratch, res_len);
+#endif // !ROCKSDB_LITE
+ } else {
+ size_t pos = 0;
+ const char* res_scratch = nullptr;
+ while (pos < n) {
+ size_t allowed;
+ if (for_compaction && rate_limiter_ != nullptr) {
+ if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
+ sw.DelayStart();
+ }
+ allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
+ Env::IOPriority::IO_LOW, stats_,
+ RateLimiter::OpType::kRead);
+ if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
+ sw.DelayStop();
+ }
+ } else {
+ allowed = n;
+ }
+ Slice tmp_result;
+
+#ifndef ROCKSDB_LITE
+ FileOperationInfo::TimePoint start_ts;
+ if (ShouldNotifyListeners()) {
+ start_ts = std::chrono::system_clock::now();
+ }
+#endif
+ {
+ IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
+ s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result,
+ scratch + pos, nullptr);
+ }
+#ifndef ROCKSDB_LITE
+ if (ShouldNotifyListeners()) {
+ auto finish_ts = std::chrono::system_clock::now();
+ NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
+ finish_ts, s);
+ }
+#endif
+
+ if (res_scratch == nullptr) {
+ // we can't simply use `scratch` because reads of mmap'd files return
+ // data in a different buffer.
+ res_scratch = tmp_result.data();
+ } else {
+ // make sure chunks are inserted contiguously into `res_scratch`.
+ assert(tmp_result.data() == res_scratch + pos);
+ }
+ pos += tmp_result.size();
+ if (!s.ok() || tmp_result.size() < allowed) {
+ break;
+ }
+ }
+ *result = Slice(res_scratch, s.ok() ? pos : 0);
+ }
+ IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
+ SetPerfLevel(prev_perf_level);
+ }
+ if (stats_ != nullptr && file_read_hist_ != nullptr) {
+ file_read_hist_->Add(elapsed);
+ }
+
+ return s;
+}
+
+Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
+ size_t num_reqs) const {
+ Status s;
+ uint64_t elapsed = 0;
+ assert(!use_direct_io());
+ {
+ StopWatch sw(env_, stats_, hist_type_,
+ (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
+ true /*delay_enabled*/);
+ auto prev_perf_level = GetPerfLevel();
+ IOSTATS_TIMER_GUARD(read_nanos);
+
+#ifndef ROCKSDB_LITE
+ FileOperationInfo::TimePoint start_ts;
+ if (ShouldNotifyListeners()) {
+ start_ts = std::chrono::system_clock::now();
+ }
+#endif // ROCKSDB_LITE
+ {
+ IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
+ s = file_->MultiRead(read_reqs, num_reqs, IOOptions(), nullptr);
+ }
+ for (size_t i = 0; i < num_reqs; ++i) {
+#ifndef ROCKSDB_LITE
+ if (ShouldNotifyListeners()) {
+ auto finish_ts = std::chrono::system_clock::now();
+ NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
+ start_ts, finish_ts, read_reqs[i].status);
+ }
+#endif // ROCKSDB_LITE
+ IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
+ }
+ SetPerfLevel(prev_perf_level);
+ }
+ if (stats_ != nullptr && file_read_hist_ != nullptr) {
+ file_read_hist_->Add(elapsed);
+ }
+
+ return s;
+}
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/random_access_file_reader.h b/src/rocksdb/file/random_access_file_reader.h
new file mode 100644
index 000000000..35027bf45
--- /dev/null
+++ b/src/rocksdb/file/random_access_file_reader.h
@@ -0,0 +1,120 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <atomic>
+#include <sstream>
+#include <string>
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/listener.h"
+#include "rocksdb/rate_limiter.h"
+#include "util/aligned_buffer.h"
+
+namespace ROCKSDB_NAMESPACE {
+class Statistics;
+class HistogramImpl;
+
+// RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is
+// responsible for:
+// - Handling Buffered and Direct reads appropriately.
+// - Rate limiting compaction reads.
+// - Notifying any interested listeners on the completion of a read.
+// - Updating IO stats.
+class RandomAccessFileReader {
+ private:
+#ifndef ROCKSDB_LITE
+ void NotifyOnFileReadFinish(uint64_t offset, size_t length,
+ const FileOperationInfo::TimePoint& start_ts,
+ const FileOperationInfo::TimePoint& finish_ts,
+ const Status& status) const {
+ FileOperationInfo info(file_name_, start_ts, finish_ts);
+ info.offset = offset;
+ info.length = length;
+ info.status = status;
+
+ for (auto& listener : listeners_) {
+ listener->OnFileReadFinish(info);
+ }
+ }
+#endif // ROCKSDB_LITE
+
+ bool ShouldNotifyListeners() const { return !listeners_.empty(); }
+
+ std::unique_ptr<FSRandomAccessFile> file_;
+ std::string file_name_;
+ Env* env_;
+ Statistics* stats_;
+ uint32_t hist_type_;
+ HistogramImpl* file_read_hist_;
+ RateLimiter* rate_limiter_;
+ std::vector<std::shared_ptr<EventListener>> listeners_;
+
+ public:
+ explicit RandomAccessFileReader(
+ std::unique_ptr<FSRandomAccessFile>&& raf, std::string _file_name,
+ Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
+ HistogramImpl* file_read_hist = nullptr,
+ RateLimiter* rate_limiter = nullptr,
+ const std::vector<std::shared_ptr<EventListener>>& listeners = {})
+ : file_(std::move(raf)),
+ file_name_(std::move(_file_name)),
+ env_(env),
+ stats_(stats),
+ hist_type_(hist_type),
+ file_read_hist_(file_read_hist),
+ rate_limiter_(rate_limiter),
+ listeners_() {
+#ifndef ROCKSDB_LITE
+ std::for_each(listeners.begin(), listeners.end(),
+ [this](const std::shared_ptr<EventListener>& e) {
+ if (e->ShouldBeNotifiedOnFileIO()) {
+ listeners_.emplace_back(e);
+ }
+ });
+#else // !ROCKSDB_LITE
+ (void)listeners;
+#endif
+ }
+
+ RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
+ *this = std::move(o);
+ }
+
+ RandomAccessFileReader& operator=(RandomAccessFileReader&& o)
+ ROCKSDB_NOEXCEPT {
+ file_ = std::move(o.file_);
+ env_ = std::move(o.env_);
+ stats_ = std::move(o.stats_);
+ hist_type_ = std::move(o.hist_type_);
+ file_read_hist_ = std::move(o.file_read_hist_);
+ rate_limiter_ = std::move(o.rate_limiter_);
+ return *this;
+ }
+
+ RandomAccessFileReader(const RandomAccessFileReader&) = delete;
+ RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
+
+ Status Read(uint64_t offset, size_t n, Slice* result, char* scratch,
+ bool for_compaction = false) const;
+
+ Status MultiRead(FSReadRequest* reqs, size_t num_reqs) const;
+
+ Status Prefetch(uint64_t offset, size_t n) const {
+ return file_->Prefetch(offset, n, IOOptions(), nullptr);
+ }
+
+ FSRandomAccessFile* file() { return file_.get(); }
+
+ std::string file_name() const { return file_name_; }
+
+ bool use_direct_io() const { return file_->use_direct_io(); }
+};
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/read_write_util.cc b/src/rocksdb/file/read_write_util.cc
new file mode 100644
index 000000000..b4854e110
--- /dev/null
+++ b/src/rocksdb/file/read_write_util.cc
@@ -0,0 +1,67 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "file/read_write_util.h"
+
+#include <sstream>
+#include "test_util/sync_point.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+IOStatus NewWritableFile(FileSystem* fs, const std::string& fname,
+ std::unique_ptr<FSWritableFile>* result,
+ const FileOptions& options) {
+ IOStatus s = fs->NewWritableFile(fname, options, result, nullptr);
+ TEST_KILL_RANDOM("NewWritableFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
+ return s;
+}
+
+bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader,
+ std::string* output, bool* has_data, Status* result) {
+ const int kBufferSize = 8192;
+ char buffer[kBufferSize + 1];
+ Slice input_slice;
+
+ std::string line;
+ bool has_complete_line = false;
+ while (!has_complete_line) {
+ if (std::getline(*iss, line)) {
+ has_complete_line = !iss->eof();
+ } else {
+ has_complete_line = false;
+ }
+ if (!has_complete_line) {
+ // if we're not sure whether we have a complete line,
+ // further read from the file.
+ if (*has_data) {
+ *result = seq_file_reader->Read(kBufferSize, &input_slice, buffer);
+ }
+ if (input_slice.size() == 0) {
+ // meaning we have read all the data
+ *has_data = false;
+ break;
+ } else {
+ iss->str(line + input_slice.ToString());
+ // reset the internal state of iss so that we can keep reading it.
+ iss->clear();
+ *has_data = (input_slice.size() == kBufferSize);
+ continue;
+ }
+ }
+ }
+ *output = line;
+ return *has_data || has_complete_line;
+}
+
+#ifndef NDEBUG
+bool IsFileSectorAligned(const size_t off, size_t sector_size) {
+ return off % sector_size == 0;
+}
+#endif // NDEBUG
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/read_write_util.h b/src/rocksdb/file/read_write_util.h
new file mode 100644
index 000000000..22f4076b3
--- /dev/null
+++ b/src/rocksdb/file/read_write_util.h
@@ -0,0 +1,34 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <atomic>
+#include "file/sequence_file_reader.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+
+namespace ROCKSDB_NAMESPACE {
+// Returns a WritableFile.
+//
+// env : the Env.
+// fname : the file name.
+// result : output arg. A WritableFile based on `fname` returned.
+// options : the Env Options.
+extern IOStatus NewWritableFile(FileSystem* fs, const std::string& fname,
+ std::unique_ptr<FSWritableFile>* result,
+ const FileOptions& options);
+
+// Read a single line from a file.
+bool ReadOneLine(std::istringstream* iss, SequentialFileReader* seq_file_reader,
+ std::string* output, bool* has_data, Status* result);
+
+#ifndef NDEBUG
+bool IsFileSectorAligned(const size_t off, size_t sector_size);
+#endif // NDEBUG
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/readahead_raf.cc b/src/rocksdb/file/readahead_raf.cc
new file mode 100644
index 000000000..493f9d9e8
--- /dev/null
+++ b/src/rocksdb/file/readahead_raf.cc
@@ -0,0 +1,162 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "file/readahead_raf.h"
+
+#include <algorithm>
+#include <mutex>
+#include "file/read_write_util.h"
+#include "util/aligned_buffer.h"
+#include "util/rate_limiter.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace {
+class ReadaheadRandomAccessFile : public RandomAccessFile {
+ public:
+ ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
+ size_t readahead_size)
+ : file_(std::move(file)),
+ alignment_(file_->GetRequiredBufferAlignment()),
+ readahead_size_(Roundup(readahead_size, alignment_)),
+ buffer_(),
+ buffer_offset_(0) {
+ buffer_.Alignment(alignment_);
+ buffer_.AllocateNewBuffer(readahead_size_);
+ }
+
+ ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
+
+ ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) =
+ delete;
+
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override {
+ // Read-ahead only make sense if we have some slack left after reading
+ if (n + alignment_ >= readahead_size_) {
+ return file_->Read(offset, n, result, scratch);
+ }
+
+ std::unique_lock<std::mutex> lk(lock_);
+
+ size_t cached_len = 0;
+ // Check if there is a cache hit, meaning that [offset, offset + n) is
+ // either completely or partially in the buffer. If it's completely cached,
+ // including end of file case when offset + n is greater than EOF, then
+ // return.
+ if (TryReadFromCache(offset, n, &cached_len, scratch) &&
+ (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
+ // We read exactly what we needed, or we hit end of file - return.
+ *result = Slice(scratch, cached_len);
+ return Status::OK();
+ }
+ size_t advanced_offset = static_cast<size_t>(offset + cached_len);
+ // In the case of cache hit advanced_offset is already aligned, means that
+ // chunk_offset equals to advanced_offset
+ size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
+
+ Status s = ReadIntoBuffer(chunk_offset, readahead_size_);
+ if (s.ok()) {
+ // The data we need is now in cache, so we can safely read it
+ size_t remaining_len;
+ TryReadFromCache(advanced_offset, n - cached_len, &remaining_len,
+ scratch + cached_len);
+ *result = Slice(scratch, cached_len + remaining_len);
+ }
+ return s;
+ }
+
+ Status Prefetch(uint64_t offset, size_t n) override {
+ if (n < readahead_size_) {
+ // Don't allow smaller prefetches than the configured `readahead_size_`.
+ // `Read()` assumes a smaller prefetch buffer indicates EOF was reached.
+ return Status::OK();
+ }
+
+ std::unique_lock<std::mutex> lk(lock_);
+
+ size_t offset_ = static_cast<size_t>(offset);
+ size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
+ if (prefetch_offset == buffer_offset_) {
+ return Status::OK();
+ }
+ return ReadIntoBuffer(prefetch_offset,
+ Roundup(offset_ + n, alignment_) - prefetch_offset);
+ }
+
+ size_t GetUniqueId(char* id, size_t max_size) const override {
+ return file_->GetUniqueId(id, max_size);
+ }
+
+ void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
+
+ Status InvalidateCache(size_t offset, size_t length) override {
+ std::unique_lock<std::mutex> lk(lock_);
+ buffer_.Clear();
+ return file_->InvalidateCache(offset, length);
+ }
+
+ bool use_direct_io() const override { return file_->use_direct_io(); }
+
+ private:
+ // Tries to read from buffer_ n bytes starting at offset. If anything was read
+ // from the cache, it sets cached_len to the number of bytes actually read,
+ // copies these number of bytes to scratch and returns true.
+ // If nothing was read sets cached_len to 0 and returns false.
+ bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len,
+ char* scratch) const {
+ if (offset < buffer_offset_ ||
+ offset >= buffer_offset_ + buffer_.CurrentSize()) {
+ *cached_len = 0;
+ return false;
+ }
+ uint64_t offset_in_buffer = offset - buffer_offset_;
+ *cached_len = std::min(
+ buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
+ memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
+ return true;
+ }
+
+ // Reads into buffer_ the next n bytes from file_ starting at offset.
+ // Can actually read less if EOF was reached.
+ // Returns the status of the read operastion on the file.
+ Status ReadIntoBuffer(uint64_t offset, size_t n) const {
+ if (n > buffer_.Capacity()) {
+ n = buffer_.Capacity();
+ }
+ assert(IsFileSectorAligned(offset, alignment_));
+ assert(IsFileSectorAligned(n, alignment_));
+ Slice result;
+ Status s = file_->Read(offset, n, &result, buffer_.BufferStart());
+ if (s.ok()) {
+ buffer_offset_ = offset;
+ buffer_.Size(result.size());
+ assert(result.size() == 0 || buffer_.BufferStart() == result.data());
+ }
+ return s;
+ }
+
+ const std::unique_ptr<RandomAccessFile> file_;
+ const size_t alignment_;
+ const size_t readahead_size_;
+
+ mutable std::mutex lock_;
+ // The buffer storing the prefetched data
+ mutable AlignedBuffer buffer_;
+ // The offset in file_, corresponding to data stored in buffer_
+ mutable uint64_t buffer_offset_;
+};
+} // namespace
+
+std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
+ std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
+ std::unique_ptr<RandomAccessFile> result(
+ new ReadaheadRandomAccessFile(std::move(file), readahead_size));
+ return result;
+}
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/readahead_raf.h b/src/rocksdb/file/readahead_raf.h
new file mode 100644
index 000000000..cbdcb124f
--- /dev/null
+++ b/src/rocksdb/file/readahead_raf.h
@@ -0,0 +1,27 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <atomic>
+#include "rocksdb/env.h"
+
+namespace ROCKSDB_NAMESPACE {
+// This file provides the following main abstractions:
+// SequentialFileReader : wrapper over Env::SequentialFile
+// RandomAccessFileReader : wrapper over Env::RandomAccessFile
+// WritableFileWriter : wrapper over Env::WritableFile
+// In addition, it also exposed NewReadaheadRandomAccessFile, NewWritableFile,
+// and ReadOneLine primitives.
+
+// NewReadaheadRandomAccessFile provides a wrapper over RandomAccessFile to
+// always prefetch additional data with every read. This is mainly used in
+// Compaction Table Readers.
+std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
+ std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size);
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/sequence_file_reader.cc b/src/rocksdb/file/sequence_file_reader.cc
new file mode 100644
index 000000000..81c5e5d1d
--- /dev/null
+++ b/src/rocksdb/file/sequence_file_reader.cc
@@ -0,0 +1,237 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "file/sequence_file_reader.h"
+
+#include <algorithm>
+#include <mutex>
+
+#include "file/read_write_util.h"
+#include "monitoring/histogram.h"
+#include "monitoring/iostats_context_imp.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/aligned_buffer.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+
+namespace ROCKSDB_NAMESPACE {
+Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
+ Status s;
+ if (use_direct_io()) {
+#ifndef ROCKSDB_LITE
+ size_t offset = offset_.fetch_add(n);
+ size_t alignment = file_->GetRequiredBufferAlignment();
+ size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
+ size_t offset_advance = offset - aligned_offset;
+ size_t size = Roundup(offset + n, alignment) - aligned_offset;
+ size_t r = 0;
+ AlignedBuffer buf;
+ buf.Alignment(alignment);
+ buf.AllocateNewBuffer(size);
+ Slice tmp;
+ s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
+ buf.BufferStart(), nullptr);
+ if (s.ok() && offset_advance < tmp.size()) {
+ buf.Size(tmp.size());
+ r = buf.Read(scratch, offset_advance,
+ std::min(tmp.size() - offset_advance, n));
+ }
+ *result = Slice(scratch, r);
+#endif // !ROCKSDB_LITE
+ } else {
+ s = file_->Read(n, IOOptions(), result, scratch, nullptr);
+ }
+ IOSTATS_ADD(bytes_read, result->size());
+ return s;
+}
+
+Status SequentialFileReader::Skip(uint64_t n) {
+#ifndef ROCKSDB_LITE
+ if (use_direct_io()) {
+ offset_ += static_cast<size_t>(n);
+ return Status::OK();
+ }
+#endif // !ROCKSDB_LITE
+ return file_->Skip(n);
+}
+
+namespace {
+// This class wraps a SequentialFile, exposing same API, with the differenece
+// of being able to prefetch up to readahead_size bytes and then serve them
+// from memory, avoiding the entire round-trip if, for example, the data for the
+// file is actually remote.
+class ReadaheadSequentialFile : public FSSequentialFile {
+ public:
+ ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
+ size_t readahead_size)
+ : file_(std::move(file)),
+ alignment_(file_->GetRequiredBufferAlignment()),
+ readahead_size_(Roundup(readahead_size, alignment_)),
+ buffer_(),
+ buffer_offset_(0),
+ read_offset_(0) {
+ buffer_.Alignment(alignment_);
+ buffer_.AllocateNewBuffer(readahead_size_);
+ }
+
+ ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
+
+ ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
+
+ IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
+ IODebugContext* dbg) override {
+ std::unique_lock<std::mutex> lk(lock_);
+
+ size_t cached_len = 0;
+ // Check if there is a cache hit, meaning that [offset, offset + n) is
+ // either completely or partially in the buffer. If it's completely cached,
+ // including end of file case when offset + n is greater than EOF, then
+ // return.
+ if (TryReadFromCache(n, &cached_len, scratch) &&
+ (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
+ // We read exactly what we needed, or we hit end of file - return.
+ *result = Slice(scratch, cached_len);
+ return IOStatus::OK();
+ }
+ n -= cached_len;
+
+ IOStatus s;
+ // Read-ahead only make sense if we have some slack left after reading
+ if (n + alignment_ >= readahead_size_) {
+ s = file_->Read(n, opts, result, scratch + cached_len, dbg);
+ if (s.ok()) {
+ read_offset_ += result->size();
+ *result = Slice(scratch, cached_len + result->size());
+ }
+ buffer_.Clear();
+ return s;
+ }
+
+ s = ReadIntoBuffer(readahead_size_, opts, dbg);
+ if (s.ok()) {
+ // The data we need is now in cache, so we can safely read it
+ size_t remaining_len;
+ TryReadFromCache(n, &remaining_len, scratch + cached_len);
+ *result = Slice(scratch, cached_len + remaining_len);
+ }
+ return s;
+ }
+
+ IOStatus Skip(uint64_t n) override {
+ std::unique_lock<std::mutex> lk(lock_);
+ IOStatus s = IOStatus::OK();
+ // First check if we need to skip already cached data
+ if (buffer_.CurrentSize() > 0) {
+ // Do we need to skip beyond cached data?
+ if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
+ // Yes. Skip whaterver is in memory and adjust offset accordingly
+ n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
+ read_offset_ = buffer_offset_ + buffer_.CurrentSize();
+ } else {
+ // No. The entire section to be skipped is entirely i cache.
+ read_offset_ += n;
+ n = 0;
+ }
+ }
+ if (n > 0) {
+ // We still need to skip more, so call the file API for skipping
+ s = file_->Skip(n);
+ if (s.ok()) {
+ read_offset_ += n;
+ }
+ buffer_.Clear();
+ }
+ return s;
+ }
+
+ IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
+ Slice* result, char* scratch,
+ IODebugContext* dbg) override {
+ return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
+ }
+
+ IOStatus InvalidateCache(size_t offset, size_t length) override {
+ std::unique_lock<std::mutex> lk(lock_);
+ buffer_.Clear();
+ return file_->InvalidateCache(offset, length);
+ }
+
+ bool use_direct_io() const override { return file_->use_direct_io(); }
+
+ private:
+ // Tries to read from buffer_ n bytes. If anything was read from the cache, it
+ // sets cached_len to the number of bytes actually read, copies these number
+ // of bytes to scratch and returns true.
+ // If nothing was read sets cached_len to 0 and returns false.
+ bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
+ if (read_offset_ < buffer_offset_ ||
+ read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
+ *cached_len = 0;
+ return false;
+ }
+ uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
+ *cached_len = std::min(
+ buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
+ memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
+ read_offset_ += *cached_len;
+ return true;
+ }
+
+ // Reads into buffer_ the next n bytes from file_.
+ // Can actually read less if EOF was reached.
+ // Returns the status of the read operastion on the file.
+ IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
+ IODebugContext* dbg) {
+ if (n > buffer_.Capacity()) {
+ n = buffer_.Capacity();
+ }
+ assert(IsFileSectorAligned(n, alignment_));
+ Slice result;
+ IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
+ if (s.ok()) {
+ buffer_offset_ = read_offset_;
+ buffer_.Size(result.size());
+ assert(result.size() == 0 || buffer_.BufferStart() == result.data());
+ }
+ return s;
+ }
+
+ const std::unique_ptr<FSSequentialFile> file_;
+ const size_t alignment_;
+ const size_t readahead_size_;
+
+ std::mutex lock_;
+ // The buffer storing the prefetched data
+ AlignedBuffer buffer_;
+ // The offset in file_, corresponding to data stored in buffer_
+ uint64_t buffer_offset_;
+ // The offset up to which data was read from file_. In fact, it can be larger
+ // than the actual file size, since the file_->Skip(n) call doesn't return the
+ // actual number of bytes that were skipped, which can be less than n.
+ // This is not a problemm since read_offset_ is monotonically increasing and
+ // its only use is to figure out if next piece of data should be read from
+ // buffer_ or file_ directly.
+ uint64_t read_offset_;
+};
+} // namespace
+
+std::unique_ptr<FSSequentialFile>
+SequentialFileReader::NewReadaheadSequentialFile(
+ std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
+ if (file->GetRequiredBufferAlignment() >= readahead_size) {
+ // Short-circuit and return the original file if readahead_size is
+ // too small and hence doesn't make sense to be used for prefetching.
+ return std::move(file);
+ }
+ std::unique_ptr<FSSequentialFile> result(
+ new ReadaheadSequentialFile(std::move(file), readahead_size));
+ return result;
+}
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/sequence_file_reader.h b/src/rocksdb/file/sequence_file_reader.h
new file mode 100644
index 000000000..2f0898b42
--- /dev/null
+++ b/src/rocksdb/file/sequence_file_reader.h
@@ -0,0 +1,67 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <atomic>
+#include <string>
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// SequentialFileReader is a wrapper on top of Env::SequentialFile. It handles
+// Buffered (i.e when page cache is enabled) and Direct (with O_DIRECT / page
+// cache disabled) reads appropriately, and also updates the IO stats.
+class SequentialFileReader {
+ private:
+ std::unique_ptr<FSSequentialFile> file_;
+ std::string file_name_;
+ std::atomic<size_t> offset_{0}; // read offset
+
+ public:
+ explicit SequentialFileReader(std::unique_ptr<FSSequentialFile>&& _file,
+ const std::string& _file_name)
+ : file_(std::move(_file)), file_name_(_file_name) {}
+
+ explicit SequentialFileReader(std::unique_ptr<FSSequentialFile>&& _file,
+ const std::string& _file_name,
+ size_t _readahead_size)
+ : file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size)),
+ file_name_(_file_name) {}
+
+ SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
+ *this = std::move(o);
+ }
+
+ SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
+ file_ = std::move(o.file_);
+ return *this;
+ }
+
+ SequentialFileReader(const SequentialFileReader&) = delete;
+ SequentialFileReader& operator=(const SequentialFileReader&) = delete;
+
+ Status Read(size_t n, Slice* result, char* scratch);
+
+ Status Skip(uint64_t n);
+
+ FSSequentialFile* file() { return file_.get(); }
+
+ std::string file_name() { return file_name_; }
+
+ bool use_direct_io() const { return file_->use_direct_io(); }
+
+ private:
+ // NewReadaheadSequentialFile provides a wrapper over SequentialFile to
+ // always prefetch additional data with every read.
+ static std::unique_ptr<FSSequentialFile> NewReadaheadSequentialFile(
+ std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size);
+};
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/sst_file_manager_impl.cc b/src/rocksdb/file/sst_file_manager_impl.cc
new file mode 100644
index 000000000..35056429e
--- /dev/null
+++ b/src/rocksdb/file/sst_file_manager_impl.cc
@@ -0,0 +1,558 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "file/sst_file_manager_impl.h"
+
+#include <cinttypes>
+#include <vector>
+
+#include "db/db_impl/db_impl.h"
+#include "env/composite_env_wrapper.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/sst_file_manager.h"
+#include "test_util/sync_point.h"
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifndef ROCKSDB_LITE
+SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<FileSystem> fs,
+ std::shared_ptr<Logger> logger,
+ int64_t rate_bytes_per_sec,
+ double max_trash_db_ratio,
+ uint64_t bytes_max_delete_chunk)
+ : env_(env),
+ fs_(fs),
+ logger_(logger),
+ total_files_size_(0),
+ in_progress_files_size_(0),
+ compaction_buffer_size_(0),
+ cur_compactions_reserved_size_(0),
+ max_allowed_space_(0),
+ delete_scheduler_(env, fs_.get(), rate_bytes_per_sec, logger.get(), this,
+ max_trash_db_ratio, bytes_max_delete_chunk),
+ cv_(&mu_),
+ closing_(false),
+ bg_thread_(nullptr),
+ reserved_disk_buffer_(0),
+ free_space_trigger_(0),
+ cur_instance_(nullptr) {}
+
+SstFileManagerImpl::~SstFileManagerImpl() {
+ Close();
+}
+
+void SstFileManagerImpl::Close() {
+ {
+ MutexLock l(&mu_);
+ if (closing_) {
+ return;
+ }
+ closing_ = true;
+ cv_.SignalAll();
+ }
+ if (bg_thread_) {
+ bg_thread_->join();
+ }
+}
+
+Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
+ bool compaction) {
+ uint64_t file_size;
+ Status s = fs_->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
+ if (s.ok()) {
+ MutexLock l(&mu_);
+ OnAddFileImpl(file_path, file_size, compaction);
+ }
+ TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
+ return s;
+}
+
+Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
+ uint64_t file_size, bool compaction) {
+ MutexLock l(&mu_);
+ OnAddFileImpl(file_path, file_size, compaction);
+ TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
+ return Status::OK();
+}
+
+Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) {
+ {
+ MutexLock l(&mu_);
+ OnDeleteFileImpl(file_path);
+ }
+ TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
+ return Status::OK();
+}
+
+void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
+ MutexLock l(&mu_);
+ uint64_t size_added_by_compaction = 0;
+ for (size_t i = 0; i < c->num_input_levels(); i++) {
+ for (size_t j = 0; j < c->num_input_files(i); j++) {
+ FileMetaData* filemeta = c->input(i, j);
+ size_added_by_compaction += filemeta->fd.GetFileSize();
+ }
+ }
+ cur_compactions_reserved_size_ -= size_added_by_compaction;
+
+ auto new_files = c->edit()->GetNewFiles();
+ for (auto& new_file : new_files) {
+ auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
+ new_file.second.fd.GetNumber(),
+ new_file.second.fd.GetPathId());
+ if (in_progress_files_.find(fn) != in_progress_files_.end()) {
+ auto tracked_file = tracked_files_.find(fn);
+ assert(tracked_file != tracked_files_.end());
+ in_progress_files_size_ -= tracked_file->second;
+ in_progress_files_.erase(fn);
+ }
+ }
+}
+
+Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
+ const std::string& new_path,
+ uint64_t* file_size) {
+ {
+ MutexLock l(&mu_);
+ if (file_size != nullptr) {
+ *file_size = tracked_files_[old_path];
+ }
+ OnAddFileImpl(new_path, tracked_files_[old_path], false);
+ OnDeleteFileImpl(old_path);
+ }
+ TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
+ return Status::OK();
+}
+
+void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) {
+ MutexLock l(&mu_);
+ max_allowed_space_ = max_allowed_space;
+}
+
+void SstFileManagerImpl::SetCompactionBufferSize(
+ uint64_t compaction_buffer_size) {
+ MutexLock l(&mu_);
+ compaction_buffer_size_ = compaction_buffer_size;
+}
+
+bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
+ MutexLock l(&mu_);
+ if (max_allowed_space_ <= 0) {
+ return false;
+ }
+ return total_files_size_ >= max_allowed_space_;
+}
+
+bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
+ MutexLock l(&mu_);
+ if (max_allowed_space_ <= 0) {
+ return false;
+ }
+ return total_files_size_ + cur_compactions_reserved_size_ >=
+ max_allowed_space_;
+}
+
+bool SstFileManagerImpl::EnoughRoomForCompaction(
+ ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
+ Status bg_error) {
+ MutexLock l(&mu_);
+ uint64_t size_added_by_compaction = 0;
+ // First check if we even have the space to do the compaction
+ for (size_t i = 0; i < inputs.size(); i++) {
+ for (size_t j = 0; j < inputs[i].size(); j++) {
+ FileMetaData* filemeta = inputs[i][j];
+ size_added_by_compaction += filemeta->fd.GetFileSize();
+ }
+ }
+
+ // Update cur_compactions_reserved_size_ so concurrent compaction
+ // don't max out space
+ size_t needed_headroom =
+ cur_compactions_reserved_size_ + size_added_by_compaction +
+ compaction_buffer_size_;
+ if (max_allowed_space_ != 0 &&
+ (needed_headroom + total_files_size_ > max_allowed_space_)) {
+ return false;
+ }
+
+ // Implement more aggressive checks only if this DB instance has already
+ // seen a NoSpace() error. This is tin order to contain a single potentially
+ // misbehaving DB instance and prevent it from slowing down compactions of
+ // other DB instances
+ if (CheckFreeSpace() && bg_error == Status::NoSpace()) {
+ auto fn =
+ TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(),
+ inputs[0][0]->fd.GetPathId());
+ uint64_t free_space = 0;
+ fs_->GetFreeSpace(fn, IOOptions(), &free_space, nullptr);
+ // needed_headroom is based on current size reserved by compactions,
+ // minus any files created by running compactions as they would count
+ // against the reserved size. If user didn't specify any compaction
+ // buffer, add reserved_disk_buffer_ that's calculated by default so the
+ // compaction doesn't end up leaving nothing for logs and flush SSTs
+ if (compaction_buffer_size_ == 0) {
+ needed_headroom += reserved_disk_buffer_;
+ }
+ needed_headroom -= in_progress_files_size_;
+ if (free_space < needed_headroom + size_added_by_compaction) {
+ // We hit the condition of not enough disk space
+ ROCKS_LOG_ERROR(logger_,
+ "free space [%" PRIu64
+ " bytes] is less than "
+ "needed headroom [%" ROCKSDB_PRIszt " bytes]\n",
+ free_space, needed_headroom);
+ return false;
+ }
+ }
+
+ cur_compactions_reserved_size_ += size_added_by_compaction;
+ // Take a snapshot of cur_compactions_reserved_size_ for when we encounter
+ // a NoSpace error.
+ free_space_trigger_ = cur_compactions_reserved_size_;
+ return true;
+}
+
+uint64_t SstFileManagerImpl::GetCompactionsReservedSize() {
+ MutexLock l(&mu_);
+ return cur_compactions_reserved_size_;
+}
+
+uint64_t SstFileManagerImpl::GetTotalSize() {
+ MutexLock l(&mu_);
+ return total_files_size_;
+}
+
+std::unordered_map<std::string, uint64_t>
+SstFileManagerImpl::GetTrackedFiles() {
+ MutexLock l(&mu_);
+ return tracked_files_;
+}
+
+int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
+ return delete_scheduler_.GetRateBytesPerSecond();
+}
+
+void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) {
+ return delete_scheduler_.SetRateBytesPerSecond(delete_rate);
+}
+
+double SstFileManagerImpl::GetMaxTrashDBRatio() {
+ return delete_scheduler_.GetMaxTrashDBRatio();
+}
+
+void SstFileManagerImpl::SetMaxTrashDBRatio(double r) {
+ return delete_scheduler_.SetMaxTrashDBRatio(r);
+}
+
+uint64_t SstFileManagerImpl::GetTotalTrashSize() {
+ return delete_scheduler_.GetTotalTrashSize();
+}
+
+void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size,
+ const std::string& path) {
+ MutexLock l(&mu_);
+
+ reserved_disk_buffer_ += size;
+ if (path_.empty()) {
+ path_ = path;
+ }
+}
+
+void SstFileManagerImpl::ClearError() {
+ while (true) {
+ MutexLock l(&mu_);
+
+ if (closing_) {
+ return;
+ }
+
+ uint64_t free_space = 0;
+ Status s = fs_->GetFreeSpace(path_, IOOptions(), &free_space, nullptr);
+ free_space = max_allowed_space_ > 0
+ ? std::min(max_allowed_space_, free_space)
+ : free_space;
+ if (s.ok()) {
+ // In case of multi-DB instances, some of them may have experienced a
+ // soft error and some a hard error. In the SstFileManagerImpl, a hard
+ // error will basically override previously reported soft errors. Once
+ // we clear the hard error, we don't keep track of previous errors for
+ // now
+ if (bg_err_.severity() == Status::Severity::kHardError) {
+ if (free_space < reserved_disk_buffer_) {
+ ROCKS_LOG_ERROR(logger_,
+ "free space [%" PRIu64
+ " bytes] is less than "
+ "required disk buffer [%" PRIu64 " bytes]\n",
+ free_space, reserved_disk_buffer_);
+ ROCKS_LOG_ERROR(logger_, "Cannot clear hard error\n");
+ s = Status::NoSpace();
+ }
+ } else if (bg_err_.severity() == Status::Severity::kSoftError) {
+ if (free_space < free_space_trigger_) {
+ ROCKS_LOG_WARN(logger_,
+ "free space [%" PRIu64
+ " bytes] is less than "
+ "free space for compaction trigger [%" PRIu64
+ " bytes]\n",
+ free_space, free_space_trigger_);
+ ROCKS_LOG_WARN(logger_, "Cannot clear soft error\n");
+ s = Status::NoSpace();
+ }
+ }
+ }
+
+ // Someone could have called CancelErrorRecovery() and the list could have
+ // become empty, so check again here
+ if (s.ok() && !error_handler_list_.empty()) {
+ auto error_handler = error_handler_list_.front();
+ // Since we will release the mutex, set cur_instance_ to signal to the
+ // shutdown thread, if it calls // CancelErrorRecovery() the meantime,
+ // to indicate that this DB instance is busy. The DB instance is
+ // guaranteed to not be deleted before RecoverFromBGError() returns,
+ // since the ErrorHandler::recovery_in_prog_ flag would be true
+ cur_instance_ = error_handler;
+ mu_.Unlock();
+ s = error_handler->RecoverFromBGError();
+ TEST_SYNC_POINT("SstFileManagerImpl::ErrorCleared");
+ mu_.Lock();
+ // The DB instance might have been deleted while we were
+ // waiting for the mutex, so check cur_instance_ to make sure its
+ // still non-null
+ if (cur_instance_) {
+ // Check for error again, since the instance may have recovered but
+ // immediately got another error. If that's the case, and the new
+ // error is also a NoSpace() non-fatal error, leave the instance in
+ // the list
+ Status err = cur_instance_->GetBGError();
+ if (s.ok() && err == Status::NoSpace() &&
+ err.severity() < Status::Severity::kFatalError) {
+ s = err;
+ }
+ cur_instance_ = nullptr;
+ }
+
+ if (s.ok() || s.IsShutdownInProgress() ||
+ (!s.ok() && s.severity() >= Status::Severity::kFatalError)) {
+ // If shutdown is in progress, abandon this handler instance
+ // and continue with the others
+ error_handler_list_.pop_front();
+ }
+ }
+
+ if (!error_handler_list_.empty()) {
+ // If there are more instances to be recovered, reschedule after 5
+ // seconds
+ int64_t wait_until = env_->NowMicros() + 5000000;
+ cv_.TimedWait(wait_until);
+ }
+
+ // Check again for error_handler_list_ empty, as a DB instance shutdown
+ // could have removed it from the queue while we were in timed wait
+ if (error_handler_list_.empty()) {
+ ROCKS_LOG_INFO(logger_, "Clearing error\n");
+ bg_err_ = Status::OK();
+ return;
+ }
+ }
+}
+
+void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler,
+ Status bg_error) {
+ MutexLock l(&mu_);
+ if (bg_error.severity() == Status::Severity::kSoftError) {
+ if (bg_err_.ok()) {
+ // Setting bg_err_ basically means we're in degraded mode
+ // Assume that all pending compactions will fail similarly. The trigger
+ // for clearing this condition is set to current compaction reserved
+ // size, so we stop checking disk space available in
+ // EnoughRoomForCompaction once this much free space is available
+ bg_err_ = bg_error;
+ }
+ } else if (bg_error.severity() == Status::Severity::kHardError) {
+ bg_err_ = bg_error;
+ } else {
+ assert(false);
+ }
+
+ // If this is the first instance of this error, kick of a thread to poll
+ // and recover from this condition
+ if (error_handler_list_.empty()) {
+ error_handler_list_.push_back(handler);
+ // Release lock before calling join. Its ok to do so because
+ // error_handler_list_ is now non-empty, so no other invocation of this
+ // function will execute this piece of code
+ mu_.Unlock();
+ if (bg_thread_) {
+ bg_thread_->join();
+ }
+ // Start a new thread. The previous one would have exited.
+ bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this));
+ mu_.Lock();
+ } else {
+ // Check if this DB instance is already in the list
+ for (auto iter = error_handler_list_.begin();
+ iter != error_handler_list_.end(); ++iter) {
+ if ((*iter) == handler) {
+ return;
+ }
+ }
+ error_handler_list_.push_back(handler);
+ }
+}
+
+bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) {
+ MutexLock l(&mu_);
+
+ if (cur_instance_ == handler) {
+ // This instance is currently busy attempting to recover
+ // Nullify it so the recovery thread doesn't attempt to access it again
+ cur_instance_ = nullptr;
+ return false;
+ }
+
+ for (auto iter = error_handler_list_.begin();
+ iter != error_handler_list_.end(); ++iter) {
+ if ((*iter) == handler) {
+ error_handler_list_.erase(iter);
+ return true;
+ }
+ }
+ return false;
+}
+
+Status SstFileManagerImpl::ScheduleFileDeletion(
+ const std::string& file_path, const std::string& path_to_sync,
+ const bool force_bg) {
+ TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
+ const_cast<std::string*>(&file_path));
+ return delete_scheduler_.DeleteFile(file_path, path_to_sync,
+ force_bg);
+}
+
+void SstFileManagerImpl::WaitForEmptyTrash() {
+ delete_scheduler_.WaitForEmptyTrash();
+}
+
+void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
+ uint64_t file_size, bool compaction) {
+ auto tracked_file = tracked_files_.find(file_path);
+ if (tracked_file != tracked_files_.end()) {
+ // File was added before, we will just update the size
+ assert(!compaction);
+ total_files_size_ -= tracked_file->second;
+ total_files_size_ += file_size;
+ cur_compactions_reserved_size_ -= file_size;
+ } else {
+ total_files_size_ += file_size;
+ if (compaction) {
+ // Keep track of the size of files created by in-progress compactions.
+ // When calculating whether there's enough headroom for new compactions,
+ // this will be subtracted from cur_compactions_reserved_size_.
+ // Otherwise, compactions will be double counted.
+ in_progress_files_size_ += file_size;
+ in_progress_files_.insert(file_path);
+ }
+ }
+ tracked_files_[file_path] = file_size;
+}
+
+void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) {
+ auto tracked_file = tracked_files_.find(file_path);
+ if (tracked_file == tracked_files_.end()) {
+ // File is not tracked
+ assert(in_progress_files_.find(file_path) == in_progress_files_.end());
+ return;
+ }
+
+ total_files_size_ -= tracked_file->second;
+ // Check if it belonged to an in-progress compaction
+ if (in_progress_files_.find(file_path) != in_progress_files_.end()) {
+ in_progress_files_size_ -= tracked_file->second;
+ in_progress_files_.erase(file_path);
+ }
+ tracked_files_.erase(tracked_file);
+}
+
+SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
+ std::string trash_dir,
+ int64_t rate_bytes_per_sec,
+ bool delete_existing_trash, Status* status,
+ double max_trash_db_ratio,
+ uint64_t bytes_max_delete_chunk) {
+ std::shared_ptr<FileSystem> fs;
+
+ if (env == Env::Default()) {
+ fs = FileSystem::Default();
+ } else {
+ fs.reset(new LegacyFileSystemWrapper(env));
+ }
+
+ return NewSstFileManager(env, fs, info_log, trash_dir, rate_bytes_per_sec,
+ delete_existing_trash, status, max_trash_db_ratio,
+ bytes_max_delete_chunk);
+}
+
+SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<FileSystem> fs,
+ std::shared_ptr<Logger> info_log,
+ const std::string& trash_dir,
+ int64_t rate_bytes_per_sec,
+ bool delete_existing_trash, Status* status,
+ double max_trash_db_ratio,
+ uint64_t bytes_max_delete_chunk) {
+ SstFileManagerImpl* res =
+ new SstFileManagerImpl(env, fs, info_log, rate_bytes_per_sec,
+ max_trash_db_ratio, bytes_max_delete_chunk);
+
+ // trash_dir is deprecated and not needed anymore, but if user passed it
+ // we will still remove files in it.
+ Status s;
+ if (delete_existing_trash && trash_dir != "") {
+ std::vector<std::string> files_in_trash;
+ s = fs->GetChildren(trash_dir, IOOptions(), &files_in_trash, nullptr);
+ if (s.ok()) {
+ for (const std::string& trash_file : files_in_trash) {
+ if (trash_file == "." || trash_file == "..") {
+ continue;
+ }
+
+ std::string path_in_trash = trash_dir + "/" + trash_file;
+ res->OnAddFile(path_in_trash);
+ Status file_delete =
+ res->ScheduleFileDeletion(path_in_trash, trash_dir);
+ if (s.ok() && !file_delete.ok()) {
+ s = file_delete;
+ }
+ }
+ }
+ }
+
+ if (status) {
+ *status = s;
+ }
+
+ return res;
+}
+
+#else
+
+SstFileManager* NewSstFileManager(Env* /*env*/,
+ std::shared_ptr<Logger> /*info_log*/,
+ std::string /*trash_dir*/,
+ int64_t /*rate_bytes_per_sec*/,
+ bool /*delete_existing_trash*/,
+ Status* status, double /*max_trash_db_ratio*/,
+ uint64_t /*bytes_max_delete_chunk*/) {
+ if (status) {
+ *status =
+ Status::NotSupported("SstFileManager is not supported in ROCKSDB_LITE");
+ }
+ return nullptr;
+}
+
+#endif // ROCKSDB_LITE
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/sst_file_manager_impl.h b/src/rocksdb/file/sst_file_manager_impl.h
new file mode 100644
index 000000000..323ffc7b2
--- /dev/null
+++ b/src/rocksdb/file/sst_file_manager_impl.h
@@ -0,0 +1,197 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <string>
+
+#include "port/port.h"
+
+#include "db/compaction/compaction.h"
+#include "db/error_handler.h"
+#include "file/delete_scheduler.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/sst_file_manager.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class Env;
+class Logger;
+
+// SstFileManager is used to track SST files in the DB and control there
+// deletion rate.
+// All SstFileManager public functions are thread-safe.
+class SstFileManagerImpl : public SstFileManager {
+ public:
+ explicit SstFileManagerImpl(Env* env, std::shared_ptr<FileSystem> fs,
+ std::shared_ptr<Logger> logger,
+ int64_t rate_bytes_per_sec,
+ double max_trash_db_ratio,
+ uint64_t bytes_max_delete_chunk);
+
+ ~SstFileManagerImpl();
+
+ // DB will call OnAddFile whenever a new sst file is added.
+ Status OnAddFile(const std::string& file_path, bool compaction = false);
+
+ // Overload where size of the file is provided by the caller rather than
+ // queried from the filesystem. This is an optimization.
+ Status OnAddFile(const std::string& file_path, uint64_t file_size,
+ bool compaction);
+
+ // DB will call OnDeleteFile whenever an sst file is deleted.
+ Status OnDeleteFile(const std::string& file_path);
+
+ // DB will call OnMoveFile whenever an sst file is move to a new path.
+ Status OnMoveFile(const std::string& old_path, const std::string& new_path,
+ uint64_t* file_size = nullptr);
+
+ // Update the maximum allowed space that should be used by RocksDB, if
+ // the total size of the SST files exceeds max_allowed_space, writes to
+ // RocksDB will fail.
+ //
+ // Setting max_allowed_space to 0 will disable this feature, maximum allowed
+ // space will be infinite (Default value).
+ //
+ // thread-safe.
+ void SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) override;
+
+ void SetCompactionBufferSize(uint64_t compaction_buffer_size) override;
+
+ // Return true if the total size of SST files exceeded the maximum allowed
+ // space usage.
+ //
+ // thread-safe.
+ bool IsMaxAllowedSpaceReached() override;
+
+ bool IsMaxAllowedSpaceReachedIncludingCompactions() override;
+
+ // Returns true is there is enough (approximate) space for the specified
+ // compaction. Space is approximate because this function conservatively
+ // estimates how much space is currently being used by compactions (i.e.
+ // if a compaction has started, this function bumps the used space by
+ // the full compaction size).
+ bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
+ const std::vector<CompactionInputFiles>& inputs,
+ Status bg_error);
+
+ // Bookkeeping so total_file_sizes_ goes back to normal after compaction
+ // finishes
+ void OnCompactionCompletion(Compaction* c);
+
+ uint64_t GetCompactionsReservedSize();
+
+ // Return the total size of all tracked files.
+ uint64_t GetTotalSize() override;
+
+ // Return a map containing all tracked files and there corresponding sizes.
+ std::unordered_map<std::string, uint64_t> GetTrackedFiles() override;
+
+ // Return delete rate limit in bytes per second.
+ virtual int64_t GetDeleteRateBytesPerSecond() override;
+
+ // Update the delete rate limit in bytes per second.
+ virtual void SetDeleteRateBytesPerSecond(int64_t delete_rate) override;
+
+ // Return trash/DB size ratio where new files will be deleted immediately
+ virtual double GetMaxTrashDBRatio() override;
+
+ // Update trash/DB size ratio where new files will be deleted immediately
+ virtual void SetMaxTrashDBRatio(double ratio) override;
+
+ // Return the total size of trash files
+ uint64_t GetTotalTrashSize() override;
+
+ // Called by each DB instance using this sst file manager to reserve
+ // disk buffer space for recovery from out of space errors
+ void ReserveDiskBuffer(uint64_t buffer, const std::string& path);
+
+ // Set a flag upon encountering disk full. May enqueue the ErrorHandler
+ // instance for background polling and recovery
+ void StartErrorRecovery(ErrorHandler* db, Status bg_error);
+
+ // Remove the given Errorhandler instance from the recovery queue. Its
+ // not guaranteed
+ bool CancelErrorRecovery(ErrorHandler* db);
+
+ // Mark file as trash and schedule it's deletion. If force_bg is set, it
+ // forces the file to be deleting in the background regardless of DB size,
+ // except when rate limited delete is disabled
+ virtual Status ScheduleFileDeletion(const std::string& file_path,
+ const std::string& dir_to_sync,
+ const bool force_bg = false);
+
+ // Wait for all files being deleteing in the background to finish or for
+ // destructor to be called.
+ virtual void WaitForEmptyTrash();
+
+ DeleteScheduler* delete_scheduler() { return &delete_scheduler_; }
+
+ // Stop the error recovery background thread. This should be called only
+ // once in the object's lifetime, and before the destructor
+ void Close();
+
+ private:
+ // REQUIRES: mutex locked
+ void OnAddFileImpl(const std::string& file_path, uint64_t file_size,
+ bool compaction);
+ // REQUIRES: mutex locked
+ void OnDeleteFileImpl(const std::string& file_path);
+
+ void ClearError();
+ bool CheckFreeSpace() {
+ return bg_err_.severity() == Status::Severity::kSoftError;
+ }
+
+ Env* env_;
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<Logger> logger_;
+ // Mutex to protect tracked_files_, total_files_size_
+ port::Mutex mu_;
+ // The summation of the sizes of all files in tracked_files_ map
+ uint64_t total_files_size_;
+ // The summation of all output files of in-progress compactions
+ uint64_t in_progress_files_size_;
+ // Compactions should only execute if they can leave at least
+ // this amount of buffer space for logs and flushes
+ uint64_t compaction_buffer_size_;
+ // Estimated size of the current ongoing compactions
+ uint64_t cur_compactions_reserved_size_;
+ // A map containing all tracked files and there sizes
+ // file_path => file_size
+ std::unordered_map<std::string, uint64_t> tracked_files_;
+ // A set of files belonging to in-progress compactions
+ std::unordered_set<std::string> in_progress_files_;
+ // The maximum allowed space (in bytes) for sst files.
+ uint64_t max_allowed_space_;
+ // DeleteScheduler used to throttle file deletition.
+ DeleteScheduler delete_scheduler_;
+ port::CondVar cv_;
+ // Flag to force error recovery thread to exit
+ bool closing_;
+ // Background error recovery thread
+ std::unique_ptr<port::Thread> bg_thread_;
+ // A path in the filesystem corresponding to this SFM. This is used for
+ // calling Env::GetFreeSpace. Posix requires a path in the filesystem
+ std::string path_;
+ // Save the current background error
+ Status bg_err_;
+ // Amount of free disk headroom before allowing recovery from hard errors
+ uint64_t reserved_disk_buffer_;
+ // For soft errors, amount of free disk space before we can allow
+ // compactions to run full throttle. If disk space is below this trigger,
+ // compactions will be gated by free disk space > input size
+ uint64_t free_space_trigger_;
+ // List of database error handler instances tracked by this sst file manager
+ std::list<ErrorHandler*> error_handler_list_;
+ // Pointer to ErrorHandler instance that is currently processing recovery
+ ErrorHandler* cur_instance_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/file/writable_file_writer.cc b/src/rocksdb/file/writable_file_writer.cc
new file mode 100644
index 000000000..d5894c17a
--- /dev/null
+++ b/src/rocksdb/file/writable_file_writer.cc
@@ -0,0 +1,429 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "file/writable_file_writer.h"
+
+#include <algorithm>
+#include <mutex>
+
+#include "db/version_edit.h"
+#include "monitoring/histogram.h"
+#include "monitoring/iostats_context_imp.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+
+namespace ROCKSDB_NAMESPACE {
+Status WritableFileWriter::Append(const Slice& data) {
+ const char* src = data.data();
+ size_t left = data.size();
+ Status s;
+ pending_sync_ = true;
+
+ TEST_KILL_RANDOM("WritableFileWriter::Append:0",
+ rocksdb_kill_odds * REDUCE_ODDS2);
+
+ {
+ IOSTATS_TIMER_GUARD(prepare_write_nanos);
+ TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
+ writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
+ IOOptions(), nullptr);
+ }
+
+ // See whether we need to enlarge the buffer to avoid the flush
+ if (buf_.Capacity() - buf_.CurrentSize() < left) {
+ for (size_t cap = buf_.Capacity();
+ cap < max_buffer_size_; // There is still room to increase
+ cap *= 2) {
+ // See whether the next available size is large enough.
+ // Buffer will never be increased to more than max_buffer_size_.
+ size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
+ if (desired_capacity - buf_.CurrentSize() >= left ||
+ (use_direct_io() && desired_capacity == max_buffer_size_)) {
+ buf_.AllocateNewBuffer(desired_capacity, true);
+ break;
+ }
+ }
+ }
+
+ // Flush only when buffered I/O
+ if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
+ if (buf_.CurrentSize() > 0) {
+ s = Flush();
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ assert(buf_.CurrentSize() == 0);
+ }
+
+ // We never write directly to disk with direct I/O on.
+ // or we simply use it for its original purpose to accumulate many small
+ // chunks
+ if (use_direct_io() || (buf_.Capacity() >= left)) {
+ while (left > 0) {
+ size_t appended = buf_.Append(src, left);
+ left -= appended;
+ src += appended;
+
+ if (left > 0) {
+ s = Flush();
+ if (!s.ok()) {
+ break;
+ }
+ }
+ }
+ } else {
+ // Writing directly to file bypassing the buffer
+ assert(buf_.CurrentSize() == 0);
+ s = WriteBuffered(src, left);
+ }
+
+ TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
+ if (s.ok()) {
+ filesize_ += data.size();
+ CalculateFileChecksum(data);
+ }
+ return s;
+}
+
+Status WritableFileWriter::Pad(const size_t pad_bytes) {
+ assert(pad_bytes < kDefaultPageSize);
+ size_t left = pad_bytes;
+ size_t cap = buf_.Capacity() - buf_.CurrentSize();
+
+ // Assume pad_bytes is small compared to buf_ capacity. So we always
+ // use buf_ rather than write directly to file in certain cases like
+ // Append() does.
+ while (left) {
+ size_t append_bytes = std::min(cap, left);
+ buf_.PadWith(append_bytes, 0);
+ left -= append_bytes;
+ if (left > 0) {
+ Status s = Flush();
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ cap = buf_.Capacity() - buf_.CurrentSize();
+ }
+ pending_sync_ = true;
+ filesize_ += pad_bytes;
+ return Status::OK();
+}
+
+Status WritableFileWriter::Close() {
+ // Do not quit immediately on failure the file MUST be closed
+ Status s;
+
+ // Possible to close it twice now as we MUST close
+ // in __dtor, simply flushing is not enough
+ // Windows when pre-allocating does not fill with zeros
+ // also with unbuffered access we also set the end of data.
+ if (!writable_file_) {
+ return s;
+ }
+
+ s = Flush(); // flush cache to OS
+
+ Status interim;
+ // In direct I/O mode we write whole pages so
+ // we need to let the file know where data ends.
+ if (use_direct_io()) {
+ interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
+ if (interim.ok()) {
+ interim = writable_file_->Fsync(IOOptions(), nullptr);
+ }
+ if (!interim.ok() && s.ok()) {
+ s = interim;
+ }
+ }
+
+ TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
+ interim = writable_file_->Close(IOOptions(), nullptr);
+ if (!interim.ok() && s.ok()) {
+ s = interim;
+ }
+
+ writable_file_.reset();
+ TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
+
+ return s;
+}
+
+// write out the cached data to the OS cache or storage if direct I/O
+// enabled
+Status WritableFileWriter::Flush() {
+ Status s;
+ TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
+ rocksdb_kill_odds * REDUCE_ODDS2);
+
+ if (buf_.CurrentSize() > 0) {
+ if (use_direct_io()) {
+#ifndef ROCKSDB_LITE
+ if (pending_sync_) {
+ s = WriteDirect();
+ }
+#endif // !ROCKSDB_LITE
+ } else {
+ s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
+ }
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ s = writable_file_->Flush(IOOptions(), nullptr);
+
+ if (!s.ok()) {
+ return s;
+ }
+
+ // sync OS cache to disk for every bytes_per_sync_
+ // TODO: give log file and sst file different options (log
+ // files could be potentially cached in OS for their whole
+ // life time, thus we might not want to flush at all).
+
+ // We try to avoid sync to the last 1MB of data. For two reasons:
+ // (1) avoid rewrite the same page that is modified later.
+ // (2) for older version of OS, write can block while writing out
+ // the page.
+ // Xfs does neighbor page flushing outside of the specified ranges. We
+ // need to make sure sync range is far from the write offset.
+ if (!use_direct_io() && bytes_per_sync_) {
+ const uint64_t kBytesNotSyncRange =
+ 1024 * 1024; // recent 1MB is not synced.
+ const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
+ if (filesize_ > kBytesNotSyncRange) {
+ uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
+ offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
+ assert(offset_sync_to >= last_sync_size_);
+ if (offset_sync_to > 0 &&
+ offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
+ s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
+ last_sync_size_ = offset_sync_to;
+ }
+ }
+ }
+
+ return s;
+}
+
+const char* WritableFileWriter::GetFileChecksumFuncName() const {
+ if (checksum_func_ != nullptr) {
+ return checksum_func_->Name();
+ } else {
+ return kUnknownFileChecksumFuncName.c_str();
+ }
+}
+
+Status WritableFileWriter::Sync(bool use_fsync) {
+ Status s = Flush();
+ if (!s.ok()) {
+ return s;
+ }
+ TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
+ if (!use_direct_io() && pending_sync_) {
+ s = SyncInternal(use_fsync);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
+ pending_sync_ = false;
+ return Status::OK();
+}
+
+Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
+ if (!writable_file_->IsSyncThreadSafe()) {
+ return Status::NotSupported(
+ "Can't WritableFileWriter::SyncWithoutFlush() because "
+ "WritableFile::IsSyncThreadSafe() is false");
+ }
+ TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
+ Status s = SyncInternal(use_fsync);
+ TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
+ return s;
+}
+
+Status WritableFileWriter::SyncInternal(bool use_fsync) {
+ Status s;
+ IOSTATS_TIMER_GUARD(fsync_nanos);
+ TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
+ auto prev_perf_level = GetPerfLevel();
+ IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
+ if (use_fsync) {
+ s = writable_file_->Fsync(IOOptions(), nullptr);
+ } else {
+ s = writable_file_->Sync(IOOptions(), nullptr);
+ }
+ SetPerfLevel(prev_perf_level);
+ return s;
+}
+
+Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
+ IOSTATS_TIMER_GUARD(range_sync_nanos);
+ TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
+ return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
+}
+
+// This method writes to disk the specified data and makes use of the rate
+// limiter if available
+Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
+ Status s;
+ assert(!use_direct_io());
+ const char* src = data;
+ size_t left = size;
+
+ while (left > 0) {
+ size_t allowed;
+ if (rate_limiter_ != nullptr) {
+ allowed = rate_limiter_->RequestToken(
+ left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
+ RateLimiter::OpType::kWrite);
+ } else {
+ allowed = left;
+ }
+
+ {
+ IOSTATS_TIMER_GUARD(write_nanos);
+ TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
+
+#ifndef ROCKSDB_LITE
+ FileOperationInfo::TimePoint start_ts;
+ uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
+ if (ShouldNotifyListeners()) {
+ start_ts = std::chrono::system_clock::now();
+ old_size = next_write_offset_;
+ }
+#endif
+ {
+ auto prev_perf_level = GetPerfLevel();
+ IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
+ s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
+ SetPerfLevel(prev_perf_level);
+ }
+#ifndef ROCKSDB_LITE
+ if (ShouldNotifyListeners()) {
+ auto finish_ts = std::chrono::system_clock::now();
+ NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
+ }
+#endif
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ IOSTATS_ADD(bytes_written, allowed);
+ TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
+
+ left -= allowed;
+ src += allowed;
+ }
+ buf_.Size(0);
+ return s;
+}
+
+void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
+ if (checksum_func_ != nullptr) {
+ if (is_first_checksum_) {
+ file_checksum_ = checksum_func_->Value(data.data(), data.size());
+ is_first_checksum_ = false;
+ } else {
+ file_checksum_ =
+ checksum_func_->Extend(file_checksum_, data.data(), data.size());
+ }
+ }
+}
+
+// This flushes the accumulated data in the buffer. We pad data with zeros if
+// necessary to the whole page.
+// However, during automatic flushes padding would not be necessary.
+// We always use RateLimiter if available. We move (Refit) any buffer bytes
+// that are left over the
+// whole number of pages to be written again on the next flush because we can
+// only write on aligned
+// offsets.
+#ifndef ROCKSDB_LITE
+Status WritableFileWriter::WriteDirect() {
+ assert(use_direct_io());
+ Status s;
+ const size_t alignment = buf_.Alignment();
+ assert((next_write_offset_ % alignment) == 0);
+
+ // Calculate whole page final file advance if all writes succeed
+ size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
+
+ // Calculate the leftover tail, we write it here padded with zeros BUT we
+ // will write
+ // it again in the future either on Close() OR when the current whole page
+ // fills out
+ size_t leftover_tail = buf_.CurrentSize() - file_advance;
+
+ // Round up and pad
+ buf_.PadToAlignmentWith(0);
+
+ const char* src = buf_.BufferStart();
+ uint64_t write_offset = next_write_offset_;
+ size_t left = buf_.CurrentSize();
+
+ while (left > 0) {
+ // Check how much is allowed
+ size_t size;
+ if (rate_limiter_ != nullptr) {
+ size = rate_limiter_->RequestToken(left, buf_.Alignment(),
+ writable_file_->GetIOPriority(),
+ stats_, RateLimiter::OpType::kWrite);
+ } else {
+ size = left;
+ }
+
+ {
+ IOSTATS_TIMER_GUARD(write_nanos);
+ TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
+ FileOperationInfo::TimePoint start_ts;
+ if (ShouldNotifyListeners()) {
+ start_ts = std::chrono::system_clock::now();
+ }
+ // direct writes must be positional
+ s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
+ IOOptions(), nullptr);
+ if (ShouldNotifyListeners()) {
+ auto finish_ts = std::chrono::system_clock::now();
+ NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
+ }
+ if (!s.ok()) {
+ buf_.Size(file_advance + leftover_tail);
+ return s;
+ }
+ }
+
+ IOSTATS_ADD(bytes_written, size);
+ left -= size;
+ src += size;
+ write_offset += size;
+ assert((next_write_offset_ % alignment) == 0);
+ }
+
+ if (s.ok()) {
+ // Move the tail to the beginning of the buffer
+ // This never happens during normal Append but rather during
+ // explicit call to Flush()/Sync() or Close()
+ buf_.RefitTail(file_advance, leftover_tail);
+ // This is where we start writing next time which may or not be
+ // the actual file size on disk. They match if the buffer size
+ // is a multiple of whole pages otherwise filesize_ is leftover_tail
+ // behind
+ next_write_offset_ += file_advance;
+ }
+ return s;
+}
+#endif // !ROCKSDB_LITE
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/file/writable_file_writer.h b/src/rocksdb/file/writable_file_writer.h
new file mode 100644
index 000000000..123110713
--- /dev/null
+++ b/src/rocksdb/file/writable_file_writer.h
@@ -0,0 +1,171 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <atomic>
+#include <string>
+#include "db/version_edit.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_checksum.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/listener.h"
+#include "rocksdb/rate_limiter.h"
+#include "test_util/sync_point.h"
+#include "util/aligned_buffer.h"
+
+namespace ROCKSDB_NAMESPACE {
+class Statistics;
+
+// WritableFileWriter is a wrapper on top of Env::WritableFile. It provides
+// facilities to:
+// - Handle Buffered and Direct writes.
+// - Rate limit writes.
+// - Flush and Sync the data to the underlying filesystem.
+// - Notify any interested listeners on the completion of a write.
+// - Update IO stats.
+class WritableFileWriter {
+ private:
+#ifndef ROCKSDB_LITE
+ void NotifyOnFileWriteFinish(uint64_t offset, size_t length,
+ const FileOperationInfo::TimePoint& start_ts,
+ const FileOperationInfo::TimePoint& finish_ts,
+ const Status& status) {
+ FileOperationInfo info(file_name_, start_ts, finish_ts);
+ info.offset = offset;
+ info.length = length;
+ info.status = status;
+
+ for (auto& listener : listeners_) {
+ listener->OnFileWriteFinish(info);
+ }
+ }
+#endif // ROCKSDB_LITE
+
+ bool ShouldNotifyListeners() const { return !listeners_.empty(); }
+ void CalculateFileChecksum(const Slice& data);
+
+ std::unique_ptr<FSWritableFile> writable_file_;
+ std::string file_name_;
+ Env* env_;
+ AlignedBuffer buf_;
+ size_t max_buffer_size_;
+ // Actually written data size can be used for truncate
+ // not counting padding data
+ uint64_t filesize_;
+#ifndef ROCKSDB_LITE
+ // This is necessary when we use unbuffered access
+ // and writes must happen on aligned offsets
+ // so we need to go back and write that page again
+ uint64_t next_write_offset_;
+#endif // ROCKSDB_LITE
+ bool pending_sync_;
+ uint64_t last_sync_size_;
+ uint64_t bytes_per_sync_;
+ RateLimiter* rate_limiter_;
+ Statistics* stats_;
+ std::vector<std::shared_ptr<EventListener>> listeners_;
+ FileChecksumFunc* checksum_func_;
+ std::string file_checksum_ = kUnknownFileChecksum;
+ bool is_first_checksum_ = true;
+
+ public:
+ WritableFileWriter(
+ std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name,
+ const FileOptions& options, Env* env = nullptr,
+ Statistics* stats = nullptr,
+ const std::vector<std::shared_ptr<EventListener>>& listeners = {},
+ FileChecksumFunc* checksum_func = nullptr)
+ : writable_file_(std::move(file)),
+ file_name_(_file_name),
+ env_(env),
+ buf_(),
+ max_buffer_size_(options.writable_file_max_buffer_size),
+ filesize_(0),
+#ifndef ROCKSDB_LITE
+ next_write_offset_(0),
+#endif // ROCKSDB_LITE
+ pending_sync_(false),
+ last_sync_size_(0),
+ bytes_per_sync_(options.bytes_per_sync),
+ rate_limiter_(options.rate_limiter),
+ stats_(stats),
+ listeners_(),
+ checksum_func_(checksum_func) {
+ TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
+ reinterpret_cast<void*>(max_buffer_size_));
+ buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
+ buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
+#ifndef ROCKSDB_LITE
+ std::for_each(listeners.begin(), listeners.end(),
+ [this](const std::shared_ptr<EventListener>& e) {
+ if (e->ShouldBeNotifiedOnFileIO()) {
+ listeners_.emplace_back(e);
+ }
+ });
+#else // !ROCKSDB_LITE
+ (void)listeners;
+#endif
+ }
+
+ WritableFileWriter(const WritableFileWriter&) = delete;
+
+ WritableFileWriter& operator=(const WritableFileWriter&) = delete;
+
+ ~WritableFileWriter() { Close(); }
+
+ std::string file_name() const { return file_name_; }
+
+ Status Append(const Slice& data);
+
+ Status Pad(const size_t pad_bytes);
+
+ Status Flush();
+
+ Status Close();
+
+ Status Sync(bool use_fsync);
+
+ // Sync only the data that was already Flush()ed. Safe to call concurrently
+ // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
+ // returns NotSupported status.
+ Status SyncWithoutFlush(bool use_fsync);
+
+ uint64_t GetFileSize() const { return filesize_; }
+
+ Status InvalidateCache(size_t offset, size_t length) {
+ return writable_file_->InvalidateCache(offset, length);
+ }
+
+ FSWritableFile* writable_file() const { return writable_file_.get(); }
+
+ bool use_direct_io() { return writable_file_->use_direct_io(); }
+
+ bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
+
+ void TEST_SetFileChecksumFunc(FileChecksumFunc* checksum_func) {
+ checksum_func_ = checksum_func;
+ }
+
+ const std::string& GetFileChecksum() const { return file_checksum_; }
+
+ const char* GetFileChecksumFuncName() const;
+
+ private:
+ // Used when os buffering is OFF and we are writing
+ // DMA such as in Direct I/O mode
+#ifndef ROCKSDB_LITE
+ Status WriteDirect();
+#endif // !ROCKSDB_LITE
+ // Normal write
+ Status WriteBuffered(const char* data, size_t size);
+ Status RangeSync(uint64_t offset, uint64_t nbytes);
+ Status SyncInternal(bool use_fsync);
+};
+} // namespace ROCKSDB_NAMESPACE