summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/persistent_cache
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/persistent_cache
parentInitial commit. (diff)
downloadceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz
ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/persistent_cache')
-rw-r--r--src/rocksdb/utilities/persistent_cache/block_cache_tier.cc422
-rw-r--r--src/rocksdb/utilities/persistent_cache/block_cache_tier.h156
-rw-r--r--src/rocksdb/utilities/persistent_cache/block_cache_tier_file.cc610
-rw-r--r--src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h293
-rw-r--r--src/rocksdb/utilities/persistent_cache/block_cache_tier_file_buffer.h127
-rw-r--r--src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.cc86
-rw-r--r--src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.h124
-rw-r--r--src/rocksdb/utilities/persistent_cache/hash_table.h239
-rw-r--r--src/rocksdb/utilities/persistent_cache/hash_table_bench.cc310
-rw-r--r--src/rocksdb/utilities/persistent_cache/hash_table_evictable.h168
-rw-r--r--src/rocksdb/utilities/persistent_cache/hash_table_test.cc163
-rw-r--r--src/rocksdb/utilities/persistent_cache/lrulist.h174
-rw-r--r--src/rocksdb/utilities/persistent_cache/persistent_cache_bench.cc359
-rw-r--r--src/rocksdb/utilities/persistent_cache/persistent_cache_test.cc462
-rw-r--r--src/rocksdb/utilities/persistent_cache/persistent_cache_test.h286
-rw-r--r--src/rocksdb/utilities/persistent_cache/persistent_cache_tier.cc167
-rw-r--r--src/rocksdb/utilities/persistent_cache/persistent_cache_tier.h342
-rw-r--r--src/rocksdb/utilities/persistent_cache/persistent_cache_util.h67
-rw-r--r--src/rocksdb/utilities/persistent_cache/volatile_tier_impl.cc140
-rw-r--r--src/rocksdb/utilities/persistent_cache/volatile_tier_impl.h141
20 files changed, 4836 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc b/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc
new file mode 100644
index 000000000..8ad9bb1b1
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc
@@ -0,0 +1,422 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "utilities/persistent_cache/block_cache_tier.h"
+
+#include <utility>
+#include <vector>
+
+#include "logging/logging.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "util/stop_watch.h"
+#include "utilities/persistent_cache/block_cache_tier_file.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+//
+// BlockCacheImpl
+//
+Status BlockCacheTier::Open() {
+ Status status;
+
+ WriteLock _(&lock_);
+
+ assert(!size_);
+
+ // Check the validity of the options
+ status = opt_.ValidateSettings();
+ assert(status.ok());
+ if (!status.ok()) {
+ Error(opt_.log, "Invalid block cache options");
+ return status;
+ }
+
+ // Create base directory or cleanup existing directory
+ status = opt_.env->CreateDirIfMissing(opt_.path);
+ if (!status.ok()) {
+ Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+
+ // Create base/<cache dir> directory
+ status = opt_.env->CreateDir(GetCachePath());
+ if (!status.ok()) {
+ // directory already exists, clean it up
+ status = CleanupCacheFolder(GetCachePath());
+ assert(status.ok());
+ if (!status.ok()) {
+ Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+ }
+
+ // create a new file
+ assert(!cache_file_);
+ status = NewCacheFile();
+ if (!status.ok()) {
+ Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+
+ assert(cache_file_);
+
+ if (opt_.pipeline_writes) {
+ assert(!insert_th_.joinable());
+ insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);
+ }
+
+ return Status::OK();
+}
+
+bool IsCacheFile(const std::string& file) {
+ // check if the file has .rc suffix
+ // Unfortunately regex support across compilers is not even, so we use simple
+ // string parsing
+ size_t pos = file.find(".");
+ if (pos == std::string::npos) {
+ return false;
+ }
+
+ std::string suffix = file.substr(pos);
+ return suffix == ".rc";
+}
+
+Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {
+ std::vector<std::string> files;
+ Status status = opt_.env->GetChildren(folder, &files);
+ if (!status.ok()) {
+ Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+
+ // cleanup files with the patter :digi:.rc
+ for (auto file : files) {
+ if (IsCacheFile(file)) {
+ // cache file
+ Info(opt_.log, "Removing file %s.", file.c_str());
+ status = opt_.env->DeleteFile(folder + "/" + file);
+ if (!status.ok()) {
+ Error(opt_.log, "Error deleting file %s. %s", file.c_str(),
+ status.ToString().c_str());
+ return status;
+ }
+ } else {
+ ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str());
+ }
+ }
+ return Status::OK();
+}
+
+Status BlockCacheTier::Close() {
+ // stop the insert thread
+ if (opt_.pipeline_writes && insert_th_.joinable()) {
+ InsertOp op(/*quit=*/true);
+ insert_ops_.Push(std::move(op));
+ insert_th_.join();
+ }
+
+ // stop the writer before
+ writer_.Stop();
+
+ // clear all metadata
+ WriteLock _(&lock_);
+ metadata_.Clear();
+ return Status::OK();
+}
+
+template <class T>
+void Add(std::map<std::string, double>* stats, const std::string& key,
+ const T& t) {
+ stats->insert({key, static_cast<double>(t)});
+}
+
+PersistentCache::StatsType BlockCacheTier::Stats() {
+ std::map<std::string, double> stats;
+ Add(&stats, "persistentcache.blockcachetier.bytes_piplined",
+ stats_.bytes_pipelined_.Average());
+ Add(&stats, "persistentcache.blockcachetier.bytes_written",
+ stats_.bytes_written_.Average());
+ Add(&stats, "persistentcache.blockcachetier.bytes_read",
+ stats_.bytes_read_.Average());
+ Add(&stats, "persistentcache.blockcachetier.insert_dropped",
+ stats_.insert_dropped_);
+ Add(&stats, "persistentcache.blockcachetier.cache_hits", stats_.cache_hits_);
+ Add(&stats, "persistentcache.blockcachetier.cache_misses",
+ stats_.cache_misses_);
+ Add(&stats, "persistentcache.blockcachetier.cache_errors",
+ stats_.cache_errors_);
+ Add(&stats, "persistentcache.blockcachetier.cache_hits_pct",
+ stats_.CacheHitPct());
+ Add(&stats, "persistentcache.blockcachetier.cache_misses_pct",
+ stats_.CacheMissPct());
+ Add(&stats, "persistentcache.blockcachetier.read_hit_latency",
+ stats_.read_hit_latency_.Average());
+ Add(&stats, "persistentcache.blockcachetier.read_miss_latency",
+ stats_.read_miss_latency_.Average());
+ Add(&stats, "persistentcache.blockcachetier.write_latency",
+ stats_.write_latency_.Average());
+
+ auto out = PersistentCacheTier::Stats();
+ out.push_back(stats);
+ return out;
+}
+
+Status BlockCacheTier::Insert(const Slice& key, const char* data,
+ const size_t size) {
+ // update stats
+ stats_.bytes_pipelined_.Add(size);
+
+ if (opt_.pipeline_writes) {
+ // off load the write to the write thread
+ insert_ops_.Push(
+ InsertOp(key.ToString(), std::move(std::string(data, size))));
+ return Status::OK();
+ }
+
+ assert(!opt_.pipeline_writes);
+ return InsertImpl(key, Slice(data, size));
+}
+
+void BlockCacheTier::InsertMain() {
+ while (true) {
+ InsertOp op(insert_ops_.Pop());
+
+ if (op.signal_) {
+ // that is a secret signal to exit
+ break;
+ }
+
+ size_t retry = 0;
+ Status s;
+ while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {
+ if (retry > kMaxRetry) {
+ break;
+ }
+
+ // this can happen when the buffers are full, we wait till some buffers
+ // are free. Why don't we wait inside the code. This is because we want
+ // to support both pipelined and non-pipelined mode
+ buffer_allocator_.WaitUntilUsable();
+ retry++;
+ }
+
+ if (!s.ok()) {
+ stats_.insert_dropped_++;
+ }
+ }
+}
+
+Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
+ // pre-condition
+ assert(key.size());
+ assert(data.size());
+ assert(cache_file_);
+
+ StopWatchNano timer(opt_.clock, /*auto_start=*/true);
+
+ WriteLock _(&lock_);
+
+ LBA lba;
+ if (metadata_.Lookup(key, &lba)) {
+ // the key already exists, this is duplicate insert
+ return Status::OK();
+ }
+
+ while (!cache_file_->Append(key, data, &lba)) {
+ if (!cache_file_->Eof()) {
+ ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d",
+ cache_file_->cacheid());
+ stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::TryAgain();
+ }
+
+ assert(cache_file_->Eof());
+ Status status = NewCacheFile();
+ if (!status.ok()) {
+ return status;
+ }
+ }
+
+ // Insert into lookup index
+ BlockInfo* info = metadata_.Insert(key, lba);
+ assert(info);
+ if (!info) {
+ return Status::IOError("Unexpected error inserting to index");
+ }
+
+ // insert to cache file reverse mapping
+ cache_file_->Add(info);
+
+ // update stats
+ stats_.bytes_written_.Add(data.size());
+ stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::OK();
+}
+
+Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val,
+ size_t* size) {
+ StopWatchNano timer(opt_.clock, /*auto_start=*/true);
+
+ LBA lba;
+ bool status;
+ status = metadata_.Lookup(key, &lba);
+ if (!status) {
+ stats_.cache_misses_++;
+ stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::NotFound("blockcache: key not found");
+ }
+
+ BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);
+ if (!file) {
+ // this can happen because the block index and cache file index are
+ // different, and the cache file might be removed between the two lookups
+ stats_.cache_misses_++;
+ stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::NotFound("blockcache: cache file not found");
+ }
+
+ assert(file->refs_);
+
+ std::unique_ptr<char[]> scratch(new char[lba.size_]);
+ Slice blk_key;
+ Slice blk_val;
+
+ status = file->Read(lba, &blk_key, &blk_val, scratch.get());
+ --file->refs_;
+ if (!status) {
+ stats_.cache_misses_++;
+ stats_.cache_errors_++;
+ stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
+ return Status::NotFound("blockcache: error reading data");
+ }
+
+ assert(blk_key == key);
+
+ val->reset(new char[blk_val.size()]);
+ memcpy(val->get(), blk_val.data(), blk_val.size());
+ *size = blk_val.size();
+
+ stats_.bytes_read_.Add(*size);
+ stats_.cache_hits_++;
+ stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);
+
+ return Status::OK();
+}
+
+bool BlockCacheTier::Erase(const Slice& key) {
+ WriteLock _(&lock_);
+ BlockInfo* info = metadata_.Remove(key);
+ assert(info);
+ delete info;
+ return true;
+}
+
+Status BlockCacheTier::NewCacheFile() {
+ lock_.AssertHeld();
+
+ TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
+ (void*)(GetCachePath().c_str()));
+
+ std::unique_ptr<WriteableCacheFile> f(new WriteableCacheFile(
+ opt_.env, &buffer_allocator_, &writer_, GetCachePath(), writer_cache_id_,
+ opt_.cache_file_size, opt_.log));
+
+ bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
+ if (!status) {
+ return Status::IOError("Error creating file");
+ }
+
+ Info(opt_.log, "Created cache file %d", writer_cache_id_);
+
+ writer_cache_id_++;
+ cache_file_ = f.release();
+
+ // insert to cache files tree
+ status = metadata_.Insert(cache_file_);
+ assert(status);
+ if (!status) {
+ Error(opt_.log, "Error inserting to metadata");
+ return Status::IOError("Error inserting to metadata");
+ }
+
+ return Status::OK();
+}
+
+bool BlockCacheTier::Reserve(const size_t size) {
+ WriteLock _(&lock_);
+ assert(size_ <= opt_.cache_size);
+
+ if (size + size_ <= opt_.cache_size) {
+ // there is enough space to write
+ size_ += size;
+ return true;
+ }
+
+ assert(size + size_ >= opt_.cache_size);
+ // there is not enough space to fit the requested data
+ // we can clear some space by evicting cold data
+
+ const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);
+ while (size + size_ > opt_.cache_size * retain_fac) {
+ std::unique_ptr<BlockCacheFile> f(metadata_.Evict());
+ if (!f) {
+ // nothing is evictable
+ return false;
+ }
+ assert(!f->refs_);
+ uint64_t file_size;
+ if (!f->Delete(&file_size).ok()) {
+ // unable to delete file
+ return false;
+ }
+
+ assert(file_size <= size_);
+ size_ -= file_size;
+ }
+
+ size_ += size;
+ assert(size_ <= opt_.cache_size * 0.9);
+ return true;
+}
+
+Status NewPersistentCache(Env* const env, const std::string& path,
+ const uint64_t size,
+ const std::shared_ptr<Logger>& log,
+ const bool optimized_for_nvm,
+ std::shared_ptr<PersistentCache>* cache) {
+ if (!cache) {
+ return Status::IOError("invalid argument cache");
+ }
+
+ auto opt = PersistentCacheConfig(env, path, size, log);
+ if (optimized_for_nvm) {
+ // the default settings are optimized for SSD
+ // NVM devices are better accessed with 4K direct IO and written with
+ // parallelism
+ opt.enable_direct_writes = true;
+ opt.writer_qdepth = 4;
+ opt.writer_dispatch_size = 4 * 1024;
+ }
+
+ auto pcache = std::make_shared<BlockCacheTier>(opt);
+ Status s = pcache->Open();
+
+ if (!s.ok()) {
+ return s;
+ }
+
+ *cache = pcache;
+ return s;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ifndef ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier.h b/src/rocksdb/utilities/persistent_cache/block_cache_tier.h
new file mode 100644
index 000000000..1aac287cc
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier.h
@@ -0,0 +1,156 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#ifndef OS_WIN
+#include <unistd.h>
+#endif // ! OS_WIN
+
+#include <atomic>
+#include <list>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <stdexcept>
+#include <string>
+#include <thread>
+
+#include "memory/arena.h"
+#include "memtable/skiplist.h"
+#include "monitoring/histogram.h"
+#include "port/port.h"
+#include "rocksdb/cache.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/persistent_cache.h"
+#include "rocksdb/system_clock.h"
+#include "util/coding.h"
+#include "util/crc32c.h"
+#include "util/mutexlock.h"
+#include "utilities/persistent_cache/block_cache_tier_file.h"
+#include "utilities/persistent_cache/block_cache_tier_metadata.h"
+#include "utilities/persistent_cache/persistent_cache_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+//
+// Block cache tier implementation
+//
+class BlockCacheTier : public PersistentCacheTier {
+ public:
+ explicit BlockCacheTier(const PersistentCacheConfig& opt)
+ : opt_(opt),
+ insert_ops_(static_cast<size_t>(opt_.max_write_pipeline_backlog_size)),
+ buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()),
+ writer_(this, opt_.writer_qdepth,
+ static_cast<size_t>(opt_.writer_dispatch_size)) {
+ Info(opt_.log, "Initializing allocator. size=%d B count=%" ROCKSDB_PRIszt,
+ opt_.write_buffer_size, opt_.write_buffer_count());
+ }
+
+ virtual ~BlockCacheTier() {
+ // Close is re-entrant so we can call close even if it is already closed
+ Close().PermitUncheckedError();
+ assert(!insert_th_.joinable());
+ }
+
+ Status Insert(const Slice& key, const char* data, const size_t size) override;
+ Status Lookup(const Slice& key, std::unique_ptr<char[]>* data,
+ size_t* size) override;
+ Status Open() override;
+ Status Close() override;
+ bool Erase(const Slice& key) override;
+ bool Reserve(const size_t size) override;
+
+ bool IsCompressed() override { return opt_.is_compressed; }
+
+ std::string GetPrintableOptions() const override { return opt_.ToString(); }
+
+ PersistentCache::StatsType Stats() override;
+
+ void TEST_Flush() override {
+ while (insert_ops_.Size()) {
+ /* sleep override */
+ SystemClock::Default()->SleepForMicroseconds(1000000);
+ }
+ }
+
+ private:
+ // Percentage of cache to be evicted when the cache is full
+ static const size_t kEvictPct = 10;
+ // Max attempts to insert key, value to cache in pipelined mode
+ static const size_t kMaxRetry = 3;
+
+ // Pipelined operation
+ struct InsertOp {
+ explicit InsertOp(const bool signal) : signal_(signal) {}
+ explicit InsertOp(std::string&& key, const std::string& data)
+ : key_(std::move(key)), data_(data) {}
+ ~InsertOp() {}
+
+ InsertOp() = delete;
+ InsertOp(InsertOp&& /*rhs*/) = default;
+ InsertOp& operator=(InsertOp&& rhs) = default;
+
+ // used for estimating size by bounded queue
+ size_t Size() { return data_.size() + key_.size(); }
+
+ std::string key_;
+ std::string data_;
+ bool signal_ = false; // signal to request processing thread to exit
+ };
+
+ // entry point for insert thread
+ void InsertMain();
+ // insert implementation
+ Status InsertImpl(const Slice& key, const Slice& data);
+ // Create a new cache file
+ Status NewCacheFile();
+ // Get cache directory path
+ std::string GetCachePath() const { return opt_.path + "/cache"; }
+ // Cleanup folder
+ Status CleanupCacheFolder(const std::string& folder);
+
+ // Statistics
+ struct Statistics {
+ HistogramImpl bytes_pipelined_;
+ HistogramImpl bytes_written_;
+ HistogramImpl bytes_read_;
+ HistogramImpl read_hit_latency_;
+ HistogramImpl read_miss_latency_;
+ HistogramImpl write_latency_;
+ std::atomic<uint64_t> cache_hits_{0};
+ std::atomic<uint64_t> cache_misses_{0};
+ std::atomic<uint64_t> cache_errors_{0};
+ std::atomic<uint64_t> insert_dropped_{0};
+
+ double CacheHitPct() const {
+ const auto lookups = cache_hits_ + cache_misses_;
+ return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
+ }
+
+ double CacheMissPct() const {
+ const auto lookups = cache_hits_ + cache_misses_;
+ return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
+ }
+ };
+
+ port::RWMutex lock_; // Synchronization
+ const PersistentCacheConfig opt_; // BlockCache options
+ BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert
+ ROCKSDB_NAMESPACE::port::Thread insert_th_; // Insert thread
+ uint32_t writer_cache_id_ = 0; // Current cache file identifier
+ WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference
+ CacheWriteBufferAllocator buffer_allocator_; // Buffer provider
+ ThreadedWriter writer_; // Writer threads
+ BlockCacheTierMetadata metadata_; // Cache meta data manager
+ std::atomic<uint64_t> size_{0}; // Size of the cache
+ Statistics stats_; // Statistics
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.cc b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.cc
new file mode 100644
index 000000000..f4f8517ab
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.cc
@@ -0,0 +1,610 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "utilities/persistent_cache/block_cache_tier_file.h"
+
+#ifndef OS_WIN
+#include <unistd.h>
+#endif
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "env/composite_env_wrapper.h"
+#include "logging/logging.h"
+#include "port/port.h"
+#include "rocksdb/system_clock.h"
+#include "util/crc32c.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+//
+// File creation factories
+//
+Status NewWritableCacheFile(Env* const env, const std::string& filepath,
+ std::unique_ptr<WritableFile>* file,
+ const bool use_direct_writes = false) {
+ EnvOptions opt;
+ opt.use_direct_writes = use_direct_writes;
+ Status s = env->NewWritableFile(filepath, file, opt);
+ return s;
+}
+
+Status NewRandomAccessCacheFile(const std::shared_ptr<FileSystem>& fs,
+ const std::string& filepath,
+ std::unique_ptr<FSRandomAccessFile>* file,
+ const bool use_direct_reads = true) {
+ assert(fs.get());
+
+ FileOptions opt;
+ opt.use_direct_reads = use_direct_reads;
+ return fs->NewRandomAccessFile(filepath, opt, file, nullptr);
+}
+
+//
+// BlockCacheFile
+//
+Status BlockCacheFile::Delete(uint64_t* size) {
+ assert(env_);
+
+ Status status = env_->GetFileSize(Path(), size);
+ if (!status.ok()) {
+ return status;
+ }
+ return env_->DeleteFile(Path());
+}
+
+//
+// CacheRecord
+//
+// Cache record represents the record on disk
+//
+// +--------+---------+----------+------------+---------------+-------------+
+// | magic | crc | key size | value size | key data | value data |
+// +--------+---------+----------+------------+---------------+-------------+
+// <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size -->
+//
+struct CacheRecordHeader {
+ CacheRecordHeader() : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
+ CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
+ const uint32_t val_size)
+ : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
+
+ uint32_t magic_;
+ uint32_t crc_;
+ uint32_t key_size_;
+ uint32_t val_size_;
+};
+
+struct CacheRecord {
+ CacheRecord() {}
+ CacheRecord(const Slice& key, const Slice& val)
+ : hdr_(MAGIC, static_cast<uint32_t>(key.size()),
+ static_cast<uint32_t>(val.size())),
+ key_(key),
+ val_(val) {
+ hdr_.crc_ = ComputeCRC();
+ }
+
+ uint32_t ComputeCRC() const;
+ bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff);
+ bool Deserialize(const Slice& buf);
+
+ static uint32_t CalcSize(const Slice& key, const Slice& val) {
+ return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() +
+ val.size());
+ }
+
+ static const uint32_t MAGIC = 0xfefa;
+
+ bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
+ const char* data, const size_t size);
+
+ CacheRecordHeader hdr_;
+ Slice key_;
+ Slice val_;
+};
+
+static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned");
+
+uint32_t CacheRecord::ComputeCRC() const {
+ uint32_t crc = 0;
+ CacheRecordHeader tmp = hdr_;
+ tmp.crc_ = 0;
+ crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp));
+ crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()),
+ key_.size());
+ crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()),
+ val_.size());
+ return crc;
+}
+
+bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs,
+ size_t* woff) {
+ assert(bufs->size());
+ return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_),
+ sizeof(hdr_)) &&
+ Append(bufs, woff, reinterpret_cast<const char*>(key_.data()),
+ key_.size()) &&
+ Append(bufs, woff, reinterpret_cast<const char*>(val_.data()),
+ val_.size());
+}
+
+bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
+ const char* data, const size_t data_size) {
+ assert(*woff < bufs->size());
+
+ const char* p = data;
+ size_t size = data_size;
+
+ while (size && *woff < bufs->size()) {
+ CacheWriteBuffer* buf = (*bufs)[*woff];
+ const size_t free = buf->Free();
+ if (size <= free) {
+ buf->Append(p, size);
+ size = 0;
+ } else {
+ buf->Append(p, free);
+ p += free;
+ size -= free;
+ assert(!buf->Free());
+ assert(buf->Used() == buf->Capacity());
+ }
+
+ if (!buf->Free()) {
+ *woff += 1;
+ }
+ }
+
+ assert(!size);
+
+ return !size;
+}
+
+bool CacheRecord::Deserialize(const Slice& data) {
+ assert(data.size() >= sizeof(CacheRecordHeader));
+ if (data.size() < sizeof(CacheRecordHeader)) {
+ return false;
+ }
+
+ memcpy(&hdr_, data.data(), sizeof(hdr_));
+
+ assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size());
+ if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) {
+ return false;
+ }
+
+ key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_);
+ val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_);
+
+ if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) {
+ fprintf(stderr, "** magic %d ** \n", hdr_.magic_);
+ fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_);
+ fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_);
+ fprintf(stderr, "** key %s ** \n", key_.ToString().c_str());
+ fprintf(stderr, "** val %s ** \n", val_.ToString().c_str());
+ for (size_t i = 0; i < hdr_.val_size_; ++i) {
+ fprintf(stderr, "%d.", (uint8_t)val_.data()[i]);
+ }
+ fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC());
+ }
+
+ assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_);
+ return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_;
+}
+
+//
+// RandomAccessFile
+//
+
+bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
+ WriteLock _(&rwlock_);
+ return OpenImpl(enable_direct_reads);
+}
+
+bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
+ rwlock_.AssertHeld();
+
+ ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
+ assert(env_);
+
+ std::unique_ptr<FSRandomAccessFile> file;
+ Status status = NewRandomAccessCacheFile(env_->GetFileSystem(), Path(), &file,
+ enable_direct_reads);
+ if (!status.ok()) {
+ Error(log_, "Error opening random access file %s. %s", Path().c_str(),
+ status.ToString().c_str());
+ return false;
+ }
+ freader_.reset(new RandomAccessFileReader(std::move(file), Path(),
+ env_->GetSystemClock().get()));
+
+ return true;
+}
+
+bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
+ char* scratch) {
+ ReadLock _(&rwlock_);
+
+ assert(lba.cache_id_ == cache_id_);
+
+ if (!freader_) {
+ return false;
+ }
+
+ Slice result;
+ Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch,
+ nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
+ if (!s.ok()) {
+ Error(log_, "Error reading from file %s. %s", Path().c_str(),
+ s.ToString().c_str());
+ return false;
+ }
+
+ assert(result.data() == scratch);
+
+ return ParseRec(lba, key, val, scratch);
+}
+
+bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val,
+ char* scratch) {
+ Slice data(scratch, lba.size_);
+
+ CacheRecord rec;
+ if (!rec.Deserialize(data)) {
+ assert(!"Error deserializing data");
+ Error(log_, "Error de-serializing record from file %s off %d",
+ Path().c_str(), lba.off_);
+ return false;
+ }
+
+ *key = Slice(rec.key_);
+ *val = Slice(rec.val_);
+
+ return true;
+}
+
+//
+// WriteableCacheFile
+//
+
+WriteableCacheFile::~WriteableCacheFile() {
+ WriteLock _(&rwlock_);
+ if (!eof_) {
+ // This file never flushed. We give priority to shutdown since this is a
+ // cache
+ // TODO(krad): Figure a way to flush the pending data
+ if (file_) {
+ assert(refs_ == 1);
+ --refs_;
+ }
+ }
+ assert(!refs_);
+ ClearBuffers();
+}
+
+bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/,
+ const bool enable_direct_reads) {
+ WriteLock _(&rwlock_);
+
+ enable_direct_reads_ = enable_direct_reads;
+
+ ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)",
+ Path().c_str(), max_size_);
+
+ assert(env_);
+
+ Status s = env_->FileExists(Path());
+ if (s.ok()) {
+ ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(),
+ s.ToString().c_str());
+ }
+
+ s = NewWritableCacheFile(env_, Path(), &file_);
+ if (!s.ok()) {
+ ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(),
+ s.ToString().c_str());
+ return false;
+ }
+
+ assert(!refs_);
+ ++refs_;
+
+ return true;
+}
+
+bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) {
+ WriteLock _(&rwlock_);
+
+ if (eof_) {
+ // We can't append since the file is full
+ return false;
+ }
+
+ // estimate the space required to store the (key, val)
+ uint32_t rec_size = CacheRecord::CalcSize(key, val);
+
+ if (!ExpandBuffer(rec_size)) {
+ // unable to expand the buffer
+ ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size);
+ return false;
+ }
+
+ lba->cache_id_ = cache_id_;
+ lba->off_ = disk_woff_;
+ lba->size_ = rec_size;
+
+ CacheRecord rec(key, val);
+ if (!rec.Serialize(&bufs_, &buf_woff_)) {
+ // unexpected error: unable to serialize the data
+ assert(!"Error serializing record");
+ return false;
+ }
+
+ disk_woff_ += rec_size;
+ eof_ = disk_woff_ >= max_size_;
+
+ // dispatch buffer for flush
+ DispatchBuffer();
+
+ return true;
+}
+
+bool WriteableCacheFile::ExpandBuffer(const size_t size) {
+ rwlock_.AssertHeld();
+ assert(!eof_);
+
+ // determine if there is enough space
+ size_t free = 0; // compute the free space left in buffer
+ for (size_t i = buf_woff_; i < bufs_.size(); ++i) {
+ free += bufs_[i]->Free();
+ if (size <= free) {
+ // we have enough space in the buffer
+ return true;
+ }
+ }
+
+ // expand the buffer until there is enough space to write `size` bytes
+ assert(free < size);
+ assert(alloc_);
+
+ while (free < size) {
+ CacheWriteBuffer* const buf = alloc_->Allocate();
+ if (!buf) {
+ ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers");
+ return false;
+ }
+
+ size_ += static_cast<uint32_t>(buf->Free());
+ free += buf->Free();
+ bufs_.push_back(buf);
+ }
+
+ assert(free >= size);
+ return true;
+}
+
+void WriteableCacheFile::DispatchBuffer() {
+ rwlock_.AssertHeld();
+
+ assert(bufs_.size());
+ assert(buf_doff_ <= buf_woff_);
+ assert(buf_woff_ <= bufs_.size());
+
+ if (pending_ios_) {
+ return;
+ }
+
+ if (!eof_ && buf_doff_ == buf_woff_) {
+ // dispatch buffer is pointing to write buffer and we haven't hit eof
+ return;
+ }
+
+ assert(eof_ || buf_doff_ < buf_woff_);
+ assert(buf_doff_ < bufs_.size());
+ assert(file_);
+ assert(alloc_);
+
+ auto* buf = bufs_[buf_doff_];
+ const uint64_t file_off = buf_doff_ * alloc_->BufferSize();
+
+ assert(!buf->Free() ||
+ (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size()));
+ // we have reached end of file, and there is space in the last buffer
+ // pad it with zero for direct IO
+ buf->FillTrailingZeros();
+
+ assert(buf->Used() % kFileAlignmentSize == 0);
+
+ writer_->Write(file_.get(), buf, file_off,
+ std::bind(&WriteableCacheFile::BufferWriteDone, this));
+ pending_ios_++;
+ buf_doff_++;
+}
+
+void WriteableCacheFile::BufferWriteDone() {
+ WriteLock _(&rwlock_);
+
+ assert(bufs_.size());
+
+ pending_ios_--;
+
+ if (buf_doff_ < bufs_.size()) {
+ DispatchBuffer();
+ }
+
+ if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) {
+ // end-of-file reached, move to read mode
+ CloseAndOpenForReading();
+ }
+}
+
+void WriteableCacheFile::CloseAndOpenForReading() {
+ // Our env abstraction do not allow reading from a file opened for appending
+ // We need close the file and re-open it for reading
+ Close();
+ RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
+}
+
+bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
+ char* scratch) {
+ rwlock_.AssertHeld();
+
+ if (!ReadBuffer(lba, scratch)) {
+ Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_,
+ lba.off_);
+ return false;
+ }
+
+ return ParseRec(lba, key, block, scratch);
+}
+
+bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) {
+ rwlock_.AssertHeld();
+
+ assert(lba.off_ < disk_woff_);
+ assert(alloc_);
+
+ // we read from the buffers like reading from a flat file. The list of buffers
+ // are treated as contiguous stream of data
+
+ char* tmp = data;
+ size_t pending_nbytes = lba.size_;
+ // start buffer
+ size_t start_idx = lba.off_ / alloc_->BufferSize();
+ // offset into the start buffer
+ size_t start_off = lba.off_ % alloc_->BufferSize();
+
+ assert(start_idx <= buf_woff_);
+
+ for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) {
+ assert(i <= buf_woff_);
+ auto* buf = bufs_[i];
+ assert(i == buf_woff_ || !buf->Free());
+ // bytes to write to the buffer
+ size_t nbytes = pending_nbytes > (buf->Used() - start_off)
+ ? (buf->Used() - start_off)
+ : pending_nbytes;
+ memcpy(tmp, buf->Data() + start_off, nbytes);
+
+ // left over to be written
+ pending_nbytes -= nbytes;
+ start_off = 0;
+ tmp += nbytes;
+ }
+
+ assert(!pending_nbytes);
+ if (pending_nbytes) {
+ return false;
+ }
+
+ assert(tmp == data + lba.size_);
+ return true;
+}
+
+void WriteableCacheFile::Close() {
+ rwlock_.AssertHeld();
+
+ assert(size_ >= max_size_);
+ assert(disk_woff_ >= max_size_);
+ assert(buf_doff_ == bufs_.size());
+ assert(bufs_.size() - buf_woff_ <= 1);
+ assert(!pending_ios_);
+
+ Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_,
+ disk_woff_);
+
+ ClearBuffers();
+ file_.reset();
+
+ assert(refs_);
+ --refs_;
+}
+
+void WriteableCacheFile::ClearBuffers() {
+ assert(alloc_);
+
+ for (size_t i = 0; i < bufs_.size(); ++i) {
+ alloc_->Deallocate(bufs_[i]);
+ }
+
+ bufs_.clear();
+}
+
+//
+// ThreadedFileWriter implementation
+//
+ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
+ const size_t qdepth, const size_t io_size)
+ : Writer(cache), io_size_(io_size) {
+ for (size_t i = 0; i < qdepth; ++i) {
+ port::Thread th(&ThreadedWriter::ThreadMain, this);
+ threads_.push_back(std::move(th));
+ }
+}
+
+void ThreadedWriter::Stop() {
+ // notify all threads to exit
+ for (size_t i = 0; i < threads_.size(); ++i) {
+ q_.Push(IO(/*signal=*/true));
+ }
+
+ // wait for all threads to exit
+ for (auto& th : threads_) {
+ th.join();
+ assert(!th.joinable());
+ }
+ threads_.clear();
+}
+
+void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,
+ const uint64_t file_off,
+ const std::function<void()> callback) {
+ q_.Push(IO(file, buf, file_off, callback));
+}
+
+void ThreadedWriter::ThreadMain() {
+ while (true) {
+ // Fetch the IO to process
+ IO io(q_.Pop());
+ if (io.signal_) {
+ // that's secret signal to exit
+ break;
+ }
+
+ // Reserve space for writing the buffer
+ while (!cache_->Reserve(io.buf_->Used())) {
+ // We can fail to reserve space if every file in the system
+ // is being currently accessed
+ /* sleep override */
+ SystemClock::Default()->SleepForMicroseconds(1000000);
+ }
+
+ DispatchIO(io);
+
+ io.callback_();
+ }
+}
+
+void ThreadedWriter::DispatchIO(const IO& io) {
+ size_t written = 0;
+ while (written < io.buf_->Used()) {
+ Slice data(io.buf_->Data() + written, io_size_);
+ Status s = io.file_->Append(data);
+ assert(s.ok());
+ if (!s.ok()) {
+ // That is definite IO error to device. There is not much we can
+ // do but ignore the failure. This can lead to corruption of data on
+ // disk, but the cache will skip while reading
+ fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str());
+ }
+ written += io_size_;
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h
new file mode 100644
index 000000000..1d265ab74
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h
@@ -0,0 +1,293 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <list>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "file/random_access_file_reader.h"
+#include "port/port.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/env.h"
+#include "util/crc32c.h"
+#include "util/mutexlock.h"
+#include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
+#include "utilities/persistent_cache/lrulist.h"
+#include "utilities/persistent_cache/persistent_cache_tier.h"
+#include "utilities/persistent_cache/persistent_cache_util.h"
+
+// The io code path of persistent cache uses pipelined architecture
+//
+// client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
+//
+// This would enable the system to scale for GB/s of throughput which is
+// expected with modern devies like NVM.
+//
+// The file level operations are encapsulated in the following abstractions
+//
+// BlockCacheFile
+// ^
+// |
+// |
+// RandomAccessCacheFile (For reading)
+// ^
+// |
+// |
+// WriteableCacheFile (For writing)
+//
+// Write IO code path :
+//
+namespace ROCKSDB_NAMESPACE {
+
+class WriteableCacheFile;
+struct BlockInfo;
+
+// Represents a logical record on device
+//
+// (L)ogical (B)lock (Address = { cache-file-id, offset, size }
+struct LogicalBlockAddress {
+ LogicalBlockAddress() {}
+ explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off,
+ const uint16_t size)
+ : cache_id_(cache_id), off_(off), size_(size) {}
+
+ uint32_t cache_id_ = 0;
+ uint32_t off_ = 0;
+ uint32_t size_ = 0;
+};
+
+using LBA = LogicalBlockAddress;
+
+// class Writer
+//
+// Writer is the abstraction used for writing data to file. The component can be
+// multithreaded. It is the last step of write pipeline
+class Writer {
+ public:
+ explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {}
+ virtual ~Writer() {}
+
+ // write buffer to file at the given offset
+ virtual void Write(WritableFile* const file, CacheWriteBuffer* buf,
+ const uint64_t file_off,
+ const std::function<void()> callback) = 0;
+ // stop the writer
+ virtual void Stop() = 0;
+
+ PersistentCacheTier* const cache_;
+};
+
+// class BlockCacheFile
+//
+// Generic interface to support building file specialized for read/writing
+class BlockCacheFile : public LRUElement<BlockCacheFile> {
+ public:
+ explicit BlockCacheFile(const uint32_t cache_id)
+ : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {}
+
+ explicit BlockCacheFile(Env* const env, const std::string& dir,
+ const uint32_t cache_id)
+ : LRUElement<BlockCacheFile>(),
+ env_(env),
+ dir_(dir),
+ cache_id_(cache_id) {}
+
+ virtual ~BlockCacheFile() {}
+
+ // append key/value to file and return LBA locator to user
+ virtual bool Append(const Slice& /*key*/, const Slice& /*val*/,
+ LBA* const /*lba*/) {
+ assert(!"not implemented");
+ return false;
+ }
+
+ // read from the record locator (LBA) and return key, value and status
+ virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/,
+ char* /*scratch*/) {
+ assert(!"not implemented");
+ return false;
+ }
+
+ // get file path
+ std::string Path() const {
+ return dir_ + "/" + std::to_string(cache_id_) + ".rc";
+ }
+ // get cache ID
+ uint32_t cacheid() const { return cache_id_; }
+ // Add block information to file data
+ // Block information is the list of index reference for this file
+ virtual void Add(BlockInfo* binfo) {
+ WriteLock _(&rwlock_);
+ block_infos_.push_back(binfo);
+ }
+ // get block information
+ std::list<BlockInfo*>& block_infos() { return block_infos_; }
+ // delete file and return the size of the file
+ virtual Status Delete(uint64_t* size);
+
+ protected:
+ port::RWMutex rwlock_; // synchronization mutex
+ Env* const env_ = nullptr; // Env for OS
+ const std::string dir_; // Directory name
+ const uint32_t cache_id_; // Cache id for the file
+ std::list<BlockInfo*> block_infos_; // List of index entries mapping to the
+ // file content
+};
+
+// class RandomAccessFile
+//
+// Thread safe implementation for reading random data from file
+class RandomAccessCacheFile : public BlockCacheFile {
+ public:
+ explicit RandomAccessCacheFile(Env* const env, const std::string& dir,
+ const uint32_t cache_id,
+ const std::shared_ptr<Logger>& log)
+ : BlockCacheFile(env, dir, cache_id), log_(log) {}
+
+ virtual ~RandomAccessCacheFile() {}
+
+ // open file for reading
+ bool Open(const bool enable_direct_reads);
+ // read data from the disk
+ bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;
+
+ private:
+ std::unique_ptr<RandomAccessFileReader> freader_;
+
+ protected:
+ bool OpenImpl(const bool enable_direct_reads);
+ bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch);
+
+ std::shared_ptr<Logger> log_; // log file
+};
+
+// class WriteableCacheFile
+//
+// All writes to the files are cached in buffers. The buffers are flushed to
+// disk as they get filled up. When file size reaches a certain size, a new file
+// will be created provided there is free space
+class WriteableCacheFile : public RandomAccessCacheFile {
+ public:
+ explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc,
+ Writer* writer, const std::string& dir,
+ const uint32_t cache_id, const uint32_t max_size,
+ const std::shared_ptr<Logger>& log)
+ : RandomAccessCacheFile(env, dir, cache_id, log),
+ alloc_(alloc),
+ writer_(writer),
+ max_size_(max_size) {}
+
+ virtual ~WriteableCacheFile();
+
+ // create file on disk
+ bool Create(const bool enable_direct_writes, const bool enable_direct_reads);
+
+ // read data from logical file
+ bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override {
+ ReadLock _(&rwlock_);
+ const bool closed = eof_ && bufs_.empty();
+ if (closed) {
+ // the file is closed, read from disk
+ return RandomAccessCacheFile::Read(lba, key, block, scratch);
+ }
+ // file is still being written, read from buffers
+ return ReadBuffer(lba, key, block, scratch);
+ }
+
+ // append data to end of file
+ bool Append(const Slice&, const Slice&, LBA* const) override;
+ // End-of-file
+ bool Eof() const { return eof_; }
+
+ private:
+ friend class ThreadedWriter;
+
+ static const size_t kFileAlignmentSize = 4 * 1024; // align file size
+
+ bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch);
+ bool ReadBuffer(const LBA& lba, char* data);
+ bool ExpandBuffer(const size_t size);
+ void DispatchBuffer();
+ void BufferWriteDone();
+ void CloseAndOpenForReading();
+ void ClearBuffers();
+ void Close();
+
+ // File layout in memory
+ //
+ // +------+------+------+------+------+------+
+ // | b0 | b1 | b2 | b3 | b4 | b5 |
+ // +------+------+------+------+------+------+
+ // ^ ^
+ // | |
+ // buf_doff_ buf_woff_
+ // (next buffer to (next buffer to fill)
+ // flush to disk)
+ //
+ // The buffers are flushed to disk serially for a given file
+
+ CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider
+ Writer* const writer_ = nullptr; // File writer thread
+ std::unique_ptr<WritableFile> file_; // RocksDB Env file abstraction
+ std::vector<CacheWriteBuffer*> bufs_; // Written buffers
+ uint32_t size_ = 0; // Size of the file
+ const uint32_t max_size_; // Max size of the file
+ bool eof_ = false; // End of file
+ uint32_t disk_woff_ = 0; // Offset to write on disk
+ size_t buf_woff_ = 0; // off into bufs_ to write
+ size_t buf_doff_ = 0; // off into bufs_ to dispatch
+ size_t pending_ios_ = 0; // Number of ios to disk in-progress
+ bool enable_direct_reads_ = false; // Should we enable direct reads
+ // when reading from disk
+};
+
+//
+// Abstraction to do writing to device. It is part of pipelined architecture.
+//
+class ThreadedWriter : public Writer {
+ public:
+ // Representation of IO to device
+ struct IO {
+ explicit IO(const bool signal) : signal_(signal) {}
+ explicit IO(WritableFile* const file, CacheWriteBuffer* const buf,
+ const uint64_t file_off, const std::function<void()> callback)
+ : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {}
+
+ IO(const IO&) = default;
+ IO& operator=(const IO&) = default;
+ size_t Size() const { return sizeof(IO); }
+
+ WritableFile* file_ = nullptr; // File to write to
+ CacheWriteBuffer* buf_ = nullptr; // buffer to write
+ uint64_t file_off_ = 0; // file offset
+ bool signal_ = false; // signal to exit thread loop
+ std::function<void()> callback_; // Callback on completion
+ };
+
+ explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth,
+ const size_t io_size);
+ virtual ~ThreadedWriter() { assert(threads_.empty()); }
+
+ void Stop() override;
+ void Write(WritableFile* const file, CacheWriteBuffer* buf,
+ const uint64_t file_off,
+ const std::function<void()> callback) override;
+
+ private:
+ void ThreadMain();
+ void DispatchIO(const IO& io);
+
+ const size_t io_size_ = 0;
+ BoundedQueue<IO> q_;
+ std::vector<port::Thread> threads_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_file_buffer.h b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file_buffer.h
new file mode 100644
index 000000000..d4f02455a
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file_buffer.h
@@ -0,0 +1,127 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#include <list>
+#include <memory>
+#include <string>
+
+#include "memory/arena.h"
+#include "rocksdb/comparator.h"
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+//
+// CacheWriteBuffer
+//
+// Buffer abstraction that can be manipulated via append
+// (not thread safe)
+class CacheWriteBuffer {
+ public:
+ explicit CacheWriteBuffer(const size_t size) : size_(size), pos_(0) {
+ buf_.reset(new char[size_]);
+ assert(!pos_);
+ assert(size_);
+ }
+
+ virtual ~CacheWriteBuffer() {}
+
+ void Append(const char* buf, const size_t size) {
+ assert(pos_ + size <= size_);
+ memcpy(buf_.get() + pos_, buf, size);
+ pos_ += size;
+ assert(pos_ <= size_);
+ }
+
+ void FillTrailingZeros() {
+ assert(pos_ <= size_);
+ memset(buf_.get() + pos_, '0', size_ - pos_);
+ pos_ = size_;
+ }
+
+ void Reset() { pos_ = 0; }
+ size_t Free() const { return size_ - pos_; }
+ size_t Capacity() const { return size_; }
+ size_t Used() const { return pos_; }
+ char* Data() const { return buf_.get(); }
+
+ private:
+ std::unique_ptr<char[]> buf_;
+ const size_t size_;
+ size_t pos_;
+};
+
+//
+// CacheWriteBufferAllocator
+//
+// Buffer pool abstraction(not thread safe)
+//
+class CacheWriteBufferAllocator {
+ public:
+ explicit CacheWriteBufferAllocator(const size_t buffer_size,
+ const size_t buffer_count)
+ : cond_empty_(&lock_), buffer_size_(buffer_size) {
+ MutexLock _(&lock_);
+ buffer_size_ = buffer_size;
+ for (uint32_t i = 0; i < buffer_count; i++) {
+ auto* buf = new CacheWriteBuffer(buffer_size_);
+ assert(buf);
+ if (buf) {
+ bufs_.push_back(buf);
+ cond_empty_.Signal();
+ }
+ }
+ }
+
+ virtual ~CacheWriteBufferAllocator() {
+ MutexLock _(&lock_);
+ assert(bufs_.size() * buffer_size_ == Capacity());
+ for (auto* buf : bufs_) {
+ delete buf;
+ }
+ bufs_.clear();
+ }
+
+ CacheWriteBuffer* Allocate() {
+ MutexLock _(&lock_);
+ if (bufs_.empty()) {
+ return nullptr;
+ }
+
+ assert(!bufs_.empty());
+ CacheWriteBuffer* const buf = bufs_.front();
+ bufs_.pop_front();
+ return buf;
+ }
+
+ void Deallocate(CacheWriteBuffer* const buf) {
+ assert(buf);
+ MutexLock _(&lock_);
+ buf->Reset();
+ bufs_.push_back(buf);
+ cond_empty_.Signal();
+ }
+
+ void WaitUntilUsable() {
+ // We are asked to wait till we have buffers available
+ MutexLock _(&lock_);
+ while (bufs_.empty()) {
+ cond_empty_.Wait();
+ }
+ }
+
+ size_t Capacity() const { return bufs_.size() * buffer_size_; }
+ size_t Free() const { return bufs_.size() * buffer_size_; }
+ size_t BufferSize() const { return buffer_size_; }
+
+ private:
+ port::Mutex lock_; // Sync lock
+ port::CondVar cond_empty_; // Condition var for empty buffers
+ size_t buffer_size_; // Size of each buffer
+ std::list<CacheWriteBuffer*> bufs_; // Buffer stash
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.cc b/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.cc
new file mode 100644
index 000000000..d73b5d0b4
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.cc
@@ -0,0 +1,86 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "utilities/persistent_cache/block_cache_tier_metadata.h"
+
+#include <functional>
+
+namespace ROCKSDB_NAMESPACE {
+
+bool BlockCacheTierMetadata::Insert(BlockCacheFile* file) {
+ return cache_file_index_.Insert(file);
+}
+
+BlockCacheFile* BlockCacheTierMetadata::Lookup(const uint32_t cache_id) {
+ BlockCacheFile* ret = nullptr;
+ BlockCacheFile lookup_key(cache_id);
+ bool ok = cache_file_index_.Find(&lookup_key, &ret);
+ if (ok) {
+ assert(ret->refs_);
+ return ret;
+ }
+ return nullptr;
+}
+
+BlockCacheFile* BlockCacheTierMetadata::Evict() {
+ using std::placeholders::_1;
+ auto fn = std::bind(&BlockCacheTierMetadata::RemoveAllKeys, this, _1);
+ return cache_file_index_.Evict(fn);
+}
+
+void BlockCacheTierMetadata::Clear() {
+ cache_file_index_.Clear([](BlockCacheFile* arg) { delete arg; });
+ block_index_.Clear([](BlockInfo* arg) { delete arg; });
+}
+
+BlockInfo* BlockCacheTierMetadata::Insert(const Slice& key, const LBA& lba) {
+ std::unique_ptr<BlockInfo> binfo(new BlockInfo(key, lba));
+ if (!block_index_.Insert(binfo.get())) {
+ return nullptr;
+ }
+ return binfo.release();
+}
+
+bool BlockCacheTierMetadata::Lookup(const Slice& key, LBA* lba) {
+ BlockInfo lookup_key(key);
+ BlockInfo* block;
+ port::RWMutex* rlock = nullptr;
+ if (!block_index_.Find(&lookup_key, &block, &rlock)) {
+ return false;
+ }
+
+ ReadUnlock _(rlock);
+ assert(block->key_ == key.ToString());
+ if (lba) {
+ *lba = block->lba_;
+ }
+ return true;
+}
+
+BlockInfo* BlockCacheTierMetadata::Remove(const Slice& key) {
+ BlockInfo lookup_key(key);
+ BlockInfo* binfo = nullptr;
+ bool ok __attribute__((__unused__));
+ ok = block_index_.Erase(&lookup_key, &binfo);
+ assert(ok);
+ return binfo;
+}
+
+void BlockCacheTierMetadata::RemoveAllKeys(BlockCacheFile* f) {
+ for (BlockInfo* binfo : f->block_infos()) {
+ BlockInfo* tmp = nullptr;
+ bool status = block_index_.Erase(binfo, &tmp);
+ (void)status;
+ assert(status);
+ assert(tmp == binfo);
+ delete binfo;
+ }
+ f->block_infos().clear();
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.h b/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.h
new file mode 100644
index 000000000..2fcd50105
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.h
@@ -0,0 +1,124 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <functional>
+#include <string>
+#include <unordered_map>
+
+#include "rocksdb/slice.h"
+#include "utilities/persistent_cache/block_cache_tier_file.h"
+#include "utilities/persistent_cache/hash_table.h"
+#include "utilities/persistent_cache/hash_table_evictable.h"
+#include "utilities/persistent_cache/lrulist.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+//
+// Block Cache Tier Metadata
+//
+// The BlockCacheTierMetadata holds all the metadata associated with block
+// cache. It
+// fundamentally contains 2 indexes and an LRU.
+//
+// Block Cache Index
+//
+// This is a forward index that maps a given key to a LBA (Logical Block
+// Address). LBA is a disk pointer that points to a record on the cache.
+//
+// LBA = { cache-id, offset, size }
+//
+// Cache File Index
+//
+// This is a forward index that maps a given cache-id to a cache file object.
+// Typically you would lookup using LBA and use the object to read or write
+struct BlockInfo {
+ explicit BlockInfo(const Slice& key, const LBA& lba = LBA())
+ : key_(key.ToString()), lba_(lba) {}
+
+ std::string key_;
+ LBA lba_;
+};
+
+class BlockCacheTierMetadata {
+ public:
+ explicit BlockCacheTierMetadata(const uint32_t blocks_capacity = 1024 * 1024,
+ const uint32_t cachefile_capacity = 10 * 1024)
+ : cache_file_index_(cachefile_capacity), block_index_(blocks_capacity) {}
+
+ virtual ~BlockCacheTierMetadata() {}
+
+ // Insert a given cache file
+ bool Insert(BlockCacheFile* file);
+
+ // Lookup cache file based on cache_id
+ BlockCacheFile* Lookup(const uint32_t cache_id);
+
+ // Insert block information to block index
+ BlockInfo* Insert(const Slice& key, const LBA& lba);
+ // bool Insert(BlockInfo* binfo);
+
+ // Lookup block information from block index
+ bool Lookup(const Slice& key, LBA* lba);
+
+ // Remove a given from the block index
+ BlockInfo* Remove(const Slice& key);
+
+ // Find and evict a cache file using LRU policy
+ BlockCacheFile* Evict();
+
+ // Clear the metadata contents
+ virtual void Clear();
+
+ protected:
+ // Remove all block information from a given file
+ virtual void RemoveAllKeys(BlockCacheFile* file);
+
+ private:
+ // Cache file index definition
+ //
+ // cache-id => BlockCacheFile
+ struct BlockCacheFileHash {
+ uint64_t operator()(const BlockCacheFile* rec) {
+ return std::hash<uint32_t>()(rec->cacheid());
+ }
+ };
+
+ struct BlockCacheFileEqual {
+ uint64_t operator()(const BlockCacheFile* lhs, const BlockCacheFile* rhs) {
+ return lhs->cacheid() == rhs->cacheid();
+ }
+ };
+
+ using CacheFileIndexType =
+ EvictableHashTable<BlockCacheFile, BlockCacheFileHash,
+ BlockCacheFileEqual>;
+
+ // Block Lookup Index
+ //
+ // key => LBA
+ struct Hash {
+ size_t operator()(BlockInfo* node) const {
+ return std::hash<std::string>()(node->key_);
+ }
+ };
+
+ struct Equal {
+ size_t operator()(BlockInfo* lhs, BlockInfo* rhs) const {
+ return lhs->key_ == rhs->key_;
+ }
+ };
+
+ using BlockIndexType = HashTable<BlockInfo*, Hash, Equal>;
+
+ CacheFileIndexType cache_file_index_;
+ BlockIndexType block_index_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/hash_table.h b/src/rocksdb/utilities/persistent_cache/hash_table.h
new file mode 100644
index 000000000..b00b294ce
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/hash_table.h
@@ -0,0 +1,239 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <assert.h>
+
+#include <list>
+#include <vector>
+
+#ifdef OS_LINUX
+#include <sys/mman.h>
+#endif
+
+#include "rocksdb/env.h"
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// HashTable<T, Hash, Equal>
+//
+// Traditional implementation of hash table with synchronization built on top
+// don't perform very well in multi-core scenarios. This is an implementation
+// designed for multi-core scenarios with high lock contention.
+//
+// |<-------- alpha ------------->|
+// Buckets Collision list
+// ---- +----+ +---+---+--- ...... ---+---+---+
+// / | |--->| | | | | |
+// / +----+ +---+---+--- ...... ---+---+---+
+// / | |
+// Locks/ +----+
+// +--+/ . .
+// | | . .
+// +--+ . .
+// | | . .
+// +--+ . .
+// | | . .
+// +--+ . .
+// \ +----+
+// \ | |
+// \ +----+
+// \ | |
+// \---- +----+
+//
+// The lock contention is spread over an array of locks. This helps improve
+// concurrent access. The spine is designed for a certain capacity and load
+// factor. When the capacity planning is done correctly we can expect
+// O(load_factor = 1) insert, access and remove time.
+//
+// Micro benchmark on debug build gives about .5 Million/sec rate of insert,
+// erase and lookup in parallel (total of about 1.5 Million ops/sec). If the
+// blocks were of 4K, the hash table can support a virtual throughput of
+// 6 GB/s.
+//
+// T Object type (contains both key and value)
+// Hash Function that returns an hash from type T
+// Equal Returns if two objects are equal
+// (We need explicit equal for pointer type)
+//
+template <class T, class Hash, class Equal>
+class HashTable {
+ public:
+ explicit HashTable(const size_t capacity = 1024 * 1024,
+ const float load_factor = 2.0, const uint32_t nlocks = 256)
+ : nbuckets_(
+ static_cast<uint32_t>(load_factor ? capacity / load_factor : 0)),
+ nlocks_(nlocks) {
+ // pre-conditions
+ assert(capacity);
+ assert(load_factor);
+ assert(nbuckets_);
+ assert(nlocks_);
+
+ buckets_.reset(new Bucket[nbuckets_]);
+#ifdef OS_LINUX
+ mlock(buckets_.get(), nbuckets_ * sizeof(Bucket));
+#endif
+
+ // initialize locks
+ locks_.reset(new port::RWMutex[nlocks_]);
+#ifdef OS_LINUX
+ mlock(locks_.get(), nlocks_ * sizeof(port::RWMutex));
+#endif
+
+ // post-conditions
+ assert(buckets_);
+ assert(locks_);
+ }
+
+ virtual ~HashTable() { AssertEmptyBuckets(); }
+
+ //
+ // Insert given record to hash table
+ //
+ bool Insert(const T& t) {
+ const uint64_t h = Hash()(t);
+ const uint32_t bucket_idx = h % nbuckets_;
+ const uint32_t lock_idx = bucket_idx % nlocks_;
+
+ WriteLock _(&locks_[lock_idx]);
+ auto& bucket = buckets_[bucket_idx];
+ return Insert(&bucket, t);
+ }
+
+ // Lookup hash table
+ //
+ // Please note that read lock should be held by the caller. This is because
+ // the caller owns the data, and should hold the read lock as long as he
+ // operates on the data.
+ bool Find(const T& t, T* ret, port::RWMutex** ret_lock) {
+ const uint64_t h = Hash()(t);
+ const uint32_t bucket_idx = h % nbuckets_;
+ const uint32_t lock_idx = bucket_idx % nlocks_;
+
+ port::RWMutex& lock = locks_[lock_idx];
+ lock.ReadLock();
+
+ auto& bucket = buckets_[bucket_idx];
+ if (Find(&bucket, t, ret)) {
+ *ret_lock = &lock;
+ return true;
+ }
+
+ lock.ReadUnlock();
+ return false;
+ }
+
+ //
+ // Erase a given key from the hash table
+ //
+ bool Erase(const T& t, T* ret) {
+ const uint64_t h = Hash()(t);
+ const uint32_t bucket_idx = h % nbuckets_;
+ const uint32_t lock_idx = bucket_idx % nlocks_;
+
+ WriteLock _(&locks_[lock_idx]);
+
+ auto& bucket = buckets_[bucket_idx];
+ return Erase(&bucket, t, ret);
+ }
+
+ // Fetch the mutex associated with a key
+ // This call is used to hold the lock for a given data for extended period of
+ // time.
+ port::RWMutex* GetMutex(const T& t) {
+ const uint64_t h = Hash()(t);
+ const uint32_t bucket_idx = h % nbuckets_;
+ const uint32_t lock_idx = bucket_idx % nlocks_;
+
+ return &locks_[lock_idx];
+ }
+
+ void Clear(void (*fn)(T)) {
+ for (uint32_t i = 0; i < nbuckets_; ++i) {
+ const uint32_t lock_idx = i % nlocks_;
+ WriteLock _(&locks_[lock_idx]);
+ for (auto& t : buckets_[i].list_) {
+ (*fn)(t);
+ }
+ buckets_[i].list_.clear();
+ }
+ }
+
+ protected:
+ // Models bucket of keys that hash to the same bucket number
+ struct Bucket {
+ std::list<T> list_;
+ };
+
+ // Substitute for std::find with custom comparator operator
+ typename std::list<T>::iterator Find(std::list<T>* list, const T& t) {
+ for (auto it = list->begin(); it != list->end(); ++it) {
+ if (Equal()(*it, t)) {
+ return it;
+ }
+ }
+ return list->end();
+ }
+
+ bool Insert(Bucket* bucket, const T& t) {
+ // Check if the key already exists
+ auto it = Find(&bucket->list_, t);
+ if (it != bucket->list_.end()) {
+ return false;
+ }
+
+ // insert to bucket
+ bucket->list_.push_back(t);
+ return true;
+ }
+
+ bool Find(Bucket* bucket, const T& t, T* ret) {
+ auto it = Find(&bucket->list_, t);
+ if (it != bucket->list_.end()) {
+ if (ret) {
+ *ret = *it;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ bool Erase(Bucket* bucket, const T& t, T* ret) {
+ auto it = Find(&bucket->list_, t);
+ if (it != bucket->list_.end()) {
+ if (ret) {
+ *ret = *it;
+ }
+
+ bucket->list_.erase(it);
+ return true;
+ }
+ return false;
+ }
+
+ // assert that all buckets are empty
+ void AssertEmptyBuckets() {
+#ifndef NDEBUG
+ for (size_t i = 0; i < nbuckets_; ++i) {
+ WriteLock _(&locks_[i % nlocks_]);
+ assert(buckets_[i].list_.empty());
+ }
+#endif
+ }
+
+ const uint32_t nbuckets_; // No. of buckets in the spine
+ std::unique_ptr<Bucket[]> buckets_; // Spine of the hash buckets
+ const uint32_t nlocks_; // No. of locks
+ std::unique_ptr<port::RWMutex[]> locks_; // Granular locks
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/hash_table_bench.cc b/src/rocksdb/utilities/persistent_cache/hash_table_bench.cc
new file mode 100644
index 000000000..74d7e2edf
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/hash_table_bench.cc
@@ -0,0 +1,310 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+
+#if !defined(OS_WIN) && !defined(ROCKSDB_LITE)
+
+#ifndef GFLAGS
+#include <cstdio>
+int main() { fprintf(stderr, "Please install gflags to run tools\n"); }
+#else
+
+#include <sys/time.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <functional>
+#include <string>
+#include <unordered_map>
+
+#include "port/port_posix.h"
+#include "port/sys_time.h"
+#include "rocksdb/env.h"
+#include "util/gflags_compat.h"
+#include "util/mutexlock.h"
+#include "util/random.h"
+#include "utilities/persistent_cache/hash_table.h"
+
+using std::string;
+
+DEFINE_int32(nsec, 10, "nsec");
+DEFINE_int32(nthread_write, 1, "insert %");
+DEFINE_int32(nthread_read, 1, "lookup %");
+DEFINE_int32(nthread_erase, 1, "erase %");
+
+namespace ROCKSDB_NAMESPACE {
+
+//
+// HashTableImpl interface
+//
+// Abstraction of a hash table implementation
+template <class Key, class Value>
+class HashTableImpl {
+ public:
+ virtual ~HashTableImpl() {}
+
+ virtual bool Insert(const Key& key, const Value& val) = 0;
+ virtual bool Erase(const Key& key) = 0;
+ virtual bool Lookup(const Key& key, Value* val) = 0;
+};
+
+// HashTableBenchmark
+//
+// Abstraction to test a given hash table implementation. The test mostly
+// focus on insert, lookup and erase. The test can operate in test mode and
+// benchmark mode.
+class HashTableBenchmark {
+ public:
+ explicit HashTableBenchmark(HashTableImpl<size_t, std::string>* impl,
+ const size_t sec = 10,
+ const size_t nthread_write = 1,
+ const size_t nthread_read = 1,
+ const size_t nthread_erase = 1)
+ : impl_(impl),
+ sec_(sec),
+ ninserts_(0),
+ nreads_(0),
+ nerases_(0),
+ nerases_failed_(0),
+ quit_(false) {
+ Prepop();
+
+ StartThreads(nthread_write, WriteMain);
+ StartThreads(nthread_read, ReadMain);
+ StartThreads(nthread_erase, EraseMain);
+
+ uint64_t start = NowInMillSec();
+ while (!quit_) {
+ quit_ = NowInMillSec() - start > sec_ * 1000;
+ /* sleep override */ sleep(1);
+ }
+
+ Env* env = Env::Default();
+ env->WaitForJoin();
+
+ if (sec_) {
+ printf("Result \n");
+ printf("====== \n");
+ printf("insert/sec = %f \n", ninserts_ / static_cast<double>(sec_));
+ printf("read/sec = %f \n", nreads_ / static_cast<double>(sec_));
+ printf("erases/sec = %f \n", nerases_ / static_cast<double>(sec_));
+ const uint64_t ops = ninserts_ + nreads_ + nerases_;
+ printf("ops/sec = %f \n", ops / static_cast<double>(sec_));
+ printf("erase fail = %d (%f%%)\n", static_cast<int>(nerases_failed_),
+ static_cast<float>(nerases_failed_ / nerases_ * 100));
+ printf("====== \n");
+ }
+ }
+
+ void RunWrite() {
+ while (!quit_) {
+ size_t k = insert_key_++;
+ std::string tmp(1000, k % 255);
+ bool status = impl_->Insert(k, tmp);
+ assert(status);
+ ninserts_++;
+ }
+ }
+
+ void RunRead() {
+ Random64 rgen(time(nullptr));
+ while (!quit_) {
+ std::string s;
+ size_t k = rgen.Next() % max_prepop_key;
+ bool status = impl_->Lookup(k, &s);
+ assert(status);
+ assert(s == std::string(1000, k % 255));
+ nreads_++;
+ }
+ }
+
+ void RunErase() {
+ while (!quit_) {
+ size_t k = erase_key_++;
+ bool status = impl_->Erase(k);
+ nerases_failed_ += !status;
+ nerases_++;
+ }
+ }
+
+ private:
+ // Start threads for a given function
+ void StartThreads(const size_t n, void (*fn)(void*)) {
+ Env* env = Env::Default();
+ for (size_t i = 0; i < n; ++i) {
+ env->StartThread(fn, this);
+ }
+ }
+
+ // Prepop the hash table with 1M keys
+ void Prepop() {
+ for (size_t i = 0; i < max_prepop_key; ++i) {
+ bool status = impl_->Insert(i, std::string(1000, i % 255));
+ assert(status);
+ }
+
+ erase_key_ = insert_key_ = max_prepop_key;
+
+ for (size_t i = 0; i < 10 * max_prepop_key; ++i) {
+ bool status = impl_->Insert(insert_key_++, std::string(1000, 'x'));
+ assert(status);
+ }
+ }
+
+ static uint64_t NowInMillSec() {
+ port::TimeVal tv;
+ port::GetTimeOfDay(&tv, /*tz=*/nullptr);
+ return tv.tv_sec * 1000 + tv.tv_usec / 1000;
+ }
+
+ //
+ // Wrapper functions for thread entry
+ //
+ static void WriteMain(void* args) {
+ reinterpret_cast<HashTableBenchmark*>(args)->RunWrite();
+ }
+
+ static void ReadMain(void* args) {
+ reinterpret_cast<HashTableBenchmark*>(args)->RunRead();
+ }
+
+ static void EraseMain(void* args) {
+ reinterpret_cast<HashTableBenchmark*>(args)->RunErase();
+ }
+
+ HashTableImpl<size_t, std::string>* impl_; // Implementation to test
+ const size_t sec_; // Test time
+ const size_t max_prepop_key = 1ULL * 1024 * 1024; // Max prepop key
+ std::atomic<size_t> insert_key_; // Last inserted key
+ std::atomic<size_t> erase_key_; // Erase key
+ std::atomic<size_t> ninserts_; // Number of inserts
+ std::atomic<size_t> nreads_; // Number of reads
+ std::atomic<size_t> nerases_; // Number of erases
+ std::atomic<size_t> nerases_failed_; // Number of erases failed
+ bool quit_; // Should the threads quit ?
+};
+
+//
+// SimpleImpl
+// Lock safe unordered_map implementation
+class SimpleImpl : public HashTableImpl<size_t, string> {
+ public:
+ bool Insert(const size_t& key, const string& val) override {
+ WriteLock _(&rwlock_);
+ map_.insert(make_pair(key, val));
+ return true;
+ }
+
+ bool Erase(const size_t& key) override {
+ WriteLock _(&rwlock_);
+ auto it = map_.find(key);
+ if (it == map_.end()) {
+ return false;
+ }
+ map_.erase(it);
+ return true;
+ }
+
+ bool Lookup(const size_t& key, string* val) override {
+ ReadLock _(&rwlock_);
+ auto it = map_.find(key);
+ if (it != map_.end()) {
+ *val = it->second;
+ }
+ return it != map_.end();
+ }
+
+ private:
+ port::RWMutex rwlock_;
+ std::unordered_map<size_t, string> map_;
+};
+
+//
+// GranularLockImpl
+// Thread safe custom RocksDB implementation of hash table with granular
+// locking
+class GranularLockImpl : public HashTableImpl<size_t, string> {
+ public:
+ bool Insert(const size_t& key, const string& val) override {
+ Node n(key, val);
+ return impl_.Insert(n);
+ }
+
+ bool Erase(const size_t& key) override {
+ Node n(key, string());
+ return impl_.Erase(n, nullptr);
+ }
+
+ bool Lookup(const size_t& key, string* val) override {
+ Node n(key, string());
+ port::RWMutex* rlock;
+ bool status = impl_.Find(n, &n, &rlock);
+ if (status) {
+ ReadUnlock _(rlock);
+ *val = n.val_;
+ }
+ return status;
+ }
+
+ private:
+ struct Node {
+ explicit Node(const size_t key, const string& val) : key_(key), val_(val) {}
+
+ size_t key_ = 0;
+ string val_;
+ };
+
+ struct Hash {
+ uint64_t operator()(const Node& node) {
+ return std::hash<uint64_t>()(node.key_);
+ }
+ };
+
+ struct Equal {
+ bool operator()(const Node& lhs, const Node& rhs) {
+ return lhs.key_ == rhs.key_;
+ }
+ };
+
+ HashTable<Node, Hash, Equal> impl_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+//
+// main
+//
+int main(int argc, char** argv) {
+ GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
+ std::string(argv[0]) + " [OPTIONS]...");
+ GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false);
+
+ //
+ // Micro benchmark unordered_map
+ //
+ printf("Micro benchmarking std::unordered_map \n");
+ {
+ ROCKSDB_NAMESPACE::SimpleImpl impl;
+ ROCKSDB_NAMESPACE::HashTableBenchmark _(
+ &impl, FLAGS_nsec, FLAGS_nthread_write, FLAGS_nthread_read,
+ FLAGS_nthread_erase);
+ }
+ //
+ // Micro benchmark scalable hash table
+ //
+ printf("Micro benchmarking scalable hash map \n");
+ {
+ ROCKSDB_NAMESPACE::GranularLockImpl impl;
+ ROCKSDB_NAMESPACE::HashTableBenchmark _(
+ &impl, FLAGS_nsec, FLAGS_nthread_write, FLAGS_nthread_read,
+ FLAGS_nthread_erase);
+ }
+
+ return 0;
+}
+#endif // #ifndef GFLAGS
+#else
+int main(int /*argc*/, char** /*argv*/) { return 0; }
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/hash_table_evictable.h b/src/rocksdb/utilities/persistent_cache/hash_table_evictable.h
new file mode 100644
index 000000000..e10939b2f
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/hash_table_evictable.h
@@ -0,0 +1,168 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <functional>
+
+#include "util/random.h"
+#include "utilities/persistent_cache/hash_table.h"
+#include "utilities/persistent_cache/lrulist.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Evictable Hash Table
+//
+// Hash table index where least accessed (or one of the least accessed) elements
+// can be evicted.
+//
+// Please note EvictableHashTable can only be created for pointer type objects
+template <class T, class Hash, class Equal>
+class EvictableHashTable : private HashTable<T*, Hash, Equal> {
+ public:
+ using hash_table = HashTable<T*, Hash, Equal>;
+
+ explicit EvictableHashTable(const size_t capacity = 1024 * 1024,
+ const float load_factor = 2.0,
+ const uint32_t nlocks = 256)
+ : HashTable<T*, Hash, Equal>(capacity, load_factor, nlocks),
+ lru_lists_(new LRUList<T>[hash_table::nlocks_]) {
+ assert(lru_lists_);
+ }
+
+ virtual ~EvictableHashTable() { AssertEmptyLRU(); }
+
+ //
+ // Insert given record to hash table (and LRU list)
+ //
+ bool Insert(T* t) {
+ const uint64_t h = Hash()(t);
+ typename hash_table::Bucket& bucket = GetBucket(h);
+ LRUListType& lru = GetLRUList(h);
+ port::RWMutex& lock = GetMutex(h);
+
+ WriteLock _(&lock);
+ if (hash_table::Insert(&bucket, t)) {
+ lru.Push(t);
+ return true;
+ }
+ return false;
+ }
+
+ //
+ // Lookup hash table
+ //
+ // Please note that read lock should be held by the caller. This is because
+ // the caller owns the data, and should hold the read lock as long as he
+ // operates on the data.
+ bool Find(T* t, T** ret) {
+ const uint64_t h = Hash()(t);
+ typename hash_table::Bucket& bucket = GetBucket(h);
+ LRUListType& lru = GetLRUList(h);
+ port::RWMutex& lock = GetMutex(h);
+
+ ReadLock _(&lock);
+ if (hash_table::Find(&bucket, t, ret)) {
+ ++(*ret)->refs_;
+ lru.Touch(*ret);
+ return true;
+ }
+ return false;
+ }
+
+ //
+ // Evict one of the least recently used object
+ //
+ T* Evict(const std::function<void(T*)>& fn = nullptr) {
+ uint32_t random = Random::GetTLSInstance()->Next();
+ const size_t start_idx = random % hash_table::nlocks_;
+ T* t = nullptr;
+
+ // iterate from start_idx .. 0 .. start_idx
+ for (size_t i = 0; !t && i < hash_table::nlocks_; ++i) {
+ const size_t idx = (start_idx + i) % hash_table::nlocks_;
+
+ WriteLock _(&hash_table::locks_[idx]);
+ LRUListType& lru = lru_lists_[idx];
+ if (!lru.IsEmpty() && (t = lru.Pop()) != nullptr) {
+ assert(!t->refs_);
+ // We got an item to evict, erase from the bucket
+ const uint64_t h = Hash()(t);
+ typename hash_table::Bucket& bucket = GetBucket(h);
+ T* tmp = nullptr;
+ bool status = hash_table::Erase(&bucket, t, &tmp);
+ assert(t == tmp);
+ (void)status;
+ assert(status);
+ if (fn) {
+ fn(t);
+ }
+ break;
+ }
+ assert(!t);
+ }
+ return t;
+ }
+
+ void Clear(void (*fn)(T*)) {
+ for (uint32_t i = 0; i < hash_table::nbuckets_; ++i) {
+ const uint32_t lock_idx = i % hash_table::nlocks_;
+ WriteLock _(&hash_table::locks_[lock_idx]);
+ auto& lru_list = lru_lists_[lock_idx];
+ auto& bucket = hash_table::buckets_[i];
+ for (auto* t : bucket.list_) {
+ lru_list.Unlink(t);
+ (*fn)(t);
+ }
+ bucket.list_.clear();
+ }
+ // make sure that all LRU lists are emptied
+ AssertEmptyLRU();
+ }
+
+ void AssertEmptyLRU() {
+#ifndef NDEBUG
+ for (uint32_t i = 0; i < hash_table::nlocks_; ++i) {
+ WriteLock _(&hash_table::locks_[i]);
+ auto& lru_list = lru_lists_[i];
+ assert(lru_list.IsEmpty());
+ }
+#endif
+ }
+
+ //
+ // Fetch the mutex associated with a key
+ // This call is used to hold the lock for a given data for extended period of
+ // time.
+ port::RWMutex* GetMutex(T* t) { return hash_table::GetMutex(t); }
+
+ private:
+ using LRUListType = LRUList<T>;
+
+ typename hash_table::Bucket& GetBucket(const uint64_t h) {
+ const uint32_t bucket_idx = h % hash_table::nbuckets_;
+ return hash_table::buckets_[bucket_idx];
+ }
+
+ LRUListType& GetLRUList(const uint64_t h) {
+ const uint32_t bucket_idx = h % hash_table::nbuckets_;
+ const uint32_t lock_idx = bucket_idx % hash_table::nlocks_;
+ return lru_lists_[lock_idx];
+ }
+
+ port::RWMutex& GetMutex(const uint64_t h) {
+ const uint32_t bucket_idx = h % hash_table::nbuckets_;
+ const uint32_t lock_idx = bucket_idx % hash_table::nlocks_;
+ return hash_table::locks_[lock_idx];
+ }
+
+ std::unique_ptr<LRUListType[]> lru_lists_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/hash_table_test.cc b/src/rocksdb/utilities/persistent_cache/hash_table_test.cc
new file mode 100644
index 000000000..2f6387f5f
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/hash_table_test.cc
@@ -0,0 +1,163 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#include "utilities/persistent_cache/hash_table.h"
+
+#include <stdlib.h>
+
+#include <iostream>
+#include <set>
+#include <string>
+
+#include "db/db_test_util.h"
+#include "memory/arena.h"
+#include "test_util/testharness.h"
+#include "util/random.h"
+#include "utilities/persistent_cache/hash_table_evictable.h"
+
+#ifndef ROCKSDB_LITE
+
+namespace ROCKSDB_NAMESPACE {
+
+struct HashTableTest : public testing::Test {
+ ~HashTableTest() override { map_.Clear(&HashTableTest::ClearNode); }
+
+ struct Node {
+ Node() {}
+ explicit Node(const uint64_t key, const std::string& val = std::string())
+ : key_(key), val_(val) {}
+
+ uint64_t key_ = 0;
+ std::string val_;
+ };
+
+ struct Equal {
+ bool operator()(const Node& lhs, const Node& rhs) {
+ return lhs.key_ == rhs.key_;
+ }
+ };
+
+ struct Hash {
+ uint64_t operator()(const Node& node) {
+ return std::hash<uint64_t>()(node.key_);
+ }
+ };
+
+ static void ClearNode(Node /*node*/) {}
+
+ HashTable<Node, Hash, Equal> map_;
+};
+
+struct EvictableHashTableTest : public testing::Test {
+ ~EvictableHashTableTest() override {
+ map_.Clear(&EvictableHashTableTest::ClearNode);
+ }
+
+ struct Node : LRUElement<Node> {
+ Node() {}
+ explicit Node(const uint64_t key, const std::string& val = std::string())
+ : key_(key), val_(val) {}
+
+ uint64_t key_ = 0;
+ std::string val_;
+ std::atomic<uint32_t> refs_{0};
+ };
+
+ struct Equal {
+ bool operator()(const Node* lhs, const Node* rhs) {
+ return lhs->key_ == rhs->key_;
+ }
+ };
+
+ struct Hash {
+ uint64_t operator()(const Node* node) {
+ return std::hash<uint64_t>()(node->key_);
+ }
+ };
+
+ static void ClearNode(Node* /*node*/) {}
+
+ EvictableHashTable<Node, Hash, Equal> map_;
+};
+
+TEST_F(HashTableTest, TestInsert) {
+ const uint64_t max_keys = 1024 * 1024;
+
+ // insert
+ for (uint64_t k = 0; k < max_keys; ++k) {
+ map_.Insert(Node(k, std::string(1000, k % 255)));
+ }
+
+ // verify
+ for (uint64_t k = 0; k < max_keys; ++k) {
+ Node val;
+ port::RWMutex* rlock = nullptr;
+ assert(map_.Find(Node(k), &val, &rlock));
+ rlock->ReadUnlock();
+ assert(val.val_ == std::string(1000, k % 255));
+ }
+}
+
+TEST_F(HashTableTest, TestErase) {
+ const uint64_t max_keys = 1024 * 1024;
+ // insert
+ for (uint64_t k = 0; k < max_keys; ++k) {
+ map_.Insert(Node(k, std::string(1000, k % 255)));
+ }
+
+ auto rand = Random64(time(nullptr));
+ // erase a few keys randomly
+ std::set<uint64_t> erased;
+ for (int i = 0; i < 1024; ++i) {
+ uint64_t k = rand.Next() % max_keys;
+ if (erased.find(k) != erased.end()) {
+ continue;
+ }
+ assert(map_.Erase(Node(k), /*ret=*/nullptr));
+ erased.insert(k);
+ }
+
+ // verify
+ for (uint64_t k = 0; k < max_keys; ++k) {
+ Node val;
+ port::RWMutex* rlock = nullptr;
+ bool status = map_.Find(Node(k), &val, &rlock);
+ if (erased.find(k) == erased.end()) {
+ assert(status);
+ rlock->ReadUnlock();
+ assert(val.val_ == std::string(1000, k % 255));
+ } else {
+ assert(!status);
+ }
+ }
+}
+
+TEST_F(EvictableHashTableTest, TestEvict) {
+ const uint64_t max_keys = 1024 * 1024;
+
+ // insert
+ for (uint64_t k = 0; k < max_keys; ++k) {
+ map_.Insert(new Node(k, std::string(1000, k % 255)));
+ }
+
+ // verify
+ for (uint64_t k = 0; k < max_keys; ++k) {
+ Node* val = map_.Evict();
+ // unfortunately we can't predict eviction value since it is from any one of
+ // the lock stripe
+ assert(val);
+ assert(val->val_ == std::string(1000, val->key_ % 255));
+ delete val;
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/utilities/persistent_cache/lrulist.h b/src/rocksdb/utilities/persistent_cache/lrulist.h
new file mode 100644
index 000000000..a608890fc
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/lrulist.h
@@ -0,0 +1,174 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// LRU element definition
+//
+// Any object that needs to be part of the LRU algorithm should extend this
+// class
+template <class T>
+struct LRUElement {
+ explicit LRUElement() : next_(nullptr), prev_(nullptr), refs_(0) {}
+
+ virtual ~LRUElement() { assert(!refs_); }
+
+ T* next_;
+ T* prev_;
+ std::atomic<size_t> refs_;
+};
+
+// LRU implementation
+//
+// In place LRU implementation. There is no copy or allocation involved when
+// inserting or removing an element. This makes the data structure slim
+template <class T>
+class LRUList {
+ public:
+ virtual ~LRUList() {
+ MutexLock _(&lock_);
+ assert(!head_);
+ assert(!tail_);
+ }
+
+ // Push element into the LRU at the cold end
+ inline void Push(T* const t) {
+ assert(t);
+ assert(!t->next_);
+ assert(!t->prev_);
+
+ MutexLock _(&lock_);
+
+ assert((!head_ && !tail_) || (head_ && tail_));
+ assert(!head_ || !head_->prev_);
+ assert(!tail_ || !tail_->next_);
+
+ t->next_ = head_;
+ if (head_) {
+ head_->prev_ = t;
+ }
+
+ head_ = t;
+ if (!tail_) {
+ tail_ = t;
+ }
+ }
+
+ // Unlink the element from the LRU
+ inline void Unlink(T* const t) {
+ MutexLock _(&lock_);
+ UnlinkImpl(t);
+ }
+
+ // Evict an element from the LRU
+ inline T* Pop() {
+ MutexLock _(&lock_);
+
+ assert(tail_ && head_);
+ assert(!tail_->next_);
+ assert(!head_->prev_);
+
+ T* t = head_;
+ while (t && t->refs_) {
+ t = t->next_;
+ }
+
+ if (!t) {
+ // nothing can be evicted
+ return nullptr;
+ }
+
+ assert(!t->refs_);
+
+ // unlike the element
+ UnlinkImpl(t);
+ return t;
+ }
+
+ // Move the element from the front of the list to the back of the list
+ inline void Touch(T* const t) {
+ MutexLock _(&lock_);
+ UnlinkImpl(t);
+ PushBackImpl(t);
+ }
+
+ // Check if the LRU is empty
+ inline bool IsEmpty() const {
+ MutexLock _(&lock_);
+ return !head_ && !tail_;
+ }
+
+ private:
+ // Unlink an element from the LRU
+ void UnlinkImpl(T* const t) {
+ assert(t);
+
+ lock_.AssertHeld();
+
+ assert(head_ && tail_);
+ assert(t->prev_ || head_ == t);
+ assert(t->next_ || tail_ == t);
+
+ if (t->prev_) {
+ t->prev_->next_ = t->next_;
+ }
+ if (t->next_) {
+ t->next_->prev_ = t->prev_;
+ }
+
+ if (tail_ == t) {
+ tail_ = tail_->prev_;
+ }
+ if (head_ == t) {
+ head_ = head_->next_;
+ }
+
+ t->next_ = t->prev_ = nullptr;
+ }
+
+ // Insert an element at the hot end
+ inline void PushBack(T* const t) {
+ MutexLock _(&lock_);
+ PushBackImpl(t);
+ }
+
+ inline void PushBackImpl(T* const t) {
+ assert(t);
+ assert(!t->next_);
+ assert(!t->prev_);
+
+ lock_.AssertHeld();
+
+ assert((!head_ && !tail_) || (head_ && tail_));
+ assert(!head_ || !head_->prev_);
+ assert(!tail_ || !tail_->next_);
+
+ t->prev_ = tail_;
+ if (tail_) {
+ tail_->next_ = t;
+ }
+
+ tail_ = t;
+ if (!head_) {
+ head_ = tail_;
+ }
+ }
+
+ mutable port::Mutex lock_; // synchronization primitive
+ T* head_ = nullptr; // front (cold)
+ T* tail_ = nullptr; // back (hot)
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_bench.cc b/src/rocksdb/utilities/persistent_cache/persistent_cache_bench.cc
new file mode 100644
index 000000000..9d6e15d6b
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_bench.cc
@@ -0,0 +1,359 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef ROCKSDB_LITE
+
+#ifndef GFLAGS
+#include <cstdio>
+int main() { fprintf(stderr, "Please install gflags to run tools\n"); }
+#else
+#include <atomic>
+#include <functional>
+#include <memory>
+#include <sstream>
+#include <unordered_map>
+
+#include "monitoring/histogram.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/system_clock.h"
+#include "table/block_based/block_builder.h"
+#include "util/gflags_compat.h"
+#include "util/mutexlock.h"
+#include "util/stop_watch.h"
+#include "utilities/persistent_cache/block_cache_tier.h"
+#include "utilities/persistent_cache/persistent_cache_tier.h"
+#include "utilities/persistent_cache/volatile_tier_impl.h"
+
+DEFINE_int32(nsec, 10, "nsec");
+DEFINE_int32(nthread_write, 1, "Insert threads");
+DEFINE_int32(nthread_read, 1, "Lookup threads");
+DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile");
+DEFINE_string(log_path, "/tmp/log", "Path for the log file");
+DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size");
+DEFINE_int32(iosize, 4 * 1024, "Read IO size");
+DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size");
+DEFINE_int32(writer_qdepth, 1, "File writer qdepth");
+DEFINE_bool(enable_pipelined_writes, false, "Enable async writes");
+DEFINE_string(cache_type, "block_cache",
+ "Cache type. (block_cache, volatile, tiered)");
+DEFINE_bool(benchmark, false, "Benchmark mode");
+DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier.");
+
+namespace ROCKSDB_NAMESPACE {
+
+std::unique_ptr<PersistentCacheTier> NewVolatileCache() {
+ assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max());
+ std::unique_ptr<PersistentCacheTier> pcache(
+ new VolatileCacheTier(FLAGS_cache_size));
+ return pcache;
+}
+
+std::unique_ptr<PersistentCacheTier> NewBlockCache() {
+ std::shared_ptr<Logger> log;
+ if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
+ fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
+ return nullptr;
+ }
+
+ PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log);
+ opt.writer_dispatch_size = FLAGS_writer_iosize;
+ opt.writer_qdepth = FLAGS_writer_qdepth;
+ opt.pipeline_writes = FLAGS_enable_pipelined_writes;
+ opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
+ std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt));
+ Status status = cache->Open();
+ return cache;
+}
+
+// create a new cache tier
+// construct a tiered RAM+Block cache
+std::unique_ptr<PersistentTieredCache> NewTieredCache(
+ const size_t mem_size, const PersistentCacheConfig& opt) {
+ std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache());
+ // create primary tier
+ assert(mem_size);
+ auto pcache =
+ std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size));
+ tcache->AddTier(pcache);
+ // create secondary tier
+ auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt));
+ tcache->AddTier(scache);
+
+ Status s = tcache->Open();
+ assert(s.ok());
+ return tcache;
+}
+
+std::unique_ptr<PersistentTieredCache> NewTieredCache() {
+ std::shared_ptr<Logger> log;
+ if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
+ fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
+ abort();
+ }
+
+ auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100);
+ PersistentCacheConfig opt(Env::Default(), FLAGS_path,
+ (1 - pct) * FLAGS_cache_size, log);
+ opt.writer_dispatch_size = FLAGS_writer_iosize;
+ opt.writer_qdepth = FLAGS_writer_qdepth;
+ opt.pipeline_writes = FLAGS_enable_pipelined_writes;
+ opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
+ return NewTieredCache(FLAGS_cache_size * pct, opt);
+}
+
+//
+// Benchmark driver
+//
+class CacheTierBenchmark {
+ public:
+ explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache)
+ : cache_(cache) {
+ if (FLAGS_nthread_read) {
+ fprintf(stdout, "Pre-populating\n");
+ Prepop();
+ fprintf(stdout, "Pre-population completed\n");
+ }
+
+ stats_.Clear();
+
+ // Start IO threads
+ std::list<port::Thread> threads;
+ Spawn(FLAGS_nthread_write, &threads,
+ std::bind(&CacheTierBenchmark::Write, this));
+ Spawn(FLAGS_nthread_read, &threads,
+ std::bind(&CacheTierBenchmark::Read, this));
+
+ // Wait till FLAGS_nsec and then signal to quit
+ StopWatchNano t(SystemClock::Default().get(), /*auto_start=*/true);
+ size_t sec = t.ElapsedNanos() / 1000000000ULL;
+ while (!quit_) {
+ sec = t.ElapsedNanos() / 1000000000ULL;
+ quit_ = sec > size_t(FLAGS_nsec);
+ /* sleep override */ sleep(1);
+ }
+
+ // Wait for threads to exit
+ Join(&threads);
+ // Print stats
+ PrintStats(sec);
+ // Close the cache
+ cache_->TEST_Flush();
+ cache_->Close();
+ }
+
+ private:
+ void PrintStats(const size_t sec) {
+ std::ostringstream msg;
+ msg << "Test stats" << std::endl
+ << "* Elapsed: " << sec << " s" << std::endl
+ << "* Write Latency:" << std::endl
+ << stats_.write_latency_.ToString() << std::endl
+ << "* Read Latency:" << std::endl
+ << stats_.read_latency_.ToString() << std::endl
+ << "* Bytes written:" << std::endl
+ << stats_.bytes_written_.ToString() << std::endl
+ << "* Bytes read:" << std::endl
+ << stats_.bytes_read_.ToString() << std::endl
+ << "Cache stats:" << std::endl
+ << cache_->PrintStats() << std::endl;
+ fprintf(stderr, "%s\n", msg.str().c_str());
+ }
+
+ //
+ // Insert implementation and corresponding helper functions
+ //
+ void Prepop() {
+ for (uint64_t i = 0; i < 1024 * 1024; ++i) {
+ InsertKey(i);
+ insert_key_limit_++;
+ read_key_limit_++;
+ }
+
+ // Wait until data is flushed
+ cache_->TEST_Flush();
+ // warmup the cache
+ for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) {
+ }
+ }
+
+ void Write() {
+ while (!quit_) {
+ InsertKey(insert_key_limit_++);
+ }
+ }
+
+ void InsertKey(const uint64_t key) {
+ // construct key
+ uint64_t k[3];
+ Slice block_key = FillKey(k, key);
+
+ // construct value
+ auto block = NewBlock(key);
+
+ // insert
+ StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true);
+ while (true) {
+ Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize);
+ if (status.ok()) {
+ break;
+ }
+
+ // transient error is possible if we run without pipelining
+ assert(!FLAGS_enable_pipelined_writes);
+ }
+
+ // adjust stats
+ const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
+ stats_.write_latency_.Add(elapsed_micro);
+ stats_.bytes_written_.Add(FLAGS_iosize);
+ }
+
+ //
+ // Read implementation
+ //
+ void Read() {
+ while (!quit_) {
+ ReadKey(random() % read_key_limit_);
+ }
+ }
+
+ void ReadKey(const uint64_t val) {
+ // construct key
+ uint64_t k[3];
+ Slice key = FillKey(k, val);
+
+ // Lookup in cache
+ StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true);
+ std::unique_ptr<char[]> block;
+ size_t size;
+ Status status = cache_->Lookup(key, &block, &size);
+ if (!status.ok()) {
+ fprintf(stderr, "%s\n", status.ToString().c_str());
+ }
+ assert(status.ok());
+ assert(size == (size_t)FLAGS_iosize);
+
+ // adjust stats
+ const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
+ stats_.read_latency_.Add(elapsed_micro);
+ stats_.bytes_read_.Add(FLAGS_iosize);
+
+ // verify content
+ if (!FLAGS_benchmark) {
+ auto expected_block = NewBlock(val);
+ assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0);
+ }
+ }
+
+ // create data for a key by filling with a certain pattern
+ std::unique_ptr<char[]> NewBlock(const uint64_t val) {
+ std::unique_ptr<char[]> data(new char[FLAGS_iosize]);
+ memset(data.get(), val % 255, FLAGS_iosize);
+ return data;
+ }
+
+ // spawn threads
+ void Spawn(const size_t n, std::list<port::Thread>* threads,
+ const std::function<void()>& fn) {
+ for (size_t i = 0; i < n; ++i) {
+ threads->emplace_back(fn);
+ }
+ }
+
+ // join threads
+ void Join(std::list<port::Thread>* threads) {
+ for (auto& th : *threads) {
+ th.join();
+ }
+ }
+
+ // construct key
+ Slice FillKey(uint64_t (&k)[3], const uint64_t val) {
+ k[0] = k[1] = 0;
+ k[2] = val;
+ void* p = static_cast<void*>(&k);
+ return Slice(static_cast<char*>(p), sizeof(k));
+ }
+
+ // benchmark stats
+ struct Stats {
+ void Clear() {
+ bytes_written_.Clear();
+ bytes_read_.Clear();
+ read_latency_.Clear();
+ write_latency_.Clear();
+ }
+
+ HistogramImpl bytes_written_;
+ HistogramImpl bytes_read_;
+ HistogramImpl read_latency_;
+ HistogramImpl write_latency_;
+ };
+
+ std::shared_ptr<PersistentCacheTier> cache_; // cache implementation
+ std::atomic<uint64_t> insert_key_limit_{0}; // data inserted upto
+ std::atomic<uint64_t> read_key_limit_{0}; // data can be read safely upto
+ bool quit_ = false; // Quit thread ?
+ mutable Stats stats_; // Stats
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+//
+// main
+//
+int main(int argc, char** argv) {
+ GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
+ std::string(argv[0]) + " [OPTIONS]...");
+ GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false);
+
+ std::ostringstream msg;
+ msg << "Config" << std::endl
+ << "======" << std::endl
+ << "* nsec=" << FLAGS_nsec << std::endl
+ << "* nthread_write=" << FLAGS_nthread_write << std::endl
+ << "* path=" << FLAGS_path << std::endl
+ << "* cache_size=" << FLAGS_cache_size << std::endl
+ << "* iosize=" << FLAGS_iosize << std::endl
+ << "* writer_iosize=" << FLAGS_writer_iosize << std::endl
+ << "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl
+ << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes
+ << std::endl
+ << "* cache_type=" << FLAGS_cache_type << std::endl
+ << "* benchmark=" << FLAGS_benchmark << std::endl
+ << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl;
+
+ fprintf(stderr, "%s\n", msg.str().c_str());
+
+ std::shared_ptr<ROCKSDB_NAMESPACE::PersistentCacheTier> cache;
+ if (FLAGS_cache_type == "block_cache") {
+ fprintf(stderr, "Using block cache implementation\n");
+ cache = ROCKSDB_NAMESPACE::NewBlockCache();
+ } else if (FLAGS_cache_type == "volatile") {
+ fprintf(stderr, "Using volatile cache implementation\n");
+ cache = ROCKSDB_NAMESPACE::NewVolatileCache();
+ } else if (FLAGS_cache_type == "tiered") {
+ fprintf(stderr, "Using tiered cache implementation\n");
+ cache = ROCKSDB_NAMESPACE::NewTieredCache();
+ } else {
+ fprintf(stderr, "Unknown option for cache\n");
+ }
+
+ assert(cache);
+ if (!cache) {
+ fprintf(stderr, "Error creating cache\n");
+ abort();
+ }
+
+ std::unique_ptr<ROCKSDB_NAMESPACE::CacheTierBenchmark> benchmark(
+ new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache)));
+
+ return 0;
+}
+#endif // #ifndef GFLAGS
+#else
+int main(int, char**) { return 0; }
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_test.cc b/src/rocksdb/utilities/persistent_cache/persistent_cache_test.cc
new file mode 100644
index 000000000..d1b18b68a
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_test.cc
@@ -0,0 +1,462 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#if !defined ROCKSDB_LITE
+
+#include "utilities/persistent_cache/persistent_cache_test.h"
+
+#include <functional>
+#include <memory>
+#include <thread>
+
+#include "file/file_util.h"
+#include "utilities/persistent_cache/block_cache_tier.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+static const double kStressFactor = .125;
+
+#ifdef OS_LINUX
+static void OnOpenForRead(void* arg) {
+ int* val = static_cast<int*>(arg);
+ *val &= ~O_DIRECT;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "NewRandomAccessFile:O_DIRECT",
+ std::bind(OnOpenForRead, std::placeholders::_1));
+}
+
+static void OnOpenForWrite(void* arg) {
+ int* val = static_cast<int*>(arg);
+ *val &= ~O_DIRECT;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "NewWritableFile:O_DIRECT",
+ std::bind(OnOpenForWrite, std::placeholders::_1));
+}
+#endif
+
+static void OnDeleteDir(void* arg) {
+ char* dir = static_cast<char*>(arg);
+ ASSERT_OK(DestroyDir(Env::Default(), std::string(dir)));
+}
+
+//
+// Simple logger that prints message on stdout
+//
+class ConsoleLogger : public Logger {
+ public:
+ using Logger::Logv;
+ ConsoleLogger() : Logger(InfoLogLevel::ERROR_LEVEL) {}
+
+ void Logv(const char* format, va_list ap) override {
+ MutexLock _(&lock_);
+ vprintf(format, ap);
+ printf("\n");
+ }
+
+ port::Mutex lock_;
+};
+
+// construct a tiered RAM+Block cache
+std::unique_ptr<PersistentTieredCache> NewTieredCache(
+ const size_t mem_size, const PersistentCacheConfig& opt) {
+ std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache());
+ // create primary tier
+ assert(mem_size);
+ auto pcache = std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(
+ /*is_compressed*/ true, mem_size));
+ tcache->AddTier(pcache);
+ // create secondary tier
+ auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt));
+ tcache->AddTier(scache);
+
+ Status s = tcache->Open();
+ assert(s.ok());
+ return tcache;
+}
+
+// create block cache
+std::unique_ptr<PersistentCacheTier> NewBlockCache(
+ Env* env, const std::string& path,
+ const uint64_t max_size = std::numeric_limits<uint64_t>::max(),
+ const bool enable_direct_writes = false) {
+ const uint32_t max_file_size =
+ static_cast<uint32_t>(12 * 1024 * 1024 * kStressFactor);
+ auto log = std::make_shared<ConsoleLogger>();
+ PersistentCacheConfig opt(env, path, max_size, log);
+ opt.cache_file_size = max_file_size;
+ opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
+ opt.enable_direct_writes = enable_direct_writes;
+ std::unique_ptr<PersistentCacheTier> scache(new BlockCacheTier(opt));
+ Status s = scache->Open();
+ assert(s.ok());
+ return scache;
+}
+
+// create a new cache tier
+std::unique_ptr<PersistentTieredCache> NewTieredCache(
+ Env* env, const std::string& path, const uint64_t max_volatile_cache_size,
+ const uint64_t max_block_cache_size =
+ std::numeric_limits<uint64_t>::max()) {
+ const uint32_t max_file_size =
+ static_cast<uint32_t>(12 * 1024 * 1024 * kStressFactor);
+ auto log = std::make_shared<ConsoleLogger>();
+ auto opt = PersistentCacheConfig(env, path, max_block_cache_size, log);
+ opt.cache_file_size = max_file_size;
+ opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
+ // create tier out of the two caches
+ auto cache = NewTieredCache(max_volatile_cache_size, opt);
+ return cache;
+}
+
+PersistentCacheTierTest::PersistentCacheTierTest()
+ : path_(test::PerThreadDBPath("cache_test")) {
+#ifdef OS_LINUX
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "NewRandomAccessFile:O_DIRECT", OnOpenForRead);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "NewWritableFile:O_DIRECT", OnOpenForWrite);
+#endif
+}
+
+// Block cache tests
+TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsertWithFileCreateError) {
+ cache_ = NewBlockCache(Env::Default(), path_,
+ /*size=*/std::numeric_limits<uint64_t>::max(),
+ /*direct_writes=*/false);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "BlockCacheTier::NewCacheFile:DeleteDir", OnDeleteDir);
+
+ RunNegativeInsertTest(/*nthreads=*/1,
+ /*max_keys*/
+ static_cast<size_t>(10 * 1024 * kStressFactor));
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+// Travis is unable to handle the normal version of the tests running out of
+// fds, out of space and timeouts. This is an easier version of the test
+// specifically written for Travis
+TEST_F(PersistentCacheTierTest, DISABLED_BasicTest) {
+ cache_ = std::make_shared<VolatileCacheTier>();
+ RunInsertTest(/*nthreads=*/1, /*max_keys=*/1024);
+
+ cache_ = NewBlockCache(Env::Default(), path_,
+ /*size=*/std::numeric_limits<uint64_t>::max(),
+ /*direct_writes=*/true);
+ RunInsertTest(/*nthreads=*/1, /*max_keys=*/1024);
+
+ cache_ = NewTieredCache(Env::Default(), path_,
+ /*memory_size=*/static_cast<size_t>(1 * 1024 * 1024));
+ RunInsertTest(/*nthreads=*/1, /*max_keys=*/1024);
+}
+
+// Volatile cache tests
+// DISABLED for now (somewhat expensive)
+TEST_F(PersistentCacheTierTest, DISABLED_VolatileCacheInsert) {
+ for (auto nthreads : {1, 5}) {
+ for (auto max_keys :
+ {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) {
+ cache_ = std::make_shared<VolatileCacheTier>();
+ RunInsertTest(nthreads, static_cast<size_t>(max_keys));
+ }
+ }
+}
+
+// DISABLED for now (somewhat expensive)
+TEST_F(PersistentCacheTierTest, DISABLED_VolatileCacheInsertWithEviction) {
+ for (auto nthreads : {1, 5}) {
+ for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) {
+ cache_ = std::make_shared<VolatileCacheTier>(
+ /*compressed=*/true,
+ /*size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor));
+ RunInsertTestWithEviction(nthreads, static_cast<size_t>(max_keys));
+ }
+ }
+}
+
+// Block cache tests
+// DISABLED for now (expensive)
+TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsert) {
+ for (auto direct_writes : {true, false}) {
+ for (auto nthreads : {1, 5}) {
+ for (auto max_keys :
+ {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) {
+ cache_ = NewBlockCache(Env::Default(), path_,
+ /*size=*/std::numeric_limits<uint64_t>::max(),
+ direct_writes);
+ RunInsertTest(nthreads, static_cast<size_t>(max_keys));
+ }
+ }
+ }
+}
+
+// DISABLED for now (somewhat expensive)
+TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsertWithEviction) {
+ for (auto nthreads : {1, 5}) {
+ for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) {
+ cache_ = NewBlockCache(
+ Env::Default(), path_,
+ /*max_size=*/static_cast<size_t>(200 * 1024 * 1024 * kStressFactor));
+ RunInsertTestWithEviction(nthreads, static_cast<size_t>(max_keys));
+ }
+ }
+}
+
+// Tiered cache tests
+// DISABLED for now (expensive)
+TEST_F(PersistentCacheTierTest, DISABLED_TieredCacheInsert) {
+ for (auto nthreads : {1, 5}) {
+ for (auto max_keys :
+ {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) {
+ cache_ = NewTieredCache(
+ Env::Default(), path_,
+ /*memory_size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor));
+ RunInsertTest(nthreads, static_cast<size_t>(max_keys));
+ }
+ }
+}
+
+// the tests causes a lot of file deletions which Travis limited testing
+// environment cannot handle
+// DISABLED for now (somewhat expensive)
+TEST_F(PersistentCacheTierTest, DISABLED_TieredCacheInsertWithEviction) {
+ for (auto nthreads : {1, 5}) {
+ for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) {
+ cache_ = NewTieredCache(
+ Env::Default(), path_,
+ /*memory_size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor),
+ /*block_cache_size*/
+ static_cast<size_t>(200 * 1024 * 1024 * kStressFactor));
+ RunInsertTestWithEviction(nthreads, static_cast<size_t>(max_keys));
+ }
+ }
+}
+
+std::shared_ptr<PersistentCacheTier> MakeVolatileCache(
+ Env* /*env*/, const std::string& /*dbname*/) {
+ return std::make_shared<VolatileCacheTier>();
+}
+
+std::shared_ptr<PersistentCacheTier> MakeBlockCache(Env* env,
+ const std::string& dbname) {
+ return NewBlockCache(env, dbname);
+}
+
+std::shared_ptr<PersistentCacheTier> MakeTieredCache(
+ Env* env, const std::string& dbname) {
+ const auto memory_size = 1 * 1024 * 1024 * kStressFactor;
+ return NewTieredCache(env, dbname, static_cast<size_t>(memory_size));
+}
+
+#ifdef OS_LINUX
+static void UniqueIdCallback(void* arg) {
+ int* result = reinterpret_cast<int*>(arg);
+ if (*result == -1) {
+ *result = 0;
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
+}
+#endif
+
+TEST_F(PersistentCacheTierTest, FactoryTest) {
+ for (auto nvm_opt : {true, false}) {
+ ASSERT_FALSE(cache_);
+ auto log = std::make_shared<ConsoleLogger>();
+ std::shared_ptr<PersistentCache> cache;
+ ASSERT_OK(NewPersistentCache(Env::Default(), path_,
+ /*size=*/1 * 1024 * 1024 * 1024, log, nvm_opt,
+ &cache));
+ ASSERT_TRUE(cache);
+ ASSERT_EQ(cache->Stats().size(), 1);
+ ASSERT_TRUE(cache->Stats()[0].size());
+ cache.reset();
+ }
+}
+
+PersistentCacheDBTest::PersistentCacheDBTest()
+ : DBTestBase("cache_test", /*env_do_fsync=*/true) {
+#ifdef OS_LINUX
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "NewRandomAccessFile:O_DIRECT", OnOpenForRead);
+#endif
+}
+
+// test template
+void PersistentCacheDBTest::RunTest(
+ const std::function<std::shared_ptr<PersistentCacheTier>(bool)>& new_pcache,
+ const size_t max_keys = 100 * 1024, const size_t max_usecase = 5) {
+ // number of insertion interations
+ int num_iter = static_cast<int>(max_keys * kStressFactor);
+
+ for (size_t iter = 0; iter < max_usecase; iter++) {
+ Options options;
+ options.write_buffer_size =
+ static_cast<size_t>(64 * 1024 * kStressFactor); // small write buffer
+ options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+ options = CurrentOptions(options);
+
+ // setup page cache
+ std::shared_ptr<PersistentCacheTier> pcache;
+ BlockBasedTableOptions table_options;
+ table_options.cache_index_and_filter_blocks = true;
+
+ const size_t size_max = std::numeric_limits<size_t>::max();
+
+ switch (iter) {
+ case 0:
+ // page cache, block cache, no-compressed cache
+ pcache = new_pcache(/*is_compressed=*/true);
+ table_options.persistent_cache = pcache;
+ table_options.block_cache = NewLRUCache(size_max);
+ table_options.block_cache_compressed = nullptr;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ break;
+ case 1:
+ // page cache, block cache, compressed cache
+ pcache = new_pcache(/*is_compressed=*/true);
+ table_options.persistent_cache = pcache;
+ table_options.block_cache = NewLRUCache(size_max);
+ table_options.block_cache_compressed = NewLRUCache(size_max);
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ break;
+ case 2:
+ // page cache, block cache, compressed cache + KNoCompression
+ // both block cache and compressed cache, but DB is not compressed
+ // also, make block cache sizes bigger, to trigger block cache hits
+ pcache = new_pcache(/*is_compressed=*/true);
+ table_options.persistent_cache = pcache;
+ table_options.block_cache = NewLRUCache(size_max);
+ table_options.block_cache_compressed = NewLRUCache(size_max);
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ options.compression = kNoCompression;
+ break;
+ case 3:
+ // page cache, no block cache, no compressed cache
+ pcache = new_pcache(/*is_compressed=*/false);
+ table_options.persistent_cache = pcache;
+ table_options.block_cache = nullptr;
+ table_options.block_cache_compressed = nullptr;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ break;
+ case 4:
+ // page cache, no block cache, no compressed cache
+ // Page cache caches compressed blocks
+ pcache = new_pcache(/*is_compressed=*/true);
+ table_options.persistent_cache = pcache;
+ table_options.block_cache = nullptr;
+ table_options.block_cache_compressed = nullptr;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ break;
+ default:
+ FAIL();
+ }
+
+ std::vector<std::string> values;
+ // insert data
+ Insert(options, table_options, num_iter, &values);
+ // flush all data in cache to device
+ pcache->TEST_Flush();
+ // verify data
+ Verify(num_iter, values);
+
+ auto block_miss = TestGetTickerCount(options, BLOCK_CACHE_MISS);
+ auto compressed_block_hit =
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
+ auto compressed_block_miss =
+ TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
+ auto page_hit = TestGetTickerCount(options, PERSISTENT_CACHE_HIT);
+ auto page_miss = TestGetTickerCount(options, PERSISTENT_CACHE_MISS);
+
+ // check that we triggered the appropriate code paths in the cache
+ switch (iter) {
+ case 0:
+ // page cache, block cache, no-compressed cache
+ ASSERT_GT(page_miss, 0);
+ ASSERT_GT(page_hit, 0);
+ ASSERT_GT(block_miss, 0);
+ ASSERT_EQ(compressed_block_miss, 0);
+ ASSERT_EQ(compressed_block_hit, 0);
+ break;
+ case 1:
+ // page cache, block cache, compressed cache
+ ASSERT_GT(page_miss, 0);
+ ASSERT_GT(block_miss, 0);
+ ASSERT_GT(compressed_block_miss, 0);
+ break;
+ case 2:
+ // page cache, block cache, compressed cache + KNoCompression
+ ASSERT_GT(page_miss, 0);
+ ASSERT_GT(page_hit, 0);
+ ASSERT_GT(block_miss, 0);
+ ASSERT_GT(compressed_block_miss, 0);
+ // remember kNoCompression
+ ASSERT_EQ(compressed_block_hit, 0);
+ break;
+ case 3:
+ case 4:
+ // page cache, no block cache, no compressed cache
+ ASSERT_GT(page_miss, 0);
+ ASSERT_GT(page_hit, 0);
+ ASSERT_EQ(compressed_block_hit, 0);
+ ASSERT_EQ(compressed_block_miss, 0);
+ break;
+ default:
+ FAIL();
+ }
+
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+
+ ASSERT_OK(pcache->Close());
+ }
+}
+
+// Travis is unable to handle the normal version of the tests running out of
+// fds, out of space and timeouts. This is an easier version of the test
+// specifically written for Travis.
+// Now used generally because main tests are too expensive as unit tests.
+TEST_F(PersistentCacheDBTest, BasicTest) {
+ RunTest(std::bind(&MakeBlockCache, env_, dbname_), /*max_keys=*/1024,
+ /*max_usecase=*/1);
+}
+
+// test table with block page cache
+// DISABLED for now (very expensive, especially memory)
+TEST_F(PersistentCacheDBTest, DISABLED_BlockCacheTest) {
+ RunTest(std::bind(&MakeBlockCache, env_, dbname_));
+}
+
+// test table with volatile page cache
+// DISABLED for now (very expensive, especially memory)
+TEST_F(PersistentCacheDBTest, DISABLED_VolatileCacheTest) {
+ RunTest(std::bind(&MakeVolatileCache, env_, dbname_));
+}
+
+// test table with tiered page cache
+// DISABLED for now (very expensive, especially memory)
+TEST_F(PersistentCacheDBTest, DISABLED_TieredCacheTest) {
+ RunTest(std::bind(&MakeTieredCache, env_, dbname_));
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+#else // !defined ROCKSDB_LITE
+int main() { return 0; }
+#endif // !defined ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_test.h b/src/rocksdb/utilities/persistent_cache/persistent_cache_test.h
new file mode 100644
index 000000000..f13155ed6
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_test.h
@@ -0,0 +1,286 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <functional>
+#include <limits>
+#include <list>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "db/db_test_util.h"
+#include "memory/arena.h"
+#include "port/port.h"
+#include "rocksdb/cache.h"
+#include "table/block_based/block_builder.h"
+#include "test_util/testharness.h"
+#include "util/random.h"
+#include "utilities/persistent_cache/volatile_tier_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+//
+// Unit tests for testing PersistentCacheTier
+//
+class PersistentCacheTierTest : public testing::Test {
+ public:
+ PersistentCacheTierTest();
+ virtual ~PersistentCacheTierTest() {
+ if (cache_) {
+ Status s = cache_->Close();
+ assert(s.ok());
+ }
+ }
+
+ protected:
+ // Flush cache
+ void Flush() {
+ if (cache_) {
+ cache_->TEST_Flush();
+ }
+ }
+
+ // create threaded workload
+ template <class T>
+ std::list<port::Thread> SpawnThreads(const size_t n, const T& fn) {
+ std::list<port::Thread> threads;
+ for (size_t i = 0; i < n; i++) {
+ port::Thread th(fn);
+ threads.push_back(std::move(th));
+ }
+ return threads;
+ }
+
+ // Wait for threads to join
+ void Join(std::list<port::Thread>&& threads) {
+ for (auto& th : threads) {
+ th.join();
+ }
+ threads.clear();
+ }
+
+ // Run insert workload in threads
+ void Insert(const size_t nthreads, const size_t max_keys) {
+ key_ = 0;
+ max_keys_ = max_keys;
+ // spawn threads
+ auto fn = std::bind(&PersistentCacheTierTest::InsertImpl, this);
+ auto threads = SpawnThreads(nthreads, fn);
+ // join with threads
+ Join(std::move(threads));
+ // Flush cache
+ Flush();
+ }
+
+ // Run verification on the cache
+ void Verify(const size_t nthreads = 1, const bool eviction_enabled = false) {
+ stats_verify_hits_ = 0;
+ stats_verify_missed_ = 0;
+ key_ = 0;
+ // spawn threads
+ auto fn =
+ std::bind(&PersistentCacheTierTest::VerifyImpl, this, eviction_enabled);
+ auto threads = SpawnThreads(nthreads, fn);
+ // join with threads
+ Join(std::move(threads));
+ }
+
+ // pad 0 to numbers
+ std::string PaddedNumber(const size_t data, const size_t pad_size) {
+ assert(pad_size);
+ char* ret = new char[pad_size];
+ int pos = static_cast<int>(pad_size) - 1;
+ size_t count = 0;
+ size_t t = data;
+ // copy numbers
+ while (t) {
+ count++;
+ ret[pos--] = '0' + t % 10;
+ t = t / 10;
+ }
+ // copy 0s
+ while (pos >= 0) {
+ ret[pos--] = '0';
+ }
+ // post condition
+ assert(count <= pad_size);
+ assert(pos == -1);
+ std::string result(ret, pad_size);
+ delete[] ret;
+ return result;
+ }
+
+ // Insert workload implementation
+ void InsertImpl() {
+ const std::string prefix = "key_prefix_";
+
+ while (true) {
+ size_t i = key_++;
+ if (i >= max_keys_) {
+ break;
+ }
+
+ char data[4 * 1024];
+ memset(data, '0' + (i % 10), sizeof(data));
+ auto k = prefix + PaddedNumber(i, /*count=*/8);
+ Slice key(k);
+ while (true) {
+ Status status = cache_->Insert(key, data, sizeof(data));
+ if (status.ok()) {
+ break;
+ }
+ ASSERT_TRUE(status.IsTryAgain());
+ Env::Default()->SleepForMicroseconds(1 * 1000 * 1000);
+ }
+ }
+ }
+
+ // Verification implementation
+ void VerifyImpl(const bool eviction_enabled = false) {
+ const std::string prefix = "key_prefix_";
+ while (true) {
+ size_t i = key_++;
+ if (i >= max_keys_) {
+ break;
+ }
+
+ char edata[4 * 1024];
+ memset(edata, '0' + (i % 10), sizeof(edata));
+ auto k = prefix + PaddedNumber(i, /*count=*/8);
+ Slice key(k);
+ std::unique_ptr<char[]> block;
+ size_t block_size;
+
+ if (eviction_enabled) {
+ if (!cache_->Lookup(key, &block, &block_size).ok()) {
+ // assume that the key is evicted
+ stats_verify_missed_++;
+ continue;
+ }
+ }
+
+ ASSERT_OK(cache_->Lookup(key, &block, &block_size));
+ ASSERT_EQ(block_size, sizeof(edata));
+ ASSERT_EQ(memcmp(edata, block.get(), sizeof(edata)), 0);
+ stats_verify_hits_++;
+ }
+ }
+
+ // template for insert test
+ void RunInsertTest(const size_t nthreads, const size_t max_keys) {
+ Insert(nthreads, max_keys);
+ Verify(nthreads);
+ ASSERT_EQ(stats_verify_hits_, max_keys);
+ ASSERT_EQ(stats_verify_missed_, 0);
+
+ ASSERT_OK(cache_->Close());
+ cache_.reset();
+ }
+
+ // template for negative insert test
+ void RunNegativeInsertTest(const size_t nthreads, const size_t max_keys) {
+ Insert(nthreads, max_keys);
+ Verify(nthreads, /*eviction_enabled=*/true);
+ ASSERT_LT(stats_verify_hits_, max_keys);
+ ASSERT_GT(stats_verify_missed_, 0);
+
+ ASSERT_OK(cache_->Close());
+ cache_.reset();
+ }
+
+ // template for insert with eviction test
+ void RunInsertTestWithEviction(const size_t nthreads, const size_t max_keys) {
+ Insert(nthreads, max_keys);
+ Verify(nthreads, /*eviction_enabled=*/true);
+ ASSERT_EQ(stats_verify_hits_ + stats_verify_missed_, max_keys);
+ ASSERT_GT(stats_verify_hits_, 0);
+ ASSERT_GT(stats_verify_missed_, 0);
+
+ ASSERT_OK(cache_->Close());
+ cache_.reset();
+ }
+
+ const std::string path_;
+ std::shared_ptr<Logger> log_;
+ std::shared_ptr<PersistentCacheTier> cache_;
+ std::atomic<size_t> key_{0};
+ size_t max_keys_ = 0;
+ std::atomic<size_t> stats_verify_hits_{0};
+ std::atomic<size_t> stats_verify_missed_{0};
+};
+
+//
+// RocksDB tests
+//
+class PersistentCacheDBTest : public DBTestBase {
+ public:
+ PersistentCacheDBTest();
+
+ static uint64_t TestGetTickerCount(const Options& options,
+ Tickers ticker_type) {
+ return static_cast<uint32_t>(
+ options.statistics->getTickerCount(ticker_type));
+ }
+
+ // insert data to table
+ void Insert(const Options& options,
+ const BlockBasedTableOptions& /*table_options*/,
+ const int num_iter, std::vector<std::string>* values) {
+ CreateAndReopenWithCF({"pikachu"}, options);
+ // default column family doesn't have block cache
+ Options no_block_cache_opts;
+ no_block_cache_opts.statistics = options.statistics;
+ no_block_cache_opts = CurrentOptions(no_block_cache_opts);
+ BlockBasedTableOptions table_options_no_bc;
+ table_options_no_bc.no_block_cache = true;
+ no_block_cache_opts.table_factory.reset(
+ NewBlockBasedTableFactory(table_options_no_bc));
+ ReopenWithColumnFamilies(
+ {"default", "pikachu"},
+ std::vector<Options>({no_block_cache_opts, options}));
+
+ Random rnd(301);
+
+ // Write 8MB (80 values, each 100K)
+ ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
+ std::string str;
+ for (int i = 0; i < num_iter; i++) {
+ if (i % 4 == 0) { // high compression ratio
+ str = rnd.RandomString(1000);
+ }
+ values->push_back(str);
+ ASSERT_OK(Put(1, Key(i), (*values)[i]));
+ }
+
+ // flush all data from memtable so that reads are from block cache
+ ASSERT_OK(Flush(1));
+ }
+
+ // verify data
+ void Verify(const int num_iter, const std::vector<std::string>& values) {
+ for (int j = 0; j < 2; ++j) {
+ for (int i = 0; i < num_iter; i++) {
+ ASSERT_EQ(Get(1, Key(i)), values[i]);
+ }
+ }
+ }
+
+ // test template
+ void RunTest(const std::function<std::shared_ptr<PersistentCacheTier>(bool)>&
+ new_pcache,
+ const size_t max_keys, const size_t max_usecase);
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.cc b/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.cc
new file mode 100644
index 000000000..54cbce8f7
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.cc
@@ -0,0 +1,167 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef ROCKSDB_LITE
+
+#include "utilities/persistent_cache/persistent_cache_tier.h"
+
+#include <cinttypes>
+#include <sstream>
+#include <string>
+
+namespace ROCKSDB_NAMESPACE {
+
+std::string PersistentCacheConfig::ToString() const {
+ std::string ret;
+ ret.reserve(20000);
+ const int kBufferSize = 200;
+ char buffer[kBufferSize];
+
+ snprintf(buffer, kBufferSize, " path: %s\n", path.c_str());
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize, " enable_direct_reads: %d\n",
+ enable_direct_reads);
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize, " enable_direct_writes: %d\n",
+ enable_direct_writes);
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize, " cache_size: %" PRIu64 "\n", cache_size);
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize, " cache_file_size: %" PRIu32 "\n",
+ cache_file_size);
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize, " writer_qdepth: %" PRIu32 "\n",
+ writer_qdepth);
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize, " pipeline_writes: %d\n", pipeline_writes);
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize,
+ " max_write_pipeline_backlog_size: %" PRIu64 "\n",
+ max_write_pipeline_backlog_size);
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize, " write_buffer_size: %" PRIu32 "\n",
+ write_buffer_size);
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize, " writer_dispatch_size: %" PRIu64 "\n",
+ writer_dispatch_size);
+ ret.append(buffer);
+ snprintf(buffer, kBufferSize, " is_compressed: %d\n", is_compressed);
+ ret.append(buffer);
+
+ return ret;
+}
+
+//
+// PersistentCacheTier implementation
+//
+Status PersistentCacheTier::Open() {
+ if (next_tier_) {
+ return next_tier_->Open();
+ }
+ return Status::OK();
+}
+
+Status PersistentCacheTier::Close() {
+ if (next_tier_) {
+ return next_tier_->Close();
+ }
+ return Status::OK();
+}
+
+bool PersistentCacheTier::Reserve(const size_t /*size*/) {
+ // default implementation is a pass through
+ return true;
+}
+
+bool PersistentCacheTier::Erase(const Slice& /*key*/) {
+ // default implementation is a pass through since not all cache tiers might
+ // support erase
+ return true;
+}
+
+std::string PersistentCacheTier::PrintStats() {
+ std::ostringstream os;
+ for (auto tier_stats : Stats()) {
+ os << "---- next tier -----" << std::endl;
+ for (auto stat : tier_stats) {
+ os << stat.first << ": " << stat.second << std::endl;
+ }
+ }
+ return os.str();
+}
+
+PersistentCache::StatsType PersistentCacheTier::Stats() {
+ if (next_tier_) {
+ return next_tier_->Stats();
+ }
+ return PersistentCache::StatsType{};
+}
+
+uint64_t PersistentCacheTier::NewId() {
+ return last_id_.fetch_add(1, std::memory_order_relaxed);
+}
+
+//
+// PersistentTieredCache implementation
+//
+PersistentTieredCache::~PersistentTieredCache() { assert(tiers_.empty()); }
+
+Status PersistentTieredCache::Open() {
+ assert(!tiers_.empty());
+ return tiers_.front()->Open();
+}
+
+Status PersistentTieredCache::Close() {
+ assert(!tiers_.empty());
+ Status status = tiers_.front()->Close();
+ if (status.ok()) {
+ tiers_.clear();
+ }
+ return status;
+}
+
+bool PersistentTieredCache::Erase(const Slice& key) {
+ assert(!tiers_.empty());
+ return tiers_.front()->Erase(key);
+}
+
+PersistentCache::StatsType PersistentTieredCache::Stats() {
+ assert(!tiers_.empty());
+ return tiers_.front()->Stats();
+}
+
+std::string PersistentTieredCache::PrintStats() {
+ assert(!tiers_.empty());
+ return tiers_.front()->PrintStats();
+}
+
+Status PersistentTieredCache::Insert(const Slice& page_key, const char* data,
+ const size_t size) {
+ assert(!tiers_.empty());
+ return tiers_.front()->Insert(page_key, data, size);
+}
+
+Status PersistentTieredCache::Lookup(const Slice& page_key,
+ std::unique_ptr<char[]>* data,
+ size_t* size) {
+ assert(!tiers_.empty());
+ return tiers_.front()->Lookup(page_key, data, size);
+}
+
+void PersistentTieredCache::AddTier(const Tier& tier) {
+ if (!tiers_.empty()) {
+ tiers_.back()->set_next_tier(tier);
+ }
+ tiers_.push_back(tier);
+}
+
+bool PersistentTieredCache::IsCompressed() {
+ assert(tiers_.size());
+ return tiers_.front()->IsCompressed();
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.h b/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.h
new file mode 100644
index 000000000..65aadcd3f
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.h
@@ -0,0 +1,342 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <limits>
+#include <list>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "monitoring/histogram.h"
+#include "rocksdb/env.h"
+#include "rocksdb/persistent_cache.h"
+#include "rocksdb/status.h"
+#include "rocksdb/system_clock.h"
+
+// Persistent Cache
+//
+// Persistent cache is tiered key-value cache that can use persistent medium. It
+// is a generic design and can leverage any storage medium -- disk/SSD/NVM/RAM.
+// The code has been kept generic but significant benchmark/design/development
+// time has been spent to make sure the cache performs appropriately for
+// respective storage medium.
+// The file defines
+// PersistentCacheTier : Implementation that handles individual cache tier
+// PersistentTieresCache : Implementation that handles all tiers as a logical
+// unit
+//
+// PersistentTieredCache architecture:
+// +--------------------------+ PersistentCacheTier that handles multiple tiers
+// | +----------------+ |
+// | | RAM | PersistentCacheTier that handles RAM (VolatileCacheImpl)
+// | +----------------+ |
+// | | next |
+// | v |
+// | +----------------+ |
+// | | NVM | PersistentCacheTier implementation that handles NVM
+// | +----------------+ (BlockCacheImpl)
+// | | next |
+// | V |
+// | +----------------+ |
+// | | LE-SSD | PersistentCacheTier implementation that handles LE-SSD
+// | +----------------+ (BlockCacheImpl)
+// | | |
+// | V |
+// | null |
+// +--------------------------+
+// |
+// V
+// null
+namespace ROCKSDB_NAMESPACE {
+
+// Persistent Cache Config
+//
+// This struct captures all the options that are used to configure persistent
+// cache. Some of the terminologies used in naming the options are
+//
+// dispatch size :
+// This is the size in which IO is dispatched to the device
+//
+// write buffer size :
+// This is the size of an individual write buffer size. Write buffers are
+// grouped to form buffered file.
+//
+// cache size :
+// This is the logical maximum for the cache size
+//
+// qdepth :
+// This is the max number of IOs that can issues to the device in parallel
+//
+// pepeling :
+// The writer code path follows pipelined architecture, which means the
+// operations are handed off from one stage to another
+//
+// pipelining backlog size :
+// With the pipelined architecture, there can always be backlogging of ops in
+// pipeline queues. This is the maximum backlog size after which ops are dropped
+// from queue
+struct PersistentCacheConfig {
+ explicit PersistentCacheConfig(
+ Env* const _env, const std::string& _path, const uint64_t _cache_size,
+ const std::shared_ptr<Logger>& _log,
+ const uint32_t _write_buffer_size = 1 * 1024 * 1024 /*1MB*/) {
+ env = _env;
+ clock = (env != nullptr) ? env->GetSystemClock().get()
+ : SystemClock::Default().get();
+ path = _path;
+ log = _log;
+ cache_size = _cache_size;
+ writer_dispatch_size = write_buffer_size = _write_buffer_size;
+ }
+
+ //
+ // Validate the settings. Our intentions are to catch erroneous settings ahead
+ // of time instead going violating invariants or causing dead locks.
+ //
+ Status ValidateSettings() const {
+ // (1) check pre-conditions for variables
+ if (!env || path.empty()) {
+ return Status::InvalidArgument("empty or null args");
+ }
+
+ // (2) assert size related invariants
+ // - cache size cannot be less than cache file size
+ // - individual write buffer size cannot be greater than cache file size
+ // - total write buffer size cannot be less than 2X cache file size
+ if (cache_size < cache_file_size || write_buffer_size >= cache_file_size ||
+ write_buffer_size * write_buffer_count() < 2 * cache_file_size) {
+ return Status::InvalidArgument("invalid cache size");
+ }
+
+ // (2) check writer settings
+ // - Queue depth cannot be 0
+ // - writer_dispatch_size cannot be greater than writer_buffer_size
+ // - dispatch size and buffer size need to be aligned
+ if (!writer_qdepth || writer_dispatch_size > write_buffer_size ||
+ write_buffer_size % writer_dispatch_size) {
+ return Status::InvalidArgument("invalid writer settings");
+ }
+
+ return Status::OK();
+ }
+
+ //
+ // Env abstraction to use for system level operations
+ //
+ Env* env;
+ SystemClock* clock;
+ //
+ // Path for the block cache where blocks are persisted
+ //
+ std::string path;
+
+ //
+ // Log handle for logging messages
+ //
+ std::shared_ptr<Logger> log;
+
+ //
+ // Enable direct IO for reading
+ //
+ bool enable_direct_reads = true;
+
+ //
+ // Enable direct IO for writing
+ //
+ bool enable_direct_writes = false;
+
+ //
+ // Logical cache size
+ //
+ uint64_t cache_size = std::numeric_limits<uint64_t>::max();
+
+ // cache-file-size
+ //
+ // Cache consists of multiples of small files. This parameter defines the
+ // size of an individual cache file
+ //
+ // default: 1M
+ uint32_t cache_file_size = 100ULL * 1024 * 1024;
+
+ // writer-qdepth
+ //
+ // The writers can issues IO to the devices in parallel. This parameter
+ // controls the max number if IOs that can issues in parallel to the block
+ // device
+ //
+ // default :1
+ uint32_t writer_qdepth = 1;
+
+ // pipeline-writes
+ //
+ // The write optionally follow pipelined architecture. This helps
+ // avoid regression in the eviction code path of the primary tier. This
+ // parameter defines if pipelining is enabled or disabled
+ //
+ // default: true
+ bool pipeline_writes = true;
+
+ // max-write-pipeline-backlog-size
+ //
+ // Max pipeline buffer size. This is the maximum backlog we can accumulate
+ // while waiting for writes. After the limit, new ops will be dropped.
+ //
+ // Default: 1GiB
+ uint64_t max_write_pipeline_backlog_size = 1ULL * 1024 * 1024 * 1024;
+
+ // write-buffer-size
+ //
+ // This is the size in which buffer slabs are allocated.
+ //
+ // Default: 1M
+ uint32_t write_buffer_size = 1ULL * 1024 * 1024;
+
+ // write-buffer-count
+ //
+ // This is the total number of buffer slabs. This is calculated as a factor of
+ // file size in order to avoid dead lock.
+ size_t write_buffer_count() const {
+ assert(write_buffer_size);
+ return static_cast<size_t>((writer_qdepth + 1.2) * cache_file_size /
+ write_buffer_size);
+ }
+
+ // writer-dispatch-size
+ //
+ // The writer thread will dispatch the IO at the specified IO size
+ //
+ // default: 1M
+ uint64_t writer_dispatch_size = 1ULL * 1024 * 1024;
+
+ // is_compressed
+ //
+ // This option determines if the cache will run in compressed mode or
+ // uncompressed mode
+ bool is_compressed = true;
+
+ PersistentCacheConfig MakePersistentCacheConfig(
+ const std::string& path, const uint64_t size,
+ const std::shared_ptr<Logger>& log);
+
+ std::string ToString() const;
+};
+
+// Persistent Cache Tier
+//
+// This a logical abstraction that defines a tier of the persistent cache. Tiers
+// can be stacked over one another. PersistentCahe provides the basic definition
+// for accessing/storing in the cache. PersistentCacheTier extends the interface
+// to enable management and stacking of tiers.
+class PersistentCacheTier : public PersistentCache {
+ public:
+ using Tier = std::shared_ptr<PersistentCacheTier>;
+
+ virtual ~PersistentCacheTier() {}
+
+ // Open the persistent cache tier
+ virtual Status Open();
+
+ // Close the persistent cache tier
+ virtual Status Close();
+
+ // Reserve space up to 'size' bytes
+ virtual bool Reserve(const size_t size);
+
+ // Erase a key from the cache
+ virtual bool Erase(const Slice& key);
+
+ // Print stats to string recursively
+ virtual std::string PrintStats();
+
+ virtual PersistentCache::StatsType Stats() override;
+
+ // Insert to page cache
+ virtual Status Insert(const Slice& page_key, const char* data,
+ const size_t size) override = 0;
+
+ // Lookup page cache by page identifier
+ virtual Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
+ size_t* size) override = 0;
+
+ // Does it store compressed data ?
+ virtual bool IsCompressed() override = 0;
+
+ virtual std::string GetPrintableOptions() const override = 0;
+
+ virtual uint64_t NewId() override;
+
+ // Return a reference to next tier
+ virtual Tier& next_tier() { return next_tier_; }
+
+ // Set the value for next tier
+ virtual void set_next_tier(const Tier& tier) {
+ assert(!next_tier_);
+ next_tier_ = tier;
+ }
+
+ virtual void TEST_Flush() {
+ if (next_tier_) {
+ next_tier_->TEST_Flush();
+ }
+ }
+
+ private:
+ Tier next_tier_; // next tier
+ std::atomic<uint64_t> last_id_{1};
+};
+
+// PersistentTieredCache
+//
+// Abstraction that helps you construct a tiers of persistent caches as a
+// unified cache. The tier(s) of cache will act a single tier for management
+// ease and support PersistentCache methods for accessing data.
+class PersistentTieredCache : public PersistentCacheTier {
+ public:
+ virtual ~PersistentTieredCache();
+
+ Status Open() override;
+ Status Close() override;
+ bool Erase(const Slice& key) override;
+ std::string PrintStats() override;
+ PersistentCache::StatsType Stats() override;
+ Status Insert(const Slice& page_key, const char* data,
+ const size_t size) override;
+ Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
+ size_t* size) override;
+ bool IsCompressed() override;
+
+ std::string GetPrintableOptions() const override {
+ return "PersistentTieredCache";
+ }
+
+ void AddTier(const Tier& tier);
+
+ Tier& next_tier() override {
+ auto it = tiers_.end();
+ return (*it)->next_tier();
+ }
+
+ void set_next_tier(const Tier& tier) override {
+ auto it = tiers_.end();
+ (*it)->set_next_tier(tier);
+ }
+
+ void TEST_Flush() override {
+ assert(!tiers_.empty());
+ tiers_.front()->TEST_Flush();
+ PersistentCacheTier::TEST_Flush();
+ }
+
+ protected:
+ std::list<Tier> tiers_; // list of tiers top-down
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_util.h b/src/rocksdb/utilities/persistent_cache/persistent_cache_util.h
new file mode 100644
index 000000000..2a769652d
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_util.h
@@ -0,0 +1,67 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#include <limits>
+#include <list>
+
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+//
+// Simple synchronized queue implementation with the option of
+// bounding the queue
+//
+// On overflow, the elements will be discarded
+//
+template <class T>
+class BoundedQueue {
+ public:
+ explicit BoundedQueue(
+ const size_t max_size = std::numeric_limits<size_t>::max())
+ : cond_empty_(&lock_), max_size_(max_size) {}
+
+ virtual ~BoundedQueue() {}
+
+ void Push(T&& t) {
+ MutexLock _(&lock_);
+ if (max_size_ != std::numeric_limits<size_t>::max() &&
+ size_ + t.Size() >= max_size_) {
+ // overflow
+ return;
+ }
+
+ size_ += t.Size();
+ q_.push_back(std::move(t));
+ cond_empty_.SignalAll();
+ }
+
+ T Pop() {
+ MutexLock _(&lock_);
+ while (q_.empty()) {
+ cond_empty_.Wait();
+ }
+
+ T t = std::move(q_.front());
+ size_ -= t.Size();
+ q_.pop_front();
+ return t;
+ }
+
+ size_t Size() const {
+ MutexLock _(&lock_);
+ return size_;
+ }
+
+ private:
+ mutable port::Mutex lock_;
+ port::CondVar cond_empty_;
+ std::list<T> q_;
+ size_t size_ = 0;
+ const size_t max_size_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.cc b/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.cc
new file mode 100644
index 000000000..45d2830aa
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.cc
@@ -0,0 +1,140 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef ROCKSDB_LITE
+
+#include "utilities/persistent_cache/volatile_tier_impl.h"
+
+#include <string>
+
+namespace ROCKSDB_NAMESPACE {
+
+void VolatileCacheTier::DeleteCacheData(VolatileCacheTier::CacheData* data) {
+ assert(data);
+ delete data;
+}
+
+VolatileCacheTier::~VolatileCacheTier() { index_.Clear(&DeleteCacheData); }
+
+PersistentCache::StatsType VolatileCacheTier::Stats() {
+ std::map<std::string, double> stat;
+ stat.insert({"persistent_cache.volatile_cache.hits",
+ static_cast<double>(stats_.cache_hits_)});
+ stat.insert({"persistent_cache.volatile_cache.misses",
+ static_cast<double>(stats_.cache_misses_)});
+ stat.insert({"persistent_cache.volatile_cache.inserts",
+ static_cast<double>(stats_.cache_inserts_)});
+ stat.insert({"persistent_cache.volatile_cache.evicts",
+ static_cast<double>(stats_.cache_evicts_)});
+ stat.insert({"persistent_cache.volatile_cache.hit_pct",
+ static_cast<double>(stats_.CacheHitPct())});
+ stat.insert({"persistent_cache.volatile_cache.miss_pct",
+ static_cast<double>(stats_.CacheMissPct())});
+
+ auto out = PersistentCacheTier::Stats();
+ out.push_back(stat);
+ return out;
+}
+
+Status VolatileCacheTier::Insert(const Slice& page_key, const char* data,
+ const size_t size) {
+ // precondition
+ assert(data);
+ assert(size);
+
+ // increment the size
+ size_ += size;
+
+ // check if we have overshot the limit, if so evict some space
+ while (size_ > max_size_) {
+ if (!Evict()) {
+ // unable to evict data, we give up so we don't spike read
+ // latency
+ assert(size_ >= size);
+ size_ -= size;
+ return Status::TryAgain("Unable to evict any data");
+ }
+ }
+
+ assert(size_ >= size);
+
+ // insert order: LRU, followed by index
+ std::string key(page_key.data(), page_key.size());
+ std::string value(data, size);
+ std::unique_ptr<CacheData> cache_data(
+ new CacheData(std::move(key), std::move(value)));
+ bool ok = index_.Insert(cache_data.get());
+ if (!ok) {
+ // decrement the size that we incremented ahead of time
+ assert(size_ >= size);
+ size_ -= size;
+ // failed to insert to cache, block already in cache
+ return Status::TryAgain("key already exists in volatile cache");
+ }
+
+ cache_data.release();
+ stats_.cache_inserts_++;
+ return Status::OK();
+}
+
+Status VolatileCacheTier::Lookup(const Slice& page_key,
+ std::unique_ptr<char[]>* result,
+ size_t* size) {
+ CacheData key(std::move(page_key.ToString()));
+ CacheData* kv;
+ bool ok = index_.Find(&key, &kv);
+ if (ok) {
+ // set return data
+ result->reset(new char[kv->value.size()]);
+ memcpy(result->get(), kv->value.c_str(), kv->value.size());
+ *size = kv->value.size();
+ // drop the reference on cache data
+ kv->refs_--;
+ // update stats
+ stats_.cache_hits_++;
+ return Status::OK();
+ }
+
+ stats_.cache_misses_++;
+
+ if (next_tier()) {
+ return next_tier()->Lookup(page_key, result, size);
+ }
+
+ return Status::NotFound("key not found in volatile cache");
+}
+
+bool VolatileCacheTier::Erase(const Slice& /*key*/) {
+ assert(!"not supported");
+ return true;
+}
+
+bool VolatileCacheTier::Evict() {
+ CacheData* edata = index_.Evict();
+ if (!edata) {
+ // not able to evict any object
+ return false;
+ }
+
+ stats_.cache_evicts_++;
+
+ // push the evicted object to the next level
+ if (next_tier()) {
+ // TODO: Should the insert error be ignored?
+ Status s = next_tier()->Insert(Slice(edata->key), edata->value.c_str(),
+ edata->value.size());
+ s.PermitUncheckedError();
+ }
+
+ // adjust size and destroy data
+ size_ -= edata->value.size();
+ delete edata;
+
+ return true;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif
diff --git a/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.h b/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.h
new file mode 100644
index 000000000..09265e457
--- /dev/null
+++ b/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.h
@@ -0,0 +1,141 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+#include <limits>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "rocksdb/cache.h"
+#include "utilities/persistent_cache/hash_table.h"
+#include "utilities/persistent_cache/hash_table_evictable.h"
+#include "utilities/persistent_cache/persistent_cache_tier.h"
+
+// VolatileCacheTier
+//
+// This file provides persistent cache tier implementation for caching
+// key/values in RAM.
+//
+// key/values
+// |
+// V
+// +-------------------+
+// | VolatileCacheTier | Store in an evictable hash table
+// +-------------------+
+// |
+// V
+// on eviction
+// pushed to next tier
+//
+// The implementation is designed to be concurrent. The evictable hash table
+// implementation is not concurrent at this point though.
+//
+// The eviction algorithm is LRU
+namespace ROCKSDB_NAMESPACE {
+
+class VolatileCacheTier : public PersistentCacheTier {
+ public:
+ explicit VolatileCacheTier(
+ const bool is_compressed = true,
+ const size_t max_size = std::numeric_limits<size_t>::max())
+ : is_compressed_(is_compressed), max_size_(max_size) {}
+
+ virtual ~VolatileCacheTier();
+
+ // insert to cache
+ Status Insert(const Slice& page_key, const char* data,
+ const size_t size) override;
+ // lookup key in cache
+ Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
+ size_t* size) override;
+
+ // is compressed cache ?
+ bool IsCompressed() override { return is_compressed_; }
+
+ // erase key from cache
+ bool Erase(const Slice& key) override;
+
+ std::string GetPrintableOptions() const override {
+ return "VolatileCacheTier";
+ }
+
+ // Expose stats as map
+ PersistentCache::StatsType Stats() override;
+
+ private:
+ //
+ // Cache data abstraction
+ //
+ struct CacheData : LRUElement<CacheData> {
+ explicit CacheData(CacheData&& rhs) noexcept
+ : key(std::move(rhs.key)), value(std::move(rhs.value)) {}
+
+ explicit CacheData(const std::string& _key, const std::string& _value = "")
+ : key(_key), value(_value) {}
+
+ virtual ~CacheData() {}
+
+ const std::string key;
+ const std::string value;
+ };
+
+ static void DeleteCacheData(CacheData* data);
+
+ //
+ // Index and LRU definition
+ //
+ struct CacheDataHash {
+ uint64_t operator()(const CacheData* obj) const {
+ assert(obj);
+ return std::hash<std::string>()(obj->key);
+ }
+ };
+
+ struct CacheDataEqual {
+ bool operator()(const CacheData* lhs, const CacheData* rhs) const {
+ assert(lhs);
+ assert(rhs);
+ return lhs->key == rhs->key;
+ }
+ };
+
+ struct Statistics {
+ std::atomic<uint64_t> cache_misses_{0};
+ std::atomic<uint64_t> cache_hits_{0};
+ std::atomic<uint64_t> cache_inserts_{0};
+ std::atomic<uint64_t> cache_evicts_{0};
+
+ double CacheHitPct() const {
+ auto lookups = cache_hits_ + cache_misses_;
+ return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
+ }
+
+ double CacheMissPct() const {
+ auto lookups = cache_hits_ + cache_misses_;
+ return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
+ }
+ };
+
+ using IndexType =
+ EvictableHashTable<CacheData, CacheDataHash, CacheDataEqual>;
+
+ // Evict LRU tail
+ bool Evict();
+
+ const bool is_compressed_ = true; // does it store compressed data
+ IndexType index_; // in-memory cache
+ std::atomic<uint64_t> max_size_{0}; // Maximum size of the cache
+ std::atomic<uint64_t> size_{0}; // Size of the cache
+ Statistics stats_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif