summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rocksdb/utilities/persistent_cache/block_cache_tier.cc
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/persistent_cache/block_cache_tier.cc')
-rw-r--r--src/rocksdb/utilities/persistent_cache/block_cache_tier.cc425
1 files changed, 425 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc b/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc
new file mode 100644
index 000000000..658737571
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc
@@ -0,0 +1,425 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "utilities/persistent_cache/block_cache_tier.h"
+
+#include <regex>
+#include <utility>
+#include <vector>
+
+#include "logging/logging.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/stop_watch.h"
+#include "utilities/persistent_cache/block_cache_tier_file.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+//
+// BlockCacheImpl
+//
+Status BlockCacheTier::Open() {
+ Status status;
+
+ WriteLock _(&lock_);
+
+ assert(!size_);
+
+ // Check the validity of the options
+ status = opt_.ValidateSettings();
+ assert(status.ok());
+ if (!status.ok()) {
+ Error(opt_.log, "Invalid block cache options");
+ return status;
+ }
+
+ // Create base directory or cleanup existing directory
+ status = opt_.env->CreateDirIfMissing(opt_.path);
+ if (!status.ok()) {
+ Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+
+ // Create base/<cache dir> directory
+ status = opt_.env->CreateDir(GetCachePath());
+ if (!status.ok()) {
+ // directory already exists, clean it up
+ status = CleanupCacheFolder(GetCachePath());
+ assert(status.ok());
+ if (!status.ok()) {
+ Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+ }
+
+ // create a new file
+ assert(!cache_file_);
+ status = NewCacheFile();
+ if (!status.ok()) {
+ Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+
+ assert(cache_file_);
+
+ if (opt_.pipeline_writes) {
+ assert(!insert_th_.joinable());
+ insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);
+ }
+
+ return Status::OK();
+}
+
+bool IsCacheFile(const std::string& file) {
+ // check if the file has .rc suffix
+ // Unfortunately regex support across compilers is not even, so we use simple
+ // string parsing
+ size_t pos = file.find(".");
+ if (pos == std::string::npos) {
+ return false;
+ }
+
+ std::string suffix = file.substr(pos);
+ return suffix == ".rc";
+}
+
+Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {
+ std::vector<std::string> files;
+ Status status = opt_.env->GetChildren(folder, &files);
+ if (!status.ok()) {
+ Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+
+ // cleanup files with the patter :digi:.rc
+ for (auto file : files) {
+ if (IsCacheFile(file)) {
+ // cache file
+ Info(opt_.log, "Removing file %s.", file.c_str());
+ status = opt_.env->DeleteFile(folder + "/" + file);
+ if (!status.ok()) {
+ Error(opt_.log, "Error deleting file %s. %s", file.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+ } else {
+ ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str());
+ }
+ }
+ return Status::OK();
+}
+
+Status BlockCacheTier::Close() {
+ // stop the insert thread
+ if (opt_.pipeline_writes && insert_th_.joinable()) {
+ InsertOp op(/*quit=*/true);
+ insert_ops_.Push(std::move(op));
+ insert_th_.join();
+ }
+
+ // stop the writer before
+ writer_.Stop();
+
+ // clear all metadata
+ WriteLock _(&lock_);
+ metadata_.Clear();
+ return Status::OK();
+}
+
+template<class T>
+void Add(std::map<std::string, double>* stats, const std::string& key,
+ const T& t) {
+ stats->insert({key, static_cast<double>(t)});
+}
+
+PersistentCache::StatsType BlockCacheTier::Stats() {
+ std::map<std::string, double> stats;
+ Add(&stats, "persistentcache.blockcachetier.bytes_piplined",
+ stats_.bytes_pipelined_.Average());
+ Add(&stats, "persistentcache.blockcachetier.bytes_written",
+ stats_.bytes_written_.Average());
+ Add(&stats, "persistentcache.blockcachetier.bytes_read",
+ stats_.bytes_read_.Average());
+ Add(&stats, "persistentcache.blockcachetier.insert_dropped",
+ stats_.insert_dropped_);
+ Add(&stats, "persistentcache.blockcachetier.cache_hits",
+ stats_.cache_hits_);
+ Add(&stats, "persistentcache.blockcachetier.cache_misses",
+ stats_.cache_misses_);
+ Add(&stats, "persistentcache.blockcachetier.cache_errors",
+ stats_.cache_errors_);
+ Add(&stats, "persistentcache.blockcachetier.cache_hits_pct",
+ stats_.CacheHitPct());
+ Add(&stats, "persistentcache.blockcachetier.cache_misses_pct",
+ stats_.CacheMissPct());
+ Add(&stats, "persistentcache.blockcachetier.read_hit_latency",
+ stats_.read_hit_latency_.Average());
+ Add(&stats, "persistentcache.blockcachetier.read_miss_latency",
+ stats_.read_miss_latency_.Average());
+ Add(&stats, "persistentcache.blockcachetier.write_latency",
+ stats_.write_latency_.Average());
+
+ auto out = PersistentCacheTier::Stats();
+ out.push_back(stats);
+ return out;
+}
+
+Status BlockCacheTier::Insert(const Slice& key, const char* data,
+ const size_t size) {
+ // update stats
+ stats_.bytes_pipelined_.Add(size);
+
+ if (opt_.pipeline_writes) {
+ // off load the write to the write thread
+ insert_ops_.Push(
+ InsertOp(key.ToString(), std::move(std::string(data, size))));
+ return Status::OK();
+ }
+
+ assert(!opt_.pipeline_writes);
+ return InsertImpl(key, Slice(data, size));
+}
+
+void BlockCacheTier::InsertMain() {
+ while (true) {
+ InsertOp op(insert_ops_.Pop());
+
+ if (op.signal_) {
+ // that is a secret signal to exit
+ break;
+ }
+
+ size_t retry = 0;
+ Status s;
+ while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {
+ if (retry > kMaxRetry) {
+ break;
+ }
+
+ // this can happen when the buffers are full, we wait till some buffers
+ // are free. Why don't we wait inside the code. This is because we want
+ // to support both pipelined and non-pipelined mode
+ buffer_allocator_.WaitUntilUsable();
+ retry++;
+ }
+
+ if (!s.ok()) {
+ stats_.insert_dropped_++;
+ }
+ }
+}
+
+Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
+ // pre-condition
+ assert(key.size());
+ assert(data.size());
+ assert(cache_file_);
+
+ StopWatchNano timer(opt_.env, /*auto_start=*/ true);
+
+ WriteLock _(&lock_);
+
+ LBA lba;
+ if (metadata_.Lookup(key, &lba)) {
+ // the key already exists, this is duplicate insert
+ return Status::OK();
+ }
+
+ while (!cache_file_->Append(key, data, &lba)) {
+ if (!cache_file_->Eof()) {
+ ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d",
+ cache_file_->cacheid());
+ stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::TryAgain();
+ }
+
+ assert(cache_file_->Eof());
+ Status status = NewCacheFile();
+ if (!status.ok()) {
+ return status;
+ }
+ }
+
+ // Insert into lookup index
+ BlockInfo* info = metadata_.Insert(key, lba);
+ assert(info);
+ if (!info) {
+ return Status::IOError("Unexpected error inserting to index");
+ }
+
+ // insert to cache file reverse mapping
+ cache_file_->Add(info);
+
+ // update stats
+ stats_.bytes_written_.Add(data.size());
+ stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::OK();
+}
+
+Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val,
+ size_t* size) {
+ StopWatchNano timer(opt_.env, /*auto_start=*/ true);
+
+ LBA lba;
+ bool status;
+ status = metadata_.Lookup(key, &lba);
+ if (!status) {
+ stats_.cache_misses_++;
+ stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::NotFound("blockcache: key not found");
+ }
+
+ BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);
+ if (!file) {
+ // this can happen because the block index and cache file index are
+ // different, and the cache file might be removed between the two lookups
+ stats_.cache_misses_++;
+ stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::NotFound("blockcache: cache file not found");
+ }
+
+ assert(file->refs_);
+
+ std::unique_ptr<char[]> scratch(new char[lba.size_]);
+ Slice blk_key;
+ Slice blk_val;
+
+ status = file->Read(lba, &blk_key, &blk_val, scratch.get());
+ --file->refs_;
+ if (!status) {
+ stats_.cache_misses_++;
+ stats_.cache_errors_++;
+ stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::NotFound("blockcache: error reading data");
+ }
+
+ assert(blk_key == key);
+
+ val->reset(new char[blk_val.size()]);
+ memcpy(val->get(), blk_val.data(), blk_val.size());
+ *size = blk_val.size();
+
+ stats_.bytes_read_.Add(*size);
+ stats_.cache_hits_++;
+ stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);
+
+ return Status::OK();
+}
+
+bool BlockCacheTier::Erase(const Slice& key) {
+ WriteLock _(&lock_);
+ BlockInfo* info = metadata_.Remove(key);
+ assert(info);
+ delete info;
+ return true;
+}
+
+Status BlockCacheTier::NewCacheFile() {
+ lock_.AssertHeld();
+
+ TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
+ (void*)(GetCachePath().c_str()));
+
+ std::unique_ptr<WriteableCacheFile> f(
+ new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
+ GetCachePath(), writer_cache_id_,
+ opt_.cache_file_size, opt_.log));
+
+ bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
+ if (!status) {
+ return Status::IOError("Error creating file");
+ }
+
+ Info(opt_.log, "Created cache file %d", writer_cache_id_);
+
+ writer_cache_id_++;
+ cache_file_ = f.release();
+
+ // insert to cache files tree
+ status = metadata_.Insert(cache_file_);
+ assert(status);
+ if (!status) {
+ Error(opt_.log, "Error inserting to metadata");
+ return Status::IOError("Error inserting to metadata");
+ }
+
+ return Status::OK();
+}
+
+bool BlockCacheTier::Reserve(const size_t size) {
+ WriteLock _(&lock_);
+ assert(size_ <= opt_.cache_size);
+
+ if (size + size_ <= opt_.cache_size) {
+ // there is enough space to write
+ size_ += size;
+ return true;
+ }
+
+ assert(size + size_ >= opt_.cache_size);
+ // there is not enough space to fit the requested data
+ // we can clear some space by evicting cold data
+
+ const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);
+ while (size + size_ > opt_.cache_size * retain_fac) {
+ std::unique_ptr<BlockCacheFile> f(metadata_.Evict());
+ if (!f) {
+ // nothing is evictable
+ return false;
+ }
+ assert(!f->refs_);
+ uint64_t file_size;
+ if (!f->Delete(&file_size).ok()) {
+ // unable to delete file
+ return false;
+ }
+
+ assert(file_size <= size_);
+ size_ -= file_size;
+ }
+
+ size_ += size;
+ assert(size_ <= opt_.cache_size * 0.9);
+ return true;
+}
+
+Status NewPersistentCache(Env* const env, const std::string& path,
+ const uint64_t size,
+ const std::shared_ptr<Logger>& log,
+ const bool optimized_for_nvm,
+ std::shared_ptr<PersistentCache>* cache) {
+ if (!cache) {
+ return Status::IOError("invalid argument cache");
+ }
+
+ auto opt = PersistentCacheConfig(env, path, size, log);
+ if (optimized_for_nvm) {
+ // the default settings are optimized for SSD
+ // NVM devices are better accessed with 4K direct IO and written with
+ // parallelism
+ opt.enable_direct_writes = true;
+ opt.writer_qdepth = 4;
+ opt.writer_dispatch_size = 4 * 1024;
+ }
+
+ auto pcache = std::make_shared<BlockCacheTier>(opt);
+ Status s = pcache->Open();
+
+ if (!s.ok()) {
+ return s;
+ }
+
+ *cache = pcache;
+ return s;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ifndef ROCKSDB_LITE