diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/persistent_cache | |
parent | Initial commit. (diff) | |
download | ceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/persistent_cache')
20 files changed, 4836 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc b/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc new file mode 100644 index 000000000..8ad9bb1b1 --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier.cc @@ -0,0 +1,422 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/block_cache_tier.h" + +#include <utility> +#include <vector> + +#include "logging/logging.h" +#include "port/port.h" +#include "test_util/sync_point.h" +#include "util/stop_watch.h" +#include "utilities/persistent_cache/block_cache_tier_file.h" + +namespace ROCKSDB_NAMESPACE { + +// +// BlockCacheImpl +// +Status BlockCacheTier::Open() { + Status status; + + WriteLock _(&lock_); + + assert(!size_); + + // Check the validity of the options + status = opt_.ValidateSettings(); + assert(status.ok()); + if (!status.ok()) { + Error(opt_.log, "Invalid block cache options"); + return status; + } + + // Create base directory or cleanup existing directory + status = opt_.env->CreateDirIfMissing(opt_.path); + if (!status.ok()) { + Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(), + status.ToString().c_str()); + return status; + } + + // Create base/<cache dir> directory + status = opt_.env->CreateDir(GetCachePath()); + if (!status.ok()) { + // directory already exists, clean it up + status = CleanupCacheFolder(GetCachePath()); + assert(status.ok()); + if (!status.ok()) { + Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(), + status.ToString().c_str()); + return status; + } + } + + // create a new file + assert(!cache_file_); + status = NewCacheFile(); + if (!status.ok()) { + Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(), + status.ToString().c_str()); + return status; + } + + assert(cache_file_); + + if (opt_.pipeline_writes) { + assert(!insert_th_.joinable()); + insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this); + } + + return Status::OK(); +} + +bool IsCacheFile(const std::string& file) { + // check if the file has .rc suffix + // Unfortunately regex support across compilers is not even, so we use simple + // string parsing + size_t pos = file.find("."); + if (pos == std::string::npos) { + return false; + } + + std::string suffix = file.substr(pos); + return suffix == ".rc"; +} + +Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) { + std::vector<std::string> files; + Status status = opt_.env->GetChildren(folder, &files); + if (!status.ok()) { + Error(opt_.log, "Error getting files for %s. %s", folder.c_str(), + status.ToString().c_str()); + return status; + } + + // cleanup files with the patter :digi:.rc + for (auto file : files) { + if (IsCacheFile(file)) { + // cache file + Info(opt_.log, "Removing file %s.", file.c_str()); + status = opt_.env->DeleteFile(folder + "/" + file); + if (!status.ok()) { + Error(opt_.log, "Error deleting file %s. %s", file.c_str(), + status.ToString().c_str()); + return status; + } + } else { + ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str()); + } + } + return Status::OK(); +} + +Status BlockCacheTier::Close() { + // stop the insert thread + if (opt_.pipeline_writes && insert_th_.joinable()) { + InsertOp op(/*quit=*/true); + insert_ops_.Push(std::move(op)); + insert_th_.join(); + } + + // stop the writer before + writer_.Stop(); + + // clear all metadata + WriteLock _(&lock_); + metadata_.Clear(); + return Status::OK(); +} + +template <class T> +void Add(std::map<std::string, double>* stats, const std::string& key, + const T& t) { + stats->insert({key, static_cast<double>(t)}); +} + +PersistentCache::StatsType BlockCacheTier::Stats() { + std::map<std::string, double> stats; + Add(&stats, "persistentcache.blockcachetier.bytes_piplined", + stats_.bytes_pipelined_.Average()); + Add(&stats, "persistentcache.blockcachetier.bytes_written", + stats_.bytes_written_.Average()); + Add(&stats, "persistentcache.blockcachetier.bytes_read", + stats_.bytes_read_.Average()); + Add(&stats, "persistentcache.blockcachetier.insert_dropped", + stats_.insert_dropped_); + Add(&stats, "persistentcache.blockcachetier.cache_hits", stats_.cache_hits_); + Add(&stats, "persistentcache.blockcachetier.cache_misses", + stats_.cache_misses_); + Add(&stats, "persistentcache.blockcachetier.cache_errors", + stats_.cache_errors_); + Add(&stats, "persistentcache.blockcachetier.cache_hits_pct", + stats_.CacheHitPct()); + Add(&stats, "persistentcache.blockcachetier.cache_misses_pct", + stats_.CacheMissPct()); + Add(&stats, "persistentcache.blockcachetier.read_hit_latency", + stats_.read_hit_latency_.Average()); + Add(&stats, "persistentcache.blockcachetier.read_miss_latency", + stats_.read_miss_latency_.Average()); + Add(&stats, "persistentcache.blockcachetier.write_latency", + stats_.write_latency_.Average()); + + auto out = PersistentCacheTier::Stats(); + out.push_back(stats); + return out; +} + +Status BlockCacheTier::Insert(const Slice& key, const char* data, + const size_t size) { + // update stats + stats_.bytes_pipelined_.Add(size); + + if (opt_.pipeline_writes) { + // off load the write to the write thread + insert_ops_.Push( + InsertOp(key.ToString(), std::move(std::string(data, size)))); + return Status::OK(); + } + + assert(!opt_.pipeline_writes); + return InsertImpl(key, Slice(data, size)); +} + +void BlockCacheTier::InsertMain() { + while (true) { + InsertOp op(insert_ops_.Pop()); + + if (op.signal_) { + // that is a secret signal to exit + break; + } + + size_t retry = 0; + Status s; + while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) { + if (retry > kMaxRetry) { + break; + } + + // this can happen when the buffers are full, we wait till some buffers + // are free. Why don't we wait inside the code. This is because we want + // to support both pipelined and non-pipelined mode + buffer_allocator_.WaitUntilUsable(); + retry++; + } + + if (!s.ok()) { + stats_.insert_dropped_++; + } + } +} + +Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) { + // pre-condition + assert(key.size()); + assert(data.size()); + assert(cache_file_); + + StopWatchNano timer(opt_.clock, /*auto_start=*/true); + + WriteLock _(&lock_); + + LBA lba; + if (metadata_.Lookup(key, &lba)) { + // the key already exists, this is duplicate insert + return Status::OK(); + } + + while (!cache_file_->Append(key, data, &lba)) { + if (!cache_file_->Eof()) { + ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d", + cache_file_->cacheid()); + stats_.write_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::TryAgain(); + } + + assert(cache_file_->Eof()); + Status status = NewCacheFile(); + if (!status.ok()) { + return status; + } + } + + // Insert into lookup index + BlockInfo* info = metadata_.Insert(key, lba); + assert(info); + if (!info) { + return Status::IOError("Unexpected error inserting to index"); + } + + // insert to cache file reverse mapping + cache_file_->Add(info); + + // update stats + stats_.bytes_written_.Add(data.size()); + stats_.write_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::OK(); +} + +Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val, + size_t* size) { + StopWatchNano timer(opt_.clock, /*auto_start=*/true); + + LBA lba; + bool status; + status = metadata_.Lookup(key, &lba); + if (!status) { + stats_.cache_misses_++; + stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::NotFound("blockcache: key not found"); + } + + BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_); + if (!file) { + // this can happen because the block index and cache file index are + // different, and the cache file might be removed between the two lookups + stats_.cache_misses_++; + stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::NotFound("blockcache: cache file not found"); + } + + assert(file->refs_); + + std::unique_ptr<char[]> scratch(new char[lba.size_]); + Slice blk_key; + Slice blk_val; + + status = file->Read(lba, &blk_key, &blk_val, scratch.get()); + --file->refs_; + if (!status) { + stats_.cache_misses_++; + stats_.cache_errors_++; + stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::NotFound("blockcache: error reading data"); + } + + assert(blk_key == key); + + val->reset(new char[blk_val.size()]); + memcpy(val->get(), blk_val.data(), blk_val.size()); + *size = blk_val.size(); + + stats_.bytes_read_.Add(*size); + stats_.cache_hits_++; + stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000); + + return Status::OK(); +} + +bool BlockCacheTier::Erase(const Slice& key) { + WriteLock _(&lock_); + BlockInfo* info = metadata_.Remove(key); + assert(info); + delete info; + return true; +} + +Status BlockCacheTier::NewCacheFile() { + lock_.AssertHeld(); + + TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir", + (void*)(GetCachePath().c_str())); + + std::unique_ptr<WriteableCacheFile> f(new WriteableCacheFile( + opt_.env, &buffer_allocator_, &writer_, GetCachePath(), writer_cache_id_, + opt_.cache_file_size, opt_.log)); + + bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads); + if (!status) { + return Status::IOError("Error creating file"); + } + + Info(opt_.log, "Created cache file %d", writer_cache_id_); + + writer_cache_id_++; + cache_file_ = f.release(); + + // insert to cache files tree + status = metadata_.Insert(cache_file_); + assert(status); + if (!status) { + Error(opt_.log, "Error inserting to metadata"); + return Status::IOError("Error inserting to metadata"); + } + + return Status::OK(); +} + +bool BlockCacheTier::Reserve(const size_t size) { + WriteLock _(&lock_); + assert(size_ <= opt_.cache_size); + + if (size + size_ <= opt_.cache_size) { + // there is enough space to write + size_ += size; + return true; + } + + assert(size + size_ >= opt_.cache_size); + // there is not enough space to fit the requested data + // we can clear some space by evicting cold data + + const double retain_fac = (100 - kEvictPct) / static_cast<double>(100); + while (size + size_ > opt_.cache_size * retain_fac) { + std::unique_ptr<BlockCacheFile> f(metadata_.Evict()); + if (!f) { + // nothing is evictable + return false; + } + assert(!f->refs_); + uint64_t file_size; + if (!f->Delete(&file_size).ok()) { + // unable to delete file + return false; + } + + assert(file_size <= size_); + size_ -= file_size; + } + + size_ += size; + assert(size_ <= opt_.cache_size * 0.9); + return true; +} + +Status NewPersistentCache(Env* const env, const std::string& path, + const uint64_t size, + const std::shared_ptr<Logger>& log, + const bool optimized_for_nvm, + std::shared_ptr<PersistentCache>* cache) { + if (!cache) { + return Status::IOError("invalid argument cache"); + } + + auto opt = PersistentCacheConfig(env, path, size, log); + if (optimized_for_nvm) { + // the default settings are optimized for SSD + // NVM devices are better accessed with 4K direct IO and written with + // parallelism + opt.enable_direct_writes = true; + opt.writer_qdepth = 4; + opt.writer_dispatch_size = 4 * 1024; + } + + auto pcache = std::make_shared<BlockCacheTier>(opt); + Status s = pcache->Open(); + + if (!s.ok()) { + return s; + } + + *cache = pcache; + return s; +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ifndef ROCKSDB_LITE diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier.h b/src/rocksdb/utilities/persistent_cache/block_cache_tier.h new file mode 100644 index 000000000..1aac287cc --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier.h @@ -0,0 +1,156 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#ifndef ROCKSDB_LITE + +#ifndef OS_WIN +#include <unistd.h> +#endif // ! OS_WIN + +#include <atomic> +#include <list> +#include <memory> +#include <set> +#include <sstream> +#include <stdexcept> +#include <string> +#include <thread> + +#include "memory/arena.h" +#include "memtable/skiplist.h" +#include "monitoring/histogram.h" +#include "port/port.h" +#include "rocksdb/cache.h" +#include "rocksdb/comparator.h" +#include "rocksdb/persistent_cache.h" +#include "rocksdb/system_clock.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/mutexlock.h" +#include "utilities/persistent_cache/block_cache_tier_file.h" +#include "utilities/persistent_cache/block_cache_tier_metadata.h" +#include "utilities/persistent_cache/persistent_cache_util.h" + +namespace ROCKSDB_NAMESPACE { + +// +// Block cache tier implementation +// +class BlockCacheTier : public PersistentCacheTier { + public: + explicit BlockCacheTier(const PersistentCacheConfig& opt) + : opt_(opt), + insert_ops_(static_cast<size_t>(opt_.max_write_pipeline_backlog_size)), + buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()), + writer_(this, opt_.writer_qdepth, + static_cast<size_t>(opt_.writer_dispatch_size)) { + Info(opt_.log, "Initializing allocator. size=%d B count=%" ROCKSDB_PRIszt, + opt_.write_buffer_size, opt_.write_buffer_count()); + } + + virtual ~BlockCacheTier() { + // Close is re-entrant so we can call close even if it is already closed + Close().PermitUncheckedError(); + assert(!insert_th_.joinable()); + } + + Status Insert(const Slice& key, const char* data, const size_t size) override; + Status Lookup(const Slice& key, std::unique_ptr<char[]>* data, + size_t* size) override; + Status Open() override; + Status Close() override; + bool Erase(const Slice& key) override; + bool Reserve(const size_t size) override; + + bool IsCompressed() override { return opt_.is_compressed; } + + std::string GetPrintableOptions() const override { return opt_.ToString(); } + + PersistentCache::StatsType Stats() override; + + void TEST_Flush() override { + while (insert_ops_.Size()) { + /* sleep override */ + SystemClock::Default()->SleepForMicroseconds(1000000); + } + } + + private: + // Percentage of cache to be evicted when the cache is full + static const size_t kEvictPct = 10; + // Max attempts to insert key, value to cache in pipelined mode + static const size_t kMaxRetry = 3; + + // Pipelined operation + struct InsertOp { + explicit InsertOp(const bool signal) : signal_(signal) {} + explicit InsertOp(std::string&& key, const std::string& data) + : key_(std::move(key)), data_(data) {} + ~InsertOp() {} + + InsertOp() = delete; + InsertOp(InsertOp&& /*rhs*/) = default; + InsertOp& operator=(InsertOp&& rhs) = default; + + // used for estimating size by bounded queue + size_t Size() { return data_.size() + key_.size(); } + + std::string key_; + std::string data_; + bool signal_ = false; // signal to request processing thread to exit + }; + + // entry point for insert thread + void InsertMain(); + // insert implementation + Status InsertImpl(const Slice& key, const Slice& data); + // Create a new cache file + Status NewCacheFile(); + // Get cache directory path + std::string GetCachePath() const { return opt_.path + "/cache"; } + // Cleanup folder + Status CleanupCacheFolder(const std::string& folder); + + // Statistics + struct Statistics { + HistogramImpl bytes_pipelined_; + HistogramImpl bytes_written_; + HistogramImpl bytes_read_; + HistogramImpl read_hit_latency_; + HistogramImpl read_miss_latency_; + HistogramImpl write_latency_; + std::atomic<uint64_t> cache_hits_{0}; + std::atomic<uint64_t> cache_misses_{0}; + std::atomic<uint64_t> cache_errors_{0}; + std::atomic<uint64_t> insert_dropped_{0}; + + double CacheHitPct() const { + const auto lookups = cache_hits_ + cache_misses_; + return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0; + } + + double CacheMissPct() const { + const auto lookups = cache_hits_ + cache_misses_; + return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0; + } + }; + + port::RWMutex lock_; // Synchronization + const PersistentCacheConfig opt_; // BlockCache options + BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert + ROCKSDB_NAMESPACE::port::Thread insert_th_; // Insert thread + uint32_t writer_cache_id_ = 0; // Current cache file identifier + WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference + CacheWriteBufferAllocator buffer_allocator_; // Buffer provider + ThreadedWriter writer_; // Writer threads + BlockCacheTierMetadata metadata_; // Cache meta data manager + std::atomic<uint64_t> size_{0}; // Size of the cache + Statistics stats_; // Statistics +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.cc b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.cc new file mode 100644 index 000000000..f4f8517ab --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.cc @@ -0,0 +1,610 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/block_cache_tier_file.h" + +#ifndef OS_WIN +#include <unistd.h> +#endif +#include <functional> +#include <memory> +#include <vector> + +#include "env/composite_env_wrapper.h" +#include "logging/logging.h" +#include "port/port.h" +#include "rocksdb/system_clock.h" +#include "util/crc32c.h" + +namespace ROCKSDB_NAMESPACE { + +// +// File creation factories +// +Status NewWritableCacheFile(Env* const env, const std::string& filepath, + std::unique_ptr<WritableFile>* file, + const bool use_direct_writes = false) { + EnvOptions opt; + opt.use_direct_writes = use_direct_writes; + Status s = env->NewWritableFile(filepath, file, opt); + return s; +} + +Status NewRandomAccessCacheFile(const std::shared_ptr<FileSystem>& fs, + const std::string& filepath, + std::unique_ptr<FSRandomAccessFile>* file, + const bool use_direct_reads = true) { + assert(fs.get()); + + FileOptions opt; + opt.use_direct_reads = use_direct_reads; + return fs->NewRandomAccessFile(filepath, opt, file, nullptr); +} + +// +// BlockCacheFile +// +Status BlockCacheFile::Delete(uint64_t* size) { + assert(env_); + + Status status = env_->GetFileSize(Path(), size); + if (!status.ok()) { + return status; + } + return env_->DeleteFile(Path()); +} + +// +// CacheRecord +// +// Cache record represents the record on disk +// +// +--------+---------+----------+------------+---------------+-------------+ +// | magic | crc | key size | value size | key data | value data | +// +--------+---------+----------+------------+---------------+-------------+ +// <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size --> +// +struct CacheRecordHeader { + CacheRecordHeader() : magic_(0), crc_(0), key_size_(0), val_size_(0) {} + CacheRecordHeader(const uint32_t magic, const uint32_t key_size, + const uint32_t val_size) + : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {} + + uint32_t magic_; + uint32_t crc_; + uint32_t key_size_; + uint32_t val_size_; +}; + +struct CacheRecord { + CacheRecord() {} + CacheRecord(const Slice& key, const Slice& val) + : hdr_(MAGIC, static_cast<uint32_t>(key.size()), + static_cast<uint32_t>(val.size())), + key_(key), + val_(val) { + hdr_.crc_ = ComputeCRC(); + } + + uint32_t ComputeCRC() const; + bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff); + bool Deserialize(const Slice& buf); + + static uint32_t CalcSize(const Slice& key, const Slice& val) { + return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() + + val.size()); + } + + static const uint32_t MAGIC = 0xfefa; + + bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff, + const char* data, const size_t size); + + CacheRecordHeader hdr_; + Slice key_; + Slice val_; +}; + +static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned"); + +uint32_t CacheRecord::ComputeCRC() const { + uint32_t crc = 0; + CacheRecordHeader tmp = hdr_; + tmp.crc_ = 0; + crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp)); + crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()), + key_.size()); + crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()), + val_.size()); + return crc; +} + +bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs, + size_t* woff) { + assert(bufs->size()); + return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_), + sizeof(hdr_)) && + Append(bufs, woff, reinterpret_cast<const char*>(key_.data()), + key_.size()) && + Append(bufs, woff, reinterpret_cast<const char*>(val_.data()), + val_.size()); +} + +bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff, + const char* data, const size_t data_size) { + assert(*woff < bufs->size()); + + const char* p = data; + size_t size = data_size; + + while (size && *woff < bufs->size()) { + CacheWriteBuffer* buf = (*bufs)[*woff]; + const size_t free = buf->Free(); + if (size <= free) { + buf->Append(p, size); + size = 0; + } else { + buf->Append(p, free); + p += free; + size -= free; + assert(!buf->Free()); + assert(buf->Used() == buf->Capacity()); + } + + if (!buf->Free()) { + *woff += 1; + } + } + + assert(!size); + + return !size; +} + +bool CacheRecord::Deserialize(const Slice& data) { + assert(data.size() >= sizeof(CacheRecordHeader)); + if (data.size() < sizeof(CacheRecordHeader)) { + return false; + } + + memcpy(&hdr_, data.data(), sizeof(hdr_)); + + assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size()); + if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) { + return false; + } + + key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_); + val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_); + + if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) { + fprintf(stderr, "** magic %d ** \n", hdr_.magic_); + fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_); + fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_); + fprintf(stderr, "** key %s ** \n", key_.ToString().c_str()); + fprintf(stderr, "** val %s ** \n", val_.ToString().c_str()); + for (size_t i = 0; i < hdr_.val_size_; ++i) { + fprintf(stderr, "%d.", (uint8_t)val_.data()[i]); + } + fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC()); + } + + assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_); + return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_; +} + +// +// RandomAccessFile +// + +bool RandomAccessCacheFile::Open(const bool enable_direct_reads) { + WriteLock _(&rwlock_); + return OpenImpl(enable_direct_reads); +} + +bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) { + rwlock_.AssertHeld(); + + ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str()); + assert(env_); + + std::unique_ptr<FSRandomAccessFile> file; + Status status = NewRandomAccessCacheFile(env_->GetFileSystem(), Path(), &file, + enable_direct_reads); + if (!status.ok()) { + Error(log_, "Error opening random access file %s. %s", Path().c_str(), + status.ToString().c_str()); + return false; + } + freader_.reset(new RandomAccessFileReader(std::move(file), Path(), + env_->GetSystemClock().get())); + + return true; +} + +bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val, + char* scratch) { + ReadLock _(&rwlock_); + + assert(lba.cache_id_ == cache_id_); + + if (!freader_) { + return false; + } + + Slice result; + Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch, + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); + if (!s.ok()) { + Error(log_, "Error reading from file %s. %s", Path().c_str(), + s.ToString().c_str()); + return false; + } + + assert(result.data() == scratch); + + return ParseRec(lba, key, val, scratch); +} + +bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val, + char* scratch) { + Slice data(scratch, lba.size_); + + CacheRecord rec; + if (!rec.Deserialize(data)) { + assert(!"Error deserializing data"); + Error(log_, "Error de-serializing record from file %s off %d", + Path().c_str(), lba.off_); + return false; + } + + *key = Slice(rec.key_); + *val = Slice(rec.val_); + + return true; +} + +// +// WriteableCacheFile +// + +WriteableCacheFile::~WriteableCacheFile() { + WriteLock _(&rwlock_); + if (!eof_) { + // This file never flushed. We give priority to shutdown since this is a + // cache + // TODO(krad): Figure a way to flush the pending data + if (file_) { + assert(refs_ == 1); + --refs_; + } + } + assert(!refs_); + ClearBuffers(); +} + +bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/, + const bool enable_direct_reads) { + WriteLock _(&rwlock_); + + enable_direct_reads_ = enable_direct_reads; + + ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)", + Path().c_str(), max_size_); + + assert(env_); + + Status s = env_->FileExists(Path()); + if (s.ok()) { + ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(), + s.ToString().c_str()); + } + + s = NewWritableCacheFile(env_, Path(), &file_); + if (!s.ok()) { + ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(), + s.ToString().c_str()); + return false; + } + + assert(!refs_); + ++refs_; + + return true; +} + +bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) { + WriteLock _(&rwlock_); + + if (eof_) { + // We can't append since the file is full + return false; + } + + // estimate the space required to store the (key, val) + uint32_t rec_size = CacheRecord::CalcSize(key, val); + + if (!ExpandBuffer(rec_size)) { + // unable to expand the buffer + ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size); + return false; + } + + lba->cache_id_ = cache_id_; + lba->off_ = disk_woff_; + lba->size_ = rec_size; + + CacheRecord rec(key, val); + if (!rec.Serialize(&bufs_, &buf_woff_)) { + // unexpected error: unable to serialize the data + assert(!"Error serializing record"); + return false; + } + + disk_woff_ += rec_size; + eof_ = disk_woff_ >= max_size_; + + // dispatch buffer for flush + DispatchBuffer(); + + return true; +} + +bool WriteableCacheFile::ExpandBuffer(const size_t size) { + rwlock_.AssertHeld(); + assert(!eof_); + + // determine if there is enough space + size_t free = 0; // compute the free space left in buffer + for (size_t i = buf_woff_; i < bufs_.size(); ++i) { + free += bufs_[i]->Free(); + if (size <= free) { + // we have enough space in the buffer + return true; + } + } + + // expand the buffer until there is enough space to write `size` bytes + assert(free < size); + assert(alloc_); + + while (free < size) { + CacheWriteBuffer* const buf = alloc_->Allocate(); + if (!buf) { + ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers"); + return false; + } + + size_ += static_cast<uint32_t>(buf->Free()); + free += buf->Free(); + bufs_.push_back(buf); + } + + assert(free >= size); + return true; +} + +void WriteableCacheFile::DispatchBuffer() { + rwlock_.AssertHeld(); + + assert(bufs_.size()); + assert(buf_doff_ <= buf_woff_); + assert(buf_woff_ <= bufs_.size()); + + if (pending_ios_) { + return; + } + + if (!eof_ && buf_doff_ == buf_woff_) { + // dispatch buffer is pointing to write buffer and we haven't hit eof + return; + } + + assert(eof_ || buf_doff_ < buf_woff_); + assert(buf_doff_ < bufs_.size()); + assert(file_); + assert(alloc_); + + auto* buf = bufs_[buf_doff_]; + const uint64_t file_off = buf_doff_ * alloc_->BufferSize(); + + assert(!buf->Free() || + (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size())); + // we have reached end of file, and there is space in the last buffer + // pad it with zero for direct IO + buf->FillTrailingZeros(); + + assert(buf->Used() % kFileAlignmentSize == 0); + + writer_->Write(file_.get(), buf, file_off, + std::bind(&WriteableCacheFile::BufferWriteDone, this)); + pending_ios_++; + buf_doff_++; +} + +void WriteableCacheFile::BufferWriteDone() { + WriteLock _(&rwlock_); + + assert(bufs_.size()); + + pending_ios_--; + + if (buf_doff_ < bufs_.size()) { + DispatchBuffer(); + } + + if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) { + // end-of-file reached, move to read mode + CloseAndOpenForReading(); + } +} + +void WriteableCacheFile::CloseAndOpenForReading() { + // Our env abstraction do not allow reading from a file opened for appending + // We need close the file and re-open it for reading + Close(); + RandomAccessCacheFile::OpenImpl(enable_direct_reads_); +} + +bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block, + char* scratch) { + rwlock_.AssertHeld(); + + if (!ReadBuffer(lba, scratch)) { + Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_, + lba.off_); + return false; + } + + return ParseRec(lba, key, block, scratch); +} + +bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) { + rwlock_.AssertHeld(); + + assert(lba.off_ < disk_woff_); + assert(alloc_); + + // we read from the buffers like reading from a flat file. The list of buffers + // are treated as contiguous stream of data + + char* tmp = data; + size_t pending_nbytes = lba.size_; + // start buffer + size_t start_idx = lba.off_ / alloc_->BufferSize(); + // offset into the start buffer + size_t start_off = lba.off_ % alloc_->BufferSize(); + + assert(start_idx <= buf_woff_); + + for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) { + assert(i <= buf_woff_); + auto* buf = bufs_[i]; + assert(i == buf_woff_ || !buf->Free()); + // bytes to write to the buffer + size_t nbytes = pending_nbytes > (buf->Used() - start_off) + ? (buf->Used() - start_off) + : pending_nbytes; + memcpy(tmp, buf->Data() + start_off, nbytes); + + // left over to be written + pending_nbytes -= nbytes; + start_off = 0; + tmp += nbytes; + } + + assert(!pending_nbytes); + if (pending_nbytes) { + return false; + } + + assert(tmp == data + lba.size_); + return true; +} + +void WriteableCacheFile::Close() { + rwlock_.AssertHeld(); + + assert(size_ >= max_size_); + assert(disk_woff_ >= max_size_); + assert(buf_doff_ == bufs_.size()); + assert(bufs_.size() - buf_woff_ <= 1); + assert(!pending_ios_); + + Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_, + disk_woff_); + + ClearBuffers(); + file_.reset(); + + assert(refs_); + --refs_; +} + +void WriteableCacheFile::ClearBuffers() { + assert(alloc_); + + for (size_t i = 0; i < bufs_.size(); ++i) { + alloc_->Deallocate(bufs_[i]); + } + + bufs_.clear(); +} + +// +// ThreadedFileWriter implementation +// +ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache, + const size_t qdepth, const size_t io_size) + : Writer(cache), io_size_(io_size) { + for (size_t i = 0; i < qdepth; ++i) { + port::Thread th(&ThreadedWriter::ThreadMain, this); + threads_.push_back(std::move(th)); + } +} + +void ThreadedWriter::Stop() { + // notify all threads to exit + for (size_t i = 0; i < threads_.size(); ++i) { + q_.Push(IO(/*signal=*/true)); + } + + // wait for all threads to exit + for (auto& th : threads_) { + th.join(); + assert(!th.joinable()); + } + threads_.clear(); +} + +void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf, + const uint64_t file_off, + const std::function<void()> callback) { + q_.Push(IO(file, buf, file_off, callback)); +} + +void ThreadedWriter::ThreadMain() { + while (true) { + // Fetch the IO to process + IO io(q_.Pop()); + if (io.signal_) { + // that's secret signal to exit + break; + } + + // Reserve space for writing the buffer + while (!cache_->Reserve(io.buf_->Used())) { + // We can fail to reserve space if every file in the system + // is being currently accessed + /* sleep override */ + SystemClock::Default()->SleepForMicroseconds(1000000); + } + + DispatchIO(io); + + io.callback_(); + } +} + +void ThreadedWriter::DispatchIO(const IO& io) { + size_t written = 0; + while (written < io.buf_->Used()) { + Slice data(io.buf_->Data() + written, io_size_); + Status s = io.file_->Append(data); + assert(s.ok()); + if (!s.ok()) { + // That is definite IO error to device. There is not much we can + // do but ignore the failure. This can lead to corruption of data on + // disk, but the cache will skip while reading + fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str()); + } + written += io_size_; + } +} + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h new file mode 100644 index 000000000..1d265ab74 --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h @@ -0,0 +1,293 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#ifndef ROCKSDB_LITE + +#include <list> +#include <memory> +#include <string> +#include <vector> + +#include "file/random_access_file_reader.h" +#include "port/port.h" +#include "rocksdb/comparator.h" +#include "rocksdb/env.h" +#include "util/crc32c.h" +#include "util/mutexlock.h" +#include "utilities/persistent_cache/block_cache_tier_file_buffer.h" +#include "utilities/persistent_cache/lrulist.h" +#include "utilities/persistent_cache/persistent_cache_tier.h" +#include "utilities/persistent_cache/persistent_cache_util.h" + +// The io code path of persistent cache uses pipelined architecture +// +// client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel +// +// This would enable the system to scale for GB/s of throughput which is +// expected with modern devies like NVM. +// +// The file level operations are encapsulated in the following abstractions +// +// BlockCacheFile +// ^ +// | +// | +// RandomAccessCacheFile (For reading) +// ^ +// | +// | +// WriteableCacheFile (For writing) +// +// Write IO code path : +// +namespace ROCKSDB_NAMESPACE { + +class WriteableCacheFile; +struct BlockInfo; + +// Represents a logical record on device +// +// (L)ogical (B)lock (Address = { cache-file-id, offset, size } +struct LogicalBlockAddress { + LogicalBlockAddress() {} + explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off, + const uint16_t size) + : cache_id_(cache_id), off_(off), size_(size) {} + + uint32_t cache_id_ = 0; + uint32_t off_ = 0; + uint32_t size_ = 0; +}; + +using LBA = LogicalBlockAddress; + +// class Writer +// +// Writer is the abstraction used for writing data to file. The component can be +// multithreaded. It is the last step of write pipeline +class Writer { + public: + explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {} + virtual ~Writer() {} + + // write buffer to file at the given offset + virtual void Write(WritableFile* const file, CacheWriteBuffer* buf, + const uint64_t file_off, + const std::function<void()> callback) = 0; + // stop the writer + virtual void Stop() = 0; + + PersistentCacheTier* const cache_; +}; + +// class BlockCacheFile +// +// Generic interface to support building file specialized for read/writing +class BlockCacheFile : public LRUElement<BlockCacheFile> { + public: + explicit BlockCacheFile(const uint32_t cache_id) + : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {} + + explicit BlockCacheFile(Env* const env, const std::string& dir, + const uint32_t cache_id) + : LRUElement<BlockCacheFile>(), + env_(env), + dir_(dir), + cache_id_(cache_id) {} + + virtual ~BlockCacheFile() {} + + // append key/value to file and return LBA locator to user + virtual bool Append(const Slice& /*key*/, const Slice& /*val*/, + LBA* const /*lba*/) { + assert(!"not implemented"); + return false; + } + + // read from the record locator (LBA) and return key, value and status + virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/, + char* /*scratch*/) { + assert(!"not implemented"); + return false; + } + + // get file path + std::string Path() const { + return dir_ + "/" + std::to_string(cache_id_) + ".rc"; + } + // get cache ID + uint32_t cacheid() const { return cache_id_; } + // Add block information to file data + // Block information is the list of index reference for this file + virtual void Add(BlockInfo* binfo) { + WriteLock _(&rwlock_); + block_infos_.push_back(binfo); + } + // get block information + std::list<BlockInfo*>& block_infos() { return block_infos_; } + // delete file and return the size of the file + virtual Status Delete(uint64_t* size); + + protected: + port::RWMutex rwlock_; // synchronization mutex + Env* const env_ = nullptr; // Env for OS + const std::string dir_; // Directory name + const uint32_t cache_id_; // Cache id for the file + std::list<BlockInfo*> block_infos_; // List of index entries mapping to the + // file content +}; + +// class RandomAccessFile +// +// Thread safe implementation for reading random data from file +class RandomAccessCacheFile : public BlockCacheFile { + public: + explicit RandomAccessCacheFile(Env* const env, const std::string& dir, + const uint32_t cache_id, + const std::shared_ptr<Logger>& log) + : BlockCacheFile(env, dir, cache_id), log_(log) {} + + virtual ~RandomAccessCacheFile() {} + + // open file for reading + bool Open(const bool enable_direct_reads); + // read data from the disk + bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override; + + private: + std::unique_ptr<RandomAccessFileReader> freader_; + + protected: + bool OpenImpl(const bool enable_direct_reads); + bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch); + + std::shared_ptr<Logger> log_; // log file +}; + +// class WriteableCacheFile +// +// All writes to the files are cached in buffers. The buffers are flushed to +// disk as they get filled up. When file size reaches a certain size, a new file +// will be created provided there is free space +class WriteableCacheFile : public RandomAccessCacheFile { + public: + explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc, + Writer* writer, const std::string& dir, + const uint32_t cache_id, const uint32_t max_size, + const std::shared_ptr<Logger>& log) + : RandomAccessCacheFile(env, dir, cache_id, log), + alloc_(alloc), + writer_(writer), + max_size_(max_size) {} + + virtual ~WriteableCacheFile(); + + // create file on disk + bool Create(const bool enable_direct_writes, const bool enable_direct_reads); + + // read data from logical file + bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override { + ReadLock _(&rwlock_); + const bool closed = eof_ && bufs_.empty(); + if (closed) { + // the file is closed, read from disk + return RandomAccessCacheFile::Read(lba, key, block, scratch); + } + // file is still being written, read from buffers + return ReadBuffer(lba, key, block, scratch); + } + + // append data to end of file + bool Append(const Slice&, const Slice&, LBA* const) override; + // End-of-file + bool Eof() const { return eof_; } + + private: + friend class ThreadedWriter; + + static const size_t kFileAlignmentSize = 4 * 1024; // align file size + + bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch); + bool ReadBuffer(const LBA& lba, char* data); + bool ExpandBuffer(const size_t size); + void DispatchBuffer(); + void BufferWriteDone(); + void CloseAndOpenForReading(); + void ClearBuffers(); + void Close(); + + // File layout in memory + // + // +------+------+------+------+------+------+ + // | b0 | b1 | b2 | b3 | b4 | b5 | + // +------+------+------+------+------+------+ + // ^ ^ + // | | + // buf_doff_ buf_woff_ + // (next buffer to (next buffer to fill) + // flush to disk) + // + // The buffers are flushed to disk serially for a given file + + CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider + Writer* const writer_ = nullptr; // File writer thread + std::unique_ptr<WritableFile> file_; // RocksDB Env file abstraction + std::vector<CacheWriteBuffer*> bufs_; // Written buffers + uint32_t size_ = 0; // Size of the file + const uint32_t max_size_; // Max size of the file + bool eof_ = false; // End of file + uint32_t disk_woff_ = 0; // Offset to write on disk + size_t buf_woff_ = 0; // off into bufs_ to write + size_t buf_doff_ = 0; // off into bufs_ to dispatch + size_t pending_ios_ = 0; // Number of ios to disk in-progress + bool enable_direct_reads_ = false; // Should we enable direct reads + // when reading from disk +}; + +// +// Abstraction to do writing to device. It is part of pipelined architecture. +// +class ThreadedWriter : public Writer { + public: + // Representation of IO to device + struct IO { + explicit IO(const bool signal) : signal_(signal) {} + explicit IO(WritableFile* const file, CacheWriteBuffer* const buf, + const uint64_t file_off, const std::function<void()> callback) + : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {} + + IO(const IO&) = default; + IO& operator=(const IO&) = default; + size_t Size() const { return sizeof(IO); } + + WritableFile* file_ = nullptr; // File to write to + CacheWriteBuffer* buf_ = nullptr; // buffer to write + uint64_t file_off_ = 0; // file offset + bool signal_ = false; // signal to exit thread loop + std::function<void()> callback_; // Callback on completion + }; + + explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth, + const size_t io_size); + virtual ~ThreadedWriter() { assert(threads_.empty()); } + + void Stop() override; + void Write(WritableFile* const file, CacheWriteBuffer* buf, + const uint64_t file_off, + const std::function<void()> callback) override; + + private: + void ThreadMain(); + void DispatchIO(const IO& io); + + const size_t io_size_ = 0; + BoundedQueue<IO> q_; + std::vector<port::Thread> threads_; +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_file_buffer.h b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file_buffer.h new file mode 100644 index 000000000..d4f02455a --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_file_buffer.h @@ -0,0 +1,127 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include <list> +#include <memory> +#include <string> + +#include "memory/arena.h" +#include "rocksdb/comparator.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +// +// CacheWriteBuffer +// +// Buffer abstraction that can be manipulated via append +// (not thread safe) +class CacheWriteBuffer { + public: + explicit CacheWriteBuffer(const size_t size) : size_(size), pos_(0) { + buf_.reset(new char[size_]); + assert(!pos_); + assert(size_); + } + + virtual ~CacheWriteBuffer() {} + + void Append(const char* buf, const size_t size) { + assert(pos_ + size <= size_); + memcpy(buf_.get() + pos_, buf, size); + pos_ += size; + assert(pos_ <= size_); + } + + void FillTrailingZeros() { + assert(pos_ <= size_); + memset(buf_.get() + pos_, '0', size_ - pos_); + pos_ = size_; + } + + void Reset() { pos_ = 0; } + size_t Free() const { return size_ - pos_; } + size_t Capacity() const { return size_; } + size_t Used() const { return pos_; } + char* Data() const { return buf_.get(); } + + private: + std::unique_ptr<char[]> buf_; + const size_t size_; + size_t pos_; +}; + +// +// CacheWriteBufferAllocator +// +// Buffer pool abstraction(not thread safe) +// +class CacheWriteBufferAllocator { + public: + explicit CacheWriteBufferAllocator(const size_t buffer_size, + const size_t buffer_count) + : cond_empty_(&lock_), buffer_size_(buffer_size) { + MutexLock _(&lock_); + buffer_size_ = buffer_size; + for (uint32_t i = 0; i < buffer_count; i++) { + auto* buf = new CacheWriteBuffer(buffer_size_); + assert(buf); + if (buf) { + bufs_.push_back(buf); + cond_empty_.Signal(); + } + } + } + + virtual ~CacheWriteBufferAllocator() { + MutexLock _(&lock_); + assert(bufs_.size() * buffer_size_ == Capacity()); + for (auto* buf : bufs_) { + delete buf; + } + bufs_.clear(); + } + + CacheWriteBuffer* Allocate() { + MutexLock _(&lock_); + if (bufs_.empty()) { + return nullptr; + } + + assert(!bufs_.empty()); + CacheWriteBuffer* const buf = bufs_.front(); + bufs_.pop_front(); + return buf; + } + + void Deallocate(CacheWriteBuffer* const buf) { + assert(buf); + MutexLock _(&lock_); + buf->Reset(); + bufs_.push_back(buf); + cond_empty_.Signal(); + } + + void WaitUntilUsable() { + // We are asked to wait till we have buffers available + MutexLock _(&lock_); + while (bufs_.empty()) { + cond_empty_.Wait(); + } + } + + size_t Capacity() const { return bufs_.size() * buffer_size_; } + size_t Free() const { return bufs_.size() * buffer_size_; } + size_t BufferSize() const { return buffer_size_; } + + private: + port::Mutex lock_; // Sync lock + port::CondVar cond_empty_; // Condition var for empty buffers + size_t buffer_size_; // Size of each buffer + std::list<CacheWriteBuffer*> bufs_; // Buffer stash +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.cc b/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.cc new file mode 100644 index 000000000..d73b5d0b4 --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.cc @@ -0,0 +1,86 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/block_cache_tier_metadata.h" + +#include <functional> + +namespace ROCKSDB_NAMESPACE { + +bool BlockCacheTierMetadata::Insert(BlockCacheFile* file) { + return cache_file_index_.Insert(file); +} + +BlockCacheFile* BlockCacheTierMetadata::Lookup(const uint32_t cache_id) { + BlockCacheFile* ret = nullptr; + BlockCacheFile lookup_key(cache_id); + bool ok = cache_file_index_.Find(&lookup_key, &ret); + if (ok) { + assert(ret->refs_); + return ret; + } + return nullptr; +} + +BlockCacheFile* BlockCacheTierMetadata::Evict() { + using std::placeholders::_1; + auto fn = std::bind(&BlockCacheTierMetadata::RemoveAllKeys, this, _1); + return cache_file_index_.Evict(fn); +} + +void BlockCacheTierMetadata::Clear() { + cache_file_index_.Clear([](BlockCacheFile* arg) { delete arg; }); + block_index_.Clear([](BlockInfo* arg) { delete arg; }); +} + +BlockInfo* BlockCacheTierMetadata::Insert(const Slice& key, const LBA& lba) { + std::unique_ptr<BlockInfo> binfo(new BlockInfo(key, lba)); + if (!block_index_.Insert(binfo.get())) { + return nullptr; + } + return binfo.release(); +} + +bool BlockCacheTierMetadata::Lookup(const Slice& key, LBA* lba) { + BlockInfo lookup_key(key); + BlockInfo* block; + port::RWMutex* rlock = nullptr; + if (!block_index_.Find(&lookup_key, &block, &rlock)) { + return false; + } + + ReadUnlock _(rlock); + assert(block->key_ == key.ToString()); + if (lba) { + *lba = block->lba_; + } + return true; +} + +BlockInfo* BlockCacheTierMetadata::Remove(const Slice& key) { + BlockInfo lookup_key(key); + BlockInfo* binfo = nullptr; + bool ok __attribute__((__unused__)); + ok = block_index_.Erase(&lookup_key, &binfo); + assert(ok); + return binfo; +} + +void BlockCacheTierMetadata::RemoveAllKeys(BlockCacheFile* f) { + for (BlockInfo* binfo : f->block_infos()) { + BlockInfo* tmp = nullptr; + bool status = block_index_.Erase(binfo, &tmp); + (void)status; + assert(status); + assert(tmp == binfo); + delete binfo; + } + f->block_infos().clear(); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.h b/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.h new file mode 100644 index 000000000..2fcd50105 --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/block_cache_tier_metadata.h @@ -0,0 +1,124 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#ifndef ROCKSDB_LITE + +#include <functional> +#include <string> +#include <unordered_map> + +#include "rocksdb/slice.h" +#include "utilities/persistent_cache/block_cache_tier_file.h" +#include "utilities/persistent_cache/hash_table.h" +#include "utilities/persistent_cache/hash_table_evictable.h" +#include "utilities/persistent_cache/lrulist.h" + +namespace ROCKSDB_NAMESPACE { + +// +// Block Cache Tier Metadata +// +// The BlockCacheTierMetadata holds all the metadata associated with block +// cache. It +// fundamentally contains 2 indexes and an LRU. +// +// Block Cache Index +// +// This is a forward index that maps a given key to a LBA (Logical Block +// Address). LBA is a disk pointer that points to a record on the cache. +// +// LBA = { cache-id, offset, size } +// +// Cache File Index +// +// This is a forward index that maps a given cache-id to a cache file object. +// Typically you would lookup using LBA and use the object to read or write +struct BlockInfo { + explicit BlockInfo(const Slice& key, const LBA& lba = LBA()) + : key_(key.ToString()), lba_(lba) {} + + std::string key_; + LBA lba_; +}; + +class BlockCacheTierMetadata { + public: + explicit BlockCacheTierMetadata(const uint32_t blocks_capacity = 1024 * 1024, + const uint32_t cachefile_capacity = 10 * 1024) + : cache_file_index_(cachefile_capacity), block_index_(blocks_capacity) {} + + virtual ~BlockCacheTierMetadata() {} + + // Insert a given cache file + bool Insert(BlockCacheFile* file); + + // Lookup cache file based on cache_id + BlockCacheFile* Lookup(const uint32_t cache_id); + + // Insert block information to block index + BlockInfo* Insert(const Slice& key, const LBA& lba); + // bool Insert(BlockInfo* binfo); + + // Lookup block information from block index + bool Lookup(const Slice& key, LBA* lba); + + // Remove a given from the block index + BlockInfo* Remove(const Slice& key); + + // Find and evict a cache file using LRU policy + BlockCacheFile* Evict(); + + // Clear the metadata contents + virtual void Clear(); + + protected: + // Remove all block information from a given file + virtual void RemoveAllKeys(BlockCacheFile* file); + + private: + // Cache file index definition + // + // cache-id => BlockCacheFile + struct BlockCacheFileHash { + uint64_t operator()(const BlockCacheFile* rec) { + return std::hash<uint32_t>()(rec->cacheid()); + } + }; + + struct BlockCacheFileEqual { + uint64_t operator()(const BlockCacheFile* lhs, const BlockCacheFile* rhs) { + return lhs->cacheid() == rhs->cacheid(); + } + }; + + using CacheFileIndexType = + EvictableHashTable<BlockCacheFile, BlockCacheFileHash, + BlockCacheFileEqual>; + + // Block Lookup Index + // + // key => LBA + struct Hash { + size_t operator()(BlockInfo* node) const { + return std::hash<std::string>()(node->key_); + } + }; + + struct Equal { + size_t operator()(BlockInfo* lhs, BlockInfo* rhs) const { + return lhs->key_ == rhs->key_; + } + }; + + using BlockIndexType = HashTable<BlockInfo*, Hash, Equal>; + + CacheFileIndexType cache_file_index_; + BlockIndexType block_index_; +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/hash_table.h b/src/rocksdb/utilities/persistent_cache/hash_table.h new file mode 100644 index 000000000..b00b294ce --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/hash_table.h @@ -0,0 +1,239 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#ifndef ROCKSDB_LITE + +#include <assert.h> + +#include <list> +#include <vector> + +#ifdef OS_LINUX +#include <sys/mman.h> +#endif + +#include "rocksdb/env.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +// HashTable<T, Hash, Equal> +// +// Traditional implementation of hash table with synchronization built on top +// don't perform very well in multi-core scenarios. This is an implementation +// designed for multi-core scenarios with high lock contention. +// +// |<-------- alpha ------------->| +// Buckets Collision list +// ---- +----+ +---+---+--- ...... ---+---+---+ +// / | |--->| | | | | | +// / +----+ +---+---+--- ...... ---+---+---+ +// / | | +// Locks/ +----+ +// +--+/ . . +// | | . . +// +--+ . . +// | | . . +// +--+ . . +// | | . . +// +--+ . . +// \ +----+ +// \ | | +// \ +----+ +// \ | | +// \---- +----+ +// +// The lock contention is spread over an array of locks. This helps improve +// concurrent access. The spine is designed for a certain capacity and load +// factor. When the capacity planning is done correctly we can expect +// O(load_factor = 1) insert, access and remove time. +// +// Micro benchmark on debug build gives about .5 Million/sec rate of insert, +// erase and lookup in parallel (total of about 1.5 Million ops/sec). If the +// blocks were of 4K, the hash table can support a virtual throughput of +// 6 GB/s. +// +// T Object type (contains both key and value) +// Hash Function that returns an hash from type T +// Equal Returns if two objects are equal +// (We need explicit equal for pointer type) +// +template <class T, class Hash, class Equal> +class HashTable { + public: + explicit HashTable(const size_t capacity = 1024 * 1024, + const float load_factor = 2.0, const uint32_t nlocks = 256) + : nbuckets_( + static_cast<uint32_t>(load_factor ? capacity / load_factor : 0)), + nlocks_(nlocks) { + // pre-conditions + assert(capacity); + assert(load_factor); + assert(nbuckets_); + assert(nlocks_); + + buckets_.reset(new Bucket[nbuckets_]); +#ifdef OS_LINUX + mlock(buckets_.get(), nbuckets_ * sizeof(Bucket)); +#endif + + // initialize locks + locks_.reset(new port::RWMutex[nlocks_]); +#ifdef OS_LINUX + mlock(locks_.get(), nlocks_ * sizeof(port::RWMutex)); +#endif + + // post-conditions + assert(buckets_); + assert(locks_); + } + + virtual ~HashTable() { AssertEmptyBuckets(); } + + // + // Insert given record to hash table + // + bool Insert(const T& t) { + const uint64_t h = Hash()(t); + const uint32_t bucket_idx = h % nbuckets_; + const uint32_t lock_idx = bucket_idx % nlocks_; + + WriteLock _(&locks_[lock_idx]); + auto& bucket = buckets_[bucket_idx]; + return Insert(&bucket, t); + } + + // Lookup hash table + // + // Please note that read lock should be held by the caller. This is because + // the caller owns the data, and should hold the read lock as long as he + // operates on the data. + bool Find(const T& t, T* ret, port::RWMutex** ret_lock) { + const uint64_t h = Hash()(t); + const uint32_t bucket_idx = h % nbuckets_; + const uint32_t lock_idx = bucket_idx % nlocks_; + + port::RWMutex& lock = locks_[lock_idx]; + lock.ReadLock(); + + auto& bucket = buckets_[bucket_idx]; + if (Find(&bucket, t, ret)) { + *ret_lock = &lock; + return true; + } + + lock.ReadUnlock(); + return false; + } + + // + // Erase a given key from the hash table + // + bool Erase(const T& t, T* ret) { + const uint64_t h = Hash()(t); + const uint32_t bucket_idx = h % nbuckets_; + const uint32_t lock_idx = bucket_idx % nlocks_; + + WriteLock _(&locks_[lock_idx]); + + auto& bucket = buckets_[bucket_idx]; + return Erase(&bucket, t, ret); + } + + // Fetch the mutex associated with a key + // This call is used to hold the lock for a given data for extended period of + // time. + port::RWMutex* GetMutex(const T& t) { + const uint64_t h = Hash()(t); + const uint32_t bucket_idx = h % nbuckets_; + const uint32_t lock_idx = bucket_idx % nlocks_; + + return &locks_[lock_idx]; + } + + void Clear(void (*fn)(T)) { + for (uint32_t i = 0; i < nbuckets_; ++i) { + const uint32_t lock_idx = i % nlocks_; + WriteLock _(&locks_[lock_idx]); + for (auto& t : buckets_[i].list_) { + (*fn)(t); + } + buckets_[i].list_.clear(); + } + } + + protected: + // Models bucket of keys that hash to the same bucket number + struct Bucket { + std::list<T> list_; + }; + + // Substitute for std::find with custom comparator operator + typename std::list<T>::iterator Find(std::list<T>* list, const T& t) { + for (auto it = list->begin(); it != list->end(); ++it) { + if (Equal()(*it, t)) { + return it; + } + } + return list->end(); + } + + bool Insert(Bucket* bucket, const T& t) { + // Check if the key already exists + auto it = Find(&bucket->list_, t); + if (it != bucket->list_.end()) { + return false; + } + + // insert to bucket + bucket->list_.push_back(t); + return true; + } + + bool Find(Bucket* bucket, const T& t, T* ret) { + auto it = Find(&bucket->list_, t); + if (it != bucket->list_.end()) { + if (ret) { + *ret = *it; + } + return true; + } + return false; + } + + bool Erase(Bucket* bucket, const T& t, T* ret) { + auto it = Find(&bucket->list_, t); + if (it != bucket->list_.end()) { + if (ret) { + *ret = *it; + } + + bucket->list_.erase(it); + return true; + } + return false; + } + + // assert that all buckets are empty + void AssertEmptyBuckets() { +#ifndef NDEBUG + for (size_t i = 0; i < nbuckets_; ++i) { + WriteLock _(&locks_[i % nlocks_]); + assert(buckets_[i].list_.empty()); + } +#endif + } + + const uint32_t nbuckets_; // No. of buckets in the spine + std::unique_ptr<Bucket[]> buckets_; // Spine of the hash buckets + const uint32_t nlocks_; // No. of locks + std::unique_ptr<port::RWMutex[]> locks_; // Granular locks +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/hash_table_bench.cc b/src/rocksdb/utilities/persistent_cache/hash_table_bench.cc new file mode 100644 index 000000000..74d7e2edf --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/hash_table_bench.cc @@ -0,0 +1,310 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#if !defined(OS_WIN) && !defined(ROCKSDB_LITE) + +#ifndef GFLAGS +#include <cstdio> +int main() { fprintf(stderr, "Please install gflags to run tools\n"); } +#else + +#include <sys/time.h> +#include <unistd.h> + +#include <atomic> +#include <functional> +#include <string> +#include <unordered_map> + +#include "port/port_posix.h" +#include "port/sys_time.h" +#include "rocksdb/env.h" +#include "util/gflags_compat.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "utilities/persistent_cache/hash_table.h" + +using std::string; + +DEFINE_int32(nsec, 10, "nsec"); +DEFINE_int32(nthread_write, 1, "insert %"); +DEFINE_int32(nthread_read, 1, "lookup %"); +DEFINE_int32(nthread_erase, 1, "erase %"); + +namespace ROCKSDB_NAMESPACE { + +// +// HashTableImpl interface +// +// Abstraction of a hash table implementation +template <class Key, class Value> +class HashTableImpl { + public: + virtual ~HashTableImpl() {} + + virtual bool Insert(const Key& key, const Value& val) = 0; + virtual bool Erase(const Key& key) = 0; + virtual bool Lookup(const Key& key, Value* val) = 0; +}; + +// HashTableBenchmark +// +// Abstraction to test a given hash table implementation. The test mostly +// focus on insert, lookup and erase. The test can operate in test mode and +// benchmark mode. +class HashTableBenchmark { + public: + explicit HashTableBenchmark(HashTableImpl<size_t, std::string>* impl, + const size_t sec = 10, + const size_t nthread_write = 1, + const size_t nthread_read = 1, + const size_t nthread_erase = 1) + : impl_(impl), + sec_(sec), + ninserts_(0), + nreads_(0), + nerases_(0), + nerases_failed_(0), + quit_(false) { + Prepop(); + + StartThreads(nthread_write, WriteMain); + StartThreads(nthread_read, ReadMain); + StartThreads(nthread_erase, EraseMain); + + uint64_t start = NowInMillSec(); + while (!quit_) { + quit_ = NowInMillSec() - start > sec_ * 1000; + /* sleep override */ sleep(1); + } + + Env* env = Env::Default(); + env->WaitForJoin(); + + if (sec_) { + printf("Result \n"); + printf("====== \n"); + printf("insert/sec = %f \n", ninserts_ / static_cast<double>(sec_)); + printf("read/sec = %f \n", nreads_ / static_cast<double>(sec_)); + printf("erases/sec = %f \n", nerases_ / static_cast<double>(sec_)); + const uint64_t ops = ninserts_ + nreads_ + nerases_; + printf("ops/sec = %f \n", ops / static_cast<double>(sec_)); + printf("erase fail = %d (%f%%)\n", static_cast<int>(nerases_failed_), + static_cast<float>(nerases_failed_ / nerases_ * 100)); + printf("====== \n"); + } + } + + void RunWrite() { + while (!quit_) { + size_t k = insert_key_++; + std::string tmp(1000, k % 255); + bool status = impl_->Insert(k, tmp); + assert(status); + ninserts_++; + } + } + + void RunRead() { + Random64 rgen(time(nullptr)); + while (!quit_) { + std::string s; + size_t k = rgen.Next() % max_prepop_key; + bool status = impl_->Lookup(k, &s); + assert(status); + assert(s == std::string(1000, k % 255)); + nreads_++; + } + } + + void RunErase() { + while (!quit_) { + size_t k = erase_key_++; + bool status = impl_->Erase(k); + nerases_failed_ += !status; + nerases_++; + } + } + + private: + // Start threads for a given function + void StartThreads(const size_t n, void (*fn)(void*)) { + Env* env = Env::Default(); + for (size_t i = 0; i < n; ++i) { + env->StartThread(fn, this); + } + } + + // Prepop the hash table with 1M keys + void Prepop() { + for (size_t i = 0; i < max_prepop_key; ++i) { + bool status = impl_->Insert(i, std::string(1000, i % 255)); + assert(status); + } + + erase_key_ = insert_key_ = max_prepop_key; + + for (size_t i = 0; i < 10 * max_prepop_key; ++i) { + bool status = impl_->Insert(insert_key_++, std::string(1000, 'x')); + assert(status); + } + } + + static uint64_t NowInMillSec() { + port::TimeVal tv; + port::GetTimeOfDay(&tv, /*tz=*/nullptr); + return tv.tv_sec * 1000 + tv.tv_usec / 1000; + } + + // + // Wrapper functions for thread entry + // + static void WriteMain(void* args) { + reinterpret_cast<HashTableBenchmark*>(args)->RunWrite(); + } + + static void ReadMain(void* args) { + reinterpret_cast<HashTableBenchmark*>(args)->RunRead(); + } + + static void EraseMain(void* args) { + reinterpret_cast<HashTableBenchmark*>(args)->RunErase(); + } + + HashTableImpl<size_t, std::string>* impl_; // Implementation to test + const size_t sec_; // Test time + const size_t max_prepop_key = 1ULL * 1024 * 1024; // Max prepop key + std::atomic<size_t> insert_key_; // Last inserted key + std::atomic<size_t> erase_key_; // Erase key + std::atomic<size_t> ninserts_; // Number of inserts + std::atomic<size_t> nreads_; // Number of reads + std::atomic<size_t> nerases_; // Number of erases + std::atomic<size_t> nerases_failed_; // Number of erases failed + bool quit_; // Should the threads quit ? +}; + +// +// SimpleImpl +// Lock safe unordered_map implementation +class SimpleImpl : public HashTableImpl<size_t, string> { + public: + bool Insert(const size_t& key, const string& val) override { + WriteLock _(&rwlock_); + map_.insert(make_pair(key, val)); + return true; + } + + bool Erase(const size_t& key) override { + WriteLock _(&rwlock_); + auto it = map_.find(key); + if (it == map_.end()) { + return false; + } + map_.erase(it); + return true; + } + + bool Lookup(const size_t& key, string* val) override { + ReadLock _(&rwlock_); + auto it = map_.find(key); + if (it != map_.end()) { + *val = it->second; + } + return it != map_.end(); + } + + private: + port::RWMutex rwlock_; + std::unordered_map<size_t, string> map_; +}; + +// +// GranularLockImpl +// Thread safe custom RocksDB implementation of hash table with granular +// locking +class GranularLockImpl : public HashTableImpl<size_t, string> { + public: + bool Insert(const size_t& key, const string& val) override { + Node n(key, val); + return impl_.Insert(n); + } + + bool Erase(const size_t& key) override { + Node n(key, string()); + return impl_.Erase(n, nullptr); + } + + bool Lookup(const size_t& key, string* val) override { + Node n(key, string()); + port::RWMutex* rlock; + bool status = impl_.Find(n, &n, &rlock); + if (status) { + ReadUnlock _(rlock); + *val = n.val_; + } + return status; + } + + private: + struct Node { + explicit Node(const size_t key, const string& val) : key_(key), val_(val) {} + + size_t key_ = 0; + string val_; + }; + + struct Hash { + uint64_t operator()(const Node& node) { + return std::hash<uint64_t>()(node.key_); + } + }; + + struct Equal { + bool operator()(const Node& lhs, const Node& rhs) { + return lhs.key_ == rhs.key_; + } + }; + + HashTable<Node, Hash, Equal> impl_; +}; + +} // namespace ROCKSDB_NAMESPACE + +// +// main +// +int main(int argc, char** argv) { + GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") + + std::string(argv[0]) + " [OPTIONS]..."); + GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false); + + // + // Micro benchmark unordered_map + // + printf("Micro benchmarking std::unordered_map \n"); + { + ROCKSDB_NAMESPACE::SimpleImpl impl; + ROCKSDB_NAMESPACE::HashTableBenchmark _( + &impl, FLAGS_nsec, FLAGS_nthread_write, FLAGS_nthread_read, + FLAGS_nthread_erase); + } + // + // Micro benchmark scalable hash table + // + printf("Micro benchmarking scalable hash map \n"); + { + ROCKSDB_NAMESPACE::GranularLockImpl impl; + ROCKSDB_NAMESPACE::HashTableBenchmark _( + &impl, FLAGS_nsec, FLAGS_nthread_write, FLAGS_nthread_read, + FLAGS_nthread_erase); + } + + return 0; +} +#endif // #ifndef GFLAGS +#else +int main(int /*argc*/, char** /*argv*/) { return 0; } +#endif diff --git a/src/rocksdb/utilities/persistent_cache/hash_table_evictable.h b/src/rocksdb/utilities/persistent_cache/hash_table_evictable.h new file mode 100644 index 000000000..e10939b2f --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/hash_table_evictable.h @@ -0,0 +1,168 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#ifndef ROCKSDB_LITE + +#include <functional> + +#include "util/random.h" +#include "utilities/persistent_cache/hash_table.h" +#include "utilities/persistent_cache/lrulist.h" + +namespace ROCKSDB_NAMESPACE { + +// Evictable Hash Table +// +// Hash table index where least accessed (or one of the least accessed) elements +// can be evicted. +// +// Please note EvictableHashTable can only be created for pointer type objects +template <class T, class Hash, class Equal> +class EvictableHashTable : private HashTable<T*, Hash, Equal> { + public: + using hash_table = HashTable<T*, Hash, Equal>; + + explicit EvictableHashTable(const size_t capacity = 1024 * 1024, + const float load_factor = 2.0, + const uint32_t nlocks = 256) + : HashTable<T*, Hash, Equal>(capacity, load_factor, nlocks), + lru_lists_(new LRUList<T>[hash_table::nlocks_]) { + assert(lru_lists_); + } + + virtual ~EvictableHashTable() { AssertEmptyLRU(); } + + // + // Insert given record to hash table (and LRU list) + // + bool Insert(T* t) { + const uint64_t h = Hash()(t); + typename hash_table::Bucket& bucket = GetBucket(h); + LRUListType& lru = GetLRUList(h); + port::RWMutex& lock = GetMutex(h); + + WriteLock _(&lock); + if (hash_table::Insert(&bucket, t)) { + lru.Push(t); + return true; + } + return false; + } + + // + // Lookup hash table + // + // Please note that read lock should be held by the caller. This is because + // the caller owns the data, and should hold the read lock as long as he + // operates on the data. + bool Find(T* t, T** ret) { + const uint64_t h = Hash()(t); + typename hash_table::Bucket& bucket = GetBucket(h); + LRUListType& lru = GetLRUList(h); + port::RWMutex& lock = GetMutex(h); + + ReadLock _(&lock); + if (hash_table::Find(&bucket, t, ret)) { + ++(*ret)->refs_; + lru.Touch(*ret); + return true; + } + return false; + } + + // + // Evict one of the least recently used object + // + T* Evict(const std::function<void(T*)>& fn = nullptr) { + uint32_t random = Random::GetTLSInstance()->Next(); + const size_t start_idx = random % hash_table::nlocks_; + T* t = nullptr; + + // iterate from start_idx .. 0 .. start_idx + for (size_t i = 0; !t && i < hash_table::nlocks_; ++i) { + const size_t idx = (start_idx + i) % hash_table::nlocks_; + + WriteLock _(&hash_table::locks_[idx]); + LRUListType& lru = lru_lists_[idx]; + if (!lru.IsEmpty() && (t = lru.Pop()) != nullptr) { + assert(!t->refs_); + // We got an item to evict, erase from the bucket + const uint64_t h = Hash()(t); + typename hash_table::Bucket& bucket = GetBucket(h); + T* tmp = nullptr; + bool status = hash_table::Erase(&bucket, t, &tmp); + assert(t == tmp); + (void)status; + assert(status); + if (fn) { + fn(t); + } + break; + } + assert(!t); + } + return t; + } + + void Clear(void (*fn)(T*)) { + for (uint32_t i = 0; i < hash_table::nbuckets_; ++i) { + const uint32_t lock_idx = i % hash_table::nlocks_; + WriteLock _(&hash_table::locks_[lock_idx]); + auto& lru_list = lru_lists_[lock_idx]; + auto& bucket = hash_table::buckets_[i]; + for (auto* t : bucket.list_) { + lru_list.Unlink(t); + (*fn)(t); + } + bucket.list_.clear(); + } + // make sure that all LRU lists are emptied + AssertEmptyLRU(); + } + + void AssertEmptyLRU() { +#ifndef NDEBUG + for (uint32_t i = 0; i < hash_table::nlocks_; ++i) { + WriteLock _(&hash_table::locks_[i]); + auto& lru_list = lru_lists_[i]; + assert(lru_list.IsEmpty()); + } +#endif + } + + // + // Fetch the mutex associated with a key + // This call is used to hold the lock for a given data for extended period of + // time. + port::RWMutex* GetMutex(T* t) { return hash_table::GetMutex(t); } + + private: + using LRUListType = LRUList<T>; + + typename hash_table::Bucket& GetBucket(const uint64_t h) { + const uint32_t bucket_idx = h % hash_table::nbuckets_; + return hash_table::buckets_[bucket_idx]; + } + + LRUListType& GetLRUList(const uint64_t h) { + const uint32_t bucket_idx = h % hash_table::nbuckets_; + const uint32_t lock_idx = bucket_idx % hash_table::nlocks_; + return lru_lists_[lock_idx]; + } + + port::RWMutex& GetMutex(const uint64_t h) { + const uint32_t bucket_idx = h % hash_table::nbuckets_; + const uint32_t lock_idx = bucket_idx % hash_table::nlocks_; + return hash_table::locks_[lock_idx]; + } + + std::unique_ptr<LRUListType[]> lru_lists_; +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/hash_table_test.cc b/src/rocksdb/utilities/persistent_cache/hash_table_test.cc new file mode 100644 index 000000000..2f6387f5f --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/hash_table_test.cc @@ -0,0 +1,163 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "utilities/persistent_cache/hash_table.h" + +#include <stdlib.h> + +#include <iostream> +#include <set> +#include <string> + +#include "db/db_test_util.h" +#include "memory/arena.h" +#include "test_util/testharness.h" +#include "util/random.h" +#include "utilities/persistent_cache/hash_table_evictable.h" + +#ifndef ROCKSDB_LITE + +namespace ROCKSDB_NAMESPACE { + +struct HashTableTest : public testing::Test { + ~HashTableTest() override { map_.Clear(&HashTableTest::ClearNode); } + + struct Node { + Node() {} + explicit Node(const uint64_t key, const std::string& val = std::string()) + : key_(key), val_(val) {} + + uint64_t key_ = 0; + std::string val_; + }; + + struct Equal { + bool operator()(const Node& lhs, const Node& rhs) { + return lhs.key_ == rhs.key_; + } + }; + + struct Hash { + uint64_t operator()(const Node& node) { + return std::hash<uint64_t>()(node.key_); + } + }; + + static void ClearNode(Node /*node*/) {} + + HashTable<Node, Hash, Equal> map_; +}; + +struct EvictableHashTableTest : public testing::Test { + ~EvictableHashTableTest() override { + map_.Clear(&EvictableHashTableTest::ClearNode); + } + + struct Node : LRUElement<Node> { + Node() {} + explicit Node(const uint64_t key, const std::string& val = std::string()) + : key_(key), val_(val) {} + + uint64_t key_ = 0; + std::string val_; + std::atomic<uint32_t> refs_{0}; + }; + + struct Equal { + bool operator()(const Node* lhs, const Node* rhs) { + return lhs->key_ == rhs->key_; + } + }; + + struct Hash { + uint64_t operator()(const Node* node) { + return std::hash<uint64_t>()(node->key_); + } + }; + + static void ClearNode(Node* /*node*/) {} + + EvictableHashTable<Node, Hash, Equal> map_; +}; + +TEST_F(HashTableTest, TestInsert) { + const uint64_t max_keys = 1024 * 1024; + + // insert + for (uint64_t k = 0; k < max_keys; ++k) { + map_.Insert(Node(k, std::string(1000, k % 255))); + } + + // verify + for (uint64_t k = 0; k < max_keys; ++k) { + Node val; + port::RWMutex* rlock = nullptr; + assert(map_.Find(Node(k), &val, &rlock)); + rlock->ReadUnlock(); + assert(val.val_ == std::string(1000, k % 255)); + } +} + +TEST_F(HashTableTest, TestErase) { + const uint64_t max_keys = 1024 * 1024; + // insert + for (uint64_t k = 0; k < max_keys; ++k) { + map_.Insert(Node(k, std::string(1000, k % 255))); + } + + auto rand = Random64(time(nullptr)); + // erase a few keys randomly + std::set<uint64_t> erased; + for (int i = 0; i < 1024; ++i) { + uint64_t k = rand.Next() % max_keys; + if (erased.find(k) != erased.end()) { + continue; + } + assert(map_.Erase(Node(k), /*ret=*/nullptr)); + erased.insert(k); + } + + // verify + for (uint64_t k = 0; k < max_keys; ++k) { + Node val; + port::RWMutex* rlock = nullptr; + bool status = map_.Find(Node(k), &val, &rlock); + if (erased.find(k) == erased.end()) { + assert(status); + rlock->ReadUnlock(); + assert(val.val_ == std::string(1000, k % 255)); + } else { + assert(!status); + } + } +} + +TEST_F(EvictableHashTableTest, TestEvict) { + const uint64_t max_keys = 1024 * 1024; + + // insert + for (uint64_t k = 0; k < max_keys; ++k) { + map_.Insert(new Node(k, std::string(1000, k % 255))); + } + + // verify + for (uint64_t k = 0; k < max_keys; ++k) { + Node* val = map_.Evict(); + // unfortunately we can't predict eviction value since it is from any one of + // the lock stripe + assert(val); + assert(val->val_ == std::string(1000, val->key_ % 255)); + delete val; + } +} + +} // namespace ROCKSDB_NAMESPACE +#endif + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/utilities/persistent_cache/lrulist.h b/src/rocksdb/utilities/persistent_cache/lrulist.h new file mode 100644 index 000000000..a608890fc --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/lrulist.h @@ -0,0 +1,174 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#ifndef ROCKSDB_LITE + +#include <atomic> + +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +// LRU element definition +// +// Any object that needs to be part of the LRU algorithm should extend this +// class +template <class T> +struct LRUElement { + explicit LRUElement() : next_(nullptr), prev_(nullptr), refs_(0) {} + + virtual ~LRUElement() { assert(!refs_); } + + T* next_; + T* prev_; + std::atomic<size_t> refs_; +}; + +// LRU implementation +// +// In place LRU implementation. There is no copy or allocation involved when +// inserting or removing an element. This makes the data structure slim +template <class T> +class LRUList { + public: + virtual ~LRUList() { + MutexLock _(&lock_); + assert(!head_); + assert(!tail_); + } + + // Push element into the LRU at the cold end + inline void Push(T* const t) { + assert(t); + assert(!t->next_); + assert(!t->prev_); + + MutexLock _(&lock_); + + assert((!head_ && !tail_) || (head_ && tail_)); + assert(!head_ || !head_->prev_); + assert(!tail_ || !tail_->next_); + + t->next_ = head_; + if (head_) { + head_->prev_ = t; + } + + head_ = t; + if (!tail_) { + tail_ = t; + } + } + + // Unlink the element from the LRU + inline void Unlink(T* const t) { + MutexLock _(&lock_); + UnlinkImpl(t); + } + + // Evict an element from the LRU + inline T* Pop() { + MutexLock _(&lock_); + + assert(tail_ && head_); + assert(!tail_->next_); + assert(!head_->prev_); + + T* t = head_; + while (t && t->refs_) { + t = t->next_; + } + + if (!t) { + // nothing can be evicted + return nullptr; + } + + assert(!t->refs_); + + // unlike the element + UnlinkImpl(t); + return t; + } + + // Move the element from the front of the list to the back of the list + inline void Touch(T* const t) { + MutexLock _(&lock_); + UnlinkImpl(t); + PushBackImpl(t); + } + + // Check if the LRU is empty + inline bool IsEmpty() const { + MutexLock _(&lock_); + return !head_ && !tail_; + } + + private: + // Unlink an element from the LRU + void UnlinkImpl(T* const t) { + assert(t); + + lock_.AssertHeld(); + + assert(head_ && tail_); + assert(t->prev_ || head_ == t); + assert(t->next_ || tail_ == t); + + if (t->prev_) { + t->prev_->next_ = t->next_; + } + if (t->next_) { + t->next_->prev_ = t->prev_; + } + + if (tail_ == t) { + tail_ = tail_->prev_; + } + if (head_ == t) { + head_ = head_->next_; + } + + t->next_ = t->prev_ = nullptr; + } + + // Insert an element at the hot end + inline void PushBack(T* const t) { + MutexLock _(&lock_); + PushBackImpl(t); + } + + inline void PushBackImpl(T* const t) { + assert(t); + assert(!t->next_); + assert(!t->prev_); + + lock_.AssertHeld(); + + assert((!head_ && !tail_) || (head_ && tail_)); + assert(!head_ || !head_->prev_); + assert(!tail_ || !tail_->next_); + + t->prev_ = tail_; + if (tail_) { + tail_->next_ = t; + } + + tail_ = t; + if (!head_) { + head_ = tail_; + } + } + + mutable port::Mutex lock_; // synchronization primitive + T* head_ = nullptr; // front (cold) + T* tail_ = nullptr; // back (hot) +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_bench.cc b/src/rocksdb/utilities/persistent_cache/persistent_cache_bench.cc new file mode 100644 index 000000000..9d6e15d6b --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_bench.cc @@ -0,0 +1,359 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef ROCKSDB_LITE + +#ifndef GFLAGS +#include <cstdio> +int main() { fprintf(stderr, "Please install gflags to run tools\n"); } +#else +#include <atomic> +#include <functional> +#include <memory> +#include <sstream> +#include <unordered_map> + +#include "monitoring/histogram.h" +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/system_clock.h" +#include "table/block_based/block_builder.h" +#include "util/gflags_compat.h" +#include "util/mutexlock.h" +#include "util/stop_watch.h" +#include "utilities/persistent_cache/block_cache_tier.h" +#include "utilities/persistent_cache/persistent_cache_tier.h" +#include "utilities/persistent_cache/volatile_tier_impl.h" + +DEFINE_int32(nsec, 10, "nsec"); +DEFINE_int32(nthread_write, 1, "Insert threads"); +DEFINE_int32(nthread_read, 1, "Lookup threads"); +DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile"); +DEFINE_string(log_path, "/tmp/log", "Path for the log file"); +DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size"); +DEFINE_int32(iosize, 4 * 1024, "Read IO size"); +DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size"); +DEFINE_int32(writer_qdepth, 1, "File writer qdepth"); +DEFINE_bool(enable_pipelined_writes, false, "Enable async writes"); +DEFINE_string(cache_type, "block_cache", + "Cache type. (block_cache, volatile, tiered)"); +DEFINE_bool(benchmark, false, "Benchmark mode"); +DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier."); + +namespace ROCKSDB_NAMESPACE { + +std::unique_ptr<PersistentCacheTier> NewVolatileCache() { + assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max()); + std::unique_ptr<PersistentCacheTier> pcache( + new VolatileCacheTier(FLAGS_cache_size)); + return pcache; +} + +std::unique_ptr<PersistentCacheTier> NewBlockCache() { + std::shared_ptr<Logger> log; + if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) { + fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str()); + return nullptr; + } + + PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log); + opt.writer_dispatch_size = FLAGS_writer_iosize; + opt.writer_qdepth = FLAGS_writer_qdepth; + opt.pipeline_writes = FLAGS_enable_pipelined_writes; + opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max(); + std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt)); + Status status = cache->Open(); + return cache; +} + +// create a new cache tier +// construct a tiered RAM+Block cache +std::unique_ptr<PersistentTieredCache> NewTieredCache( + const size_t mem_size, const PersistentCacheConfig& opt) { + std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache()); + // create primary tier + assert(mem_size); + auto pcache = + std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size)); + tcache->AddTier(pcache); + // create secondary tier + auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt)); + tcache->AddTier(scache); + + Status s = tcache->Open(); + assert(s.ok()); + return tcache; +} + +std::unique_ptr<PersistentTieredCache> NewTieredCache() { + std::shared_ptr<Logger> log; + if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) { + fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str()); + abort(); + } + + auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100); + PersistentCacheConfig opt(Env::Default(), FLAGS_path, + (1 - pct) * FLAGS_cache_size, log); + opt.writer_dispatch_size = FLAGS_writer_iosize; + opt.writer_qdepth = FLAGS_writer_qdepth; + opt.pipeline_writes = FLAGS_enable_pipelined_writes; + opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max(); + return NewTieredCache(FLAGS_cache_size * pct, opt); +} + +// +// Benchmark driver +// +class CacheTierBenchmark { + public: + explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache) + : cache_(cache) { + if (FLAGS_nthread_read) { + fprintf(stdout, "Pre-populating\n"); + Prepop(); + fprintf(stdout, "Pre-population completed\n"); + } + + stats_.Clear(); + + // Start IO threads + std::list<port::Thread> threads; + Spawn(FLAGS_nthread_write, &threads, + std::bind(&CacheTierBenchmark::Write, this)); + Spawn(FLAGS_nthread_read, &threads, + std::bind(&CacheTierBenchmark::Read, this)); + + // Wait till FLAGS_nsec and then signal to quit + StopWatchNano t(SystemClock::Default().get(), /*auto_start=*/true); + size_t sec = t.ElapsedNanos() / 1000000000ULL; + while (!quit_) { + sec = t.ElapsedNanos() / 1000000000ULL; + quit_ = sec > size_t(FLAGS_nsec); + /* sleep override */ sleep(1); + } + + // Wait for threads to exit + Join(&threads); + // Print stats + PrintStats(sec); + // Close the cache + cache_->TEST_Flush(); + cache_->Close(); + } + + private: + void PrintStats(const size_t sec) { + std::ostringstream msg; + msg << "Test stats" << std::endl + << "* Elapsed: " << sec << " s" << std::endl + << "* Write Latency:" << std::endl + << stats_.write_latency_.ToString() << std::endl + << "* Read Latency:" << std::endl + << stats_.read_latency_.ToString() << std::endl + << "* Bytes written:" << std::endl + << stats_.bytes_written_.ToString() << std::endl + << "* Bytes read:" << std::endl + << stats_.bytes_read_.ToString() << std::endl + << "Cache stats:" << std::endl + << cache_->PrintStats() << std::endl; + fprintf(stderr, "%s\n", msg.str().c_str()); + } + + // + // Insert implementation and corresponding helper functions + // + void Prepop() { + for (uint64_t i = 0; i < 1024 * 1024; ++i) { + InsertKey(i); + insert_key_limit_++; + read_key_limit_++; + } + + // Wait until data is flushed + cache_->TEST_Flush(); + // warmup the cache + for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) { + } + } + + void Write() { + while (!quit_) { + InsertKey(insert_key_limit_++); + } + } + + void InsertKey(const uint64_t key) { + // construct key + uint64_t k[3]; + Slice block_key = FillKey(k, key); + + // construct value + auto block = NewBlock(key); + + // insert + StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true); + while (true) { + Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize); + if (status.ok()) { + break; + } + + // transient error is possible if we run without pipelining + assert(!FLAGS_enable_pipelined_writes); + } + + // adjust stats + const size_t elapsed_micro = timer.ElapsedNanos() / 1000; + stats_.write_latency_.Add(elapsed_micro); + stats_.bytes_written_.Add(FLAGS_iosize); + } + + // + // Read implementation + // + void Read() { + while (!quit_) { + ReadKey(random() % read_key_limit_); + } + } + + void ReadKey(const uint64_t val) { + // construct key + uint64_t k[3]; + Slice key = FillKey(k, val); + + // Lookup in cache + StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true); + std::unique_ptr<char[]> block; + size_t size; + Status status = cache_->Lookup(key, &block, &size); + if (!status.ok()) { + fprintf(stderr, "%s\n", status.ToString().c_str()); + } + assert(status.ok()); + assert(size == (size_t)FLAGS_iosize); + + // adjust stats + const size_t elapsed_micro = timer.ElapsedNanos() / 1000; + stats_.read_latency_.Add(elapsed_micro); + stats_.bytes_read_.Add(FLAGS_iosize); + + // verify content + if (!FLAGS_benchmark) { + auto expected_block = NewBlock(val); + assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0); + } + } + + // create data for a key by filling with a certain pattern + std::unique_ptr<char[]> NewBlock(const uint64_t val) { + std::unique_ptr<char[]> data(new char[FLAGS_iosize]); + memset(data.get(), val % 255, FLAGS_iosize); + return data; + } + + // spawn threads + void Spawn(const size_t n, std::list<port::Thread>* threads, + const std::function<void()>& fn) { + for (size_t i = 0; i < n; ++i) { + threads->emplace_back(fn); + } + } + + // join threads + void Join(std::list<port::Thread>* threads) { + for (auto& th : *threads) { + th.join(); + } + } + + // construct key + Slice FillKey(uint64_t (&k)[3], const uint64_t val) { + k[0] = k[1] = 0; + k[2] = val; + void* p = static_cast<void*>(&k); + return Slice(static_cast<char*>(p), sizeof(k)); + } + + // benchmark stats + struct Stats { + void Clear() { + bytes_written_.Clear(); + bytes_read_.Clear(); + read_latency_.Clear(); + write_latency_.Clear(); + } + + HistogramImpl bytes_written_; + HistogramImpl bytes_read_; + HistogramImpl read_latency_; + HistogramImpl write_latency_; + }; + + std::shared_ptr<PersistentCacheTier> cache_; // cache implementation + std::atomic<uint64_t> insert_key_limit_{0}; // data inserted upto + std::atomic<uint64_t> read_key_limit_{0}; // data can be read safely upto + bool quit_ = false; // Quit thread ? + mutable Stats stats_; // Stats +}; + +} // namespace ROCKSDB_NAMESPACE + +// +// main +// +int main(int argc, char** argv) { + GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") + + std::string(argv[0]) + " [OPTIONS]..."); + GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false); + + std::ostringstream msg; + msg << "Config" << std::endl + << "======" << std::endl + << "* nsec=" << FLAGS_nsec << std::endl + << "* nthread_write=" << FLAGS_nthread_write << std::endl + << "* path=" << FLAGS_path << std::endl + << "* cache_size=" << FLAGS_cache_size << std::endl + << "* iosize=" << FLAGS_iosize << std::endl + << "* writer_iosize=" << FLAGS_writer_iosize << std::endl + << "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl + << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes + << std::endl + << "* cache_type=" << FLAGS_cache_type << std::endl + << "* benchmark=" << FLAGS_benchmark << std::endl + << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl; + + fprintf(stderr, "%s\n", msg.str().c_str()); + + std::shared_ptr<ROCKSDB_NAMESPACE::PersistentCacheTier> cache; + if (FLAGS_cache_type == "block_cache") { + fprintf(stderr, "Using block cache implementation\n"); + cache = ROCKSDB_NAMESPACE::NewBlockCache(); + } else if (FLAGS_cache_type == "volatile") { + fprintf(stderr, "Using volatile cache implementation\n"); + cache = ROCKSDB_NAMESPACE::NewVolatileCache(); + } else if (FLAGS_cache_type == "tiered") { + fprintf(stderr, "Using tiered cache implementation\n"); + cache = ROCKSDB_NAMESPACE::NewTieredCache(); + } else { + fprintf(stderr, "Unknown option for cache\n"); + } + + assert(cache); + if (!cache) { + fprintf(stderr, "Error creating cache\n"); + abort(); + } + + std::unique_ptr<ROCKSDB_NAMESPACE::CacheTierBenchmark> benchmark( + new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache))); + + return 0; +} +#endif // #ifndef GFLAGS +#else +int main(int, char**) { return 0; } +#endif diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_test.cc b/src/rocksdb/utilities/persistent_cache/persistent_cache_test.cc new file mode 100644 index 000000000..d1b18b68a --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_test.cc @@ -0,0 +1,462 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#if !defined ROCKSDB_LITE + +#include "utilities/persistent_cache/persistent_cache_test.h" + +#include <functional> +#include <memory> +#include <thread> + +#include "file/file_util.h" +#include "utilities/persistent_cache/block_cache_tier.h" + +namespace ROCKSDB_NAMESPACE { + +static const double kStressFactor = .125; + +#ifdef OS_LINUX +static void OnOpenForRead(void* arg) { + int* val = static_cast<int*>(arg); + *val &= ~O_DIRECT; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "NewRandomAccessFile:O_DIRECT", + std::bind(OnOpenForRead, std::placeholders::_1)); +} + +static void OnOpenForWrite(void* arg) { + int* val = static_cast<int*>(arg); + *val &= ~O_DIRECT; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", + std::bind(OnOpenForWrite, std::placeholders::_1)); +} +#endif + +static void OnDeleteDir(void* arg) { + char* dir = static_cast<char*>(arg); + ASSERT_OK(DestroyDir(Env::Default(), std::string(dir))); +} + +// +// Simple logger that prints message on stdout +// +class ConsoleLogger : public Logger { + public: + using Logger::Logv; + ConsoleLogger() : Logger(InfoLogLevel::ERROR_LEVEL) {} + + void Logv(const char* format, va_list ap) override { + MutexLock _(&lock_); + vprintf(format, ap); + printf("\n"); + } + + port::Mutex lock_; +}; + +// construct a tiered RAM+Block cache +std::unique_ptr<PersistentTieredCache> NewTieredCache( + const size_t mem_size, const PersistentCacheConfig& opt) { + std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache()); + // create primary tier + assert(mem_size); + auto pcache = std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier( + /*is_compressed*/ true, mem_size)); + tcache->AddTier(pcache); + // create secondary tier + auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt)); + tcache->AddTier(scache); + + Status s = tcache->Open(); + assert(s.ok()); + return tcache; +} + +// create block cache +std::unique_ptr<PersistentCacheTier> NewBlockCache( + Env* env, const std::string& path, + const uint64_t max_size = std::numeric_limits<uint64_t>::max(), + const bool enable_direct_writes = false) { + const uint32_t max_file_size = + static_cast<uint32_t>(12 * 1024 * 1024 * kStressFactor); + auto log = std::make_shared<ConsoleLogger>(); + PersistentCacheConfig opt(env, path, max_size, log); + opt.cache_file_size = max_file_size; + opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max(); + opt.enable_direct_writes = enable_direct_writes; + std::unique_ptr<PersistentCacheTier> scache(new BlockCacheTier(opt)); + Status s = scache->Open(); + assert(s.ok()); + return scache; +} + +// create a new cache tier +std::unique_ptr<PersistentTieredCache> NewTieredCache( + Env* env, const std::string& path, const uint64_t max_volatile_cache_size, + const uint64_t max_block_cache_size = + std::numeric_limits<uint64_t>::max()) { + const uint32_t max_file_size = + static_cast<uint32_t>(12 * 1024 * 1024 * kStressFactor); + auto log = std::make_shared<ConsoleLogger>(); + auto opt = PersistentCacheConfig(env, path, max_block_cache_size, log); + opt.cache_file_size = max_file_size; + opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max(); + // create tier out of the two caches + auto cache = NewTieredCache(max_volatile_cache_size, opt); + return cache; +} + +PersistentCacheTierTest::PersistentCacheTierTest() + : path_(test::PerThreadDBPath("cache_test")) { +#ifdef OS_LINUX + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "NewRandomAccessFile:O_DIRECT", OnOpenForRead); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", OnOpenForWrite); +#endif +} + +// Block cache tests +TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsertWithFileCreateError) { + cache_ = NewBlockCache(Env::Default(), path_, + /*size=*/std::numeric_limits<uint64_t>::max(), + /*direct_writes=*/false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BlockCacheTier::NewCacheFile:DeleteDir", OnDeleteDir); + + RunNegativeInsertTest(/*nthreads=*/1, + /*max_keys*/ + static_cast<size_t>(10 * 1024 * kStressFactor)); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +// Travis is unable to handle the normal version of the tests running out of +// fds, out of space and timeouts. This is an easier version of the test +// specifically written for Travis +TEST_F(PersistentCacheTierTest, DISABLED_BasicTest) { + cache_ = std::make_shared<VolatileCacheTier>(); + RunInsertTest(/*nthreads=*/1, /*max_keys=*/1024); + + cache_ = NewBlockCache(Env::Default(), path_, + /*size=*/std::numeric_limits<uint64_t>::max(), + /*direct_writes=*/true); + RunInsertTest(/*nthreads=*/1, /*max_keys=*/1024); + + cache_ = NewTieredCache(Env::Default(), path_, + /*memory_size=*/static_cast<size_t>(1 * 1024 * 1024)); + RunInsertTest(/*nthreads=*/1, /*max_keys=*/1024); +} + +// Volatile cache tests +// DISABLED for now (somewhat expensive) +TEST_F(PersistentCacheTierTest, DISABLED_VolatileCacheInsert) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : + {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) { + cache_ = std::make_shared<VolatileCacheTier>(); + RunInsertTest(nthreads, static_cast<size_t>(max_keys)); + } + } +} + +// DISABLED for now (somewhat expensive) +TEST_F(PersistentCacheTierTest, DISABLED_VolatileCacheInsertWithEviction) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) { + cache_ = std::make_shared<VolatileCacheTier>( + /*compressed=*/true, + /*size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor)); + RunInsertTestWithEviction(nthreads, static_cast<size_t>(max_keys)); + } + } +} + +// Block cache tests +// DISABLED for now (expensive) +TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsert) { + for (auto direct_writes : {true, false}) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : + {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) { + cache_ = NewBlockCache(Env::Default(), path_, + /*size=*/std::numeric_limits<uint64_t>::max(), + direct_writes); + RunInsertTest(nthreads, static_cast<size_t>(max_keys)); + } + } + } +} + +// DISABLED for now (somewhat expensive) +TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsertWithEviction) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) { + cache_ = NewBlockCache( + Env::Default(), path_, + /*max_size=*/static_cast<size_t>(200 * 1024 * 1024 * kStressFactor)); + RunInsertTestWithEviction(nthreads, static_cast<size_t>(max_keys)); + } + } +} + +// Tiered cache tests +// DISABLED for now (expensive) +TEST_F(PersistentCacheTierTest, DISABLED_TieredCacheInsert) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : + {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) { + cache_ = NewTieredCache( + Env::Default(), path_, + /*memory_size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor)); + RunInsertTest(nthreads, static_cast<size_t>(max_keys)); + } + } +} + +// the tests causes a lot of file deletions which Travis limited testing +// environment cannot handle +// DISABLED for now (somewhat expensive) +TEST_F(PersistentCacheTierTest, DISABLED_TieredCacheInsertWithEviction) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) { + cache_ = NewTieredCache( + Env::Default(), path_, + /*memory_size=*/static_cast<size_t>(1 * 1024 * 1024 * kStressFactor), + /*block_cache_size*/ + static_cast<size_t>(200 * 1024 * 1024 * kStressFactor)); + RunInsertTestWithEviction(nthreads, static_cast<size_t>(max_keys)); + } + } +} + +std::shared_ptr<PersistentCacheTier> MakeVolatileCache( + Env* /*env*/, const std::string& /*dbname*/) { + return std::make_shared<VolatileCacheTier>(); +} + +std::shared_ptr<PersistentCacheTier> MakeBlockCache(Env* env, + const std::string& dbname) { + return NewBlockCache(env, dbname); +} + +std::shared_ptr<PersistentCacheTier> MakeTieredCache( + Env* env, const std::string& dbname) { + const auto memory_size = 1 * 1024 * 1024 * kStressFactor; + return NewTieredCache(env, dbname, static_cast<size_t>(memory_size)); +} + +#ifdef OS_LINUX +static void UniqueIdCallback(void* arg) { + int* result = reinterpret_cast<int*>(arg); + if (*result == -1) { + *result = 0; + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); +} +#endif + +TEST_F(PersistentCacheTierTest, FactoryTest) { + for (auto nvm_opt : {true, false}) { + ASSERT_FALSE(cache_); + auto log = std::make_shared<ConsoleLogger>(); + std::shared_ptr<PersistentCache> cache; + ASSERT_OK(NewPersistentCache(Env::Default(), path_, + /*size=*/1 * 1024 * 1024 * 1024, log, nvm_opt, + &cache)); + ASSERT_TRUE(cache); + ASSERT_EQ(cache->Stats().size(), 1); + ASSERT_TRUE(cache->Stats()[0].size()); + cache.reset(); + } +} + +PersistentCacheDBTest::PersistentCacheDBTest() + : DBTestBase("cache_test", /*env_do_fsync=*/true) { +#ifdef OS_LINUX + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "NewRandomAccessFile:O_DIRECT", OnOpenForRead); +#endif +} + +// test template +void PersistentCacheDBTest::RunTest( + const std::function<std::shared_ptr<PersistentCacheTier>(bool)>& new_pcache, + const size_t max_keys = 100 * 1024, const size_t max_usecase = 5) { + // number of insertion interations + int num_iter = static_cast<int>(max_keys * kStressFactor); + + for (size_t iter = 0; iter < max_usecase; iter++) { + Options options; + options.write_buffer_size = + static_cast<size_t>(64 * 1024 * kStressFactor); // small write buffer + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options = CurrentOptions(options); + + // setup page cache + std::shared_ptr<PersistentCacheTier> pcache; + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + + const size_t size_max = std::numeric_limits<size_t>::max(); + + switch (iter) { + case 0: + // page cache, block cache, no-compressed cache + pcache = new_pcache(/*is_compressed=*/true); + table_options.persistent_cache = pcache; + table_options.block_cache = NewLRUCache(size_max); + table_options.block_cache_compressed = nullptr; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + case 1: + // page cache, block cache, compressed cache + pcache = new_pcache(/*is_compressed=*/true); + table_options.persistent_cache = pcache; + table_options.block_cache = NewLRUCache(size_max); + table_options.block_cache_compressed = NewLRUCache(size_max); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + case 2: + // page cache, block cache, compressed cache + KNoCompression + // both block cache and compressed cache, but DB is not compressed + // also, make block cache sizes bigger, to trigger block cache hits + pcache = new_pcache(/*is_compressed=*/true); + table_options.persistent_cache = pcache; + table_options.block_cache = NewLRUCache(size_max); + table_options.block_cache_compressed = NewLRUCache(size_max); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.compression = kNoCompression; + break; + case 3: + // page cache, no block cache, no compressed cache + pcache = new_pcache(/*is_compressed=*/false); + table_options.persistent_cache = pcache; + table_options.block_cache = nullptr; + table_options.block_cache_compressed = nullptr; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + case 4: + // page cache, no block cache, no compressed cache + // Page cache caches compressed blocks + pcache = new_pcache(/*is_compressed=*/true); + table_options.persistent_cache = pcache; + table_options.block_cache = nullptr; + table_options.block_cache_compressed = nullptr; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + default: + FAIL(); + } + + std::vector<std::string> values; + // insert data + Insert(options, table_options, num_iter, &values); + // flush all data in cache to device + pcache->TEST_Flush(); + // verify data + Verify(num_iter, values); + + auto block_miss = TestGetTickerCount(options, BLOCK_CACHE_MISS); + auto compressed_block_hit = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); + auto compressed_block_miss = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); + auto page_hit = TestGetTickerCount(options, PERSISTENT_CACHE_HIT); + auto page_miss = TestGetTickerCount(options, PERSISTENT_CACHE_MISS); + + // check that we triggered the appropriate code paths in the cache + switch (iter) { + case 0: + // page cache, block cache, no-compressed cache + ASSERT_GT(page_miss, 0); + ASSERT_GT(page_hit, 0); + ASSERT_GT(block_miss, 0); + ASSERT_EQ(compressed_block_miss, 0); + ASSERT_EQ(compressed_block_hit, 0); + break; + case 1: + // page cache, block cache, compressed cache + ASSERT_GT(page_miss, 0); + ASSERT_GT(block_miss, 0); + ASSERT_GT(compressed_block_miss, 0); + break; + case 2: + // page cache, block cache, compressed cache + KNoCompression + ASSERT_GT(page_miss, 0); + ASSERT_GT(page_hit, 0); + ASSERT_GT(block_miss, 0); + ASSERT_GT(compressed_block_miss, 0); + // remember kNoCompression + ASSERT_EQ(compressed_block_hit, 0); + break; + case 3: + case 4: + // page cache, no block cache, no compressed cache + ASSERT_GT(page_miss, 0); + ASSERT_GT(page_hit, 0); + ASSERT_EQ(compressed_block_hit, 0); + ASSERT_EQ(compressed_block_miss, 0); + break; + default: + FAIL(); + } + + options.create_if_missing = true; + DestroyAndReopen(options); + + ASSERT_OK(pcache->Close()); + } +} + +// Travis is unable to handle the normal version of the tests running out of +// fds, out of space and timeouts. This is an easier version of the test +// specifically written for Travis. +// Now used generally because main tests are too expensive as unit tests. +TEST_F(PersistentCacheDBTest, BasicTest) { + RunTest(std::bind(&MakeBlockCache, env_, dbname_), /*max_keys=*/1024, + /*max_usecase=*/1); +} + +// test table with block page cache +// DISABLED for now (very expensive, especially memory) +TEST_F(PersistentCacheDBTest, DISABLED_BlockCacheTest) { + RunTest(std::bind(&MakeBlockCache, env_, dbname_)); +} + +// test table with volatile page cache +// DISABLED for now (very expensive, especially memory) +TEST_F(PersistentCacheDBTest, DISABLED_VolatileCacheTest) { + RunTest(std::bind(&MakeVolatileCache, env_, dbname_)); +} + +// test table with tiered page cache +// DISABLED for now (very expensive, especially memory) +TEST_F(PersistentCacheDBTest, DISABLED_TieredCacheTest) { + RunTest(std::bind(&MakeTieredCache, env_, dbname_)); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} +#else // !defined ROCKSDB_LITE +int main() { return 0; } +#endif // !defined ROCKSDB_LITE diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_test.h b/src/rocksdb/utilities/persistent_cache/persistent_cache_test.h new file mode 100644 index 000000000..f13155ed6 --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_test.h @@ -0,0 +1,286 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#ifndef ROCKSDB_LITE + +#include <functional> +#include <limits> +#include <list> +#include <memory> +#include <string> +#include <thread> +#include <vector> + +#include "db/db_test_util.h" +#include "memory/arena.h" +#include "port/port.h" +#include "rocksdb/cache.h" +#include "table/block_based/block_builder.h" +#include "test_util/testharness.h" +#include "util/random.h" +#include "utilities/persistent_cache/volatile_tier_impl.h" + +namespace ROCKSDB_NAMESPACE { + +// +// Unit tests for testing PersistentCacheTier +// +class PersistentCacheTierTest : public testing::Test { + public: + PersistentCacheTierTest(); + virtual ~PersistentCacheTierTest() { + if (cache_) { + Status s = cache_->Close(); + assert(s.ok()); + } + } + + protected: + // Flush cache + void Flush() { + if (cache_) { + cache_->TEST_Flush(); + } + } + + // create threaded workload + template <class T> + std::list<port::Thread> SpawnThreads(const size_t n, const T& fn) { + std::list<port::Thread> threads; + for (size_t i = 0; i < n; i++) { + port::Thread th(fn); + threads.push_back(std::move(th)); + } + return threads; + } + + // Wait for threads to join + void Join(std::list<port::Thread>&& threads) { + for (auto& th : threads) { + th.join(); + } + threads.clear(); + } + + // Run insert workload in threads + void Insert(const size_t nthreads, const size_t max_keys) { + key_ = 0; + max_keys_ = max_keys; + // spawn threads + auto fn = std::bind(&PersistentCacheTierTest::InsertImpl, this); + auto threads = SpawnThreads(nthreads, fn); + // join with threads + Join(std::move(threads)); + // Flush cache + Flush(); + } + + // Run verification on the cache + void Verify(const size_t nthreads = 1, const bool eviction_enabled = false) { + stats_verify_hits_ = 0; + stats_verify_missed_ = 0; + key_ = 0; + // spawn threads + auto fn = + std::bind(&PersistentCacheTierTest::VerifyImpl, this, eviction_enabled); + auto threads = SpawnThreads(nthreads, fn); + // join with threads + Join(std::move(threads)); + } + + // pad 0 to numbers + std::string PaddedNumber(const size_t data, const size_t pad_size) { + assert(pad_size); + char* ret = new char[pad_size]; + int pos = static_cast<int>(pad_size) - 1; + size_t count = 0; + size_t t = data; + // copy numbers + while (t) { + count++; + ret[pos--] = '0' + t % 10; + t = t / 10; + } + // copy 0s + while (pos >= 0) { + ret[pos--] = '0'; + } + // post condition + assert(count <= pad_size); + assert(pos == -1); + std::string result(ret, pad_size); + delete[] ret; + return result; + } + + // Insert workload implementation + void InsertImpl() { + const std::string prefix = "key_prefix_"; + + while (true) { + size_t i = key_++; + if (i >= max_keys_) { + break; + } + + char data[4 * 1024]; + memset(data, '0' + (i % 10), sizeof(data)); + auto k = prefix + PaddedNumber(i, /*count=*/8); + Slice key(k); + while (true) { + Status status = cache_->Insert(key, data, sizeof(data)); + if (status.ok()) { + break; + } + ASSERT_TRUE(status.IsTryAgain()); + Env::Default()->SleepForMicroseconds(1 * 1000 * 1000); + } + } + } + + // Verification implementation + void VerifyImpl(const bool eviction_enabled = false) { + const std::string prefix = "key_prefix_"; + while (true) { + size_t i = key_++; + if (i >= max_keys_) { + break; + } + + char edata[4 * 1024]; + memset(edata, '0' + (i % 10), sizeof(edata)); + auto k = prefix + PaddedNumber(i, /*count=*/8); + Slice key(k); + std::unique_ptr<char[]> block; + size_t block_size; + + if (eviction_enabled) { + if (!cache_->Lookup(key, &block, &block_size).ok()) { + // assume that the key is evicted + stats_verify_missed_++; + continue; + } + } + + ASSERT_OK(cache_->Lookup(key, &block, &block_size)); + ASSERT_EQ(block_size, sizeof(edata)); + ASSERT_EQ(memcmp(edata, block.get(), sizeof(edata)), 0); + stats_verify_hits_++; + } + } + + // template for insert test + void RunInsertTest(const size_t nthreads, const size_t max_keys) { + Insert(nthreads, max_keys); + Verify(nthreads); + ASSERT_EQ(stats_verify_hits_, max_keys); + ASSERT_EQ(stats_verify_missed_, 0); + + ASSERT_OK(cache_->Close()); + cache_.reset(); + } + + // template for negative insert test + void RunNegativeInsertTest(const size_t nthreads, const size_t max_keys) { + Insert(nthreads, max_keys); + Verify(nthreads, /*eviction_enabled=*/true); + ASSERT_LT(stats_verify_hits_, max_keys); + ASSERT_GT(stats_verify_missed_, 0); + + ASSERT_OK(cache_->Close()); + cache_.reset(); + } + + // template for insert with eviction test + void RunInsertTestWithEviction(const size_t nthreads, const size_t max_keys) { + Insert(nthreads, max_keys); + Verify(nthreads, /*eviction_enabled=*/true); + ASSERT_EQ(stats_verify_hits_ + stats_verify_missed_, max_keys); + ASSERT_GT(stats_verify_hits_, 0); + ASSERT_GT(stats_verify_missed_, 0); + + ASSERT_OK(cache_->Close()); + cache_.reset(); + } + + const std::string path_; + std::shared_ptr<Logger> log_; + std::shared_ptr<PersistentCacheTier> cache_; + std::atomic<size_t> key_{0}; + size_t max_keys_ = 0; + std::atomic<size_t> stats_verify_hits_{0}; + std::atomic<size_t> stats_verify_missed_{0}; +}; + +// +// RocksDB tests +// +class PersistentCacheDBTest : public DBTestBase { + public: + PersistentCacheDBTest(); + + static uint64_t TestGetTickerCount(const Options& options, + Tickers ticker_type) { + return static_cast<uint32_t>( + options.statistics->getTickerCount(ticker_type)); + } + + // insert data to table + void Insert(const Options& options, + const BlockBasedTableOptions& /*table_options*/, + const int num_iter, std::vector<std::string>* values) { + CreateAndReopenWithCF({"pikachu"}, options); + // default column family doesn't have block cache + Options no_block_cache_opts; + no_block_cache_opts.statistics = options.statistics; + no_block_cache_opts = CurrentOptions(no_block_cache_opts); + BlockBasedTableOptions table_options_no_bc; + table_options_no_bc.no_block_cache = true; + no_block_cache_opts.table_factory.reset( + NewBlockBasedTableFactory(table_options_no_bc)); + ReopenWithColumnFamilies( + {"default", "pikachu"}, + std::vector<Options>({no_block_cache_opts, options})); + + Random rnd(301); + + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); + std::string str; + for (int i = 0; i < num_iter; i++) { + if (i % 4 == 0) { // high compression ratio + str = rnd.RandomString(1000); + } + values->push_back(str); + ASSERT_OK(Put(1, Key(i), (*values)[i])); + } + + // flush all data from memtable so that reads are from block cache + ASSERT_OK(Flush(1)); + } + + // verify data + void Verify(const int num_iter, const std::vector<std::string>& values) { + for (int j = 0; j < 2; ++j) { + for (int i = 0; i < num_iter; i++) { + ASSERT_EQ(Get(1, Key(i)), values[i]); + } + } + } + + // test template + void RunTest(const std::function<std::shared_ptr<PersistentCacheTier>(bool)>& + new_pcache, + const size_t max_keys, const size_t max_usecase); +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.cc b/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.cc new file mode 100644 index 000000000..54cbce8f7 --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.cc @@ -0,0 +1,167 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/persistent_cache_tier.h" + +#include <cinttypes> +#include <sstream> +#include <string> + +namespace ROCKSDB_NAMESPACE { + +std::string PersistentCacheConfig::ToString() const { + std::string ret; + ret.reserve(20000); + const int kBufferSize = 200; + char buffer[kBufferSize]; + + snprintf(buffer, kBufferSize, " path: %s\n", path.c_str()); + ret.append(buffer); + snprintf(buffer, kBufferSize, " enable_direct_reads: %d\n", + enable_direct_reads); + ret.append(buffer); + snprintf(buffer, kBufferSize, " enable_direct_writes: %d\n", + enable_direct_writes); + ret.append(buffer); + snprintf(buffer, kBufferSize, " cache_size: %" PRIu64 "\n", cache_size); + ret.append(buffer); + snprintf(buffer, kBufferSize, " cache_file_size: %" PRIu32 "\n", + cache_file_size); + ret.append(buffer); + snprintf(buffer, kBufferSize, " writer_qdepth: %" PRIu32 "\n", + writer_qdepth); + ret.append(buffer); + snprintf(buffer, kBufferSize, " pipeline_writes: %d\n", pipeline_writes); + ret.append(buffer); + snprintf(buffer, kBufferSize, + " max_write_pipeline_backlog_size: %" PRIu64 "\n", + max_write_pipeline_backlog_size); + ret.append(buffer); + snprintf(buffer, kBufferSize, " write_buffer_size: %" PRIu32 "\n", + write_buffer_size); + ret.append(buffer); + snprintf(buffer, kBufferSize, " writer_dispatch_size: %" PRIu64 "\n", + writer_dispatch_size); + ret.append(buffer); + snprintf(buffer, kBufferSize, " is_compressed: %d\n", is_compressed); + ret.append(buffer); + + return ret; +} + +// +// PersistentCacheTier implementation +// +Status PersistentCacheTier::Open() { + if (next_tier_) { + return next_tier_->Open(); + } + return Status::OK(); +} + +Status PersistentCacheTier::Close() { + if (next_tier_) { + return next_tier_->Close(); + } + return Status::OK(); +} + +bool PersistentCacheTier::Reserve(const size_t /*size*/) { + // default implementation is a pass through + return true; +} + +bool PersistentCacheTier::Erase(const Slice& /*key*/) { + // default implementation is a pass through since not all cache tiers might + // support erase + return true; +} + +std::string PersistentCacheTier::PrintStats() { + std::ostringstream os; + for (auto tier_stats : Stats()) { + os << "---- next tier -----" << std::endl; + for (auto stat : tier_stats) { + os << stat.first << ": " << stat.second << std::endl; + } + } + return os.str(); +} + +PersistentCache::StatsType PersistentCacheTier::Stats() { + if (next_tier_) { + return next_tier_->Stats(); + } + return PersistentCache::StatsType{}; +} + +uint64_t PersistentCacheTier::NewId() { + return last_id_.fetch_add(1, std::memory_order_relaxed); +} + +// +// PersistentTieredCache implementation +// +PersistentTieredCache::~PersistentTieredCache() { assert(tiers_.empty()); } + +Status PersistentTieredCache::Open() { + assert(!tiers_.empty()); + return tiers_.front()->Open(); +} + +Status PersistentTieredCache::Close() { + assert(!tiers_.empty()); + Status status = tiers_.front()->Close(); + if (status.ok()) { + tiers_.clear(); + } + return status; +} + +bool PersistentTieredCache::Erase(const Slice& key) { + assert(!tiers_.empty()); + return tiers_.front()->Erase(key); +} + +PersistentCache::StatsType PersistentTieredCache::Stats() { + assert(!tiers_.empty()); + return tiers_.front()->Stats(); +} + +std::string PersistentTieredCache::PrintStats() { + assert(!tiers_.empty()); + return tiers_.front()->PrintStats(); +} + +Status PersistentTieredCache::Insert(const Slice& page_key, const char* data, + const size_t size) { + assert(!tiers_.empty()); + return tiers_.front()->Insert(page_key, data, size); +} + +Status PersistentTieredCache::Lookup(const Slice& page_key, + std::unique_ptr<char[]>* data, + size_t* size) { + assert(!tiers_.empty()); + return tiers_.front()->Lookup(page_key, data, size); +} + +void PersistentTieredCache::AddTier(const Tier& tier) { + if (!tiers_.empty()) { + tiers_.back()->set_next_tier(tier); + } + tiers_.push_back(tier); +} + +bool PersistentTieredCache::IsCompressed() { + assert(tiers_.size()); + return tiers_.front()->IsCompressed(); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.h b/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.h new file mode 100644 index 000000000..65aadcd3f --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_tier.h @@ -0,0 +1,342 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#ifndef ROCKSDB_LITE + +#include <limits> +#include <list> +#include <map> +#include <string> +#include <vector> + +#include "monitoring/histogram.h" +#include "rocksdb/env.h" +#include "rocksdb/persistent_cache.h" +#include "rocksdb/status.h" +#include "rocksdb/system_clock.h" + +// Persistent Cache +// +// Persistent cache is tiered key-value cache that can use persistent medium. It +// is a generic design and can leverage any storage medium -- disk/SSD/NVM/RAM. +// The code has been kept generic but significant benchmark/design/development +// time has been spent to make sure the cache performs appropriately for +// respective storage medium. +// The file defines +// PersistentCacheTier : Implementation that handles individual cache tier +// PersistentTieresCache : Implementation that handles all tiers as a logical +// unit +// +// PersistentTieredCache architecture: +// +--------------------------+ PersistentCacheTier that handles multiple tiers +// | +----------------+ | +// | | RAM | PersistentCacheTier that handles RAM (VolatileCacheImpl) +// | +----------------+ | +// | | next | +// | v | +// | +----------------+ | +// | | NVM | PersistentCacheTier implementation that handles NVM +// | +----------------+ (BlockCacheImpl) +// | | next | +// | V | +// | +----------------+ | +// | | LE-SSD | PersistentCacheTier implementation that handles LE-SSD +// | +----------------+ (BlockCacheImpl) +// | | | +// | V | +// | null | +// +--------------------------+ +// | +// V +// null +namespace ROCKSDB_NAMESPACE { + +// Persistent Cache Config +// +// This struct captures all the options that are used to configure persistent +// cache. Some of the terminologies used in naming the options are +// +// dispatch size : +// This is the size in which IO is dispatched to the device +// +// write buffer size : +// This is the size of an individual write buffer size. Write buffers are +// grouped to form buffered file. +// +// cache size : +// This is the logical maximum for the cache size +// +// qdepth : +// This is the max number of IOs that can issues to the device in parallel +// +// pepeling : +// The writer code path follows pipelined architecture, which means the +// operations are handed off from one stage to another +// +// pipelining backlog size : +// With the pipelined architecture, there can always be backlogging of ops in +// pipeline queues. This is the maximum backlog size after which ops are dropped +// from queue +struct PersistentCacheConfig { + explicit PersistentCacheConfig( + Env* const _env, const std::string& _path, const uint64_t _cache_size, + const std::shared_ptr<Logger>& _log, + const uint32_t _write_buffer_size = 1 * 1024 * 1024 /*1MB*/) { + env = _env; + clock = (env != nullptr) ? env->GetSystemClock().get() + : SystemClock::Default().get(); + path = _path; + log = _log; + cache_size = _cache_size; + writer_dispatch_size = write_buffer_size = _write_buffer_size; + } + + // + // Validate the settings. Our intentions are to catch erroneous settings ahead + // of time instead going violating invariants or causing dead locks. + // + Status ValidateSettings() const { + // (1) check pre-conditions for variables + if (!env || path.empty()) { + return Status::InvalidArgument("empty or null args"); + } + + // (2) assert size related invariants + // - cache size cannot be less than cache file size + // - individual write buffer size cannot be greater than cache file size + // - total write buffer size cannot be less than 2X cache file size + if (cache_size < cache_file_size || write_buffer_size >= cache_file_size || + write_buffer_size * write_buffer_count() < 2 * cache_file_size) { + return Status::InvalidArgument("invalid cache size"); + } + + // (2) check writer settings + // - Queue depth cannot be 0 + // - writer_dispatch_size cannot be greater than writer_buffer_size + // - dispatch size and buffer size need to be aligned + if (!writer_qdepth || writer_dispatch_size > write_buffer_size || + write_buffer_size % writer_dispatch_size) { + return Status::InvalidArgument("invalid writer settings"); + } + + return Status::OK(); + } + + // + // Env abstraction to use for system level operations + // + Env* env; + SystemClock* clock; + // + // Path for the block cache where blocks are persisted + // + std::string path; + + // + // Log handle for logging messages + // + std::shared_ptr<Logger> log; + + // + // Enable direct IO for reading + // + bool enable_direct_reads = true; + + // + // Enable direct IO for writing + // + bool enable_direct_writes = false; + + // + // Logical cache size + // + uint64_t cache_size = std::numeric_limits<uint64_t>::max(); + + // cache-file-size + // + // Cache consists of multiples of small files. This parameter defines the + // size of an individual cache file + // + // default: 1M + uint32_t cache_file_size = 100ULL * 1024 * 1024; + + // writer-qdepth + // + // The writers can issues IO to the devices in parallel. This parameter + // controls the max number if IOs that can issues in parallel to the block + // device + // + // default :1 + uint32_t writer_qdepth = 1; + + // pipeline-writes + // + // The write optionally follow pipelined architecture. This helps + // avoid regression in the eviction code path of the primary tier. This + // parameter defines if pipelining is enabled or disabled + // + // default: true + bool pipeline_writes = true; + + // max-write-pipeline-backlog-size + // + // Max pipeline buffer size. This is the maximum backlog we can accumulate + // while waiting for writes. After the limit, new ops will be dropped. + // + // Default: 1GiB + uint64_t max_write_pipeline_backlog_size = 1ULL * 1024 * 1024 * 1024; + + // write-buffer-size + // + // This is the size in which buffer slabs are allocated. + // + // Default: 1M + uint32_t write_buffer_size = 1ULL * 1024 * 1024; + + // write-buffer-count + // + // This is the total number of buffer slabs. This is calculated as a factor of + // file size in order to avoid dead lock. + size_t write_buffer_count() const { + assert(write_buffer_size); + return static_cast<size_t>((writer_qdepth + 1.2) * cache_file_size / + write_buffer_size); + } + + // writer-dispatch-size + // + // The writer thread will dispatch the IO at the specified IO size + // + // default: 1M + uint64_t writer_dispatch_size = 1ULL * 1024 * 1024; + + // is_compressed + // + // This option determines if the cache will run in compressed mode or + // uncompressed mode + bool is_compressed = true; + + PersistentCacheConfig MakePersistentCacheConfig( + const std::string& path, const uint64_t size, + const std::shared_ptr<Logger>& log); + + std::string ToString() const; +}; + +// Persistent Cache Tier +// +// This a logical abstraction that defines a tier of the persistent cache. Tiers +// can be stacked over one another. PersistentCahe provides the basic definition +// for accessing/storing in the cache. PersistentCacheTier extends the interface +// to enable management and stacking of tiers. +class PersistentCacheTier : public PersistentCache { + public: + using Tier = std::shared_ptr<PersistentCacheTier>; + + virtual ~PersistentCacheTier() {} + + // Open the persistent cache tier + virtual Status Open(); + + // Close the persistent cache tier + virtual Status Close(); + + // Reserve space up to 'size' bytes + virtual bool Reserve(const size_t size); + + // Erase a key from the cache + virtual bool Erase(const Slice& key); + + // Print stats to string recursively + virtual std::string PrintStats(); + + virtual PersistentCache::StatsType Stats() override; + + // Insert to page cache + virtual Status Insert(const Slice& page_key, const char* data, + const size_t size) override = 0; + + // Lookup page cache by page identifier + virtual Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data, + size_t* size) override = 0; + + // Does it store compressed data ? + virtual bool IsCompressed() override = 0; + + virtual std::string GetPrintableOptions() const override = 0; + + virtual uint64_t NewId() override; + + // Return a reference to next tier + virtual Tier& next_tier() { return next_tier_; } + + // Set the value for next tier + virtual void set_next_tier(const Tier& tier) { + assert(!next_tier_); + next_tier_ = tier; + } + + virtual void TEST_Flush() { + if (next_tier_) { + next_tier_->TEST_Flush(); + } + } + + private: + Tier next_tier_; // next tier + std::atomic<uint64_t> last_id_{1}; +}; + +// PersistentTieredCache +// +// Abstraction that helps you construct a tiers of persistent caches as a +// unified cache. The tier(s) of cache will act a single tier for management +// ease and support PersistentCache methods for accessing data. +class PersistentTieredCache : public PersistentCacheTier { + public: + virtual ~PersistentTieredCache(); + + Status Open() override; + Status Close() override; + bool Erase(const Slice& key) override; + std::string PrintStats() override; + PersistentCache::StatsType Stats() override; + Status Insert(const Slice& page_key, const char* data, + const size_t size) override; + Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data, + size_t* size) override; + bool IsCompressed() override; + + std::string GetPrintableOptions() const override { + return "PersistentTieredCache"; + } + + void AddTier(const Tier& tier); + + Tier& next_tier() override { + auto it = tiers_.end(); + return (*it)->next_tier(); + } + + void set_next_tier(const Tier& tier) override { + auto it = tiers_.end(); + (*it)->set_next_tier(tier); + } + + void TEST_Flush() override { + assert(!tiers_.empty()); + tiers_.front()->TEST_Flush(); + PersistentCacheTier::TEST_Flush(); + } + + protected: + std::list<Tier> tiers_; // list of tiers top-down +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/persistent_cache_util.h b/src/rocksdb/utilities/persistent_cache/persistent_cache_util.h new file mode 100644 index 000000000..2a769652d --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/persistent_cache_util.h @@ -0,0 +1,67 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include <limits> +#include <list> + +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +// +// Simple synchronized queue implementation with the option of +// bounding the queue +// +// On overflow, the elements will be discarded +// +template <class T> +class BoundedQueue { + public: + explicit BoundedQueue( + const size_t max_size = std::numeric_limits<size_t>::max()) + : cond_empty_(&lock_), max_size_(max_size) {} + + virtual ~BoundedQueue() {} + + void Push(T&& t) { + MutexLock _(&lock_); + if (max_size_ != std::numeric_limits<size_t>::max() && + size_ + t.Size() >= max_size_) { + // overflow + return; + } + + size_ += t.Size(); + q_.push_back(std::move(t)); + cond_empty_.SignalAll(); + } + + T Pop() { + MutexLock _(&lock_); + while (q_.empty()) { + cond_empty_.Wait(); + } + + T t = std::move(q_.front()); + size_ -= t.Size(); + q_.pop_front(); + return t; + } + + size_t Size() const { + MutexLock _(&lock_); + return size_; + } + + private: + mutable port::Mutex lock_; + port::CondVar cond_empty_; + std::list<T> q_; + size_t size_ = 0; + const size_t max_size_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.cc b/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.cc new file mode 100644 index 000000000..45d2830aa --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.cc @@ -0,0 +1,140 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/volatile_tier_impl.h" + +#include <string> + +namespace ROCKSDB_NAMESPACE { + +void VolatileCacheTier::DeleteCacheData(VolatileCacheTier::CacheData* data) { + assert(data); + delete data; +} + +VolatileCacheTier::~VolatileCacheTier() { index_.Clear(&DeleteCacheData); } + +PersistentCache::StatsType VolatileCacheTier::Stats() { + std::map<std::string, double> stat; + stat.insert({"persistent_cache.volatile_cache.hits", + static_cast<double>(stats_.cache_hits_)}); + stat.insert({"persistent_cache.volatile_cache.misses", + static_cast<double>(stats_.cache_misses_)}); + stat.insert({"persistent_cache.volatile_cache.inserts", + static_cast<double>(stats_.cache_inserts_)}); + stat.insert({"persistent_cache.volatile_cache.evicts", + static_cast<double>(stats_.cache_evicts_)}); + stat.insert({"persistent_cache.volatile_cache.hit_pct", + static_cast<double>(stats_.CacheHitPct())}); + stat.insert({"persistent_cache.volatile_cache.miss_pct", + static_cast<double>(stats_.CacheMissPct())}); + + auto out = PersistentCacheTier::Stats(); + out.push_back(stat); + return out; +} + +Status VolatileCacheTier::Insert(const Slice& page_key, const char* data, + const size_t size) { + // precondition + assert(data); + assert(size); + + // increment the size + size_ += size; + + // check if we have overshot the limit, if so evict some space + while (size_ > max_size_) { + if (!Evict()) { + // unable to evict data, we give up so we don't spike read + // latency + assert(size_ >= size); + size_ -= size; + return Status::TryAgain("Unable to evict any data"); + } + } + + assert(size_ >= size); + + // insert order: LRU, followed by index + std::string key(page_key.data(), page_key.size()); + std::string value(data, size); + std::unique_ptr<CacheData> cache_data( + new CacheData(std::move(key), std::move(value))); + bool ok = index_.Insert(cache_data.get()); + if (!ok) { + // decrement the size that we incremented ahead of time + assert(size_ >= size); + size_ -= size; + // failed to insert to cache, block already in cache + return Status::TryAgain("key already exists in volatile cache"); + } + + cache_data.release(); + stats_.cache_inserts_++; + return Status::OK(); +} + +Status VolatileCacheTier::Lookup(const Slice& page_key, + std::unique_ptr<char[]>* result, + size_t* size) { + CacheData key(std::move(page_key.ToString())); + CacheData* kv; + bool ok = index_.Find(&key, &kv); + if (ok) { + // set return data + result->reset(new char[kv->value.size()]); + memcpy(result->get(), kv->value.c_str(), kv->value.size()); + *size = kv->value.size(); + // drop the reference on cache data + kv->refs_--; + // update stats + stats_.cache_hits_++; + return Status::OK(); + } + + stats_.cache_misses_++; + + if (next_tier()) { + return next_tier()->Lookup(page_key, result, size); + } + + return Status::NotFound("key not found in volatile cache"); +} + +bool VolatileCacheTier::Erase(const Slice& /*key*/) { + assert(!"not supported"); + return true; +} + +bool VolatileCacheTier::Evict() { + CacheData* edata = index_.Evict(); + if (!edata) { + // not able to evict any object + return false; + } + + stats_.cache_evicts_++; + + // push the evicted object to the next level + if (next_tier()) { + // TODO: Should the insert error be ignored? + Status s = next_tier()->Insert(Slice(edata->key), edata->value.c_str(), + edata->value.size()); + s.PermitUncheckedError(); + } + + // adjust size and destroy data + size_ -= edata->value.size(); + delete edata; + + return true; +} + +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.h b/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.h new file mode 100644 index 000000000..09265e457 --- /dev/null +++ b/src/rocksdb/utilities/persistent_cache/volatile_tier_impl.h @@ -0,0 +1,141 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#ifndef ROCKSDB_LITE + +#include <atomic> +#include <limits> +#include <sstream> +#include <string> +#include <vector> + +#include "rocksdb/cache.h" +#include "utilities/persistent_cache/hash_table.h" +#include "utilities/persistent_cache/hash_table_evictable.h" +#include "utilities/persistent_cache/persistent_cache_tier.h" + +// VolatileCacheTier +// +// This file provides persistent cache tier implementation for caching +// key/values in RAM. +// +// key/values +// | +// V +// +-------------------+ +// | VolatileCacheTier | Store in an evictable hash table +// +-------------------+ +// | +// V +// on eviction +// pushed to next tier +// +// The implementation is designed to be concurrent. The evictable hash table +// implementation is not concurrent at this point though. +// +// The eviction algorithm is LRU +namespace ROCKSDB_NAMESPACE { + +class VolatileCacheTier : public PersistentCacheTier { + public: + explicit VolatileCacheTier( + const bool is_compressed = true, + const size_t max_size = std::numeric_limits<size_t>::max()) + : is_compressed_(is_compressed), max_size_(max_size) {} + + virtual ~VolatileCacheTier(); + + // insert to cache + Status Insert(const Slice& page_key, const char* data, + const size_t size) override; + // lookup key in cache + Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data, + size_t* size) override; + + // is compressed cache ? + bool IsCompressed() override { return is_compressed_; } + + // erase key from cache + bool Erase(const Slice& key) override; + + std::string GetPrintableOptions() const override { + return "VolatileCacheTier"; + } + + // Expose stats as map + PersistentCache::StatsType Stats() override; + + private: + // + // Cache data abstraction + // + struct CacheData : LRUElement<CacheData> { + explicit CacheData(CacheData&& rhs) noexcept + : key(std::move(rhs.key)), value(std::move(rhs.value)) {} + + explicit CacheData(const std::string& _key, const std::string& _value = "") + : key(_key), value(_value) {} + + virtual ~CacheData() {} + + const std::string key; + const std::string value; + }; + + static void DeleteCacheData(CacheData* data); + + // + // Index and LRU definition + // + struct CacheDataHash { + uint64_t operator()(const CacheData* obj) const { + assert(obj); + return std::hash<std::string>()(obj->key); + } + }; + + struct CacheDataEqual { + bool operator()(const CacheData* lhs, const CacheData* rhs) const { + assert(lhs); + assert(rhs); + return lhs->key == rhs->key; + } + }; + + struct Statistics { + std::atomic<uint64_t> cache_misses_{0}; + std::atomic<uint64_t> cache_hits_{0}; + std::atomic<uint64_t> cache_inserts_{0}; + std::atomic<uint64_t> cache_evicts_{0}; + + double CacheHitPct() const { + auto lookups = cache_hits_ + cache_misses_; + return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0; + } + + double CacheMissPct() const { + auto lookups = cache_hits_ + cache_misses_; + return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0; + } + }; + + using IndexType = + EvictableHashTable<CacheData, CacheDataHash, CacheDataEqual>; + + // Evict LRU tail + bool Evict(); + + const bool is_compressed_ = true; // does it store compressed data + IndexType index_; // in-memory cache + std::atomic<uint64_t> max_size_{0}; // Maximum size of the cache + std::atomic<uint64_t> size_{0}; // Size of the cache + Statistics stats_; +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif |