// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #ifndef ROCKSDB_LITE #include "utilities/persistent_cache/block_cache_tier.h" #include #include #include #include "logging/logging.h" #include "port/port.h" #include "test_util/sync_point.h" #include "util/stop_watch.h" #include "utilities/persistent_cache/block_cache_tier_file.h" namespace ROCKSDB_NAMESPACE { // // BlockCacheImpl // Status BlockCacheTier::Open() { Status status; WriteLock _(&lock_); assert(!size_); // Check the validity of the options status = opt_.ValidateSettings(); assert(status.ok()); if (!status.ok()) { Error(opt_.log, "Invalid block cache options"); return status; } // Create base directory or cleanup existing directory status = opt_.env->CreateDirIfMissing(opt_.path); if (!status.ok()) { Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(), status.ToString().c_str()); return status; } // Create base/ directory status = opt_.env->CreateDir(GetCachePath()); if (!status.ok()) { // directory already exists, clean it up status = CleanupCacheFolder(GetCachePath()); assert(status.ok()); if (!status.ok()) { Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(), status.ToString().c_str()); return status; } } // create a new file assert(!cache_file_); status = NewCacheFile(); if (!status.ok()) { Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(), status.ToString().c_str()); return status; } assert(cache_file_); if (opt_.pipeline_writes) { assert(!insert_th_.joinable()); insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this); } return Status::OK(); } bool IsCacheFile(const std::string& file) { // check if the file has .rc suffix // Unfortunately regex support across compilers is not even, so we use simple // string parsing size_t pos = file.find("."); if (pos == std::string::npos) { return false; } std::string suffix = file.substr(pos); return suffix == ".rc"; } Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) { std::vector files; Status status = opt_.env->GetChildren(folder, &files); if (!status.ok()) { Error(opt_.log, "Error getting files for %s. %s", folder.c_str(), status.ToString().c_str()); return status; } // cleanup files with the patter :digi:.rc for (auto file : files) { if (IsCacheFile(file)) { // cache file Info(opt_.log, "Removing file %s.", file.c_str()); status = opt_.env->DeleteFile(folder + "/" + file); if (!status.ok()) { Error(opt_.log, "Error deleting file %s. %s", file.c_str(), status.ToString().c_str()); return status; } } else { ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str()); } } return Status::OK(); } Status BlockCacheTier::Close() { // stop the insert thread if (opt_.pipeline_writes && insert_th_.joinable()) { InsertOp op(/*quit=*/true); insert_ops_.Push(std::move(op)); insert_th_.join(); } // stop the writer before writer_.Stop(); // clear all metadata WriteLock _(&lock_); metadata_.Clear(); return Status::OK(); } template void Add(std::map* stats, const std::string& key, const T& t) { stats->insert({key, static_cast(t)}); } PersistentCache::StatsType BlockCacheTier::Stats() { std::map stats; Add(&stats, "persistentcache.blockcachetier.bytes_piplined", stats_.bytes_pipelined_.Average()); Add(&stats, "persistentcache.blockcachetier.bytes_written", stats_.bytes_written_.Average()); Add(&stats, "persistentcache.blockcachetier.bytes_read", stats_.bytes_read_.Average()); Add(&stats, "persistentcache.blockcachetier.insert_dropped", stats_.insert_dropped_); Add(&stats, "persistentcache.blockcachetier.cache_hits", stats_.cache_hits_); Add(&stats, "persistentcache.blockcachetier.cache_misses", stats_.cache_misses_); Add(&stats, "persistentcache.blockcachetier.cache_errors", stats_.cache_errors_); Add(&stats, "persistentcache.blockcachetier.cache_hits_pct", stats_.CacheHitPct()); Add(&stats, "persistentcache.blockcachetier.cache_misses_pct", stats_.CacheMissPct()); Add(&stats, "persistentcache.blockcachetier.read_hit_latency", stats_.read_hit_latency_.Average()); Add(&stats, "persistentcache.blockcachetier.read_miss_latency", stats_.read_miss_latency_.Average()); Add(&stats, "persistentcache.blockcachetier.write_latency", stats_.write_latency_.Average()); auto out = PersistentCacheTier::Stats(); out.push_back(stats); return out; } Status BlockCacheTier::Insert(const Slice& key, const char* data, const size_t size) { // update stats stats_.bytes_pipelined_.Add(size); if (opt_.pipeline_writes) { // off load the write to the write thread insert_ops_.Push( InsertOp(key.ToString(), std::move(std::string(data, size)))); return Status::OK(); } assert(!opt_.pipeline_writes); return InsertImpl(key, Slice(data, size)); } void BlockCacheTier::InsertMain() { while (true) { InsertOp op(insert_ops_.Pop()); if (op.signal_) { // that is a secret signal to exit break; } size_t retry = 0; Status s; while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) { if (retry > kMaxRetry) { break; } // this can happen when the buffers are full, we wait till some buffers // are free. Why don't we wait inside the code. This is because we want // to support both pipelined and non-pipelined mode buffer_allocator_.WaitUntilUsable(); retry++; } if (!s.ok()) { stats_.insert_dropped_++; } } } Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) { // pre-condition assert(key.size()); assert(data.size()); assert(cache_file_); StopWatchNano timer(opt_.env, /*auto_start=*/ true); WriteLock _(&lock_); LBA lba; if (metadata_.Lookup(key, &lba)) { // the key already exists, this is duplicate insert return Status::OK(); } while (!cache_file_->Append(key, data, &lba)) { if (!cache_file_->Eof()) { ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d", cache_file_->cacheid()); stats_.write_latency_.Add(timer.ElapsedNanos() / 1000); return Status::TryAgain(); } assert(cache_file_->Eof()); Status status = NewCacheFile(); if (!status.ok()) { return status; } } // Insert into lookup index BlockInfo* info = metadata_.Insert(key, lba); assert(info); if (!info) { return Status::IOError("Unexpected error inserting to index"); } // insert to cache file reverse mapping cache_file_->Add(info); // update stats stats_.bytes_written_.Add(data.size()); stats_.write_latency_.Add(timer.ElapsedNanos() / 1000); return Status::OK(); } Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr* val, size_t* size) { StopWatchNano timer(opt_.env, /*auto_start=*/ true); LBA lba; bool status; status = metadata_.Lookup(key, &lba); if (!status) { stats_.cache_misses_++; stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); return Status::NotFound("blockcache: key not found"); } BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_); if (!file) { // this can happen because the block index and cache file index are // different, and the cache file might be removed between the two lookups stats_.cache_misses_++; stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); return Status::NotFound("blockcache: cache file not found"); } assert(file->refs_); std::unique_ptr scratch(new char[lba.size_]); Slice blk_key; Slice blk_val; status = file->Read(lba, &blk_key, &blk_val, scratch.get()); --file->refs_; if (!status) { stats_.cache_misses_++; stats_.cache_errors_++; stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); return Status::NotFound("blockcache: error reading data"); } assert(blk_key == key); val->reset(new char[blk_val.size()]); memcpy(val->get(), blk_val.data(), blk_val.size()); *size = blk_val.size(); stats_.bytes_read_.Add(*size); stats_.cache_hits_++; stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000); return Status::OK(); } bool BlockCacheTier::Erase(const Slice& key) { WriteLock _(&lock_); BlockInfo* info = metadata_.Remove(key); assert(info); delete info; return true; } Status BlockCacheTier::NewCacheFile() { lock_.AssertHeld(); TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir", (void*)(GetCachePath().c_str())); std::unique_ptr f( new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_, GetCachePath(), writer_cache_id_, opt_.cache_file_size, opt_.log)); bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads); if (!status) { return Status::IOError("Error creating file"); } Info(opt_.log, "Created cache file %d", writer_cache_id_); writer_cache_id_++; cache_file_ = f.release(); // insert to cache files tree status = metadata_.Insert(cache_file_); assert(status); if (!status) { Error(opt_.log, "Error inserting to metadata"); return Status::IOError("Error inserting to metadata"); } return Status::OK(); } bool BlockCacheTier::Reserve(const size_t size) { WriteLock _(&lock_); assert(size_ <= opt_.cache_size); if (size + size_ <= opt_.cache_size) { // there is enough space to write size_ += size; return true; } assert(size + size_ >= opt_.cache_size); // there is not enough space to fit the requested data // we can clear some space by evicting cold data const double retain_fac = (100 - kEvictPct) / static_cast(100); while (size + size_ > opt_.cache_size * retain_fac) { std::unique_ptr f(metadata_.Evict()); if (!f) { // nothing is evictable return false; } assert(!f->refs_); uint64_t file_size; if (!f->Delete(&file_size).ok()) { // unable to delete file return false; } assert(file_size <= size_); size_ -= file_size; } size_ += size; assert(size_ <= opt_.cache_size * 0.9); return true; } Status NewPersistentCache(Env* const env, const std::string& path, const uint64_t size, const std::shared_ptr& log, const bool optimized_for_nvm, std::shared_ptr* cache) { if (!cache) { return Status::IOError("invalid argument cache"); } auto opt = PersistentCacheConfig(env, path, size, log); if (optimized_for_nvm) { // the default settings are optimized for SSD // NVM devices are better accessed with 4K direct IO and written with // parallelism opt.enable_direct_writes = true; opt.writer_qdepth = 4; opt.writer_dispatch_size = 4 * 1024; } auto pcache = std::make_shared(opt); Status s = pcache->Open(); if (!s.ok()) { return s; } *cache = pcache; return s; } } // namespace ROCKSDB_NAMESPACE #endif // ifndef ROCKSDB_LITE