diff options
Diffstat (limited to 'src/kv')
-rw-r--r-- | src/kv/CMakeLists.txt | 17 | ||||
-rw-r--r-- | src/kv/KeyValueDB.cc | 48 | ||||
-rw-r--r-- | src/kv/KeyValueDB.h | 440 | ||||
-rw-r--r-- | src/kv/LevelDBStore.cc | 455 | ||||
-rw-r--r-- | src/kv/LevelDBStore.h | 412 | ||||
-rw-r--r-- | src/kv/MemDB.cc | 661 | ||||
-rw-r--r-- | src/kv/MemDB.h | 222 | ||||
-rw-r--r-- | src/kv/RocksDBStore.cc | 3453 | ||||
-rw-r--r-- | src/kv/RocksDBStore.h | 547 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/BinnedLRUCache.cc | 624 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/BinnedLRUCache.h | 336 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/ShardedCache.cc | 159 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/ShardedCache.h | 141 |
13 files changed, 7515 insertions, 0 deletions
diff --git a/src/kv/CMakeLists.txt b/src/kv/CMakeLists.txt new file mode 100644 index 000000000..057eb8260 --- /dev/null +++ b/src/kv/CMakeLists.txt @@ -0,0 +1,17 @@ +set(kv_srcs + KeyValueDB.cc + MemDB.cc + RocksDBStore.cc + rocksdb_cache/ShardedCache.cc + rocksdb_cache/BinnedLRUCache.cc) + +if (WITH_LEVELDB) + list(APPEND kv_srcs LevelDBStore.cc) +endif (WITH_LEVELDB) + +add_library(kv STATIC ${kv_srcs} + $<TARGET_OBJECTS:common_prioritycache_obj>) + +target_link_libraries(kv ${LEVELDB_LIBRARIES} + RocksDB::RocksDB + heap_profiler) diff --git a/src/kv/KeyValueDB.cc b/src/kv/KeyValueDB.cc new file mode 100644 index 000000000..f050040c1 --- /dev/null +++ b/src/kv/KeyValueDB.cc @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "KeyValueDB.h" +#ifdef WITH_LEVELDB +#include "LevelDBStore.h" +#endif +#include "MemDB.h" +#include "RocksDBStore.h" + +using std::map; +using std::string; + +KeyValueDB *KeyValueDB::create(CephContext *cct, const string& type, + const string& dir, + map<string,string> options, + void *p) +{ +#ifdef WITH_LEVELDB + if (type == "leveldb") { + return new LevelDBStore(cct, dir); + } +#endif + if (type == "rocksdb") { + return new RocksDBStore(cct, dir, options, p); + } + if ((type == "memdb") && + cct->check_experimental_feature_enabled("memdb")) { + return new MemDB(cct, dir, p); + } + return NULL; +} + +int KeyValueDB::test_init(const string& type, const string& dir) +{ +#ifdef WITH_LEVELDB + if (type == "leveldb") { + return LevelDBStore::_test_init(dir); + } +#endif + if (type == "rocksdb") { + return RocksDBStore::_test_init(dir); + } + if (type == "memdb") { + return MemDB::_test_init(dir); + } + return -EINVAL; +} diff --git a/src/kv/KeyValueDB.h b/src/kv/KeyValueDB.h new file mode 100644 index 000000000..9e3cb3c3d --- /dev/null +++ b/src/kv/KeyValueDB.h @@ -0,0 +1,440 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef KEY_VALUE_DB_H +#define KEY_VALUE_DB_H + +#include "include/buffer.h" +#include <ostream> +#include <set> +#include <map> +#include <optional> +#include <string> +#include <boost/scoped_ptr.hpp> +#include "include/encoding.h" +#include "common/Formatter.h" +#include "common/perf_counters.h" +#include "common/PriorityCache.h" + +/** + * Defines virtual interface to be implemented by key value store + * + * Kyoto Cabinet or LevelDB should implement this + */ +class KeyValueDB { +public: + class TransactionImpl { + public: + /// Set Keys + void set( + const std::string &prefix, ///< [in] Prefix for keys, or CF name + const std::map<std::string, ceph::buffer::list> &to_set ///< [in] keys/values to set + ) { + for (auto it = to_set.cbegin(); it != to_set.cend(); ++it) + set(prefix, it->first, it->second); + } + + /// Set Keys (via encoded ceph::buffer::list) + void set( + const std::string &prefix, ///< [in] prefix, or CF name + ceph::buffer::list& to_set_bl ///< [in] encoded key/values to set + ) { + using ceph::decode; + auto p = std::cbegin(to_set_bl); + uint32_t num; + decode(num, p); + while (num--) { + std::string key; + ceph::buffer::list value; + decode(key, p); + decode(value, p); + set(prefix, key, value); + } + } + + /// Set Key + virtual void set( + const std::string &prefix, ///< [in] Prefix or CF for the key + const std::string &k, ///< [in] Key to set + const ceph::buffer::list &bl ///< [in] Value to set + ) = 0; + virtual void set( + const std::string &prefix, + const char *k, + size_t keylen, + const ceph::buffer::list& bl) { + set(prefix, std::string(k, keylen), bl); + } + + /// Removes Keys (via encoded ceph::buffer::list) + void rmkeys( + const std::string &prefix, ///< [in] Prefix or CF to search for + ceph::buffer::list &keys_bl ///< [in] Keys to remove + ) { + using ceph::decode; + auto p = std::cbegin(keys_bl); + uint32_t num; + decode(num, p); + while (num--) { + std::string key; + decode(key, p); + rmkey(prefix, key); + } + } + + /// Removes Keys + void rmkeys( + const std::string &prefix, ///< [in] Prefix/CF to search for + const std::set<std::string> &keys ///< [in] Keys to remove + ) { + for (auto it = keys.cbegin(); it != keys.cend(); ++it) + rmkey(prefix, *it); + } + + /// Remove Key + virtual void rmkey( + const std::string &prefix, ///< [in] Prefix/CF to search for + const std::string &k ///< [in] Key to remove + ) = 0; + virtual void rmkey( + const std::string &prefix, ///< [in] Prefix to search for + const char *k, ///< [in] Key to remove + size_t keylen + ) { + rmkey(prefix, std::string(k, keylen)); + } + + /// Remove Single Key which exists and was not overwritten. + /// This API is only related to performance optimization, and should only be + /// re-implemented by log-insert-merge tree based keyvalue stores(such as RocksDB). + /// If a key is overwritten (by calling set multiple times), then the result + /// of calling rm_single_key on this key is undefined. + virtual void rm_single_key( + const std::string &prefix, ///< [in] Prefix/CF to search for + const std::string &k ///< [in] Key to remove + ) { return rmkey(prefix, k);} + + /// Removes keys beginning with prefix + virtual void rmkeys_by_prefix( + const std::string &prefix ///< [in] Prefix/CF by which to remove keys + ) = 0; + + virtual void rm_range_keys( + const std::string &prefix, ///< [in] Prefix by which to remove keys + const std::string &start, ///< [in] The start bound of remove keys + const std::string &end ///< [in] The start bound of remove keys + ) = 0; + + /// Merge value into key + virtual void merge( + const std::string &prefix, ///< [in] Prefix/CF ==> MUST match some established merge operator + const std::string &key, ///< [in] Key to be merged + const ceph::buffer::list &value ///< [in] value to be merged into key + ) { ceph_abort_msg("Not implemented"); } + + virtual ~TransactionImpl() {} + }; + typedef std::shared_ptr< TransactionImpl > Transaction; + + /// create a new instance + static KeyValueDB *create(CephContext *cct, const std::string& type, + const std::string& dir, + std::map<std::string,std::string> options = {}, + void *p = NULL); + + /// test whether we can successfully initialize; may have side effects (e.g., create) + static int test_init(const std::string& type, const std::string& dir); + virtual int init(std::string option_str="") = 0; + virtual int open(std::ostream &out, const std::string& cfs="") = 0; + // std::vector cfs contains column families to be created when db is created. + virtual int create_and_open(std::ostream &out, const std::string& cfs="") = 0; + + virtual int open_read_only(std::ostream &out, const std::string& cfs="") { + return -ENOTSUP; + } + + virtual void close() { } + + /// Try to repair K/V database. leveldb and rocksdb require that database must be not opened. + virtual int repair(std::ostream &out) { return 0; } + + virtual Transaction get_transaction() = 0; + virtual int submit_transaction(Transaction) = 0; + virtual int submit_transaction_sync(Transaction t) { + return submit_transaction(t); + } + + /// Retrieve Keys + virtual int get( + const std::string &prefix, ///< [in] Prefix/CF for key + const std::set<std::string> &key, ///< [in] Key to retrieve + std::map<std::string, ceph::buffer::list> *out ///< [out] Key value retrieved + ) = 0; + virtual int get(const std::string &prefix, ///< [in] prefix or CF name + const std::string &key, ///< [in] key + ceph::buffer::list *value) { ///< [out] value + std::set<std::string> ks; + ks.insert(key); + std::map<std::string,ceph::buffer::list> om; + int r = get(prefix, ks, &om); + if (om.find(key) != om.end()) { + *value = std::move(om[key]); + } else { + *value = ceph::buffer::list(); + r = -ENOENT; + } + return r; + } + virtual int get(const std::string &prefix, + const char *key, size_t keylen, + ceph::buffer::list *value) { + return get(prefix, std::string(key, keylen), value); + } + + // This superclass is used both by kv iterators *and* by the ObjectMap + // omap iterator. The class hierarchies are unfortunately tied together + // by the legacy DBOjectMap implementation :(. + class SimplestIteratorImpl { + public: + virtual int seek_to_first() = 0; + virtual int upper_bound(const std::string &after) = 0; + virtual int lower_bound(const std::string &to) = 0; + virtual bool valid() = 0; + virtual int next() = 0; + virtual std::string key() = 0; + virtual std::string tail_key() { + return ""; + } + virtual ceph::buffer::list value() = 0; + virtual int status() = 0; + virtual ~SimplestIteratorImpl() {} + }; + + class IteratorImpl : public SimplestIteratorImpl { + public: + virtual ~IteratorImpl() {} + virtual int seek_to_last() = 0; + virtual int prev() = 0; + virtual std::pair<std::string, std::string> raw_key() = 0; + virtual ceph::buffer::ptr value_as_ptr() { + ceph::buffer::list bl = value(); + if (bl.length() == 1) { + return *bl.buffers().begin(); + } else if (bl.length() == 0) { + return ceph::buffer::ptr(); + } else { + ceph_abort(); + } + } + }; + typedef std::shared_ptr< IteratorImpl > Iterator; + + // This is the low-level iterator implemented by the underlying KV store. + class WholeSpaceIteratorImpl { + public: + virtual int seek_to_first() = 0; + virtual int seek_to_first(const std::string &prefix) = 0; + virtual int seek_to_last() = 0; + virtual int seek_to_last(const std::string &prefix) = 0; + virtual int upper_bound(const std::string &prefix, const std::string &after) = 0; + virtual int lower_bound(const std::string &prefix, const std::string &to) = 0; + virtual bool valid() = 0; + virtual int next() = 0; + virtual int prev() = 0; + virtual std::string key() = 0; + virtual std::pair<std::string,std::string> raw_key() = 0; + virtual bool raw_key_is_prefixed(const std::string &prefix) = 0; + virtual ceph::buffer::list value() = 0; + virtual ceph::buffer::ptr value_as_ptr() { + ceph::buffer::list bl = value(); + if (bl.length()) { + return *bl.buffers().begin(); + } else { + return ceph::buffer::ptr(); + } + } + virtual int status() = 0; + virtual size_t key_size() { + return 0; + } + virtual size_t value_size() { + return 0; + } + virtual ~WholeSpaceIteratorImpl() { } + }; + typedef std::shared_ptr< WholeSpaceIteratorImpl > WholeSpaceIterator; + +private: + // This class filters a WholeSpaceIterator by a prefix. + class PrefixIteratorImpl : public IteratorImpl { + const std::string prefix; + WholeSpaceIterator generic_iter; + public: + PrefixIteratorImpl(const std::string &prefix, WholeSpaceIterator iter) : + prefix(prefix), generic_iter(iter) { } + ~PrefixIteratorImpl() override { } + + int seek_to_first() override { + return generic_iter->seek_to_first(prefix); + } + int seek_to_last() override { + return generic_iter->seek_to_last(prefix); + } + int upper_bound(const std::string &after) override { + return generic_iter->upper_bound(prefix, after); + } + int lower_bound(const std::string &to) override { + return generic_iter->lower_bound(prefix, to); + } + bool valid() override { + if (!generic_iter->valid()) + return false; + return generic_iter->raw_key_is_prefixed(prefix); + } + int next() override { + return generic_iter->next(); + } + int prev() override { + return generic_iter->prev(); + } + std::string key() override { + return generic_iter->key(); + } + std::pair<std::string, std::string> raw_key() override { + return generic_iter->raw_key(); + } + ceph::buffer::list value() override { + return generic_iter->value(); + } + ceph::buffer::ptr value_as_ptr() override { + return generic_iter->value_as_ptr(); + } + int status() override { + return generic_iter->status(); + } + }; +public: + typedef uint32_t IteratorOpts; + static const uint32_t ITERATOR_NOCACHE = 1; + + struct IteratorBounds { + std::optional<std::string> lower_bound; + std::optional<std::string> upper_bound; + }; + + virtual WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) = 0; + virtual Iterator get_iterator(const std::string &prefix, IteratorOpts opts = 0, IteratorBounds bounds = IteratorBounds()) { + return std::make_shared<PrefixIteratorImpl>( + prefix, + get_wholespace_iterator(opts)); + } + + virtual uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) = 0; + virtual int get_statfs(struct store_statfs_t *buf) { + return -EOPNOTSUPP; + } + + virtual int set_cache_size(uint64_t) { + return -EOPNOTSUPP; + } + + virtual int set_cache_high_pri_pool_ratio(double ratio) { + return -EOPNOTSUPP; + } + + virtual int64_t get_cache_usage() const { + return -EOPNOTSUPP; + } + + virtual int64_t get_cache_usage(std::string prefix) const { + return -EOPNOTSUPP; + } + + virtual std::shared_ptr<PriorityCache::PriCache> get_priority_cache() const { + return nullptr; + } + + virtual std::shared_ptr<PriorityCache::PriCache> get_priority_cache(std::string prefix) const { + return nullptr; + } + + + + virtual ~KeyValueDB() {} + + /// estimate space utilization for a prefix (in bytes) + virtual int64_t estimate_prefix_size(const std::string& prefix, + const std::string& key_prefix) { + return 0; + } + + /// compact the underlying store + virtual void compact() {} + + /// compact the underlying store in async mode + virtual void compact_async() {} + + /// compact db for all keys with a given prefix + virtual void compact_prefix(const std::string& prefix) {} + /// compact db for all keys with a given prefix, async + virtual void compact_prefix_async(const std::string& prefix) {} + virtual void compact_range(const std::string& prefix, + const std::string& start, const std::string& end) {} + virtual void compact_range_async(const std::string& prefix, + const std::string& start, const std::string& end) {} + + // See RocksDB merge operator definition, we support the basic + // associative merge only right now. + class MergeOperator { + public: + /// Merge into a key that doesn't exist + virtual void merge_nonexistent( + const char *rdata, size_t rlen, + std::string *new_value) = 0; + /// Merge into a key that does exist + virtual void merge( + const char *ldata, size_t llen, + const char *rdata, size_t rlen, + std::string *new_value) = 0; + /// We use each operator name and each prefix to construct the overall RocksDB operator name for consistency check at open time. + virtual const char *name() const = 0; + + virtual ~MergeOperator() {} + }; + + /// Setup one or more operators, this needs to be done BEFORE the DB is opened. + virtual int set_merge_operator(const std::string& prefix, + std::shared_ptr<MergeOperator> mop) { + return -EOPNOTSUPP; + } + + virtual void get_statistics(ceph::Formatter *f) { + return; + } + + /** + * Return your perf counters if you have any. Subclasses are not + * required to implement this, and callers must respect a null return + * value. + */ + virtual PerfCounters *get_perf_counters() { + return nullptr; + } + + /** + * Access implementation specific integral property corresponding + * to passed property and prefic. + * Return value is true if property is valid for prefix, populates out. + */ + virtual bool get_property( + const std::string &property, + uint64_t *out) { + return false; + } +protected: + /// List of matching prefixes/ColumnFamilies and merge operators + std::vector<std::pair<std::string, + std::shared_ptr<MergeOperator> > > merge_ops; + +}; + +#endif diff --git a/src/kv/LevelDBStore.cc b/src/kv/LevelDBStore.cc new file mode 100644 index 000000000..6142e7505 --- /dev/null +++ b/src/kv/LevelDBStore.cc @@ -0,0 +1,455 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "LevelDBStore.h" + +#include <set> +#include <map> +#include <string> +#include <cerrno> + +#include "common/debug.h" +#include "common/perf_counters.h" + +// re-include our assert to clobber the system one; fix dout: +#include "include/ceph_assert.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_leveldb +#undef dout_prefix +#define dout_prefix *_dout << "leveldb: " + +using std::list; +using std::string; +using std::ostream; +using std::pair; +using std::vector; + +using ceph::bufferlist; +using ceph::bufferptr; + +class CephLevelDBLogger : public leveldb::Logger { + CephContext *cct; +public: + explicit CephLevelDBLogger(CephContext *c) : cct(c) { + cct->get(); + } + ~CephLevelDBLogger() override { + cct->put(); + } + + // Write an entry to the log file with the specified format. + void Logv(const char* format, va_list ap) override { + dout(1); + char buf[65536]; + vsnprintf(buf, sizeof(buf), format, ap); + *_dout << buf << dendl; + } +}; + +leveldb::Logger *create_leveldb_ceph_logger() +{ + return new CephLevelDBLogger(g_ceph_context); +} + +int LevelDBStore::init(string option_str) +{ + // init defaults. caller can override these if they want + // prior to calling open. + options.write_buffer_size = g_conf()->leveldb_write_buffer_size; + options.cache_size = g_conf()->leveldb_cache_size; + options.block_size = g_conf()->leveldb_block_size; + options.bloom_size = g_conf()->leveldb_bloom_size; + options.compression_enabled = g_conf()->leveldb_compression; + options.paranoid_checks = g_conf()->leveldb_paranoid; + options.max_open_files = g_conf()->leveldb_max_open_files; + options.log_file = g_conf()->leveldb_log; + return 0; +} + +int LevelDBStore::open(ostream &out, const std::string& cfs) { + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, false); +} + +int LevelDBStore::create_and_open(ostream &out, const std::string& cfs) { + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, true); +} + +int LevelDBStore::load_leveldb_options(bool create_if_missing, leveldb::Options &ldoptions) +{ + if (options.write_buffer_size) + ldoptions.write_buffer_size = options.write_buffer_size; + if (options.max_open_files) + ldoptions.max_open_files = options.max_open_files; + if (options.cache_size) { + leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size); + db_cache.reset(_db_cache); + ldoptions.block_cache = db_cache.get(); + } + if (options.block_size) + ldoptions.block_size = options.block_size; + if (options.bloom_size) { +#ifdef HAVE_LEVELDB_FILTER_POLICY + const leveldb::FilterPolicy *_filterpolicy = + leveldb::NewBloomFilterPolicy(options.bloom_size); + filterpolicy.reset(_filterpolicy); + ldoptions.filter_policy = filterpolicy.get(); +#else + ceph_abort_msg("bloom size set but installed leveldb doesn't support bloom filters"); +#endif + } + if (options.compression_enabled) + ldoptions.compression = leveldb::kSnappyCompression; + else + ldoptions.compression = leveldb::kNoCompression; + if (options.block_restart_interval) + ldoptions.block_restart_interval = options.block_restart_interval; + + ldoptions.error_if_exists = options.error_if_exists; + ldoptions.paranoid_checks = options.paranoid_checks; + ldoptions.create_if_missing = create_if_missing; + + if (g_conf()->leveldb_log_to_ceph_log) { + ceph_logger = new CephLevelDBLogger(g_ceph_context); + ldoptions.info_log = ceph_logger; + } + + if (options.log_file.length()) { + leveldb::Env *env = leveldb::Env::Default(); + env->NewLogger(options.log_file, &ldoptions.info_log); + } + return 0; +} + +int LevelDBStore::do_open(ostream &out, bool create_if_missing) +{ + leveldb::Options ldoptions; + int r = load_leveldb_options(create_if_missing, ldoptions); + if (r) { + dout(1) << "load leveldb options failed" << dendl; + return r; + } + + leveldb::DB *_db; + leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db); + db.reset(_db); + if (!status.ok()) { + out << status.ToString() << std::endl; + return -EINVAL; + } + + PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last); + plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets"); + plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions"); + plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency"); + plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency"); + plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency"); + plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions"); + plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range"); + plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue"); + plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + + if (g_conf()->leveldb_compact_on_mount) { + derr << "Compacting leveldb store..." << dendl; + compact(); + derr << "Finished compacting leveldb store" << dendl; + } + return 0; +} + +int LevelDBStore::_test_init(const string& dir) +{ + leveldb::Options options; + options.create_if_missing = true; + leveldb::DB *db; + leveldb::Status status = leveldb::DB::Open(options, dir, &db); + delete db; + return status.ok() ? 0 : -EIO; +} + +LevelDBStore::~LevelDBStore() +{ + close(); +} + +void LevelDBStore::close() +{ + // stop compaction thread + compact_queue_lock.lock(); + if (compact_thread.is_started()) { + compact_queue_stop = true; + compact_queue_cond.notify_all(); + compact_queue_lock.unlock(); + compact_thread.join(); + } else { + compact_queue_lock.unlock(); + } + + if (logger) { + cct->get_perfcounters_collection()->remove(logger); + delete logger; + logger = nullptr; + } + + // Ensure db is destroyed before dependent db_cache and filterpolicy + db.reset(); + delete ceph_logger; +} + +int LevelDBStore::repair(std::ostream &out) +{ + leveldb::Options ldoptions; + int r = load_leveldb_options(false, ldoptions); + if (r) { + dout(1) << "load leveldb options failed" << dendl; + out << "load leveldb options failed" << std::endl; + return r; + } + leveldb::Status status = leveldb::RepairDB(path, ldoptions); + if (status.ok()) { + return 0; + } else { + out << "repair leveldb failed : " << status.ToString() << std::endl; + return 1; + } +} + +int LevelDBStore::submit_transaction(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + LevelDBTransactionImpl * _t = + static_cast<LevelDBTransactionImpl *>(t.get()); + leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat)); + utime_t lat = ceph_clock_now() - start; + logger->inc(l_leveldb_txns); + logger->tinc(l_leveldb_submit_latency, lat); + return s.ok() ? 0 : -1; +} + +int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + LevelDBTransactionImpl * _t = + static_cast<LevelDBTransactionImpl *>(t.get()); + leveldb::WriteOptions options; + options.sync = true; + leveldb::Status s = db->Write(options, &(_t->bat)); + utime_t lat = ceph_clock_now() - start; + logger->inc(l_leveldb_txns); + logger->tinc(l_leveldb_submit_sync_latency, lat); + return s.ok() ? 0 : -1; +} + +void LevelDBStore::LevelDBTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + size_t bllen = to_set_bl.length(); + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && bllen > 0) { + // bufferlist contains just one ptr or they're contiguous + bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen)); + } else if ((bllen <= 32 * 1024) && (bllen > 0)) { + // 2+ bufferptrs that are not contiguopus + // allocate buffer on stack and copy bl contents to that buffer + // make sure the buffer isn't too large or we might crash here... + char* slicebuf = (char*) alloca(bllen); + leveldb::Slice newslice(slicebuf, bllen); + for (const auto& node : to_set_bl.buffers()) { + const size_t ptrlen = node.length(); + memcpy(static_cast<void*>(slicebuf), node.c_str(), ptrlen); + slicebuf += ptrlen; + } + bat.Put(leveldb::Slice(key), newslice); + } else { + // 2+ bufferptrs that are not contiguous, and enormous in size + bufferlist val = to_set_bl; + bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length())); + } +} + +void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + string key = combine_strings(prefix, k); + bat.Delete(leveldb::Slice(key)); +} + +void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + KeyValueDB::Iterator it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + bat.Delete(leveldb::Slice(combine_strings(prefix, it->key()))); + } +} + +void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end) +{ + KeyValueDB::Iterator it = db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + bat.Delete(combine_strings(prefix, it->key())); + it->next(); + } +} + +int LevelDBStore::get( + const string &prefix, + const std::set<string> &keys, + std::map<string, bufferlist> *out) +{ + utime_t start = ceph_clock_now(); + for (std::set<string>::const_iterator i = keys.begin(); + i != keys.end(); ++i) { + std::string value; + std::string bound = combine_strings(prefix, *i); + auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value); + if (status.ok()) + (*out)[*i].append(value); + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_leveldb_gets); + logger->tinc(l_leveldb_get_latency, lat); + return 0; +} + +int LevelDBStore::get(const string &prefix, + const string &key, + bufferlist *out) +{ + ceph_assert(out && (out->length() == 0)); + utime_t start = ceph_clock_now(); + int r = 0; + string value, k; + leveldb::Status s; + k = combine_strings(prefix, key); + s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value); + if (s.ok()) { + out->append(value); + } else { + r = -ENOENT; + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_leveldb_gets); + logger->tinc(l_leveldb_get_latency, lat); + return r; +} + +string LevelDBStore::combine_strings(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(0); + out.append(value); + return out; +} + +bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in) +{ + bufferlist bl; + bl.append(bufferptr(in.data(), in.size())); + return bl; +} + +int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key) +{ + size_t prefix_len = 0; + + // Find separator inside Slice + char* separator = (char*) memchr(in.data(), 0, in.size()); + if (separator == NULL) + return -EINVAL; + prefix_len = size_t(separator - in.data()); + if (prefix_len >= in.size()) + return -EINVAL; + + if (prefix) + *prefix = string(in.data(), prefix_len); + if (key) + *key = string(separator+1, in.size() - prefix_len - 1); + return 0; +} + +void LevelDBStore::compact() +{ + logger->inc(l_leveldb_compact); + db->CompactRange(NULL, NULL); +} + + +void LevelDBStore::compact_thread_entry() +{ + std::unique_lock l{compact_queue_lock}; + while (!compact_queue_stop) { + while (!compact_queue.empty()) { + pair<string,string> range = compact_queue.front(); + compact_queue.pop_front(); + logger->set(l_leveldb_compact_queue_len, compact_queue.size()); + l.unlock(); + logger->inc(l_leveldb_compact_range); + if (range.first.empty() && range.second.empty()) { + compact(); + } else { + compact_range(range.first, range.second); + } + l.lock(); + continue; + } + if (compact_queue_stop) + break; + compact_queue_cond.wait(l); + } +} + +void LevelDBStore::compact_range_async(const string& start, const string& end) +{ + std::lock_guard l(compact_queue_lock); + + // try to merge adjacent ranges. this is O(n), but the queue should + // be short. note that we do not cover all overlap cases and merge + // opportunities here, but we capture the ones we currently need. + list< pair<string,string> >::iterator p = compact_queue.begin(); + while (p != compact_queue.end()) { + if (p->first == start && p->second == end) { + // dup; no-op + return; + } + if (p->first <= end && p->first > start) { + // merge with existing range to the right + compact_queue.push_back(make_pair(start, p->second)); + compact_queue.erase(p); + logger->inc(l_leveldb_compact_queue_merge); + break; + } + if (p->second >= start && p->second < end) { + // merge with existing range to the left + compact_queue.push_back(make_pair(p->first, end)); + compact_queue.erase(p); + logger->inc(l_leveldb_compact_queue_merge); + break; + } + ++p; + } + if (p == compact_queue.end()) { + // no merge, new entry. + compact_queue.push_back(make_pair(start, end)); + logger->set(l_leveldb_compact_queue_len, compact_queue.size()); + } + compact_queue_cond.notify_all(); + if (!compact_thread.is_started()) { + compact_thread.create("levdbst_compact"); + } +} diff --git a/src/kv/LevelDBStore.h b/src/kv/LevelDBStore.h new file mode 100644 index 000000000..085193ee0 --- /dev/null +++ b/src/kv/LevelDBStore.h @@ -0,0 +1,412 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef LEVEL_DB_STORE_H +#define LEVEL_DB_STORE_H + +#include "include/types.h" +#include "include/buffer_fwd.h" +#include "KeyValueDB.h" +#include <set> +#include <map> +#include <string> +#include <boost/scoped_ptr.hpp> +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/write_batch.h" +#include "leveldb/slice.h" +#include "leveldb/cache.h" +#ifdef HAVE_LEVELDB_FILTER_POLICY +#include "leveldb/filter_policy.h" +#endif + +#include <errno.h> +#include "common/errno.h" +#include "common/dout.h" +#include "include/ceph_assert.h" +#include "common/Formatter.h" +#include "common/Cond.h" + +#include "common/ceph_context.h" +#include "include/common_fwd.h" + +// reinclude our assert to clobber the system one +# include "include/ceph_assert.h" + +enum { + l_leveldb_first = 34300, + l_leveldb_gets, + l_leveldb_txns, + l_leveldb_get_latency, + l_leveldb_submit_latency, + l_leveldb_submit_sync_latency, + l_leveldb_compact, + l_leveldb_compact_range, + l_leveldb_compact_queue_merge, + l_leveldb_compact_queue_len, + l_leveldb_last, +}; + +extern leveldb::Logger *create_leveldb_ceph_logger(); + +class CephLevelDBLogger; + +/** + * Uses LevelDB to implement the KeyValueDB interface + */ +class LevelDBStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + CephLevelDBLogger *ceph_logger; + std::string path; + boost::scoped_ptr<leveldb::Cache> db_cache; +#ifdef HAVE_LEVELDB_FILTER_POLICY + boost::scoped_ptr<const leveldb::FilterPolicy> filterpolicy; +#endif + boost::scoped_ptr<leveldb::DB> db; + + int load_leveldb_options(bool create_if_missing, leveldb::Options &opts); + int do_open(std::ostream &out, bool create_if_missing); + + // manage async compactions + ceph::mutex compact_queue_lock = + ceph::make_mutex("LevelDBStore::compact_thread_lock"); + ceph::condition_variable compact_queue_cond; + std::list<std::pair<std::string, std::string>> compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + LevelDBStore *db; + public: + explicit CompactThread(LevelDBStore *d) : db(d) {} + void *entry() override { + db->compact_thread_entry(); + return NULL; + } + friend class LevelDBStore; + } compact_thread; + + void compact_thread_entry(); + + void compact_range(const std::string& start, const std::string& end) { + leveldb::Slice cstart(start); + leveldb::Slice cend(end); + db->CompactRange(&cstart, &cend); + } + void compact_range_async(const std::string& start, const std::string& end); + +public: + /// compact the underlying leveldb store + void compact() override; + + void compact_async() override { + compact_range_async({}, {}); + } + + /// compact db for all keys with a given prefix + void compact_prefix(const std::string& prefix) override { + compact_range(prefix, past_prefix(prefix)); + } + void compact_prefix_async(const std::string& prefix) override { + compact_range_async(prefix, past_prefix(prefix)); + } + void compact_range(const std::string& prefix, + const std::string& start, const std::string& end) override { + compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); + } + void compact_range_async(const std::string& prefix, + const std::string& start, const std::string& end) override { + compact_range_async(combine_strings(prefix, start), + combine_strings(prefix, end)); + } + + + /** + * options_t: Holds options which are minimally interpreted + * on initialization and then passed through to LevelDB. + * We transform a couple of these into actual LevelDB + * structures, but the rest are simply passed through unchanged. See + * leveldb/options.h for more precise details on each. + * + * Set them after constructing the LevelDBStore, but before calling + * open() or create_and_open(). + */ + struct options_t { + uint64_t write_buffer_size; /// in-memory write buffer size + int max_open_files; /// maximum number of files LevelDB can open at once + uint64_t cache_size; /// size of extra decompressed cache to use + uint64_t block_size; /// user data per block + int bloom_size; /// number of bits per entry to put in a bloom filter + bool compression_enabled; /// whether to use libsnappy compression or not + + // don't change these ones. No, seriously + int block_restart_interval; + bool error_if_exists; + bool paranoid_checks; + + std::string log_file; + + options_t() : + write_buffer_size(0), //< 0 means default + max_open_files(0), //< 0 means default + cache_size(0), //< 0 means no cache (default) + block_size(0), //< 0 means default + bloom_size(0), //< 0 means no bloom filter (default) + compression_enabled(true), //< set to false for no compression + block_restart_interval(0), //< 0 means default + error_if_exists(false), //< set to true if you want to check nonexistence + paranoid_checks(false) //< set to true if you want paranoid checks + {} + } options; + + LevelDBStore(CephContext *c, const std::string &path) : + cct(c), + logger(NULL), + ceph_logger(NULL), + path(path), + db_cache(NULL), +#ifdef HAVE_LEVELDB_FILTER_POLICY + filterpolicy(NULL), +#endif + compact_queue_stop(false), + compact_thread(this), + options() + {} + + ~LevelDBStore() override; + + static int _test_init(const std::string& dir); + int init(std::string option_str="") override; + + /// Opens underlying db + int open(std::ostream &out, const std::string& cfs="") override; + /// Creates underlying db if missing and opens it + int create_and_open(std::ostream &out, const std::string& cfs="") override; + + void close() override; + + PerfCounters *get_perf_counters() override + { + return logger; + } + int repair(std::ostream &out) override; + + class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + leveldb::WriteBatch bat; + LevelDBStore *db; + explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {} + void set( + const std::string &prefix, + const std::string &k, + const ceph::buffer::list &bl) override; + using KeyValueDB::TransactionImpl::set; + void rmkey( + const std::string &prefix, + const std::string &k) override; + void rmkeys_by_prefix( + const std::string &prefix + ) override; + virtual void rm_range_keys( + const std::string &prefix, + const std::string &start, + const std::string &end) override; + + using KeyValueDB::TransactionImpl::rmkey; + }; + + KeyValueDB::Transaction get_transaction() override { + return std::make_shared<LevelDBTransactionImpl>(this); + } + + int submit_transaction(KeyValueDB::Transaction t) override; + int submit_transaction_sync(KeyValueDB::Transaction t) override; + int get( + const std::string &prefix, + const std::set<std::string> &key, + std::map<std::string, ceph::buffer::list> *out + ) override; + + int get(const std::string &prefix, + const std::string &key, + ceph::buffer::list *value) override; + + using KeyValueDB::get; + + class LevelDBWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + protected: + boost::scoped_ptr<leveldb::Iterator> dbiter; + public: + explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) : + dbiter(iter) { } + ~LevelDBWholeSpaceIteratorImpl() override { } + + int seek_to_first() override { + dbiter->SeekToFirst(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_first(const std::string &prefix) override { + leveldb::Slice slice_prefix(prefix); + dbiter->Seek(slice_prefix); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last() override { + dbiter->SeekToLast(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last(const std::string &prefix) override { + std::string limit = past_prefix(prefix); + leveldb::Slice slice_limit(limit); + dbiter->Seek(slice_limit); + + if (!dbiter->Valid()) { + dbiter->SeekToLast(); + } else { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; + } + int upper_bound(const std::string &prefix, const std::string &after) override { + lower_bound(prefix, after); + if (valid()) { + std::pair<std::string,std::string> key = raw_key(); + if (key.first == prefix && key.second == after) + next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int lower_bound(const std::string &prefix, const std::string &to) override { + std::string bound = combine_strings(prefix, to); + leveldb::Slice slice_bound(bound); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; + } + bool valid() override { + return dbiter->Valid(); + } + int next() override { + if (valid()) + dbiter->Next(); + return dbiter->status().ok() ? 0 : -1; + } + int prev() override { + if (valid()) + dbiter->Prev(); + return dbiter->status().ok() ? 0 : -1; + } + std::string key() override { + std::string out_key; + split_key(dbiter->key(), 0, &out_key); + return out_key; + } + std::pair<std::string,std::string> raw_key() override { + std::string prefix, key; + split_key(dbiter->key(), &prefix, &key); + return std::make_pair(prefix, key); + } + bool raw_key_is_prefixed(const std::string &prefix) override { + leveldb::Slice key = dbiter->key(); + if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { + return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; + } else { + return false; + } + } + ceph::buffer::list value() override { + return to_bufferlist(dbiter->value()); + } + + ceph::bufferptr value_as_ptr() override { + leveldb::Slice data = dbiter->value(); + return ceph::bufferptr(data.data(), data.size()); + } + + int status() override { + return dbiter->status().ok() ? 0 : -1; + } + }; + + /// Utility + static std::string combine_strings(const std::string &prefix, const std::string &value); + static int split_key(leveldb::Slice in, std::string *prefix, std::string *key); + static ceph::buffer::list to_bufferlist(leveldb::Slice in); + static std::string past_prefix(const std::string &prefix) { + std::string limit = prefix; + limit.push_back(1); + return limit; + } + + uint64_t get_estimated_size(std::map<std::string,std::uint64_t> &extra) override { + DIR *store_dir = opendir(path.c_str()); + if (!store_dir) { + lderr(cct) << __func__ << " something happened opening the store: " + << cpp_strerror(errno) << dendl; + return 0; + } + + uint64_t total_size = 0; + uint64_t sst_size = 0; + uint64_t log_size = 0; + uint64_t misc_size = 0; + + struct dirent *entry = NULL; + while ((entry = readdir(store_dir)) != NULL) { + std::string n(entry->d_name); + + if (n == "." || n == "..") + continue; + + std::string fpath = path + '/' + n; + struct stat s; + int err = stat(fpath.c_str(), &s); + if (err < 0) + err = -errno; + // we may race against leveldb while reading files; this should only + // happen when those files are being updated, data is being shuffled + // and files get removed, in which case there's not much of a problem + // as we'll get to them next time around. + if (err == -ENOENT) { + continue; + } + if (err < 0) { + lderr(cct) << __func__ << " error obtaining stats for " << fpath + << ": " << cpp_strerror(err) << dendl; + goto err; + } + + size_t pos = n.find_last_of('.'); + if (pos == std::string::npos) { + misc_size += s.st_size; + continue; + } + + std::string ext = n.substr(pos+1); + if (ext == "sst") { + sst_size += s.st_size; + } else if (ext == "log") { + log_size += s.st_size; + } else { + misc_size += s.st_size; + } + } + + total_size = sst_size + log_size + misc_size; + + extra["sst"] = sst_size; + extra["log"] = log_size; + extra["misc"] = misc_size; + extra["total"] = total_size; + +err: + closedir(store_dir); + return total_size; + } + + + WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override { + return std::make_shared<LevelDBWholeSpaceIteratorImpl>( + db->NewIterator(leveldb::ReadOptions())); + } + +}; + +#endif diff --git a/src/kv/MemDB.cc b/src/kv/MemDB.cc new file mode 100644 index 000000000..465aab9e9 --- /dev/null +++ b/src/kv/MemDB.cc @@ -0,0 +1,661 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * In-memory crash non-safe keyvalue db + * Author: Ramesh Chander, Ramesh.Chander@sandisk.com + */ + +#include "include/compat.h" +#include <set> +#include <map> +#include <string> +#include <memory> +#if __has_include(<filesystem>) +#include <filesystem> +namespace fs = std::filesystem; +#elif __has_include(<experimental/filesystem>) +#include <experimental/filesystem> +namespace fs = std::experimental::filesystem; +#endif +#include <errno.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include "common/perf_counters.h" +#include "common/debug.h" +#include "include/str_list.h" +#include "include/str_map.h" +#include "KeyValueDB.h" +#include "MemDB.h" + +#include "include/ceph_assert.h" +#include "common/debug.h" +#include "common/errno.h" +#include "include/buffer.h" +#include "include/buffer_raw.h" +#include "include/compat.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_memdb +#undef dout_prefix +#define dout_prefix *_dout << "memdb: " +#define dtrace dout(30) +#define dwarn dout(0) +#define dinfo dout(0) + +using std::cerr; +using std::ostream; +using std::string; +using std::vector; + +using ceph::bufferlist; +using ceph::bufferptr; +using ceph::decode; +using ceph::encode; + +static void split_key(const string& raw_key, string *prefix, string *key) +{ + size_t pos = raw_key.find(KEY_DELIM, 0); + ceph_assert(pos != std::string::npos); + *prefix = raw_key.substr(0, pos); + *key = raw_key.substr(pos + 1, raw_key.length()); +} + +static string make_key(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(KEY_DELIM); + out.append(value); + return out; +} + +void MemDB::_encode(mdb_iter_t iter, bufferlist &bl) +{ + encode(iter->first, bl); + encode(iter->second, bl); +} + +std::string MemDB::_get_data_fn() +{ + string fn = m_db_path + "/" + "MemDB.db"; + return fn; +} + +void MemDB::_save() +{ + std::lock_guard<std::mutex> l(m_lock); + dout(10) << __func__ << " Saving MemDB to file: "<< _get_data_fn().c_str() << dendl; + int mode = 0644; + int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(), + O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode)); + if (fd < 0) { + int err = errno; + cerr << "write_file(" << _get_data_fn().c_str() << "): failed to open file: " + << cpp_strerror(err) << std::endl; + return; + } + bufferlist bl; + mdb_iter_t iter = m_map.begin(); + while (iter != m_map.end()) { + dout(10) << __func__ << " Key:"<< iter->first << dendl; + _encode(iter, bl); + ++iter; + } + bl.write_fd(fd); + + VOID_TEMP_FAILURE_RETRY(::close(fd)); +} + +int MemDB::_load() +{ + std::lock_guard<std::mutex> l(m_lock); + dout(10) << __func__ << " Reading MemDB from file: "<< _get_data_fn().c_str() << dendl; + /* + * Open file and read it in single shot. + */ + int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(), O_RDONLY|O_CLOEXEC)); + if (fd < 0) { + int err = errno; + cerr << "can't open " << _get_data_fn().c_str() << ": " + << cpp_strerror(err) << std::endl; + return -err; + } + + struct stat st; + memset(&st, 0, sizeof(st)); + if (::fstat(fd, &st) < 0) { + int err = errno; + cerr << "can't stat file " << _get_data_fn().c_str() << ": " + << cpp_strerror(err) << std::endl; + VOID_TEMP_FAILURE_RETRY(::close(fd)); + return -err; + } + + ssize_t file_size = st.st_size; + ssize_t bytes_done = 0; + while (bytes_done < file_size) { + string key; + bufferptr datap; + + bytes_done += ceph::decode_file(fd, key); + bytes_done += ceph::decode_file(fd, datap); + + dout(10) << __func__ << " Key:"<< key << dendl; + m_map[key] = datap; + m_total_bytes += datap.length(); + } + VOID_TEMP_FAILURE_RETRY(::close(fd)); + return 0; +} + +int MemDB::_init(bool create) +{ + int r = 0; + dout(1) << __func__ << dendl; + if (create) { + if (fs::exists(m_db_path)) { + r = 0; // ignore EEXIST + } else { + std::error_code ec; + if (!fs::create_directory(m_db_path, ec)) { + derr << __func__ << " mkdir failed: " << ec.message() << dendl; + return -ec.value(); + } + fs::permissions(m_db_path, fs::perms::owner_all); + } + } else { + r = _load(); + } + + PerfCountersBuilder plb(g_ceph_context, "memdb", l_memdb_first, l_memdb_last); + plb.add_u64_counter(l_memdb_gets, "get", "Gets"); + plb.add_u64_counter(l_memdb_txns, "submit_transaction", "Submit transactions"); + plb.add_time_avg(l_memdb_get_latency, "get_latency", "Get latency"); + plb.add_time_avg(l_memdb_submit_latency, "submit_latency", "Submit Latency"); + logger = plb.create_perf_counters(); + m_cct->get_perfcounters_collection()->add(logger); + + return r; +} + +int MemDB::set_merge_operator( + const string& prefix, + std::shared_ptr<KeyValueDB::MergeOperator> mop) +{ + merge_ops.push_back(std::make_pair(prefix, mop)); + return 0; +} + +int MemDB::do_open(ostream &out, bool create) +{ + m_total_bytes = 0; + m_allocated_bytes = 1; + + return _init(create); +} + +int MemDB::open(ostream &out, const std::string& cfs) { + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, false); +} + +int MemDB::create_and_open(ostream &out, const std::string& cfs) { + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, true); +} + +MemDB::~MemDB() +{ + close(); + dout(10) << __func__ << " Destroying MemDB instance: "<< dendl; +} + +void MemDB::close() +{ + /* + * Save whatever in memory btree. + */ + _save(); + if (logger) + m_cct->get_perfcounters_collection()->remove(logger); +} + +int MemDB::submit_transaction(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + + MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get()); + + dtrace << __func__ << " " << mt->get_ops().size() << dendl; + for(auto& op : mt->get_ops()) { + if(op.first == MDBTransactionImpl::WRITE) { + ms_op_t set_op = op.second; + _setkey(set_op); + } else if (op.first == MDBTransactionImpl::MERGE) { + ms_op_t merge_op = op.second; + _merge(merge_op); + } else { + ms_op_t rm_op = op.second; + ceph_assert(op.first == MDBTransactionImpl::DELETE); + _rmkey(rm_op); + } + } + + utime_t lat = ceph_clock_now() - start; + logger->inc(l_memdb_txns); + logger->tinc(l_memdb_submit_latency, lat); + + return 0; +} + +int MemDB::submit_transaction_sync(KeyValueDB::Transaction tsync) +{ + dtrace << __func__ << " " << dendl; + submit_transaction(tsync); + return 0; +} + +int MemDB::transaction_rollback(KeyValueDB::Transaction t) +{ + MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get()); + mt->clear(); + return 0; +} + +void MemDB::MDBTransactionImpl::set( + const string &prefix, const string &k, const bufferlist &to_set_bl) +{ + dtrace << __func__ << " " << prefix << " " << k << dendl; + ops.push_back(make_pair(WRITE, std::make_pair(std::make_pair(prefix, k), + to_set_bl))); +} + +void MemDB::MDBTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + dtrace << __func__ << " " << prefix << " " << k << dendl; + ops.push_back(make_pair(DELETE, + std::make_pair(std::make_pair(prefix, k), + bufferlist()))); +} + +void MemDB::MDBTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + KeyValueDB::Iterator it = m_db->get_iterator(prefix); + for (it->seek_to_first(); it->valid(); it->next()) { + rmkey(prefix, it->key()); + } +} + +void MemDB::MDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end) +{ + KeyValueDB::Iterator it = m_db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + rmkey(prefix, it->key()); + it->next(); + } +} + +void MemDB::MDBTransactionImpl::merge( + const std::string &prefix, const std::string &key, const bufferlist &value) +{ + + dtrace << __func__ << " " << prefix << " " << key << dendl; + ops.push_back(make_pair(MERGE, make_pair(std::make_pair(prefix, key), value))); + return; +} + +int MemDB::_setkey(ms_op_t &op) +{ + std::lock_guard<std::mutex> l(m_lock); + std::string key = make_key(op.first.first, op.first.second); + bufferlist bl = op.second; + + m_total_bytes += bl.length(); + + bufferlist bl_old; + if (_get(op.first.first, op.first.second, &bl_old)) { + /* + * delete and free existing key. + */ + ceph_assert(m_total_bytes >= bl_old.length()); + m_total_bytes -= bl_old.length(); + m_map.erase(key); + } + + m_map[key] = bufferptr((char *) bl.c_str(), bl.length()); + iterator_seq_no++; + return 0; +} + +int MemDB::_rmkey(ms_op_t &op) +{ + std::lock_guard<std::mutex> l(m_lock); + std::string key = make_key(op.first.first, op.first.second); + + bufferlist bl_old; + if (_get(op.first.first, op.first.second, &bl_old)) { + ceph_assert(m_total_bytes >= bl_old.length()); + m_total_bytes -= bl_old.length(); + } + iterator_seq_no++; + /* + * Erase will call the destructor for bufferptr. + */ + return m_map.erase(key); +} + +std::shared_ptr<KeyValueDB::MergeOperator> MemDB::_find_merge_op(const std::string &prefix) +{ + for (const auto& i : merge_ops) { + if (i.first == prefix) { + return i.second; + } + } + + dtrace << __func__ << " No merge op for " << prefix << dendl; + return NULL; +} + + +int MemDB::_merge(ms_op_t &op) +{ + std::lock_guard<std::mutex> l(m_lock); + std::string prefix = op.first.first; + std::string key = make_key(op.first.first, op.first.second); + bufferlist bl = op.second; + int64_t bytes_adjusted = bl.length(); + + /* + * find the operator for this prefix + */ + std::shared_ptr<MergeOperator> mop = _find_merge_op(prefix); + ceph_assert(mop); + + /* + * call the merge operator with value and non value + */ + bufferlist bl_old; + if (_get(op.first.first, op.first.second, &bl_old) == false) { + std::string new_val; + /* + * Merge non existent. + */ + mop->merge_nonexistent(bl.c_str(), bl.length(), &new_val); + m_map[key] = bufferptr(new_val.c_str(), new_val.length()); + } else { + /* + * Merge existing. + */ + std::string new_val; + mop->merge(bl_old.c_str(), bl_old.length(), bl.c_str(), bl.length(), &new_val); + m_map[key] = bufferptr(new_val.c_str(), new_val.length()); + bytes_adjusted -= bl_old.length(); + bl_old.clear(); + } + + ceph_assert((int64_t)m_total_bytes + bytes_adjusted >= 0); + m_total_bytes += bytes_adjusted; + iterator_seq_no++; + return 0; +} + +/* + * Caller take btree lock. + */ +bool MemDB::_get(const string &prefix, const string &k, bufferlist *out) +{ + string key = make_key(prefix, k); + + mdb_iter_t iter = m_map.find(key); + if (iter == m_map.end()) { + return false; + } + + out->push_back((m_map[key].clone())); + return true; +} + +bool MemDB::_get_locked(const string &prefix, const string &k, bufferlist *out) +{ + std::lock_guard<std::mutex> l(m_lock); + return _get(prefix, k, out); +} + + +int MemDB::get(const string &prefix, const std::string& key, + bufferlist *out) +{ + utime_t start = ceph_clock_now(); + int ret; + + if (_get_locked(prefix, key, out)) { + ret = 0; + } else { + ret = -ENOENT; + } + + utime_t lat = ceph_clock_now() - start; + logger->inc(l_memdb_gets); + logger->tinc(l_memdb_get_latency, lat); + + return ret; +} + +int MemDB::get(const string &prefix, const std::set<string> &keys, + std::map<string, bufferlist> *out) +{ + utime_t start = ceph_clock_now(); + + for (const auto& i : keys) { + bufferlist bl; + if (_get_locked(prefix, i, &bl)) + out->insert(make_pair(i, bl)); + } + + utime_t lat = ceph_clock_now() - start; + logger->inc(l_memdb_gets); + logger->tinc(l_memdb_get_latency, lat); + + return 0; +} + +void MemDB::MDBWholeSpaceIteratorImpl::fill_current() +{ + bufferlist bl; + bl.push_back(m_iter->second.clone()); + m_key_value = std::make_pair(m_iter->first, bl); +} + +bool MemDB::MDBWholeSpaceIteratorImpl::valid() +{ + if (m_key_value.first.empty()) { + return false; + } + return true; +} + +bool MemDB::MDBWholeSpaceIteratorImpl::iterator_validate() { + + if (this_seq_no != *global_seq_no) { + auto key = m_key_value.first; + ceph_assert(!key.empty()); + + bool restart_iter = false; + if (!m_using_btree) { + /* + * Map is modified and marker key does not exists, + * restart the iterator from next key. + */ + if (m_map_p->find(key) == m_map_p->end()) { + restart_iter = true; + } + } else { + restart_iter = true; + } + + if (restart_iter) { + m_iter = m_map_p->lower_bound(key); + if (m_iter == m_map_p->end()) { + return false; + } + } + + /* + * This iter is valid now. + */ + this_seq_no = *global_seq_no; + } + + return true; +} + +void +MemDB::MDBWholeSpaceIteratorImpl::free_last() +{ + m_key_value.first.clear(); + m_key_value.second.clear(); +} + +string MemDB::MDBWholeSpaceIteratorImpl::key() +{ + dtrace << __func__ << " " << m_key_value.first << dendl; + string prefix, key; + split_key(m_key_value.first, &prefix, &key); + return key; +} + +std::pair<string,string> MemDB::MDBWholeSpaceIteratorImpl::raw_key() +{ + string prefix, key; + split_key(m_key_value.first, &prefix, &key); + return { prefix, key }; +} + +bool MemDB::MDBWholeSpaceIteratorImpl::raw_key_is_prefixed( + const string &prefix) +{ + string p, k; + split_key(m_key_value.first, &p, &k); + return (p == prefix); +} + +bufferlist MemDB::MDBWholeSpaceIteratorImpl::value() +{ + dtrace << __func__ << " " << m_key_value << dendl; + return m_key_value.second; +} + +int MemDB::MDBWholeSpaceIteratorImpl::next() +{ + std::lock_guard<std::mutex> l(*m_map_lock_p); + if (!iterator_validate()) { + free_last(); + return -1; + } + free_last(); + ++m_iter; + if (m_iter != m_map_p->end()) { + fill_current(); + return 0; + } else { + return -1; + } +} + +int MemDB::MDBWholeSpaceIteratorImpl:: prev() +{ + std::lock_guard<std::mutex> l(*m_map_lock_p); + if (!iterator_validate()) { + free_last(); + return -1; + } + free_last(); + if (m_iter != m_map_p->begin()) { + --m_iter; + fill_current(); + return 0; + } else { + return -1; + } +} + +/* + * First key >= to given key, if key is null then first key in btree. + */ +int MemDB::MDBWholeSpaceIteratorImpl::seek_to_first(const std::string &k) +{ + std::lock_guard<std::mutex> l(*m_map_lock_p); + free_last(); + if (k.empty()) { + m_iter = m_map_p->begin(); + } else { + m_iter = m_map_p->lower_bound(k); + } + + if (m_iter == m_map_p->end()) { + return -1; + } + fill_current(); + return 0; +} + +int MemDB::MDBWholeSpaceIteratorImpl::seek_to_last(const std::string &k) +{ + std::lock_guard<std::mutex> l(*m_map_lock_p); + free_last(); + if (k.empty()) { + m_iter = m_map_p->end(); + --m_iter; + } else { + m_iter = m_map_p->lower_bound(k); + } + + if (m_iter == m_map_p->end()) { + return -1; + } + fill_current(); + return 0; +} + +MemDB::MDBWholeSpaceIteratorImpl::~MDBWholeSpaceIteratorImpl() +{ + free_last(); +} + +int MemDB::MDBWholeSpaceIteratorImpl::upper_bound(const std::string &prefix, + const std::string &after) { + + std::lock_guard<std::mutex> l(*m_map_lock_p); + + dtrace << "upper_bound " << prefix.c_str() << after.c_str() << dendl; + string k = make_key(prefix, after); + m_iter = m_map_p->upper_bound(k); + if (m_iter != m_map_p->end()) { + fill_current(); + return 0; + } + return -1; +} + +int MemDB::MDBWholeSpaceIteratorImpl::lower_bound(const std::string &prefix, + const std::string &to) { + std::lock_guard<std::mutex> l(*m_map_lock_p); + dtrace << "lower_bound " << prefix.c_str() << to.c_str() << dendl; + string k = make_key(prefix, to); + m_iter = m_map_p->lower_bound(k); + if (m_iter != m_map_p->end()) { + fill_current(); + return 0; + } + return -1; +} diff --git a/src/kv/MemDB.h b/src/kv/MemDB.h new file mode 100644 index 000000000..32d81db22 --- /dev/null +++ b/src/kv/MemDB.h @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * In-memory crash non-safe keyvalue db + * Author: Ramesh Chander, Ramesh.Chander@sandisk.com + */ + +#ifndef CEPH_OS_BLUESTORE_MEMDB_H +#define CEPH_OS_BLUESTORE_MEMDB_H + +#include "include/buffer.h" +#include <ostream> +#include <set> +#include <map> +#include <string> +#include <memory> +#include <boost/scoped_ptr.hpp> +#include "include/common_fwd.h" +#include "include/encoding.h" +#include "include/btree_map.h" +#include "KeyValueDB.h" +#include "osd/osd_types.h" + +#define KEY_DELIM '\0' + + +enum { + l_memdb_first = 34440, + l_memdb_gets, + l_memdb_txns, + l_memdb_get_latency, + l_memdb_submit_latency, + l_memdb_last, +}; + +class MemDB : public KeyValueDB +{ + typedef std::pair<std::pair<std::string, std::string>, ceph::bufferlist> ms_op_t; + std::mutex m_lock; + uint64_t m_total_bytes; + uint64_t m_allocated_bytes; + + typedef std::map<std::string, ceph::bufferptr> mdb_map_t; + typedef mdb_map_t::iterator mdb_iter_t; + bool m_using_btree; + + mdb_map_t m_map; + + CephContext *m_cct; + PerfCounters *logger; + void* m_priv; + std::string m_options; + std::string m_db_path; + + int transaction_rollback(KeyValueDB::Transaction t); + int _open(std::ostream &out); + void close() override; + bool _get(const std::string &prefix, const std::string &k, ceph::bufferlist *out); + bool _get_locked(const std::string &prefix, const std::string &k, ceph::bufferlist *out); + std::string _get_data_fn(); + void _encode(mdb_iter_t iter, ceph::bufferlist &bl); + void _save(); + int _load(); + uint64_t iterator_seq_no; + +public: + MemDB(CephContext *c, const std::string &path, void *p) : + m_total_bytes(0), m_allocated_bytes(0), m_using_btree(false), + m_cct(c), logger(NULL), m_priv(p), m_db_path(path), iterator_seq_no(1) + { + //Nothing as of now + } + + ~MemDB() override; + int set_merge_operator(const std::string& prefix, + std::shared_ptr<MergeOperator> mop) override; + + std::shared_ptr<MergeOperator> _find_merge_op(const std::string &prefix); + + static + int _test_init(const std::string& dir) { return 0; }; + + class MDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + enum op_type { WRITE = 1, MERGE = 2, DELETE = 3}; + private: + + std::vector<std::pair<op_type, ms_op_t>> ops; + MemDB *m_db; + + bool key_is_prefixed(const std::string &prefix, const std::string& full_key); + public: + const std::vector<std::pair<op_type, ms_op_t>>& + get_ops() { return ops; }; + + void set(const std::string &prefix, const std::string &key, + const ceph::bufferlist &val) override; + using KeyValueDB::TransactionImpl::set; + void rmkey(const std::string &prefix, const std::string &k) override; + using KeyValueDB::TransactionImpl::rmkey; + void rmkeys_by_prefix(const std::string &prefix) override; + void rm_range_keys( + const std::string &prefix, + const std::string &start, + const std::string &end) override; + + void merge(const std::string &prefix, const std::string &key, + const ceph::bufferlist &value) override; + void clear() { + ops.clear(); + } + explicit MDBTransactionImpl(MemDB* _db) :m_db(_db) + { + ops.clear(); + } + ~MDBTransactionImpl() override {}; + }; + +private: + + /* + * Transaction states. + */ + int _merge(const std::string &k, ceph::bufferptr &bl); + int _merge(ms_op_t &op); + int _setkey(ms_op_t &op); + int _rmkey(ms_op_t &op); + +public: + + int init(std::string option_str="") override { m_options = option_str; return 0; } + int _init(bool format); + + int do_open(std::ostream &out, bool create); + int open(std::ostream &out, const std::string& cfs="") override; + int create_and_open(std::ostream &out, const std::string& cfs="") override; + using KeyValueDB::create_and_open; + + KeyValueDB::Transaction get_transaction() override { + return std::shared_ptr<MDBTransactionImpl>(new MDBTransactionImpl(this)); + } + + int submit_transaction(Transaction) override; + int submit_transaction_sync(Transaction) override; + + int get(const std::string &prefix, const std::set<std::string> &key, + std::map<std::string, ceph::bufferlist> *out) override; + + int get(const std::string &prefix, const std::string &key, + ceph::bufferlist *out) override; + + using KeyValueDB::get; + + class MDBWholeSpaceIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl { + + mdb_iter_t m_iter; + std::pair<std::string, ceph::bufferlist> m_key_value; + mdb_map_t *m_map_p; + std::mutex *m_map_lock_p; + uint64_t *global_seq_no; + uint64_t this_seq_no; + bool m_using_btree; + + public: + MDBWholeSpaceIteratorImpl(mdb_map_t *btree_p, std::mutex *btree_lock_p, + uint64_t *iterator_seq_no, bool using_btree) { + m_map_p = btree_p; + m_map_lock_p = btree_lock_p; + std::lock_guard<std::mutex> l(*m_map_lock_p); + global_seq_no = iterator_seq_no; + this_seq_no = *iterator_seq_no; + m_using_btree = using_btree; + } + + void fill_current(); + void free_last(); + + + int seek_to_first(const std::string &k) override; + int seek_to_last(const std::string &k) override; + + int seek_to_first() override { return seek_to_first(std::string()); }; + int seek_to_last() override { return seek_to_last(std::string()); }; + + int upper_bound(const std::string &prefix, const std::string &after) override; + int lower_bound(const std::string &prefix, const std::string &to) override; + bool valid() override; + bool iterator_validate(); + + int next() override; + int prev() override; + int status() override { return 0; }; + + std::string key() override; + std::pair<std::string,std::string> raw_key() override; + bool raw_key_is_prefixed(const std::string &prefix) override; + ceph::bufferlist value() override; + ~MDBWholeSpaceIteratorImpl() override; + }; + + uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) override { + std::lock_guard<std::mutex> l(m_lock); + return m_allocated_bytes; + }; + + int get_statfs(struct store_statfs_t *buf) override { + std::lock_guard<std::mutex> l(m_lock); + buf->reset(); + buf->total = m_total_bytes; + buf->allocated = m_allocated_bytes; + buf->data_stored = m_total_bytes; + return 0; + } + + WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override { + return std::shared_ptr<KeyValueDB::WholeSpaceIteratorImpl>( + new MDBWholeSpaceIteratorImpl(&m_map, &m_lock, &iterator_seq_no, m_using_btree)); + } +}; + +#endif + diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc new file mode 100644 index 000000000..8e8983c18 --- /dev/null +++ b/src/kv/RocksDBStore.cc @@ -0,0 +1,3453 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <set> +#include <map> +#include <string> +#include <memory> +#if __has_include(<filesystem>) +#include <filesystem> +namespace fs = std::filesystem; +#elif __has_include(<experimental/filesystem>) +#include <experimental/filesystem> +namespace fs = std::experimental::filesystem; +#endif +#include <errno.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include "rocksdb/db.h" +#include "rocksdb/table.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/cache.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/utilities/convenience.h" +#include "rocksdb/merge_operator.h" + +#include "common/perf_counters.h" +#include "common/PriorityCache.h" +#include "include/common_fwd.h" +#include "include/scope_guard.h" +#include "include/str_list.h" +#include "include/stringify.h" +#include "include/str_map.h" +#include "KeyValueDB.h" +#include "RocksDBStore.h" + +#include "common/debug.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_rocksdb +#undef dout_prefix +#define dout_prefix *_dout << "rocksdb: " + +using std::function; +using std::list; +using std::map; +using std::ostream; +using std::pair; +using std::set; +using std::string; +using std::unique_ptr; +using std::vector; + +using ceph::bufferlist; +using ceph::bufferptr; +using ceph::Formatter; + +static const char* sharding_def_dir = "sharding"; +static const char* sharding_def_file = "sharding/def"; +static const char* sharding_recreate = "sharding/recreate_columns"; +static const char* resharding_column_lock = "reshardingXcommencingXlocked"; + +static bufferlist to_bufferlist(rocksdb::Slice in) { + bufferlist bl; + bl.append(bufferptr(in.data(), in.size())); + return bl; +} + +static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, + vector<rocksdb::Slice> *slices) +{ + unsigned n = 0; + for (auto& buf : bl.buffers()) { + (*slices)[n].data_ = buf.c_str(); + (*slices)[n].size_ = buf.length(); + n++; + } + return rocksdb::SliceParts(slices->data(), slices->size()); +} + + +// +// One of these for the default rocksdb column family, routing each prefix +// to the appropriate MergeOperator. +// +class RocksDBStore::MergeOperatorRouter + : public rocksdb::AssociativeMergeOperator +{ + RocksDBStore& store; +public: + const char *Name() const override { + // Construct a name that rocksDB will validate against. We want to + // do this in a way that doesn't constrain the ordering of calls + // to set_merge_operator, so sort the merge operators and then + // construct a name from all of those parts. + store.assoc_name.clear(); + map<std::string,std::string> names; + + for (auto& p : store.merge_ops) { + names[p.first] = p.second->name(); + } + for (auto& p : names) { + store.assoc_name += '.'; + store.assoc_name += p.first; + store.assoc_name += ':'; + store.assoc_name += p.second; + } + return store.assoc_name.c_str(); + } + + explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} + + bool Merge(const rocksdb::Slice& key, + const rocksdb::Slice* existing_value, + const rocksdb::Slice& value, + std::string* new_value, + rocksdb::Logger* logger) const override { + // for default column family + // extract prefix from key and compare against each registered merge op; + // even though merge operator for explicit CF is included in merge_ops, + // it won't be picked up, since it won't match. + for (auto& p : store.merge_ops) { + if (p.first.compare(0, p.first.length(), + key.data(), p.first.length()) == 0 && + key.data()[p.first.length()] == 0) { + if (existing_value) { + p.second->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + p.second->merge_nonexistent(value.data(), value.size(), new_value); + } + break; + } + } + return true; // OK :) + } +}; + +// +// One of these per non-default column family, linked directly to the +// merge operator for that CF/prefix (if any). +// +class RocksDBStore::MergeOperatorLinker + : public rocksdb::AssociativeMergeOperator +{ +private: + std::shared_ptr<KeyValueDB::MergeOperator> mop; +public: + explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {} + + const char *Name() const override { + return mop->name(); + } + + bool Merge(const rocksdb::Slice& key, + const rocksdb::Slice* existing_value, + const rocksdb::Slice& value, + std::string* new_value, + rocksdb::Logger* logger) const override { + if (existing_value) { + mop->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + mop->merge_nonexistent(value.data(), value.size(), new_value); + } + return true; + } +}; + +int RocksDBStore::set_merge_operator( + const string& prefix, + std::shared_ptr<KeyValueDB::MergeOperator> mop) +{ + // If you fail here, it's because you can't do this on an open database + ceph_assert(db == nullptr); + merge_ops.push_back(std::make_pair(prefix,mop)); + return 0; +} + +class CephRocksdbLogger : public rocksdb::Logger { + CephContext *cct; +public: + explicit CephRocksdbLogger(CephContext *c) : cct(c) { + cct->get(); + } + ~CephRocksdbLogger() override { + cct->put(); + } + + // Write an entry to the log file with the specified format. + void Logv(const char* format, va_list ap) override { + Logv(rocksdb::INFO_LEVEL, format, ap); + } + + // Write an entry to the log file with the specified log level + // and format. Any log with level under the internal log level + // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be + // printed. + void Logv(const rocksdb::InfoLogLevel log_level, const char* format, + va_list ap) override { + int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1; + dout(ceph::dout::need_dynamic(v)); + char buf[65536]; + vsnprintf(buf, sizeof(buf), format, ap); + *_dout << buf << dendl; + } +}; + +rocksdb::Logger *create_rocksdb_ceph_logger() +{ + return new CephRocksdbLogger(g_ceph_context); +} + +static int string2bool(const string &val, bool &b_val) +{ + if (strcasecmp(val.c_str(), "false") == 0) { + b_val = false; + return 0; + } else if (strcasecmp(val.c_str(), "true") == 0) { + b_val = true; + return 0; + } else { + std::string err; + int b = strict_strtol(val.c_str(), 10, &err); + if (!err.empty()) + return -EINVAL; + b_val = !!b; + return 0; + } +} + +namespace rocksdb { +extern std::string trim(const std::string& str); +} + +// this function is a modification of rocksdb's StringToMap: +// 1) accepts ' \n ; as separators +// 2) leaves compound options with enclosing { and } +rocksdb::Status StringToMap(const std::string& opts_str, + std::unordered_map<std::string, std::string>* opts_map) +{ + using rocksdb::Status; + using rocksdb::trim; + assert(opts_map); + // Example: + // opts_str = "write_buffer_size=1024;max_write_buffer_number=2;" + // "nested_opt={opt1=1;opt2=2};max_bytes_for_level_base=100" + size_t pos = 0; + std::string opts = trim(opts_str); + while (pos < opts.size()) { + size_t eq_pos = opts.find('=', pos); + if (eq_pos == std::string::npos) { + return Status::InvalidArgument("Mismatched key value pair, '=' expected"); + } + std::string key = trim(opts.substr(pos, eq_pos - pos)); + if (key.empty()) { + return Status::InvalidArgument("Empty key found"); + } + + // skip space after '=' and look for '{' for possible nested options + pos = eq_pos + 1; + while (pos < opts.size() && isspace(opts[pos])) { + ++pos; + } + // Empty value at the end + if (pos >= opts.size()) { + (*opts_map)[key] = ""; + break; + } + if (opts[pos] == '{') { + int count = 1; + size_t brace_pos = pos + 1; + while (brace_pos < opts.size()) { + if (opts[brace_pos] == '{') { + ++count; + } else if (opts[brace_pos] == '}') { + --count; + if (count == 0) { + break; + } + } + ++brace_pos; + } + // found the matching closing brace + if (count == 0) { + //include both '{' and '}' + (*opts_map)[key] = trim(opts.substr(pos, brace_pos - pos + 1)); + // skip all whitespace and move to the next ';,' + // brace_pos points to the matching '}' + pos = brace_pos + 1; + while (pos < opts.size() && isspace(opts[pos])) { + ++pos; + } + if (pos < opts.size() && opts[pos] != ';' && opts[pos] != ',') { + return Status::InvalidArgument( + "Unexpected chars after nested options"); + } + ++pos; + } else { + return Status::InvalidArgument( + "Mismatched curly braces for nested options"); + } + } else { + size_t sc_pos = opts.find_first_of(",;", pos); + if (sc_pos == std::string::npos) { + (*opts_map)[key] = trim(opts.substr(pos)); + // It either ends with a trailing , ; or the last key-value pair + break; + } else { + (*opts_map)[key] = trim(opts.substr(pos, sc_pos - pos)); + } + pos = sc_pos + 1; + } + } + return Status::OK(); +} + +int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt) +{ + if (key == "compaction_threads") { + std::string err; + int f = strict_iecstrtoll(val.c_str(), &err); + if (!err.empty()) + return -EINVAL; + //Low priority threadpool is used for compaction + opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); + } else if (key == "flusher_threads") { + std::string err; + int f = strict_iecstrtoll(val.c_str(), &err); + if (!err.empty()) + return -EINVAL; + //High priority threadpool is used for flusher + opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); + } else if (key == "compact_on_mount") { + int ret = string2bool(val, compact_on_mount); + if (ret != 0) + return ret; + } else if (key == "disableWAL") { + int ret = string2bool(val, disableWAL); + if (ret != 0) + return ret; + } else { + //unrecognize config options. + return -EINVAL; + } + return 0; +} + +int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt) +{ + return ParseOptionsFromStringStatic(cct, opt_str, opt, + [&](const string& k, const string& v, rocksdb::Options& o) { + return tryInterpret(k, v, o); + } + ); +} + +int RocksDBStore::ParseOptionsFromStringStatic( + CephContext *cct, + const string& opt_str, + rocksdb::Options& opt, + function<int(const string&, const string&, rocksdb::Options&)> interp) +{ + // keep aligned with func tryInterpret + const set<string> need_interp_keys = {"compaction_threads", "flusher_threads", "compact_on_mount", "disableWAL"}; + int r; + rocksdb::Status status; + std::unordered_map<std::string, std::string> str_map; + status = StringToMap(opt_str, &str_map); + if (!status.ok()) { + dout(5) << __func__ << " error '" << status.getState() << + "' while parsing options '" << opt_str << "'" << dendl; + return -EINVAL; + } + + for (auto it = str_map.begin(); it != str_map.end(); ++it) { + string this_opt = it->first + "=" + it->second; + rocksdb::Status status = + rocksdb::GetOptionsFromString(opt, this_opt, &opt); + if (!status.ok()) { + if (interp != nullptr) { + r = interp(it->first, it->second, opt); + } else if (!need_interp_keys.count(it->first)) { + r = -1; + } + if (r < 0) { + derr << status.ToString() << dendl; + return -EINVAL; + } + } + lgeneric_dout(cct, 1) << " set rocksdb option " << it->first + << " = " << it->second << dendl; + } + return 0; +} + +int RocksDBStore::init(string _options_str) +{ + options_str = _options_str; + rocksdb::Options opt; + //try parse options + if (options_str.length()) { + int r = ParseOptionsFromString(options_str, opt); + if (r != 0) { + return -EINVAL; + } + } + return 0; +} + +int RocksDBStore::create_db_dir() +{ + if (env) { + unique_ptr<rocksdb::Directory> dir; + env->NewDirectory(path, &dir); + } else { + if (!fs::exists(path)) { + std::error_code ec; + if (!fs::create_directory(path, ec)) { + derr << __func__ << " failed to create " << path + << ": " << ec.message() << dendl; + return -ec.value(); + } + fs::permissions(path, + fs::perms::owner_all | + fs::perms::group_read | fs::perms::group_exec | + fs::perms::others_read | fs::perms::others_exec); + } + } + return 0; +} + +int RocksDBStore::install_cf_mergeop( + const string &key_prefix, + rocksdb::ColumnFamilyOptions *cf_opt) +{ + ceph_assert(cf_opt != nullptr); + cf_opt->merge_operator.reset(); + for (auto& i : merge_ops) { + if (i.first == key_prefix) { + cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second)); + } + } + return 0; +} + +int RocksDBStore::create_and_open(ostream &out, + const std::string& cfs) +{ + int r = create_db_dir(); + if (r < 0) + return r; + return do_open(out, true, false, cfs); +} + +std::shared_ptr<rocksdb::Cache> RocksDBStore::create_block_cache( + const std::string& cache_type, size_t cache_size, double cache_prio_high) { + std::shared_ptr<rocksdb::Cache> cache; + auto shard_bits = cct->_conf->rocksdb_cache_shard_bits; + if (cache_type == "binned_lru") { + cache = rocksdb_cache::NewBinnedLRUCache(cct, cache_size, shard_bits, false, cache_prio_high); + } else if (cache_type == "lru") { + cache = rocksdb::NewLRUCache(cache_size, shard_bits); + } else if (cache_type == "clock") { + cache = rocksdb::NewClockCache(cache_size, shard_bits); + if (!cache) { + derr << "rocksdb_cache_type '" << cache + << "' chosen, but RocksDB not compiled with LibTBB. " + << dendl; + } + } else { + derr << "unrecognized rocksdb_cache_type '" << cache_type << "'" << dendl; + } + return cache; +} + +int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt) +{ + rocksdb::Status status; + + if (options_str.length()) { + int r = ParseOptionsFromString(options_str, opt); + if (r != 0) { + return -EINVAL; + } + } + + if (cct->_conf->rocksdb_perf) { + dbstats = rocksdb::CreateDBStatistics(); + opt.statistics = dbstats; + } + + opt.create_if_missing = create_if_missing; + if (kv_options.count("separate_wal_dir")) { + opt.wal_dir = path + ".wal"; + } + + // Since ceph::for_each_substr doesn't return a value and + // std::stoull does throw, we may as well just catch everything here. + try { + if (kv_options.count("db_paths")) { + list<string> paths; + get_str_list(kv_options["db_paths"], "; \t", paths); + for (auto& p : paths) { + size_t pos = p.find(','); + if (pos == std::string::npos) { + derr << __func__ << " invalid db path item " << p << " in " + << kv_options["db_paths"] << dendl; + return -EINVAL; + } + string path = p.substr(0, pos); + string size_str = p.substr(pos + 1); + uint64_t size = atoll(size_str.c_str()); + if (!size) { + derr << __func__ << " invalid db path item " << p << " in " + << kv_options["db_paths"] << dendl; + return -EINVAL; + } + opt.db_paths.push_back(rocksdb::DbPath(path, size)); + dout(10) << __func__ << " db_path " << path << " size " << size << dendl; + } + } + } catch (const std::system_error& e) { + return -e.code().value(); + } + + if (cct->_conf->rocksdb_log_to_ceph_log) { + opt.info_log.reset(new CephRocksdbLogger(cct)); + } + + if (priv) { + dout(10) << __func__ << " using custom Env " << priv << dendl; + opt.env = static_cast<rocksdb::Env*>(priv); + } else { + env = opt.env; + } + + opt.env->SetAllowNonOwnerAccess(false); + + // caches + if (!set_cache_flag) { + cache_size = cct->_conf->rocksdb_cache_size; + } + uint64_t row_cache_size = cache_size * cct->_conf->rocksdb_cache_row_ratio; + uint64_t block_cache_size = cache_size - row_cache_size; + + bbt_opts.block_cache = create_block_cache(cct->_conf->rocksdb_cache_type, block_cache_size); + if (!bbt_opts.block_cache) { + return -EINVAL; + } + bbt_opts.block_size = cct->_conf->rocksdb_block_size; + + if (row_cache_size > 0) + opt.row_cache = rocksdb::NewLRUCache(row_cache_size, + cct->_conf->rocksdb_cache_shard_bits); + uint64_t bloom_bits = cct->_conf.get_val<uint64_t>("rocksdb_bloom_bits_per_key"); + if (bloom_bits > 0) { + dout(10) << __func__ << " set bloom filter bits per key to " + << bloom_bits << dendl; + bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits)); + } + using std::placeholders::_1; + if (cct->_conf.with_val<std::string>("rocksdb_index_type", + std::bind(std::equal_to<std::string>(), _1, + "binary_search"))) + bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch; + if (cct->_conf.with_val<std::string>("rocksdb_index_type", + std::bind(std::equal_to<std::string>(), _1, + "hash_search"))) + bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch; + if (cct->_conf.with_val<std::string>("rocksdb_index_type", + std::bind(std::equal_to<std::string>(), _1, + "two_level"))) + bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + if (!bbt_opts.no_block_cache) { + bbt_opts.cache_index_and_filter_blocks = + cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks"); + bbt_opts.cache_index_and_filter_blocks_with_high_priority = + cct->_conf.get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority"); + bbt_opts.pin_l0_filter_and_index_blocks_in_cache = + cct->_conf.get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache"); + } + bbt_opts.partition_filters = cct->_conf.get_val<bool>("rocksdb_partition_filters"); + if (cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size") > 0) + bbt_opts.metadata_block_size = cct->_conf.get_val<Option::size_t>("rocksdb_metadata_block_size"); + + opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts)); + dout(10) << __func__ << " block size " << cct->_conf->rocksdb_block_size + << ", block_cache size " << byte_u_t(block_cache_size) + << ", row_cache size " << byte_u_t(row_cache_size) + << "; shards " + << (1 << cct->_conf->rocksdb_cache_shard_bits) + << ", type " << cct->_conf->rocksdb_cache_type + << dendl; + + opt.merge_operator.reset(new MergeOperatorRouter(*this)); + comparator = opt.comparator; + return 0; +} + +void RocksDBStore::add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h, + size_t shard_idx, rocksdb::ColumnFamilyHandle *handle) { + dout(10) << __func__ << " column_name=" << cf_name << " shard_idx=" << shard_idx << + " hash_l=" << hash_l << " hash_h=" << hash_h << " handle=" << (void*) handle << dendl; + bool exists = cf_handles.count(cf_name) > 0; + auto& column = cf_handles[cf_name]; + if (exists) { + ceph_assert(hash_l == column.hash_l); + ceph_assert(hash_h == column.hash_h); + } else { + ceph_assert(hash_l < hash_h); + column.hash_l = hash_l; + column.hash_h = hash_h; + } + if (column.handles.size() <= shard_idx) + column.handles.resize(shard_idx + 1); + column.handles[shard_idx] = handle; + cf_ids_to_prefix.emplace(handle->GetID(), cf_name); +} + +bool RocksDBStore::is_column_family(const std::string& prefix) { + return cf_handles.count(prefix); +} + +std::string_view RocksDBStore::get_key_hash_view(const prefix_shards& shards, const char* key, const size_t keylen) { + uint32_t hash_l = std::min<uint32_t>(shards.hash_l, keylen); + uint32_t hash_h = std::min<uint32_t>(shards.hash_h, keylen); + return { key + hash_l, hash_h - hash_l }; +} + +rocksdb::ColumnFamilyHandle *RocksDBStore::get_key_cf(const prefix_shards& shards, const char* key, const size_t keylen) { + auto sv = get_key_hash_view(shards, key, keylen); + uint32_t hash = ceph_str_hash_rjenkins(sv.data(), sv.size()); + return shards.handles[hash % shards.handles.size()]; +} + +rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const std::string& key) { + auto iter = cf_handles.find(prefix); + if (iter == cf_handles.end()) { + return nullptr; + } else { + if (iter->second.handles.size() == 1) { + return iter->second.handles[0]; + } else { + return get_key_cf(iter->second, key.data(), key.size()); + } + } +} + +rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const char* key, size_t keylen) { + auto iter = cf_handles.find(prefix); + if (iter == cf_handles.end()) { + return nullptr; + } else { + if (iter->second.handles.size() == 1) { + return iter->second.handles[0]; + } else { + return get_key_cf(iter->second, key, keylen); + } + } +} + +/** + * If the specified IteratorBounds arg has both an upper and a lower bound defined, and they have equal placement hash + * strings, we can be sure that the entire iteration range exists in a single CF. In that case, we return the relevant + * CF handle. In all other cases, we return a nullptr to indicate that the specified bounds cannot necessarily be mapped + * to a single CF. + */ +rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const IteratorBounds& bounds) { + if (!bounds.lower_bound || !bounds.upper_bound) { + return nullptr; + } + auto iter = cf_handles.find(prefix); + ceph_assert(iter != cf_handles.end()); + ceph_assert(iter->second.handles.size() != 1); + if (iter->second.hash_l != 0) { + return nullptr; + } + auto lower_bound_hash_str = get_key_hash_view(iter->second, bounds.lower_bound->data(), bounds.lower_bound->size()); + auto upper_bound_hash_str = get_key_hash_view(iter->second, bounds.upper_bound->data(), bounds.upper_bound->size()); + if (lower_bound_hash_str == upper_bound_hash_str) { + auto key = *bounds.lower_bound; + return get_key_cf(iter->second, key.data(), key.size()); + } else { + return nullptr; + } +} + +/** + * Definition of sharding: + * space-separated list of: column_def [ '=' options ] + * column_def := column_name '(' shard_count ')' + * column_def := column_name '(' shard_count ',' hash_begin '-' ')' + * column_def := column_name '(' shard_count ',' hash_begin '-' hash_end ')' + * I=write_buffer_size=1048576 O(6) m(7,10-) prefix(4,0-10)=disable_auto_compactions=true,max_bytes_for_level_base=1048576 + */ +bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in, + std::vector<ColumnFamily>& sharding_def, + char const* *error_position, + std::string *error_msg) +{ + std::string_view text_def = text_def_in; + char const* error_position_local = nullptr; + std::string error_msg_local; + if (error_position == nullptr) { + error_position = &error_position_local; + } + *error_position = nullptr; + if (error_msg == nullptr) { + error_msg = &error_msg_local; + error_msg->clear(); + } + + sharding_def.clear(); + while (!text_def.empty()) { + std::string_view options; + std::string_view name; + size_t shard_cnt = 1; + uint32_t l_bound = 0; + uint32_t h_bound = std::numeric_limits<uint32_t>::max(); + + std::string_view column_def; + size_t spos = text_def.find(' '); + if (spos == std::string_view::npos) { + column_def = text_def; + text_def = std::string_view(text_def.end(), 0); + } else { + column_def = text_def.substr(0, spos); + text_def = text_def.substr(spos + 1); + } + size_t eqpos = column_def.find('='); + if (eqpos != std::string_view::npos) { + options = column_def.substr(eqpos + 1); + column_def = column_def.substr(0, eqpos); + } + + size_t bpos = column_def.find('('); + if (bpos != std::string_view::npos) { + name = column_def.substr(0, bpos); + const char* nptr = &column_def[bpos + 1]; + char* endptr; + shard_cnt = strtol(nptr, &endptr, 10); + if (nptr == endptr) { + *error_position = nptr; + *error_msg = "expecting integer"; + break; + } + nptr = endptr; + if (*nptr == ',') { + nptr++; + l_bound = strtol(nptr, &endptr, 10); + if (nptr == endptr) { + *error_position = nptr; + *error_msg = "expecting integer"; + break; + } + nptr = endptr; + if (*nptr != '-') { + *error_position = nptr; + *error_msg = "expecting '-'"; + break; + } + nptr++; + h_bound = strtol(nptr, &endptr, 10); + if (nptr == endptr) { + h_bound = std::numeric_limits<uint32_t>::max(); + } + nptr = endptr; + } + if (*nptr != ')') { + *error_position = nptr; + *error_msg = "expecting ')'"; + break; + } + } else { + name = column_def; + } + sharding_def.emplace_back(std::string(name), shard_cnt, std::string(options), l_bound, h_bound); + } + return *error_position == nullptr; +} + +void RocksDBStore::sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def, + std::vector<std::string>& columns) +{ + columns.clear(); + for (size_t i = 0; i < sharding_def.size(); i++) { + if (sharding_def[i].shard_cnt == 1) { + columns.push_back(sharding_def[i].name); + } else { + for (size_t j = 0; j < sharding_def[i].shard_cnt; j++) { + columns.push_back(sharding_def[i].name + "-" + to_string(j)); + } + } + } +} + +int RocksDBStore::create_shards(const rocksdb::Options& opt, + const std::vector<ColumnFamily>& sharding_def) +{ + for (auto& p : sharding_def) { + // copy default CF settings, block cache, merge operators as + // the base for new CF + rocksdb::ColumnFamilyOptions cf_opt(opt); + rocksdb::Status status; + // apply options to column family + int r = update_column_family_options(p.name, p.options, &cf_opt); + if (r != 0) { + return r; + } + for (size_t idx = 0; idx < p.shard_cnt; idx++) { + std::string cf_name; + if (p.shard_cnt == 1) + cf_name = p.name; + else + cf_name = p.name + "-" + to_string(idx); + rocksdb::ColumnFamilyHandle *cf; + status = db->CreateColumnFamily(cf_opt, cf_name, &cf); + if (!status.ok()) { + derr << __func__ << " Failed to create rocksdb column family: " + << cf_name << dendl; + return -EINVAL; + } + // store the new CF handle + add_column_family(p.name, p.hash_l, p.hash_h, idx, cf); + } + } + return 0; +} + +int RocksDBStore::apply_sharding(const rocksdb::Options& opt, + const std::string& sharding_text) +{ + // create and open column families + if (!sharding_text.empty()) { + bool b; + int r; + rocksdb::Status status; + std::vector<ColumnFamily> sharding_def; + char const* error_position; + std::string error_msg; + b = parse_sharding_def(sharding_text, sharding_def, &error_position, &error_msg); + if (!b) { + dout(1) << __func__ << " bad sharding: " << dendl; + dout(1) << __func__ << sharding_text << dendl; + dout(1) << __func__ << std::string(error_position - &sharding_text[0], ' ') << "^" << error_msg << dendl; + return -EINVAL; + } + r = create_shards(opt, sharding_def); + if (r != 0 ) { + derr << __func__ << " create_shards failed error=" << r << dendl; + return r; + } + opt.env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(opt.env, sharding_text, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + return -EIO; + } + } else { + opt.env->DeleteFile(sharding_def_file); + } + return 0; +} + +// linking to rocksdb function defined in options_helper.cc +// it can parse nested params like "nested_opt={opt1=1;opt2=2}" +extern rocksdb::Status rocksdb::StringToMap(const std::string& opts_str, + std::unordered_map<std::string, std::string>* opts_map); + +// Splits column family options from single string into name->value column_opts_map. +// The split is done using RocksDB parser that understands "{" and "}", so it +// properly extracts compound options. +// If non-RocksDB option "block_cache" is defined it is extracted to block_cache_opt. +int RocksDBStore::split_column_family_options(const std::string& options, + std::unordered_map<std::string, std::string>* opt_map, + std::string* block_cache_opt) +{ + dout(20) << __func__ << " options=" << options << dendl; + rocksdb::Status status = rocksdb::StringToMap(options, opt_map); + if (!status.ok()) { + dout(5) << __func__ << " error '" << status.getState() + << "' while parsing options '" << options << "'" << dendl; + return -EINVAL; + } + // if "block_cache" option exists, then move it to separate string + if (auto it = opt_map->find("block_cache"); it != opt_map->end()) { + *block_cache_opt = it->second; + opt_map->erase(it); + } else { + block_cache_opt->clear(); + } + return 0; +} + +// Updates column family options. +// Take options from more_options and apply them to cf_opt. +// Allowed options are exactly the same as allowed for column families in RocksDB. +// Ceph addition is "block_cache" option that is translated to block_cache and +// allows to specialize separate block cache for O column family. +// +// base_name - name of column without shard suffix: "-"+number +// options - additional options to apply +// cf_opt - column family options to update +int RocksDBStore::update_column_family_options(const std::string& base_name, + const std::string& more_options, + rocksdb::ColumnFamilyOptions* cf_opt) +{ + std::unordered_map<std::string, std::string> options_map; + std::string block_cache_opt; + rocksdb::Status status; + int r = split_column_family_options(more_options, &options_map, &block_cache_opt); + if (r != 0) { + dout(5) << __func__ << " failed to parse options; column family=" << base_name + << " options=" << more_options << dendl; + return r; + } + status = rocksdb::GetColumnFamilyOptionsFromMap(*cf_opt, options_map, cf_opt); + if (!status.ok()) { + dout(5) << __func__ << " invalid column family optionsp; column family=" + << base_name << " options=" << more_options << dendl; + dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl; + return -EINVAL; + } + if (base_name != rocksdb::kDefaultColumnFamilyName) { + // default cf has its merge operator defined in load_rocksdb_options, should not override it + install_cf_mergeop(base_name, cf_opt); + } + if (!block_cache_opt.empty()) { + r = apply_block_cache_options(base_name, block_cache_opt, cf_opt); + if (r != 0) { + // apply_block_cache_options already does all necessary douts + return r; + } + } + return 0; +} + +int RocksDBStore::apply_block_cache_options(const std::string& column_name, + const std::string& block_cache_opt, + rocksdb::ColumnFamilyOptions* cf_opt) +{ + rocksdb::Status status; + std::unordered_map<std::string, std::string> cache_options_map; + status = rocksdb::StringToMap(block_cache_opt, &cache_options_map); + if (!status.ok()) { + dout(5) << __func__ << " invalid block cache options; column=" << column_name + << " options=" << block_cache_opt << dendl; + dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl; + return -EINVAL; + } + bool require_new_block_cache = false; + std::string cache_type = cct->_conf->rocksdb_cache_type; + if (const auto it = cache_options_map.find("type"); it != cache_options_map.end()) { + cache_type = it->second; + cache_options_map.erase(it); + require_new_block_cache = true; + } + size_t cache_size = cct->_conf->rocksdb_cache_size; + if (auto it = cache_options_map.find("size"); it != cache_options_map.end()) { + std::string error; + cache_size = strict_iecstrtoll(it->second.c_str(), &error); + if (!error.empty()) { + dout(10) << __func__ << " invalid size: '" << it->second << "'" << dendl; + return -EINVAL; + } + cache_options_map.erase(it); + require_new_block_cache = true; + } + double high_pri_pool_ratio = 0.0; + if (auto it = cache_options_map.find("high_ratio"); it != cache_options_map.end()) { + std::string error; + high_pri_pool_ratio = strict_strtod(it->second.c_str(), &error); + if (!error.empty()) { + dout(10) << __func__ << " invalid high_pri (float): '" << it->second << "'" << dendl; + return -EINVAL; + } + cache_options_map.erase(it); + require_new_block_cache = true; + } + + rocksdb::BlockBasedTableOptions column_bbt_opts; + status = GetBlockBasedTableOptionsFromMap(bbt_opts, cache_options_map, &column_bbt_opts); + if (!status.ok()) { + dout(5) << __func__ << " invalid block cache options; column=" << column_name + << " options=" << block_cache_opt << dendl; + dout(5) << __func__ << " RocksDB error='" << status.getState() << "'" << dendl; + return -EINVAL; + } + std::shared_ptr<rocksdb::Cache> block_cache; + if (column_bbt_opts.no_block_cache) { + // clear all settings except no_block_cache + // rocksdb does not like then + column_bbt_opts = rocksdb::BlockBasedTableOptions(); + column_bbt_opts.no_block_cache = true; + } else { + if (require_new_block_cache) { + block_cache = create_block_cache(cache_type, cache_size, high_pri_pool_ratio); + if (!block_cache) { + dout(5) << __func__ << " failed to create block cache for params: " << block_cache_opt << dendl; + return -EINVAL; + } + } else { + block_cache = bbt_opts.block_cache; + } + } + column_bbt_opts.block_cache = block_cache; + cf_bbt_opts[column_name] = column_bbt_opts; + cf_opt->table_factory.reset(NewBlockBasedTableFactory(cf_bbt_opts[column_name])); + return 0; +} + +int RocksDBStore::verify_sharding(const rocksdb::Options& opt, + std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs, + std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard, + std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs, + std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard) +{ + rocksdb::Status status; + std::string stored_sharding_text; + status = opt.env->FileExists(sharding_def_file); + if (status.ok()) { + status = rocksdb::ReadFileToString(opt.env, + sharding_def_file, + &stored_sharding_text); + if(!status.ok()) { + derr << __func__ << " cannot read from " << sharding_def_file << dendl; + return -EIO; + } + dout(20) << __func__ << " sharding=" << stored_sharding_text << dendl; + } else { + dout(30) << __func__ << " no sharding" << dendl; + //no "sharding_def" present + } + //check if sharding_def matches stored_sharding_def + std::vector<ColumnFamily> stored_sharding_def; + parse_sharding_def(stored_sharding_text, stored_sharding_def); + + std::sort(stored_sharding_def.begin(), stored_sharding_def.end(), + [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } ); + + std::vector<string> rocksdb_cfs; + status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), + path, &rocksdb_cfs); + if (!status.ok()) { + derr << __func__ << " unable to list column families: " << status.ToString() << dendl; + return -EIO; + } + dout(5) << __func__ << " column families from rocksdb: " << rocksdb_cfs << dendl; + + auto emplace_cf = [&] (const RocksDBStore::ColumnFamily& column, + int32_t shard_id, + const std::string& shard_name, + const rocksdb::ColumnFamilyOptions& opt) { + if (std::find(rocksdb_cfs.begin(), rocksdb_cfs.end(), shard_name) != rocksdb_cfs.end()) { + existing_cfs.emplace_back(shard_name, opt); + existing_cfs_shard.emplace_back(shard_id, column); + } else { + missing_cfs.emplace_back(shard_name, opt); + missing_cfs_shard.emplace_back(shard_id, column); + } + }; + + for (auto& column : stored_sharding_def) { + rocksdb::ColumnFamilyOptions cf_opt(opt); + int r = update_column_family_options(column.name, column.options, &cf_opt); + if (r != 0) { + return r; + } + if (column.shard_cnt == 1) { + emplace_cf(column, 0, column.name, cf_opt); + } else { + for (size_t i = 0; i < column.shard_cnt; i++) { + std::string cf_name = column.name + "-" + to_string(i); + emplace_cf(column, i, cf_name, cf_opt); + } + } + } + existing_cfs.emplace_back("default", opt); + + if (existing_cfs.size() != rocksdb_cfs.size()) { + std::vector<std::string> columns_from_stored; + sharding_def_to_columns(stored_sharding_def, columns_from_stored); + derr << __func__ << " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs + << " target columns = " << columns_from_stored << dendl; + return -EIO; + } + return 0; +} + +std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf) +{ + out << "("; + out << cf.name; + out << ","; + out << cf.shard_cnt; + out << ","; + out << cf.hash_l; + out << "-"; + if (cf.hash_h != std::numeric_limits<uint32_t>::max()) { + out << cf.hash_h; + } + out << ","; + out << cf.options; + out << ")"; + return out; +} + +int RocksDBStore::do_open(ostream &out, + bool create_if_missing, + bool open_readonly, + const std::string& sharding_text) +{ + ceph_assert(!(create_if_missing && open_readonly)); + rocksdb::Options opt; + int r = load_rocksdb_options(create_if_missing, opt); + if (r) { + dout(1) << __func__ << " load rocksdb options failed" << dendl; + return r; + } + rocksdb::Status status; + if (create_if_missing) { + status = rocksdb::DB::Open(opt, path, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + r = apply_sharding(opt, sharding_text); + if (r < 0) { + return r; + } + default_cf = db->DefaultColumnFamily(); + } else { + std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs; + std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > existing_cfs_shard; + std::vector<rocksdb::ColumnFamilyDescriptor> missing_cfs; + std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > missing_cfs_shard; + + r = verify_sharding(opt, + existing_cfs, existing_cfs_shard, + missing_cfs, missing_cfs_shard); + if (r < 0) { + return r; + } + std::string sharding_recreate_text; + status = rocksdb::ReadFileToString(opt.env, + sharding_recreate, + &sharding_recreate_text); + bool recreate_mode = status.ok() && sharding_recreate_text == "1"; + + ceph_assert(!recreate_mode || !open_readonly); + if (recreate_mode == false && missing_cfs.size() != 0) { + // We do not accept when there are missing column families, except case that we are during resharding. + // We can get into this case if resharding was interrupted. It gives a chance to continue. + // Opening DB is only allowed in read-only mode. + if (open_readonly == false && + std::find_if(missing_cfs.begin(), missing_cfs.end(), + [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; } + ) != missing_cfs.end()) { + derr << __func__ << " missing column families: " << missing_cfs_shard << dendl; + return -EIO; + } + } + + if (existing_cfs.empty()) { + // no column families + if (open_readonly) { + status = rocksdb::DB::OpenForReadOnly(opt, path, &db); + } else { + status = rocksdb::DB::Open(opt, path, &db); + } + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + default_cf = db->DefaultColumnFamily(); + } else { + std::vector<rocksdb::ColumnFamilyHandle*> handles; + if (open_readonly) { + status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt), + path, existing_cfs, + &handles, &db); + } else { + status = rocksdb::DB::Open(rocksdb::DBOptions(opt), + path, existing_cfs, &handles, &db); + } + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + ceph_assert(existing_cfs.size() == existing_cfs_shard.size() + 1); + ceph_assert(handles.size() == existing_cfs.size()); + dout(10) << __func__ << " existing_cfs=" << existing_cfs.size() << dendl; + for (size_t i = 0; i < existing_cfs_shard.size(); i++) { + add_column_family(existing_cfs_shard[i].second.name, + existing_cfs_shard[i].second.hash_l, + existing_cfs_shard[i].second.hash_h, + existing_cfs_shard[i].first, + handles[i]); + } + default_cf = handles[handles.size() - 1]; + must_close_default_cf = true; + + if (missing_cfs.size() > 0 && + std::find_if(missing_cfs.begin(), missing_cfs.end(), + [](const rocksdb::ColumnFamilyDescriptor& c) { return c.name == resharding_column_lock; } + ) == missing_cfs.end()) + { + dout(10) << __func__ << " missing_cfs=" << missing_cfs.size() << dendl; + ceph_assert(recreate_mode); + ceph_assert(missing_cfs.size() == missing_cfs_shard.size()); + for (size_t i = 0; i < missing_cfs.size(); i++) { + rocksdb::ColumnFamilyHandle *cf; + status = db->CreateColumnFamily(missing_cfs[i].options, missing_cfs[i].name, &cf); + if (!status.ok()) { + derr << __func__ << " Failed to create rocksdb column family: " + << missing_cfs[i].name << dendl; + return -EINVAL; + } + add_column_family(missing_cfs_shard[i].second.name, + missing_cfs_shard[i].second.hash_l, + missing_cfs_shard[i].second.hash_h, + missing_cfs_shard[i].first, + cf); + } + opt.env->DeleteFile(sharding_recreate); + } + } + } + ceph_assert(default_cf != nullptr); + + PerfCountersBuilder plb(cct, "rocksdb", l_rocksdb_first, l_rocksdb_last); + plb.add_u64_counter(l_rocksdb_gets, "get", "Gets"); + plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency"); + plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency"); + plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency"); + plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions"); + plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range"); + plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue"); + plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue"); + plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time"); + plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time"); + plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time"); + plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time, + "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + + if (compact_on_mount) { + derr << "Compacting rocksdb store..." << dendl; + compact(); + derr << "Finished compacting rocksdb store" << dendl; + } + return 0; +} + +int RocksDBStore::_test_init(const string& dir) +{ + rocksdb::Options options; + options.create_if_missing = true; + rocksdb::DB *db; + rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); + delete db; + db = nullptr; + return status.ok() ? 0 : -EIO; +} + +RocksDBStore::~RocksDBStore() +{ + close(); + if (priv) { + delete static_cast<rocksdb::Env*>(priv); + } +} + +void RocksDBStore::close() +{ + // stop compaction thread + compact_queue_lock.lock(); + if (compact_thread.is_started()) { + dout(1) << __func__ << " waiting for compaction thread to stop" << dendl; + compact_queue_stop = true; + compact_queue_cond.notify_all(); + compact_queue_lock.unlock(); + compact_thread.join(); + dout(1) << __func__ << " compaction thread to stopped" << dendl; + } else { + compact_queue_lock.unlock(); + } + + if (logger) { + cct->get_perfcounters_collection()->remove(logger); + delete logger; + logger = nullptr; + } + + // Ensure db is destroyed before dependent db_cache and filterpolicy + for (auto& p : cf_handles) { + for (size_t i = 0; i < p.second.handles.size(); i++) { + db->DestroyColumnFamilyHandle(p.second.handles[i]); + } + } + cf_handles.clear(); + if (must_close_default_cf) { + db->DestroyColumnFamilyHandle(default_cf); + must_close_default_cf = false; + } + default_cf = nullptr; + delete db; + db = nullptr; +} + +int RocksDBStore::repair(std::ostream &out) +{ + rocksdb::Status status; + rocksdb::Options opt; + int r = load_rocksdb_options(false, opt); + if (r) { + dout(1) << __func__ << " load rocksdb options failed" << dendl; + out << "load rocksdb options failed" << std::endl; + return r; + } + //need to save sharding definition, repairDB will delete files it does not know + std::string stored_sharding_text; + status = opt.env->FileExists(sharding_def_file); + if (status.ok()) { + status = rocksdb::ReadFileToString(opt.env, + sharding_def_file, + &stored_sharding_text); + if (!status.ok()) { + stored_sharding_text.clear(); + } + } + dout(10) << __func__ << " stored_sharding: " << stored_sharding_text << dendl; + status = rocksdb::RepairDB(path, opt); + bool repaired = status.ok(); + if (!stored_sharding_text.empty()) { + //recreate markers even if repair failed + opt.env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(opt.env, stored_sharding_text, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + return -1; + } + status = rocksdb::WriteStringToFile(opt.env, "1", + sharding_recreate, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_recreate << dendl; + return -1; + } + // fiinalize sharding recreate + if (do_open(out, false, false)) { + derr << __func__ << " cannot finalize repair" << dendl; + return -1; + } + close(); + } + + if (repaired && status.ok()) { + return 0; + } else { + out << "repair rocksdb failed : " << status.ToString() << std::endl; + return -1; + } +} + +void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) { + std::stringstream ss; + ss.str(s); + std::string item; + while (std::getline(ss, item, delim)) { + elems.push_back(item); + } +} + +bool RocksDBStore::get_property( + const std::string &property, + uint64_t *out) +{ + return db->GetIntProperty(property, out); +} + +int64_t RocksDBStore::estimate_prefix_size(const string& prefix, + const string& key_prefix) +{ + uint64_t size = 0; + uint8_t flags = + //rocksdb::DB::INCLUDE_MEMTABLES | // do not include memtables... + rocksdb::DB::INCLUDE_FILES; + auto p_iter = cf_handles.find(prefix); + if (p_iter != cf_handles.end()) { + for (auto cf : p_iter->second.handles) { + uint64_t s = 0; + string start = key_prefix + string(1, '\x00'); + string limit = key_prefix + string("\xff\xff\xff\xff"); + rocksdb::Range r(start, limit); + db->GetApproximateSizes(cf, &r, 1, &s, flags); + size += s; + } + } else { + string start = combine_strings(prefix , key_prefix); + string limit = combine_strings(prefix , key_prefix + "\xff\xff\xff\xff"); + rocksdb::Range r(start, limit); + db->GetApproximateSizes(default_cf, &r, 1, &size, flags); + } + return size; +} + +void RocksDBStore::get_statistics(Formatter *f) +{ + if (!cct->_conf->rocksdb_perf) { + dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats" + << dendl; + return; + } + + if (cct->_conf->rocksdb_collect_compaction_stats) { + std::string stat_str; + bool status = db->GetProperty("rocksdb.stats", &stat_str); + if (status) { + f->open_object_section("rocksdb_statistics"); + f->dump_string("rocksdb_compaction_statistics", ""); + vector<string> stats; + split_stats(stat_str, '\n', stats); + for (auto st :stats) { + f->dump_string("", st); + } + f->close_section(); + } + } + if (cct->_conf->rocksdb_collect_extended_stats) { + if (dbstats) { + f->open_object_section("rocksdb_extended_statistics"); + string stat_str = dbstats->ToString(); + vector<string> stats; + split_stats(stat_str, '\n', stats); + f->dump_string("rocksdb_extended_statistics", ""); + for (auto st :stats) { + f->dump_string(".", st); + } + f->close_section(); + } + f->open_object_section("rocksdbstore_perf_counters"); + logger->dump_formatted(f,0); + f->close_section(); + } + if (cct->_conf->rocksdb_collect_memory_stats) { + f->open_object_section("rocksdb_memtable_statistics"); + std::string str; + if (!bbt_opts.no_block_cache) { + str.append(stringify(bbt_opts.block_cache->GetUsage())); + f->dump_string("block_cache_usage", str.data()); + str.clear(); + str.append(stringify(bbt_opts.block_cache->GetPinnedUsage())); + f->dump_string("block_cache_pinned_blocks_usage", str); + str.clear(); + } + db->GetProperty("rocksdb.cur-size-all-mem-tables", &str); + f->dump_string("rocksdb_memtable_usage", str); + str.clear(); + db->GetProperty("rocksdb.estimate-table-readers-mem", &str); + f->dump_string("rocksdb_index_filter_blocks_usage", str); + f->close_section(); + } +} + +struct RocksDBStore::RocksWBHandler: public rocksdb::WriteBatch::Handler { + RocksWBHandler(const RocksDBStore& db) : db(db) {} + const RocksDBStore& db; + std::stringstream seen; + int num_seen = 0; + + void dump(const char* op_name, + uint32_t column_family_id, + const rocksdb::Slice& key_in, + const rocksdb::Slice* value = nullptr) { + string prefix; + string key; + ssize_t size = value ? value->size() : -1; + seen << std::endl << op_name << "("; + + if (column_family_id == 0) { + db.split_key(key_in, &prefix, &key); + } else { + auto it = db.cf_ids_to_prefix.find(column_family_id); + ceph_assert(it != db.cf_ids_to_prefix.end()); + prefix = it->second; + key = key_in.ToString(); + } + seen << " prefix = " << prefix; + seen << " key = " << pretty_binary_string(key); + if (size != -1) + seen << " value size = " << std::to_string(size); + seen << ")"; + num_seen++; + } + void Put(const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + dump("Put", 0, key, &value); + } + rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + dump("PutCF", column_family_id, key, &value); + return rocksdb::Status::OK(); + } + void SingleDelete(const rocksdb::Slice& key) override { + dump("SingleDelete", 0, key); + } + rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + dump("SingleDeleteCF", column_family_id, key); + return rocksdb::Status::OK(); + } + void Delete(const rocksdb::Slice& key) override { + dump("Delete", 0, key); + } + rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + dump("DeleteCF", column_family_id, key); + return rocksdb::Status::OK(); + } + void Merge(const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + dump("Merge", 0, key, &value); + } + rocksdb::Status MergeCF(uint32_t column_family_id, const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + dump("MergeCF", column_family_id, key, &value); + return rocksdb::Status::OK(); + } + bool Continue() override { return num_seen < 50; } +}; + +int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t) +{ + // enable rocksdb breakdown + // considering performance overhead, default is disabled + if (cct->_conf->rocksdb_perf) { + rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); + rocksdb::get_perf_context()->Reset(); + } + + RocksDBTransactionImpl * _t = + static_cast<RocksDBTransactionImpl *>(t.get()); + woptions.disableWAL = disableWAL; + lgeneric_subdout(cct, rocksdb, 30) << __func__; + RocksWBHandler bat_txc(*this); + _t->bat.Iterate(&bat_txc); + *_dout << " Rocksdb transaction: " << bat_txc.seen.str() << dendl; + + rocksdb::Status s = db->Write(woptions, &_t->bat); + if (!s.ok()) { + RocksWBHandler rocks_txc(*this); + _t->bat.Iterate(&rocks_txc); + derr << __func__ << " error: " << s.ToString() << " code = " << s.code() + << " Rocksdb transaction: " << rocks_txc.seen.str() << dendl; + } + + if (cct->_conf->rocksdb_perf) { + utime_t write_memtable_time; + utime_t write_delay_time; + utime_t write_wal_time; + utime_t write_pre_and_post_process_time; + write_wal_time.set_from_double( + static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000); + write_memtable_time.set_from_double( + static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000); + write_delay_time.set_from_double( + static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000); + write_pre_and_post_process_time.set_from_double( + static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000); + logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); + logger->tinc(l_rocksdb_write_delay_time, write_delay_time); + logger->tinc(l_rocksdb_write_wal_time, write_wal_time); + logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); + } + + return s.ok() ? 0 : -1; +} + +int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + rocksdb::WriteOptions woptions; + woptions.sync = false; + + int result = submit_common(woptions, t); + + utime_t lat = ceph_clock_now() - start; + logger->tinc(l_rocksdb_submit_latency, lat); + + return result; +} + +int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + rocksdb::WriteOptions woptions; + // if disableWAL, sync can't set + woptions.sync = !disableWAL; + + int result = submit_common(woptions, t); + + utime_t lat = ceph_clock_now() - start; + logger->tinc(l_rocksdb_submit_sync_latency, lat); + + return result; +} + +RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) +{ + db = _db; +} + +void RocksDBStore::RocksDBTransactionImpl::put_bat( + rocksdb::WriteBatch& bat, + rocksdb::ColumnFamilyHandle *cf, + const string &key, + const bufferlist &to_set_bl) +{ + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat.Put(cf, + rocksdb::Slice(key), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), + to_set_bl.length())); + } else { + rocksdb::Slice key_slice(key); + vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers()); + bat.Put(cf, + rocksdb::SliceParts(&key_slice, 1), + prepare_sliceparts(to_set_bl, &value_slices)); + } +} + +void RocksDBStore::RocksDBTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + auto cf = db->get_cf_handle(prefix, k); + if (cf) { + put_bat(bat, cf, k, to_set_bl); + } else { + string key = combine_strings(prefix, k); + put_bat(bat, db->default_cf, key, to_set_bl); + } +} + +void RocksDBStore::RocksDBTransactionImpl::set( + const string &prefix, + const char *k, size_t keylen, + const bufferlist &to_set_bl) +{ + auto cf = db->get_cf_handle(prefix, k, keylen); + if (cf) { + string key(k, keylen); // fixme? + put_bat(bat, cf, key, to_set_bl); + } else { + string key; + combine_strings(prefix, k, keylen, &key); + put_bat(bat, cf, key, to_set_bl); + } +} + +void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + auto cf = db->get_cf_handle(prefix, k); + if (cf) { + bat.Delete(cf, rocksdb::Slice(k)); + } else { + bat.Delete(db->default_cf, combine_strings(prefix, k)); + } +} + +void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, + const char *k, + size_t keylen) +{ + auto cf = db->get_cf_handle(prefix, k, keylen); + if (cf) { + bat.Delete(cf, rocksdb::Slice(k, keylen)); + } else { + string key; + combine_strings(prefix, k, keylen, &key); + bat.Delete(db->default_cf, rocksdb::Slice(key)); + } +} + +void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix, + const string &k) +{ + auto cf = db->get_cf_handle(prefix, k); + if (cf) { + bat.SingleDelete(cf, k); + } else { + bat.SingleDelete(db->default_cf, combine_strings(prefix, k)); + } +} + +void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + auto p_iter = db->cf_handles.find(prefix); + if (p_iter == db->cf_handles.end()) { + uint64_t cnt = db->delete_range_threshold; + bat.SetSavePoint(); + auto it = db->get_iterator(prefix); + for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) { + bat.Delete(db->default_cf, combine_strings(prefix, it->key())); + } + if (cnt == 0) { + bat.RollbackToSavePoint(); + string endprefix = prefix; + endprefix.push_back('\x01'); + bat.DeleteRange(db->default_cf, + combine_strings(prefix, string()), + combine_strings(endprefix, string())); + } else { + bat.PopSavePoint(); + } + } else { + ceph_assert(p_iter->second.handles.size() >= 1); + for (auto cf : p_iter->second.handles) { + uint64_t cnt = db->delete_range_threshold; + bat.SetSavePoint(); + auto it = db->new_shard_iterator(cf); + for (it->SeekToFirst(); it->Valid() && (--cnt) != 0; it->Next()) { + bat.Delete(cf, it->key()); + } + if (cnt == 0) { + bat.RollbackToSavePoint(); + string endprefix = "\xff\xff\xff\xff"; // FIXME: this is cheating... + bat.DeleteRange(cf, string(), endprefix); + } else { + bat.PopSavePoint(); + } + } + } +} + +void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, + const string &start, + const string &end) +{ + ldout(db->cct, 10) << __func__ << " enter start=" << start + << " end=" << end << dendl; + auto p_iter = db->cf_handles.find(prefix); + if (p_iter == db->cf_handles.end()) { + uint64_t cnt = db->delete_range_threshold; + bat.SetSavePoint(); + auto it = db->get_iterator(prefix); + for (it->lower_bound(start); + it->valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0; + it->next()) { + bat.Delete(db->default_cf, combine_strings(prefix, it->key())); + } + if (cnt == 0) { + ldout(db->cct, 10) << __func__ << " p_iter == end(), resorting to DeleteRange" + << dendl; + bat.RollbackToSavePoint(); + bat.DeleteRange(db->default_cf, + rocksdb::Slice(combine_strings(prefix, start)), + rocksdb::Slice(combine_strings(prefix, end))); + } else { + bat.PopSavePoint(); + } + } else { + ceph_assert(p_iter->second.handles.size() >= 1); + for (auto cf : p_iter->second.handles) { + uint64_t cnt = db->delete_range_threshold; + bat.SetSavePoint(); + rocksdb::Iterator* it = db->new_shard_iterator(cf); + ceph_assert(it != nullptr); + for (it->Seek(start); + it->Valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0; + it->Next()) { + bat.Delete(cf, it->key()); + } + if (cnt == 0) { + ldout(db->cct, 10) << __func__ << " p_iter != end(), resorting to DeleteRange" + << dendl; + bat.RollbackToSavePoint(); + bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end)); + } else { + bat.PopSavePoint(); + } + delete it; + } + } + ldout(db->cct, 10) << __func__ << " end" << dendl; +} + +void RocksDBStore::RocksDBTransactionImpl::merge( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + auto cf = db->get_cf_handle(prefix, k); + if (cf) { + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat.Merge( + cf, + rocksdb::Slice(k), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); + } else { + // make a copy + rocksdb::Slice key_slice(k); + vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers()); + bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1), + prepare_sliceparts(to_set_bl, &value_slices)); + } + } else { + string key = combine_strings(prefix, k); + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat.Merge( + db->default_cf, + rocksdb::Slice(key), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); + } else { + // make a copy + rocksdb::Slice key_slice(key); + vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers()); + bat.Merge( + db->default_cf, + rocksdb::SliceParts(&key_slice, 1), + prepare_sliceparts(to_set_bl, &value_slices)); + } + } +} + +int RocksDBStore::get( + const string &prefix, + const std::set<string> &keys, + std::map<string, bufferlist> *out) +{ + rocksdb::PinnableSlice value; + utime_t start = ceph_clock_now(); + if (cf_handles.count(prefix) > 0) { + for (auto& key : keys) { + auto cf_handle = get_cf_handle(prefix, key); + auto status = db->Get(rocksdb::ReadOptions(), + cf_handle, + rocksdb::Slice(key), + &value); + if (status.ok()) { + (*out)[key].append(value.data(), value.size()); + } else if (status.IsIOError()) { + ceph_abort_msg(status.getState()); + } + value.Reset(); + } + } else { + for (auto& key : keys) { + string k = combine_strings(prefix, key); + auto status = db->Get(rocksdb::ReadOptions(), + default_cf, + rocksdb::Slice(k), + &value); + if (status.ok()) { + (*out)[key].append(value.data(), value.size()); + } else if (status.IsIOError()) { + ceph_abort_msg(status.getState()); + } + value.Reset(); + } + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_rocksdb_gets); + logger->tinc(l_rocksdb_get_latency, lat); + return 0; +} + +int RocksDBStore::get( + const string &prefix, + const string &key, + bufferlist *out) +{ + ceph_assert(out && (out->length() == 0)); + utime_t start = ceph_clock_now(); + int r = 0; + rocksdb::PinnableSlice value; + rocksdb::Status s; + auto cf = get_cf_handle(prefix, key); + if (cf) { + s = db->Get(rocksdb::ReadOptions(), + cf, + rocksdb::Slice(key), + &value); + } else { + string k = combine_strings(prefix, key); + s = db->Get(rocksdb::ReadOptions(), + default_cf, + rocksdb::Slice(k), + &value); + } + if (s.ok()) { + out->append(value.data(), value.size()); + } else if (s.IsNotFound()) { + r = -ENOENT; + } else { + ceph_abort_msg(s.getState()); + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_rocksdb_gets); + logger->tinc(l_rocksdb_get_latency, lat); + return r; +} + +int RocksDBStore::get( + const string& prefix, + const char *key, + size_t keylen, + bufferlist *out) +{ + ceph_assert(out && (out->length() == 0)); + utime_t start = ceph_clock_now(); + int r = 0; + rocksdb::PinnableSlice value; + rocksdb::Status s; + auto cf = get_cf_handle(prefix, key, keylen); + if (cf) { + s = db->Get(rocksdb::ReadOptions(), + cf, + rocksdb::Slice(key, keylen), + &value); + } else { + string k; + combine_strings(prefix, key, keylen, &k); + s = db->Get(rocksdb::ReadOptions(), + default_cf, + rocksdb::Slice(k), + &value); + } + if (s.ok()) { + out->append(value.data(), value.size()); + } else if (s.IsNotFound()) { + r = -ENOENT; + } else { + ceph_abort_msg(s.getState()); + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_rocksdb_gets); + logger->tinc(l_rocksdb_get_latency, lat); + return r; +} + +int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) +{ + size_t prefix_len = 0; + + // Find separator inside Slice + char* separator = (char*) memchr(in.data(), 0, in.size()); + if (separator == NULL) + return -EINVAL; + prefix_len = size_t(separator - in.data()); + if (prefix_len >= in.size()) + return -EINVAL; + + // Fetch prefix and/or key directly from Slice + if (prefix) + *prefix = string(in.data(), prefix_len); + if (key) + *key = string(separator+1, in.size()-prefix_len-1); + return 0; +} + +void RocksDBStore::compact() +{ + logger->inc(l_rocksdb_compact); + rocksdb::CompactRangeOptions options; + db->CompactRange(options, default_cf, nullptr, nullptr); + for (auto cf : cf_handles) { + for (auto shard_cf : cf.second.handles) { + db->CompactRange( + options, + shard_cf, + nullptr, nullptr); + } + } +} + +void RocksDBStore::compact_thread_entry() +{ + std::unique_lock l{compact_queue_lock}; + dout(10) << __func__ << " enter" << dendl; + while (!compact_queue_stop) { + if (!compact_queue.empty()) { + auto range = compact_queue.front(); + compact_queue.pop_front(); + logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); + l.unlock(); + logger->inc(l_rocksdb_compact_range); + if (range.first.empty() && range.second.empty()) { + compact(); + } else { + compact_range(range.first, range.second); + } + l.lock(); + continue; + } + dout(10) << __func__ << " waiting" << dendl; + compact_queue_cond.wait(l); + } + dout(10) << __func__ << " exit" << dendl; +} + +void RocksDBStore::compact_range_async(const string& start, const string& end) +{ + std::lock_guard l(compact_queue_lock); + + // try to merge adjacent ranges. this is O(n), but the queue should + // be short. note that we do not cover all overlap cases and merge + // opportunities here, but we capture the ones we currently need. + list< pair<string,string> >::iterator p = compact_queue.begin(); + while (p != compact_queue.end()) { + if (p->first == start && p->second == end) { + // dup; no-op + return; + } + if (start <= p->first && p->first <= end) { + // new region crosses start of existing range + // select right bound that is bigger + compact_queue.push_back(make_pair(start, end > p->second ? end : p->second)); + compact_queue.erase(p); + logger->inc(l_rocksdb_compact_queue_merge); + break; + } + if (start <= p->second && p->second <= end) { + // new region crosses end of existing range + //p->first < p->second and p->second <= end, so p->first <= end. + //But we break if previous condition, so start > p->first. + compact_queue.push_back(make_pair(p->first, end)); + compact_queue.erase(p); + logger->inc(l_rocksdb_compact_queue_merge); + break; + } + ++p; + } + if (p == compact_queue.end()) { + // no merge, new entry. + compact_queue.push_back(make_pair(start, end)); + logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); + } + compact_queue_cond.notify_all(); + if (!compact_thread.is_started()) { + compact_thread.create("rstore_compact"); + } +} +bool RocksDBStore::check_omap_dir(string &omap_dir) +{ + rocksdb::Options options; + options.create_if_missing = true; + rocksdb::DB *db; + rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); + delete db; + db = nullptr; + return status.ok(); +} + +void RocksDBStore::compact_range(const string& start, const string& end) +{ + rocksdb::CompactRangeOptions options; + rocksdb::Slice cstart(start); + rocksdb::Slice cend(end); + string prefix_start, key_start; + string prefix_end, key_end; + string key_highest = "\xff\xff\xff\xff"; //cheating + string key_lowest = ""; + + auto compact_range = [&] (const decltype(cf_handles)::iterator column_it, + const std::string& start, + const std::string& end) { + rocksdb::Slice cstart(start); + rocksdb::Slice cend(end); + for (const auto& shard_it : column_it->second.handles) { + db->CompactRange(options, shard_it, &cstart, &cend); + } + }; + db->CompactRange(options, default_cf, &cstart, &cend); + split_key(cstart, &prefix_start, &key_start); + split_key(cend, &prefix_end, &key_end); + if (prefix_start == prefix_end) { + const auto& column = cf_handles.find(prefix_start); + if (column != cf_handles.end()) { + compact_range(column, key_start, key_end); + } + } else { + auto column = cf_handles.find(prefix_start); + if (column != cf_handles.end()) { + compact_range(column, key_start, key_highest); + ++column; + } + const auto& column_end = cf_handles.find(prefix_end); + while (column != column_end) { + compact_range(column, key_lowest, key_highest); + column++; + } + if (column != cf_handles.end()) { + compact_range(column, key_lowest, key_end); + } + } +} + +RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() +{ + delete dbiter; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() +{ + dbiter->SeekToFirst(); + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) +{ + rocksdb::Slice slice_prefix(prefix); + dbiter->Seek(slice_prefix); + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() +{ + dbiter->SeekToLast(); + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) +{ + string limit = past_prefix(prefix); + rocksdb::Slice slice_limit(limit); + dbiter->Seek(slice_limit); + + if (!dbiter->Valid()) { + dbiter->SeekToLast(); + } else { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) +{ + lower_bound(prefix, after); + if (valid()) { + pair<string,string> key = raw_key(); + if (key.first == prefix && key.second == after) + next(); + } + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) +{ + string bound = combine_strings(prefix, to); + rocksdb::Slice slice_bound(bound); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; +} +bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() +{ + return dbiter->Valid(); +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() +{ + if (valid()) { + dbiter->Next(); + } + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() +{ + if (valid()) { + dbiter->Prev(); + } + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() +{ + string out_key; + split_key(dbiter->key(), 0, &out_key); + return out_key; +} +pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() +{ + string prefix, key; + split_key(dbiter->key(), &prefix, &key); + return make_pair(prefix, key); +} + +bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) { + // Look for "prefix\0" right in rocksb::Slice + rocksdb::Slice key = dbiter->key(); + if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { + return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; + } else { + return false; + } +} + +bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() +{ + return to_bufferlist(dbiter->value()); +} + +size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size() +{ + return dbiter->key().size(); +} + +size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size() +{ + return dbiter->value().size(); +} + +bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr() +{ + rocksdb::Slice val = dbiter->value(); + return bufferptr(val.data(), val.size()); +} + +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() +{ + return dbiter->status().ok() ? 0 : -1; +} + +string RocksDBStore::past_prefix(const string &prefix) +{ + string limit = prefix; + limit.push_back(1); + return limit; +} + +class CFIteratorImpl : public KeyValueDB::IteratorImpl { +protected: + string prefix; + rocksdb::Iterator *dbiter; + const KeyValueDB::IteratorBounds bounds; + const rocksdb::Slice iterate_lower_bound; + const rocksdb::Slice iterate_upper_bound; +public: + explicit CFIteratorImpl(const RocksDBStore* db, + const std::string& p, + rocksdb::ColumnFamilyHandle* cf, + KeyValueDB::IteratorBounds bounds_) + : prefix(p), bounds(std::move(bounds_)), + iterate_lower_bound(make_slice(bounds.lower_bound)), + iterate_upper_bound(make_slice(bounds.upper_bound)) + { + auto options = rocksdb::ReadOptions(); + if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) { + if (bounds.lower_bound) { + options.iterate_lower_bound = &iterate_lower_bound; + } + if (bounds.upper_bound) { + options.iterate_upper_bound = &iterate_upper_bound; + } + } + dbiter = db->db->NewIterator(options, cf); + } + ~CFIteratorImpl() { + delete dbiter; + } + + int seek_to_first() override { + dbiter->SeekToFirst(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last() override { + dbiter->SeekToLast(); + return dbiter->status().ok() ? 0 : -1; + } + int upper_bound(const string &after) override { + lower_bound(after); + if (valid() && (key() == after)) { + next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int lower_bound(const string &to) override { + rocksdb::Slice slice_bound(to); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; + } + int next() override { + if (valid()) { + dbiter->Next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int prev() override { + if (valid()) { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; + } + bool valid() override { + return dbiter->Valid(); + } + string key() override { + return dbiter->key().ToString(); + } + std::pair<std::string, std::string> raw_key() override { + return make_pair(prefix, key()); + } + bufferlist value() override { + return to_bufferlist(dbiter->value()); + } + bufferptr value_as_ptr() override { + rocksdb::Slice val = dbiter->value(); + return bufferptr(val.data(), val.size()); + } + int status() override { + return dbiter->status().ok() ? 0 : -1; + } +}; + + +//merge column iterators and rest iterator +class WholeMergeIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl { +private: + RocksDBStore* db; + KeyValueDB::WholeSpaceIterator main; + std::map<std::string, KeyValueDB::Iterator> shards; + std::map<std::string, KeyValueDB::Iterator>::iterator current_shard; + enum {on_main, on_shard} smaller; + +public: + WholeMergeIteratorImpl(RocksDBStore* db) + : db(db) + , main(db->get_default_cf_iterator()) + { + for (auto& e : db->cf_handles) { + shards.emplace(e.first, db->get_iterator(e.first)); + } + } + + // returns true if value in main is smaller then in shards + // invalid is larger then actual value + bool is_main_smaller() { + if (main->valid()) { + if (current_shard != shards.end()) { + auto main_rk = main->raw_key(); + ceph_assert(current_shard->second->valid()); + auto shards_rk = current_shard->second->raw_key(); + if (main_rk.first < shards_rk.first) + return true; + if (main_rk.first > shards_rk.first) + return false; + return main_rk.second < shards_rk.second; + } else { + return true; + } + } else { + if (current_shard != shards.end()) { + return false; + } else { + //this means that neither is valid + //we select main to be smaller, so valid() will signal properly + return true; + } + } + } + + int seek_to_first() override { + int r0 = main->seek_to_first(); + int r1 = 0; + // find first shard that has some data + current_shard = shards.begin(); + while (current_shard != shards.end()) { + r1 = current_shard->second->seek_to_first(); + if (r1 != 0 || current_shard->second->valid()) { + //this is the first shard that will yield some keys + break; + } + ++current_shard; + } + smaller = is_main_smaller() ? on_main : on_shard; + return r0 == 0 && r1 == 0 ? 0 : -1; + } + + int seek_to_first(const std::string &prefix) override { + int r0 = main->seek_to_first(prefix); + int r1 = 0; + // find first shard that has some data + current_shard = shards.lower_bound(prefix); + while (current_shard != shards.end()) { + r1 = current_shard->second->seek_to_first(); + if (r1 != 0 || current_shard->second->valid()) { + //this is the first shard that will yield some keys + break; + } + ++current_shard; + } + smaller = is_main_smaller() ? on_main : on_shard; + return r0 == 0 && r1 == 0 ? 0 : -1; + }; + + int seek_to_last() override { + int r0 = main->seek_to_last(); + int r1 = 0; + r1 = shards_seek_to_last(); + //if we have 2 candidates, we need to select + if (main->valid()) { + if (shards_valid()) { + if (is_main_smaller()) { + smaller = on_shard; + main->next(); + } else { + smaller = on_main; + shards_next(); + } + } else { + smaller = on_main; + } + } else { + if (shards_valid()) { + smaller = on_shard; + } else { + smaller = on_main; + } + } + return r0 == 0 && r1 == 0 ? 0 : -1; + } + + int seek_to_last(const std::string &prefix) override { + int r0 = main->seek_to_last(prefix); + int r1 = 0; + // find last shard that has some data + bool found = false; + current_shard = shards.lower_bound(prefix); + while (current_shard != shards.begin()) { + r1 = current_shard->second->seek_to_last(); + if (r1 != 0) + break; + if (current_shard->second->valid()) { + found = true; + break; + } + } + //if we have 2 candidates, we need to select + if (main->valid() && found) { + if (is_main_smaller()) { + main->next(); + } else { + shards_next(); + } + } + if (!found) { + //set shards state that properly represents eof + current_shard = shards.end(); + } + smaller = is_main_smaller() ? on_main : on_shard; + return r0 == 0 && r1 == 0 ? 0 : -1; + } + + int upper_bound(const std::string &prefix, const std::string &after) override { + int r0 = main->upper_bound(prefix, after); + int r1 = 0; + if (r0 != 0) + return r0; + current_shard = shards.lower_bound(prefix); + if (current_shard != shards.end()) { + bool located = false; + if (current_shard->first == prefix) { + r1 = current_shard->second->upper_bound(after); + if (r1 != 0) + return r1; + if (current_shard->second->valid()) { + located = true; + } + } + if (!located) { + while (current_shard != shards.end()) { + r1 = current_shard->second->seek_to_first(); + if (r1 != 0) + return r1; + if (current_shard->second->valid()) + break; + ++current_shard; + } + } + } + smaller = is_main_smaller() ? on_main : on_shard; + return 0; + } + + int lower_bound(const std::string &prefix, const std::string &to) override { + int r0 = main->lower_bound(prefix, to); + int r1 = 0; + if (r0 != 0) + return r0; + current_shard = shards.lower_bound(prefix); + if (current_shard != shards.end()) { + bool located = false; + if (current_shard->first == prefix) { + r1 = current_shard->second->lower_bound(to); + if (r1 != 0) + return r1; + if (current_shard->second->valid()) { + located = true; + } + } + if (!located) { + while (current_shard != shards.end()) { + r1 = current_shard->second->seek_to_first(); + if (r1 != 0) + return r1; + if (current_shard->second->valid()) + break; + ++current_shard; + } + } + } + smaller = is_main_smaller() ? on_main : on_shard; + return 0; + } + + bool valid() override { + if (smaller == on_main) { + return main->valid(); + } else { + if (current_shard == shards.end()) + return false; + return current_shard->second->valid(); + } + }; + + int next() override { + int r; + if (smaller == on_main) { + r = main->next(); + } else { + r = shards_next(); + } + if (r != 0) + return r; + smaller = is_main_smaller() ? on_main : on_shard; + return 0; + } + + int prev() override { + int r; + bool main_was_valid = false; + if (main->valid()) { + main_was_valid = true; + r = main->prev(); + } else { + r = main->seek_to_last(); + } + if (r != 0) + return r; + + bool shards_was_valid = false; + if (shards_valid()) { + shards_was_valid = true; + r = shards_prev(); + } else { + r = shards_seek_to_last(); + } + if (r != 0) + return r; + + if (!main->valid() && !shards_valid()) { + //end, no previous. set marker so valid() can work + smaller = on_main; + return 0; + } + + //if 1 is valid, select it + //if 2 are valid select larger and advance the other + if (main->valid()) { + if (shards_valid()) { + if (is_main_smaller()) { + smaller = on_shard; + if (main_was_valid) { + if (main->valid()) { + r = main->next(); + } else { + r = main->seek_to_first(); + } + } else { + //if we have resurrected main, kill it + if (main->valid()) { + main->next(); + } + } + } else { + smaller = on_main; + if (shards_was_valid) { + if (shards_valid()) { + r = shards_next(); + } else { + r = shards_seek_to_first(); + } + } else { + //if we have resurected shards, kill it + if (shards_valid()) { + shards_next(); + } + } + } + } else { + smaller = on_main; + r = shards_seek_to_first(); + } + } else { + smaller = on_shard; + r = main->seek_to_first(); + } + return r; + } + + std::string key() override + { + if (smaller == on_main) { + return main->key(); + } else { + return current_shard->second->key(); + } + } + + std::pair<std::string,std::string> raw_key() override + { + if (smaller == on_main) { + return main->raw_key(); + } else { + return { current_shard->first, current_shard->second->key() }; + } + } + + bool raw_key_is_prefixed(const std::string &prefix) override + { + if (smaller == on_main) { + return main->raw_key_is_prefixed(prefix); + } else { + return current_shard->first == prefix; + } + } + + ceph::buffer::list value() override + { + if (smaller == on_main) { + return main->value(); + } else { + return current_shard->second->value(); + } + } + + int status() override + { + //because we already had to inspect key, it must be ok + return 0; + } + + size_t key_size() override + { + if (smaller == on_main) { + return main->key_size(); + } else { + return current_shard->second->key().size(); + } + } + size_t value_size() override + { + if (smaller == on_main) { + return main->value_size(); + } else { + return current_shard->second->value().length(); + } + } + + int shards_valid() { + if (current_shard == shards.end()) + return false; + return current_shard->second->valid(); + } + + int shards_next() { + if (current_shard == shards.end()) { + //illegal to next() on !valid() + return -1; + } + int r = 0; + r = current_shard->second->next(); + if (r != 0) + return r; + if (current_shard->second->valid()) + return 0; + //current shard exhaused, search for key + ++current_shard; + while (current_shard != shards.end()) { + r = current_shard->second->seek_to_first(); + if (r != 0) + return r; + if (current_shard->second->valid()) + break; + ++current_shard; + } + //either we found key or not, but it is success + return 0; + } + + int shards_prev() { + if (current_shard == shards.end()) { + //illegal to prev() on !valid() + return -1; + } + int r = current_shard->second->prev(); + while (r == 0) { + if (current_shard->second->valid()) { + break; + } + if (current_shard == shards.begin()) { + //we have reached pre-first element + //this makes it !valid(), but guarantees next() moves to first element + break; + } + --current_shard; + r = current_shard->second->seek_to_last(); + } + return r; + } + + int shards_seek_to_last() { + int r = 0; + current_shard = shards.end(); + if (current_shard == shards.begin()) { + //no shards at all + return 0; + } + while (current_shard != shards.begin()) { + --current_shard; + r = current_shard->second->seek_to_last(); + if (r != 0) + return r; + if (current_shard->second->valid()) { + return 0; + } + } + //no keys at all + current_shard = shards.end(); + return r; + } + + int shards_seek_to_first() { + int r = 0; + current_shard = shards.begin(); + while (current_shard != shards.end()) { + r = current_shard->second->seek_to_first(); + if (r != 0) + break; + if (current_shard->second->valid()) { + //this is the first shard that will yield some keys + break; + } + ++current_shard; + } + return r; + } +}; + +class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl { +private: + struct KeyLess { + private: + const rocksdb::Comparator* comparator; + public: + KeyLess(const rocksdb::Comparator* comparator) : comparator(comparator) { }; + + bool operator()(rocksdb::Iterator* a, rocksdb::Iterator* b) const + { + if (a->Valid()) { + if (b->Valid()) { + return comparator->Compare(a->key(), b->key()) < 0; + } else { + return true; + } + } else { + if (b->Valid()) { + return false; + } else { + return false; + } + } + } + }; + + const RocksDBStore* db; + KeyLess keyless; + string prefix; + const KeyValueDB::IteratorBounds bounds; + const rocksdb::Slice iterate_lower_bound; + const rocksdb::Slice iterate_upper_bound; + std::vector<rocksdb::Iterator*> iters; +public: + explicit ShardMergeIteratorImpl(const RocksDBStore* db, + const std::string& prefix, + const std::vector<rocksdb::ColumnFamilyHandle*>& shards, + KeyValueDB::IteratorBounds bounds_) + : db(db), keyless(db->comparator), prefix(prefix), bounds(std::move(bounds_)), + iterate_lower_bound(make_slice(bounds.lower_bound)), + iterate_upper_bound(make_slice(bounds.upper_bound)) + { + iters.reserve(shards.size()); + auto options = rocksdb::ReadOptions(); + if (db->cct->_conf->osd_rocksdb_iterator_bounds_enabled) { + if (bounds.lower_bound) { + options.iterate_lower_bound = &iterate_lower_bound; + } + if (bounds.upper_bound) { + options.iterate_upper_bound = &iterate_upper_bound; + } + } + for (auto& s : shards) { + iters.push_back(db->db->NewIterator(options, s)); + } + } + ~ShardMergeIteratorImpl() { + for (auto& it : iters) { + delete it; + } + } + int seek_to_first() override { + for (auto& it : iters) { + it->SeekToFirst(); + if (!it->status().ok()) { + return -1; + } + } + //all iterators seeked, sort + std::sort(iters.begin(), iters.end(), keyless); + return 0; + } + int seek_to_last() override { + for (auto& it : iters) { + it->SeekToLast(); + if (!it->status().ok()) { + return -1; + } + } + for (size_t i = 1; i < iters.size(); i++) { + if (iters[0]->Valid()) { + if (iters[i]->Valid()) { + if (keyless(iters[0], iters[i])) { + swap(iters[0], iters[i]); + } + } else { + //iters[i] empty + } + } else { + if (iters[i]->Valid()) { + swap(iters[0], iters[i]); + } + } + //it might happen that cf was empty + if (iters[i]->Valid()) { + iters[i]->Next(); + } + } + //no need to sort, as at most 1 iterator is valid now + return 0; + } + int upper_bound(const string &after) override { + rocksdb::Slice slice_bound(after); + for (auto& it : iters) { + it->Seek(slice_bound); + if (it->Valid() && it->key() == after) { + it->Next(); + } + if (!it->status().ok()) { + return -1; + } + } + std::sort(iters.begin(), iters.end(), keyless); + return 0; + } + int lower_bound(const string &to) override { + rocksdb::Slice slice_bound(to); + for (auto& it : iters) { + it->Seek(slice_bound); + if (!it->status().ok()) { + return -1; + } + } + std::sort(iters.begin(), iters.end(), keyless); + return 0; + } + int next() override { + int r = -1; + if (iters[0]->Valid()) { + iters[0]->Next(); + if (iters[0]->status().ok()) { + r = 0; + //bubble up + for (size_t i = 0; i < iters.size() - 1; i++) { + if (keyless(iters[i], iters[i + 1])) { + //matches, fixed + break; + } + std::swap(iters[i], iters[i + 1]); + } + } + } + return r; + } + // iters are sorted, so + // a[0] < b[0] < c[0] < d[0] + // a[0] > a[-1], a[0] > b[-1], a[0] > c[-1], a[0] > d[-1] + // so, prev() will be one of: + // a[-1], b[-1], c[-1], d[-1] + // prev() will be the one that is *largest* of them + // + // alg: + // 1. go prev() on each iterator we can + // 2. select largest key from those iterators + // 3. go next() on all iterators except (2) + // 4. sort + int prev() override { + std::vector<rocksdb::Iterator*> prev_done; + //1 + for (auto it: iters) { + if (it->Valid()) { + it->Prev(); + if (it->Valid()) { + prev_done.push_back(it); + } else { + it->SeekToFirst(); + } + } else { + it->SeekToLast(); + if (it->Valid()) { + prev_done.push_back(it); + } + } + } + if (prev_done.size() == 0) { + /* there is no previous element */ + if (iters[0]->Valid()) { + iters[0]->Prev(); + ceph_assert(!iters[0]->Valid()); + } + return 0; + } + //2,3 + rocksdb::Iterator* highest = prev_done[0]; + for (size_t i = 1; i < prev_done.size(); i++) { + if (keyless(highest, prev_done[i])) { + highest->Next(); + highest = prev_done[i]; + } else { + prev_done[i]->Next(); + } + } + //4 + //insert highest in the beginning, and shift values until we pick highest + //untouched rest is sorted - we just prev()/next() them + rocksdb::Iterator* hold = highest; + for (size_t i = 0; i < iters.size(); i++) { + std::swap(hold, iters[i]); + if (hold == highest) break; + } + ceph_assert(hold == highest); + return 0; + } + bool valid() override { + return iters[0]->Valid(); + } + string key() override { + return iters[0]->key().ToString(); + } + std::pair<std::string, std::string> raw_key() override { + return make_pair(prefix, key()); + } + bufferlist value() override { + return to_bufferlist(iters[0]->value()); + } + bufferptr value_as_ptr() override { + rocksdb::Slice val = iters[0]->value(); + return bufferptr(val.data(), val.size()); + } + int status() override { + return iters[0]->status().ok() ? 0 : -1; + } +}; + +KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix, IteratorOpts opts, IteratorBounds bounds) +{ + auto cf_it = cf_handles.find(prefix); + if (cf_it != cf_handles.end()) { + rocksdb::ColumnFamilyHandle* cf = nullptr; + if (cf_it->second.handles.size() == 1) { + cf = cf_it->second.handles[0]; + } else if (cct->_conf->osd_rocksdb_iterator_bounds_enabled) { + cf = get_cf_handle(prefix, bounds); + } + if (cf) { + return std::make_shared<CFIteratorImpl>( + this, + prefix, + cf, + std::move(bounds)); + } else { + return std::make_shared<ShardMergeIteratorImpl>( + this, + prefix, + cf_it->second.handles, + std::move(bounds)); + } + } else { + return KeyValueDB::get_iterator(prefix, opts); + } +} + +rocksdb::Iterator* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf) +{ + return db->NewIterator(rocksdb::ReadOptions(), cf); +} + +RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator(IteratorOpts opts) +{ + if (cf_handles.size() == 0) { + return std::make_shared<RocksDBWholeSpaceIteratorImpl>( + this, default_cf, opts); + } else { + return std::make_shared<WholeMergeIteratorImpl>(this); + } +} + +RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator() +{ + return std::make_shared<RocksDBWholeSpaceIteratorImpl>(this, default_cf, 0); +} + +int RocksDBStore::prepare_for_reshard(const std::string& new_sharding, + RocksDBStore::columns_t& to_process_columns) +{ + //0. lock db from opening + //1. list existing columns + //2. apply merge operator to (main + columns) opts + //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs + //4. open db, acquire existing column handles + //5. calculate missing columns + //6. create missing columns + //7. construct cf_handles according to new sharding + //8. check is all cf_handles are filled + + bool b; + std::vector<ColumnFamily> new_sharding_def; + char const* error_position; + std::string error_msg; + b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg); + if (!b) { + dout(1) << __func__ << " bad sharding: " << dendl; + dout(1) << __func__ << new_sharding << dendl; + dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl; + return -EINVAL; + } + + //0. lock db from opening + std::string stored_sharding_text; + rocksdb::ReadFileToString(env, + sharding_def_file, + &stored_sharding_text); + if (stored_sharding_text.find(resharding_column_lock) == string::npos) { + rocksdb::Status status; + if (stored_sharding_text.size() != 0) + stored_sharding_text += " "; + stored_sharding_text += resharding_column_lock; + env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(env, stored_sharding_text, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + return -EIO; + } + } + + //1. list existing columns + + rocksdb::Status status; + std::vector<std::string> existing_columns; + rocksdb::Options opt; + int r = load_rocksdb_options(false, opt); + if (r) { + dout(1) << __func__ << " load rocksdb options failed" << dendl; + return r; + } + status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns); + if (!status.ok()) { + derr << "Unable to list column families: " << status.ToString() << dendl; + return -EINVAL; + } + dout(5) << "existing columns = " << existing_columns << dendl; + + //2. apply merge operator to (main + columns) opts + //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open + + std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open; + for (const auto& full_name : existing_columns) { + //split col_name to <prefix>-<number> + std::string base_name; + size_t pos = full_name.find('-'); + if (std::string::npos == pos) + base_name = full_name; + else + base_name = full_name.substr(0,pos); + + rocksdb::ColumnFamilyOptions cf_opt(opt); + // search if we have options for this column + std::string options; + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + options = nsd.options; + break; + } + } + int r = update_column_family_options(base_name, options, &cf_opt); + if (r != 0) { + return r; + } + cfs_to_open.emplace_back(full_name, cf_opt); + } + + //4. open db, acquire existing column handles + std::vector<rocksdb::ColumnFamilyHandle*> handles; + status = rocksdb::DB::Open(rocksdb::DBOptions(opt), + path, cfs_to_open, &handles, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + for (size_t i = 0; i < cfs_to_open.size(); i++) { + dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl; + } + + //5. calculate missing columns + std::vector<std::string> new_sharding_columns; + std::vector<std::string> missing_columns; + sharding_def_to_columns(new_sharding_def, + new_sharding_columns); + dout(5) << "target columns = " << new_sharding_columns << dendl; + for (const auto& n : new_sharding_columns) { + bool found = false; + for (const auto& e : existing_columns) { + if (n == e) { + found = true; + break; + } + } + if (!found) { + missing_columns.push_back(n); + } + } + dout(5) << "missing columns = " << missing_columns << dendl; + + //6. create missing columns + for (const auto& full_name : missing_columns) { + std::string base_name; + size_t pos = full_name.find('-'); + if (std::string::npos == pos) + base_name = full_name; + else + base_name = full_name.substr(0,pos); + + rocksdb::ColumnFamilyOptions cf_opt(opt); + // search if we have options for this column + std::string options; + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + options = nsd.options; + break; + } + } + int r = update_column_family_options(base_name, options, &cf_opt); + if (r != 0) { + return r; + } + rocksdb::ColumnFamilyHandle *cf; + status = db->CreateColumnFamily(cf_opt, full_name, &cf); + if (!status.ok()) { + derr << __func__ << " Failed to create rocksdb column family: " + << full_name << dendl; + return -EINVAL; + } + dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl; + existing_columns.push_back(full_name); + handles.push_back(cf); + } + + //7. construct cf_handles according to new sharding + for (size_t i = 0; i < existing_columns.size(); i++) { + std::string full_name = existing_columns[i]; + rocksdb::ColumnFamilyHandle *cf = handles[i]; + std::string base_name; + size_t shard_idx = 0; + size_t pos = full_name.find('-'); + dout(10) << "processing column " << full_name << dendl; + if (std::string::npos == pos) { + base_name = full_name; + } else { + base_name = full_name.substr(0,pos); + shard_idx = atoi(full_name.substr(pos+1).c_str()); + } + if (rocksdb::kDefaultColumnFamilyName == base_name) { + default_cf = handles[i]; + must_close_default_cf = true; + std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{ + cf, [](rocksdb::ColumnFamilyHandle*) {}}; + to_process_columns.emplace(full_name, std::move(ptr)); + } else { + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + if (shard_idx < nsd.shard_cnt) { + add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf); + } else { + //ignore columns with index larger then shard count + } + break; + } + } + std::unique_ptr<rocksdb::ColumnFamilyHandle, cf_deleter_t> ptr{ + cf, [this](rocksdb::ColumnFamilyHandle* handle) { + db->DestroyColumnFamilyHandle(handle); + }}; + to_process_columns.emplace(full_name, std::move(ptr)); + } + } + + //8. check if all cf_handles are filled + for (const auto& col : cf_handles) { + for (size_t i = 0; i < col.second.handles.size(); i++) { + if (col.second.handles[i] == nullptr) { + derr << "missing handle for column " << col.first << " shard " << i << dendl; + return -EIO; + } + } + } + return 0; +} + +int RocksDBStore::reshard_cleanup(const RocksDBStore::columns_t& current_columns) +{ + std::vector<std::string> new_sharding_columns; + for (const auto& [name, handle] : cf_handles) { + if (handle.handles.size() == 1) { + new_sharding_columns.push_back(name); + } else { + for (size_t i = 0; i < handle.handles.size(); i++) { + new_sharding_columns.push_back(name + "-" + to_string(i)); + } + } + } + + for (auto& [name, handle] : current_columns) { + auto found = std::find(new_sharding_columns.begin(), + new_sharding_columns.end(), + name) != new_sharding_columns.end(); + if (found || name == rocksdb::kDefaultColumnFamilyName) { + dout(5) << "Column " << name << " is part of new sharding." << dendl; + continue; + } + dout(5) << "Column " << name << " not part of new sharding. Deleting." << dendl; + + // verify that column is empty + std::unique_ptr<rocksdb::Iterator> it{ + db->NewIterator(rocksdb::ReadOptions(), handle.get())}; + ceph_assert(it); + it->SeekToFirst(); + ceph_assert(!it->Valid()); + + if (rocksdb::Status status = db->DropColumnFamily(handle.get()); !status.ok()) { + derr << __func__ << " Failed to drop column: " << name << dendl; + return -EINVAL; + } + } + return 0; +} + +int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in) +{ + + resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl(); + size_t bytes_in_batch = 0; + size_t keys_in_batch = 0; + size_t bytes_per_iterator = 0; + size_t keys_per_iterator = 0; + size_t keys_processed = 0; + size_t keys_moved = 0; + + auto flush_batch = [&](rocksdb::WriteBatch* batch) { + dout(10) << "flushing batch, " << keys_in_batch << " keys, for " + << bytes_in_batch << " bytes" << dendl; + rocksdb::WriteOptions woptions; + woptions.sync = true; + rocksdb::Status s = db->Write(woptions, batch); + ceph_assert(s.ok()); + bytes_in_batch = 0; + keys_in_batch = 0; + batch->Clear(); + }; + + auto process_column = [&](rocksdb::ColumnFamilyHandle* handle, + const std::string& fixed_prefix) + { + dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl; + std::unique_ptr<rocksdb::Iterator> it{ + db->NewIterator(rocksdb::ReadOptions(), handle)}; + ceph_assert(it); + + rocksdb::WriteBatch bat; + for (it->SeekToFirst(); it->Valid(); it->Next()) { + rocksdb::Slice raw_key = it->key(); + dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl; + //check if need to refresh iterator + if (bytes_per_iterator >= ctrl.bytes_per_iterator || + keys_per_iterator >= ctrl.keys_per_iterator) { + dout(8) << "refreshing iterator" << dendl; + bytes_per_iterator = 0; + keys_per_iterator = 0; + std::string raw_key_str = raw_key.ToString(); + it.reset(db->NewIterator(rocksdb::ReadOptions(), handle)); + ceph_assert(it); + it->Seek(raw_key_str); + ceph_assert(it->Valid()); + raw_key = it->key(); + } + rocksdb::Slice value = it->value(); + std::string prefix, key; + if (fixed_prefix.size() == 0) { + split_key(raw_key, &prefix, &key); + } else { + prefix = fixed_prefix; + key = raw_key.ToString(); + } + keys_processed++; + if ((keys_processed % 10000) == 0) { + dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl; + } + rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key); + if (new_handle == nullptr) { + new_handle = default_cf; + } + if (handle == new_handle) { + continue; + } + std::string new_raw_key; + if (new_handle == default_cf) { + new_raw_key = combine_strings(prefix, key); + } else { + new_raw_key = key; + } + bat.Delete(handle, raw_key); + bat.Put(new_handle, new_raw_key, value); + dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) << + " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) << + " size " << value.size() << dendl; + keys_moved++; + bytes_in_batch += new_raw_key.size() * 2 + value.size(); + keys_in_batch++; + bytes_per_iterator += new_raw_key.size() * 2 + value.size(); + keys_per_iterator++; + + //check if need to write batch + if (bytes_in_batch >= ctrl.bytes_per_batch || + keys_in_batch >= ctrl.keys_per_batch) { + flush_batch(&bat); + if (ctrl.unittest_fail_after_first_batch) { + return -1000; + } + } + } + if (bat.Count() > 0) { + flush_batch(&bat); + } + return 0; + }; + + auto close_column_handles = make_scope_guard([this] { + cf_handles.clear(); + close(); + }); + columns_t to_process_columns; + int r = prepare_for_reshard(new_sharding, to_process_columns); + if (r != 0) { + dout(1) << "failed to prepare db for reshard" << dendl; + return r; + } + + for (auto& [name, handle] : to_process_columns) { + dout(5) << "Processing column=" << name + << " handle=" << handle.get() << dendl; + if (name == rocksdb::kDefaultColumnFamilyName) { + ceph_assert(handle.get() == default_cf); + r = process_column(default_cf, std::string()); + } else { + std::string fixed_prefix = name.substr(0, name.find('-')); + dout(10) << "Prefix: " << fixed_prefix << dendl; + r = process_column(handle.get(), fixed_prefix); + } + if (r != 0) { + derr << "Error processing column " << name << dendl; + return r; + } + if (ctrl.unittest_fail_after_processing_column) { + return -1001; + } + } + + r = reshard_cleanup(to_process_columns); + if (r != 0) { + dout(5) << "failed to cleanup after reshard" << dendl; + return r; + } + + if (ctrl.unittest_fail_after_successful_processing) { + return -1002; + } + env->CreateDir(sharding_def_dir); + if (auto status = rocksdb::WriteStringToFile(env, new_sharding, + sharding_def_file, true); + !status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + return -EIO; + } + + return r; +} + +bool RocksDBStore::get_sharding(std::string& sharding) { + rocksdb::Status status; + std::string stored_sharding_text; + bool result = false; + sharding.clear(); + + status = env->FileExists(sharding_def_file); + if (status.ok()) { + status = rocksdb::ReadFileToString(env, + sharding_def_file, + &stored_sharding_text); + if(status.ok()) { + result = true; + sharding = stored_sharding_text; + } + } + return result; +} diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h new file mode 100644 index 000000000..50f2b4488 --- /dev/null +++ b/src/kv/RocksDBStore.h @@ -0,0 +1,547 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef ROCKS_DB_STORE_H +#define ROCKS_DB_STORE_H + +#include "include/types.h" +#include "include/buffer_fwd.h" +#include "KeyValueDB.h" +#include <set> +#include <map> +#include <string> +#include <memory> +#include <boost/scoped_ptr.hpp> +#include "rocksdb/write_batch.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/iostats_context.h" +#include "rocksdb/statistics.h" +#include "rocksdb/table.h" +#include "rocksdb/db.h" +#include "kv/rocksdb_cache/BinnedLRUCache.h" +#include <errno.h> +#include "common/errno.h" +#include "common/dout.h" +#include "include/ceph_assert.h" +#include "include/common_fwd.h" +#include "common/Formatter.h" +#include "common/Cond.h" +#include "common/ceph_context.h" +#include "common/PriorityCache.h" +#include "common/pretty_binary.h" + +enum { + l_rocksdb_first = 34300, + l_rocksdb_gets, + l_rocksdb_get_latency, + l_rocksdb_submit_latency, + l_rocksdb_submit_sync_latency, + l_rocksdb_compact, + l_rocksdb_compact_range, + l_rocksdb_compact_queue_merge, + l_rocksdb_compact_queue_len, + l_rocksdb_write_wal_time, + l_rocksdb_write_memtable_time, + l_rocksdb_write_delay_time, + l_rocksdb_write_pre_and_post_process_time, + l_rocksdb_last, +}; + +namespace rocksdb{ + class DB; + class Env; + class Cache; + class FilterPolicy; + class Snapshot; + class Slice; + class WriteBatch; + class Iterator; + class Logger; + class ColumnFamilyHandle; + struct Options; + struct BlockBasedTableOptions; + struct DBOptions; + struct ColumnFamilyOptions; +} + +extern rocksdb::Logger *create_rocksdb_ceph_logger(); + +inline rocksdb::Slice make_slice(const std::optional<std::string>& bound) { + if (bound) { + return {*bound}; + } else { + return {}; + } +} + +/** + * Uses RocksDB to implement the KeyValueDB interface + */ +class RocksDBStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + std::string path; + std::map<std::string,std::string> kv_options; + void *priv; + rocksdb::DB *db; + rocksdb::Env *env; + const rocksdb::Comparator* comparator; + std::shared_ptr<rocksdb::Statistics> dbstats; + rocksdb::BlockBasedTableOptions bbt_opts; + std::string options_str; + + uint64_t cache_size = 0; + bool set_cache_flag = false; + friend class ShardMergeIteratorImpl; + friend class CFIteratorImpl; + friend class WholeMergeIteratorImpl; + /* + * See RocksDB's definition of a column family(CF) and how to use it. + * The interfaces of KeyValueDB is extended, when a column family is created. + * Prefix will be the name of column family to use. + */ +public: + struct ColumnFamily { + string name; //< name of this individual column family + size_t shard_cnt; //< count of shards + string options; //< configure option string for this CF + uint32_t hash_l; //< first character to take for hash calc. + uint32_t hash_h; //< last character to take for hash calc. + ColumnFamily(const string &name, size_t shard_cnt, const string &options, + uint32_t hash_l, uint32_t hash_h) + : name(name), shard_cnt(shard_cnt), options(options), hash_l(hash_l), hash_h(hash_h) {} + }; +private: + friend std::ostream& operator<<(std::ostream& out, const ColumnFamily& cf); + + bool must_close_default_cf = false; + rocksdb::ColumnFamilyHandle *default_cf = nullptr; + + /// column families in use, name->handles + struct prefix_shards { + uint32_t hash_l; //< first character to take for hash calc. + uint32_t hash_h; //< last character to take for hash calc. + std::vector<rocksdb::ColumnFamilyHandle *> handles; + }; + std::unordered_map<std::string, prefix_shards> cf_handles; + std::unordered_map<uint32_t, std::string> cf_ids_to_prefix; + std::unordered_map<std::string, rocksdb::BlockBasedTableOptions> cf_bbt_opts; + + void add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h, + size_t shard_idx, rocksdb::ColumnFamilyHandle *handle); + bool is_column_family(const std::string& prefix); + std::string_view get_key_hash_view(const prefix_shards& shards, const char* key, const size_t keylen); + rocksdb::ColumnFamilyHandle *get_key_cf(const prefix_shards& shards, const char* key, const size_t keylen); + rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const std::string& key); + rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const char* key, size_t keylen); + rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const IteratorBounds& bounds); + + int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t); + int install_cf_mergeop(const std::string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt); + int create_db_dir(); + int do_open(std::ostream &out, bool create_if_missing, bool open_readonly, + const std::string& cfs=""); + int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt); +public: + static bool parse_sharding_def(const std::string_view text_def, + std::vector<ColumnFamily>& sharding_def, + char const* *error_position = nullptr, + std::string *error_msg = nullptr); + const rocksdb::Comparator* get_comparator() const { + return comparator; + } + +private: + static void sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def, + std::vector<std::string>& columns); + int create_shards(const rocksdb::Options& opt, + const vector<ColumnFamily>& sharding_def); + int apply_sharding(const rocksdb::Options& opt, + const std::string& sharding_text); + int verify_sharding(const rocksdb::Options& opt, + std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs, + std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard, + std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs, + std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard); + std::shared_ptr<rocksdb::Cache> create_block_cache(const std::string& cache_type, size_t cache_size, double cache_prio_high = 0.0); + int split_column_family_options(const std::string& opts_str, + std::unordered_map<std::string, std::string>* column_opts_map, + std::string* block_cache_opt); + int apply_block_cache_options(const std::string& column_name, + const std::string& block_cache_opt, + rocksdb::ColumnFamilyOptions* cf_opt); + int update_column_family_options(const std::string& base_name, + const std::string& more_options, + rocksdb::ColumnFamilyOptions* cf_opt); + // manage async compactions + ceph::mutex compact_queue_lock = + ceph::make_mutex("RocksDBStore::compact_thread_lock"); + ceph::condition_variable compact_queue_cond; + std::list<std::pair<std::string,std::string>> compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + RocksDBStore *db; + public: + explicit CompactThread(RocksDBStore *d) : db(d) {} + void *entry() override { + db->compact_thread_entry(); + return NULL; + } + friend class RocksDBStore; + } compact_thread; + + void compact_thread_entry(); + + void compact_range(const std::string& start, const std::string& end); + void compact_range_async(const std::string& start, const std::string& end); + int tryInterpret(const std::string& key, const std::string& val, + rocksdb::Options& opt); + +public: + /// compact the underlying rocksdb store + bool compact_on_mount; + bool disableWAL; + const uint64_t delete_range_threshold; + void compact() override; + + void compact_async() override { + compact_range_async({}, {}); + } + + int ParseOptionsFromString(const std::string& opt_str, rocksdb::Options& opt); + static int ParseOptionsFromStringStatic( + CephContext* cct, + const std::string& opt_str, + rocksdb::Options &opt, + std::function<int(const std::string&, const std::string&, rocksdb::Options&)> interp); + static int _test_init(const std::string& dir); + int init(std::string options_str) override; + /// compact rocksdb for all keys with a given prefix + void compact_prefix(const std::string& prefix) override { + compact_range(prefix, past_prefix(prefix)); + } + void compact_prefix_async(const std::string& prefix) override { + compact_range_async(prefix, past_prefix(prefix)); + } + + void compact_range(const std::string& prefix, const std::string& start, + const std::string& end) override { + compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); + } + void compact_range_async(const std::string& prefix, const std::string& start, + const std::string& end) override { + compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end)); + } + + RocksDBStore(CephContext *c, const std::string &path, std::map<std::string,std::string> opt, void *p) : + cct(c), + logger(NULL), + path(path), + kv_options(opt), + priv(p), + db(NULL), + env(static_cast<rocksdb::Env*>(p)), + comparator(nullptr), + dbstats(NULL), + compact_queue_stop(false), + compact_thread(this), + compact_on_mount(false), + disableWAL(false), + delete_range_threshold(cct->_conf.get_val<uint64_t>("rocksdb_delete_range_threshold")) + {} + + ~RocksDBStore() override; + + static bool check_omap_dir(std::string &omap_dir); + /// Opens underlying db + int open(std::ostream &out, const std::string& cfs="") override { + return do_open(out, false, false, cfs); + } + /// Creates underlying db if missing and opens it + int create_and_open(std::ostream &out, + const std::string& cfs="") override; + + int open_read_only(std::ostream &out, const std::string& cfs="") override { + return do_open(out, false, true, cfs); + } + + void close() override; + + int repair(std::ostream &out) override; + void split_stats(const std::string &s, char delim, std::vector<std::string> &elems); + void get_statistics(ceph::Formatter *f) override; + + PerfCounters *get_perf_counters() override + { + return logger; + } + + bool get_property( + const std::string &property, + uint64_t *out) final; + + int64_t estimate_prefix_size(const std::string& prefix, + const std::string& key_prefix) override; + struct RocksWBHandler; + class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + rocksdb::WriteBatch bat; + RocksDBStore *db; + + explicit RocksDBTransactionImpl(RocksDBStore *_db); + private: + void put_bat( + rocksdb::WriteBatch& bat, + rocksdb::ColumnFamilyHandle *cf, + const std::string &k, + const ceph::bufferlist &to_set_bl); + public: + void set( + const std::string &prefix, + const std::string &k, + const ceph::bufferlist &bl) override; + void set( + const std::string &prefix, + const char *k, + size_t keylen, + const ceph::bufferlist &bl) override; + void rmkey( + const std::string &prefix, + const std::string &k) override; + void rmkey( + const std::string &prefix, + const char *k, + size_t keylen) override; + void rm_single_key( + const std::string &prefix, + const std::string &k) override; + void rmkeys_by_prefix( + const std::string &prefix + ) override; + void rm_range_keys( + const std::string &prefix, + const std::string &start, + const std::string &end) override; + void merge( + const std::string& prefix, + const std::string& k, + const ceph::bufferlist &bl) override; + }; + + KeyValueDB::Transaction get_transaction() override { + return std::make_shared<RocksDBTransactionImpl>(this); + } + + int submit_transaction(KeyValueDB::Transaction t) override; + int submit_transaction_sync(KeyValueDB::Transaction t) override; + int get( + const std::string &prefix, + const std::set<std::string> &key, + std::map<std::string, ceph::bufferlist> *out + ) override; + int get( + const std::string &prefix, + const std::string &key, + ceph::bufferlist *out + ) override; + int get( + const std::string &prefix, + const char *key, + size_t keylen, + ceph::bufferlist *out) override; + + + class RocksDBWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + protected: + rocksdb::Iterator *dbiter; + public: + explicit RocksDBWholeSpaceIteratorImpl(const RocksDBStore* db, + rocksdb::ColumnFamilyHandle* cf, + const KeyValueDB::IteratorOpts opts) + { + rocksdb::ReadOptions options = rocksdb::ReadOptions(); + if (opts & ITERATOR_NOCACHE) + options.fill_cache=false; + dbiter = db->db->NewIterator(options, cf); + } + ~RocksDBWholeSpaceIteratorImpl() override; + + int seek_to_first() override; + int seek_to_first(const std::string &prefix) override; + int seek_to_last() override; + int seek_to_last(const std::string &prefix) override; + int upper_bound(const std::string &prefix, const std::string &after) override; + int lower_bound(const std::string &prefix, const std::string &to) override; + bool valid() override; + int next() override; + int prev() override; + std::string key() override; + std::pair<std::string,std::string> raw_key() override; + bool raw_key_is_prefixed(const std::string &prefix) override; + ceph::bufferlist value() override; + ceph::bufferptr value_as_ptr() override; + int status() override; + size_t key_size() override; + size_t value_size() override; + }; + + Iterator get_iterator(const std::string& prefix, IteratorOpts opts = 0, IteratorBounds = IteratorBounds()) override; +private: + /// this iterator spans single cf + rocksdb::Iterator* new_shard_iterator(rocksdb::ColumnFamilyHandle* cf); +public: + /// Utility + static std::string combine_strings(const std::string &prefix, const std::string &value) { + std::string out = prefix; + out.push_back(0); + out.append(value); + return out; + } + static void combine_strings(const std::string &prefix, + const char *key, size_t keylen, + std::string *out) { + out->reserve(prefix.size() + 1 + keylen); + *out = prefix; + out->push_back(0); + out->append(key, keylen); + } + + static int split_key(rocksdb::Slice in, std::string *prefix, std::string *key); + + static std::string past_prefix(const std::string &prefix); + + class MergeOperatorRouter; + class MergeOperatorLinker; + friend class MergeOperatorRouter; + int set_merge_operator( + const std::string& prefix, + std::shared_ptr<KeyValueDB::MergeOperator> mop) override; + std::string assoc_name; ///< Name of associative operator + + uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) override { + DIR *store_dir = opendir(path.c_str()); + if (!store_dir) { + lderr(cct) << __func__ << " something happened opening the store: " + << cpp_strerror(errno) << dendl; + return 0; + } + + uint64_t total_size = 0; + uint64_t sst_size = 0; + uint64_t log_size = 0; + uint64_t misc_size = 0; + + struct dirent *entry = NULL; + while ((entry = readdir(store_dir)) != NULL) { + std::string n(entry->d_name); + + if (n == "." || n == "..") + continue; + + std::string fpath = path + '/' + n; + struct stat s; + int err = stat(fpath.c_str(), &s); + if (err < 0) + err = -errno; + // we may race against rocksdb while reading files; this should only + // happen when those files are being updated, data is being shuffled + // and files get removed, in which case there's not much of a problem + // as we'll get to them next time around. + if (err == -ENOENT) { + continue; + } + if (err < 0) { + lderr(cct) << __func__ << " error obtaining stats for " << fpath + << ": " << cpp_strerror(err) << dendl; + goto err; + } + + size_t pos = n.find_last_of('.'); + if (pos == std::string::npos) { + misc_size += s.st_size; + continue; + } + + std::string ext = n.substr(pos+1); + if (ext == "sst") { + sst_size += s.st_size; + } else if (ext == "log") { + log_size += s.st_size; + } else { + misc_size += s.st_size; + } + } + + total_size = sst_size + log_size + misc_size; + + extra["sst"] = sst_size; + extra["log"] = log_size; + extra["misc"] = misc_size; + extra["total"] = total_size; + +err: + closedir(store_dir); + return total_size; + } + + virtual int64_t get_cache_usage() const override { + return static_cast<int64_t>(bbt_opts.block_cache->GetUsage()); + } + + virtual int64_t get_cache_usage(string prefix) const override { + auto it = cf_bbt_opts.find(prefix); + if (it != cf_bbt_opts.end() && it->second.block_cache) { + return static_cast<int64_t>(it->second.block_cache->GetUsage()); + } + return -EINVAL; + } + + int set_cache_size(uint64_t s) override { + cache_size = s; + set_cache_flag = true; + return 0; + } + + virtual std::shared_ptr<PriorityCache::PriCache> + get_priority_cache() const override { + return dynamic_pointer_cast<PriorityCache::PriCache>( + bbt_opts.block_cache); + } + + virtual std::shared_ptr<PriorityCache::PriCache> + get_priority_cache(string prefix) const override { + auto it = cf_bbt_opts.find(prefix); + if (it != cf_bbt_opts.end()) { + return dynamic_pointer_cast<PriorityCache::PriCache>( + it->second.block_cache); + } + return nullptr; + } + + WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override; +private: + WholeSpaceIterator get_default_cf_iterator(); + + using cf_deleter_t = std::function<void(rocksdb::ColumnFamilyHandle*)>; + using columns_t = std::map<std::string, + std::unique_ptr<rocksdb::ColumnFamilyHandle, + cf_deleter_t>>; + int prepare_for_reshard(const std::string& new_sharding, + columns_t& to_process_columns); + int reshard_cleanup(const columns_t& current_columns); +public: + struct resharding_ctrl { + size_t bytes_per_iterator = 10000000; /// amount of data to process before refreshing iterator + size_t keys_per_iterator = 10000; + size_t bytes_per_batch = 1000000; /// amount of data before submitting batch + size_t keys_per_batch = 1000; + bool unittest_fail_after_first_batch = false; + bool unittest_fail_after_processing_column = false; + bool unittest_fail_after_successful_processing = false; + }; + int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr); + bool get_sharding(std::string& sharding); + +}; + +#endif diff --git a/src/kv/rocksdb_cache/BinnedLRUCache.cc b/src/kv/rocksdb_cache/BinnedLRUCache.cc new file mode 100644 index 000000000..0d657883e --- /dev/null +++ b/src/kv/rocksdb_cache/BinnedLRUCache.cc @@ -0,0 +1,624 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "BinnedLRUCache.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string> + +#define dout_context cct +#define dout_subsys ceph_subsys_rocksdb +#undef dout_prefix +#define dout_prefix *_dout << "rocksdb: " + +namespace rocksdb_cache { + +BinnedLRUHandleTable::BinnedLRUHandleTable() : list_(nullptr), length_(0), elems_(0) { + Resize(); +} + +BinnedLRUHandleTable::~BinnedLRUHandleTable() { + ApplyToAllCacheEntries([](BinnedLRUHandle* h) { + if (h->refs == 1) { + h->Free(); + } + }); + delete[] list_; +} + +BinnedLRUHandle* BinnedLRUHandleTable::Lookup(const rocksdb::Slice& key, uint32_t hash) { + return *FindPointer(key, hash); +} + +BinnedLRUHandle* BinnedLRUHandleTable::Insert(BinnedLRUHandle* h) { + BinnedLRUHandle** ptr = FindPointer(h->key(), h->hash); + BinnedLRUHandle* old = *ptr; + h->next_hash = (old == nullptr ? nullptr : old->next_hash); + *ptr = h; + if (old == nullptr) { + ++elems_; + if (elems_ > length_) { + // Since each cache entry is fairly large, we aim for a small + // average linked list length (<= 1). + Resize(); + } + } + return old; +} + +BinnedLRUHandle* BinnedLRUHandleTable::Remove(const rocksdb::Slice& key, uint32_t hash) { + BinnedLRUHandle** ptr = FindPointer(key, hash); + BinnedLRUHandle* result = *ptr; + if (result != nullptr) { + *ptr = result->next_hash; + --elems_; + } + return result; +} + +BinnedLRUHandle** BinnedLRUHandleTable::FindPointer(const rocksdb::Slice& key, uint32_t hash) { + BinnedLRUHandle** ptr = &list_[hash & (length_ - 1)]; + while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) { + ptr = &(*ptr)->next_hash; + } + return ptr; +} + +void BinnedLRUHandleTable::Resize() { + uint32_t new_length = 16; + while (new_length < elems_ * 1.5) { + new_length *= 2; + } + BinnedLRUHandle** new_list = new BinnedLRUHandle*[new_length]; + memset(new_list, 0, sizeof(new_list[0]) * new_length); + uint32_t count = 0; + for (uint32_t i = 0; i < length_; i++) { + BinnedLRUHandle* h = list_[i]; + while (h != nullptr) { + BinnedLRUHandle* next = h->next_hash; + uint32_t hash = h->hash; + BinnedLRUHandle** ptr = &new_list[hash & (new_length - 1)]; + h->next_hash = *ptr; + *ptr = h; + h = next; + count++; + } + } + ceph_assert(elems_ == count); + delete[] list_; + list_ = new_list; + length_ = new_length; +} + +BinnedLRUCacheShard::BinnedLRUCacheShard(CephContext *c, size_t capacity, bool strict_capacity_limit, + double high_pri_pool_ratio) + : cct(c), + capacity_(0), + high_pri_pool_usage_(0), + strict_capacity_limit_(strict_capacity_limit), + high_pri_pool_ratio_(high_pri_pool_ratio), + high_pri_pool_capacity_(0), + usage_(0), + lru_usage_(0) { + // Make empty circular linked list + lru_.next = &lru_; + lru_.prev = &lru_; + lru_low_pri_ = &lru_; + SetCapacity(capacity); +} + +BinnedLRUCacheShard::~BinnedLRUCacheShard() {} + +bool BinnedLRUCacheShard::Unref(BinnedLRUHandle* e) { + ceph_assert(e->refs > 0); + e->refs--; + return e->refs == 0; +} + +// Call deleter and free + +void BinnedLRUCacheShard::EraseUnRefEntries() { + ceph::autovector<BinnedLRUHandle*> last_reference_list; + { + std::lock_guard<std::mutex> l(mutex_); + while (lru_.next != &lru_) { + BinnedLRUHandle* old = lru_.next; + ceph_assert(old->InCache()); + ceph_assert(old->refs == + 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + last_reference_list.push_back(old); + } + } + + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void BinnedLRUCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + if (thread_safe) { + mutex_.lock(); + } + table_.ApplyToAllCacheEntries( + [callback](BinnedLRUHandle* h) { callback(h->value, h->charge); }); + if (thread_safe) { + mutex_.unlock(); + } +} + +void BinnedLRUCacheShard::TEST_GetLRUList(BinnedLRUHandle** lru, BinnedLRUHandle** lru_low_pri) { + *lru = &lru_; + *lru_low_pri = lru_low_pri_; +} + +size_t BinnedLRUCacheShard::TEST_GetLRUSize() { + BinnedLRUHandle* lru_handle = lru_.next; + size_t lru_size = 0; + while (lru_handle != &lru_) { + lru_size++; + lru_handle = lru_handle->next; + } + return lru_size; +} + +double BinnedLRUCacheShard::GetHighPriPoolRatio() const { + std::lock_guard<std::mutex> l(mutex_); + return high_pri_pool_ratio_; +} + +size_t BinnedLRUCacheShard::GetHighPriPoolUsage() const { + std::lock_guard<std::mutex> l(mutex_); + return high_pri_pool_usage_; +} + +void BinnedLRUCacheShard::LRU_Remove(BinnedLRUHandle* e) { + ceph_assert(e->next != nullptr); + ceph_assert(e->prev != nullptr); + if (lru_low_pri_ == e) { + lru_low_pri_ = e->prev; + } + e->next->prev = e->prev; + e->prev->next = e->next; + e->prev = e->next = nullptr; + lru_usage_ -= e->charge; + if (e->InHighPriPool()) { + ceph_assert(high_pri_pool_usage_ >= e->charge); + high_pri_pool_usage_ -= e->charge; + } +} + +void BinnedLRUCacheShard::LRU_Insert(BinnedLRUHandle* e) { + ceph_assert(e->next == nullptr); + ceph_assert(e->prev == nullptr); + if (high_pri_pool_ratio_ > 0 && e->IsHighPri()) { + // Inset "e" to head of LRU list. + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(true); + high_pri_pool_usage_ += e->charge; + MaintainPoolSize(); + } else { + // Insert "e" to the head of low-pri pool. Note that when + // high_pri_pool_ratio is 0, head of low-pri pool is also head of LRU list. + e->next = lru_low_pri_->next; + e->prev = lru_low_pri_; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(false); + lru_low_pri_ = e; + } + lru_usage_ += e->charge; +} + +void BinnedLRUCacheShard::MaintainPoolSize() { + while (high_pri_pool_usage_ > high_pri_pool_capacity_) { + // Overflow last entry in high-pri pool to low-pri pool. + lru_low_pri_ = lru_low_pri_->next; + ceph_assert(lru_low_pri_ != &lru_); + lru_low_pri_->SetInHighPriPool(false); + high_pri_pool_usage_ -= lru_low_pri_->charge; + } +} + +void BinnedLRUCacheShard::EvictFromLRU(size_t charge, + ceph::autovector<BinnedLRUHandle*>* deleted) { + while (usage_ + charge > capacity_ && lru_.next != &lru_) { + BinnedLRUHandle* old = lru_.next; + ceph_assert(old->InCache()); + ceph_assert(old->refs == 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + deleted->push_back(old); + } +} + +void BinnedLRUCacheShard::SetCapacity(size_t capacity) { + ceph::autovector<BinnedLRUHandle*> last_reference_list; + { + std::lock_guard<std::mutex> l(mutex_); + capacity_ = capacity; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + EvictFromLRU(0, &last_reference_list); + } + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void BinnedLRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { + std::lock_guard<std::mutex> l(mutex_); + strict_capacity_limit_ = strict_capacity_limit; +} + +rocksdb::Cache::Handle* BinnedLRUCacheShard::Lookup(const rocksdb::Slice& key, uint32_t hash) { + std::lock_guard<std::mutex> l(mutex_); + BinnedLRUHandle* e = table_.Lookup(key, hash); + if (e != nullptr) { + ceph_assert(e->InCache()); + if (e->refs == 1) { + LRU_Remove(e); + } + e->refs++; + e->SetHit(); + } + return reinterpret_cast<rocksdb::Cache::Handle*>(e); +} + +bool BinnedLRUCacheShard::Ref(rocksdb::Cache::Handle* h) { + BinnedLRUHandle* handle = reinterpret_cast<BinnedLRUHandle*>(h); + std::lock_guard<std::mutex> l(mutex_); + if (handle->InCache() && handle->refs == 1) { + LRU_Remove(handle); + } + handle->refs++; + return true; +} + +void BinnedLRUCacheShard::SetHighPriPoolRatio(double high_pri_pool_ratio) { + std::lock_guard<std::mutex> l(mutex_); + high_pri_pool_ratio_ = high_pri_pool_ratio; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + MaintainPoolSize(); +} + +bool BinnedLRUCacheShard::Release(rocksdb::Cache::Handle* handle, bool force_erase) { + if (handle == nullptr) { + return false; + } + BinnedLRUHandle* e = reinterpret_cast<BinnedLRUHandle*>(handle); + bool last_reference = false; + { + std::lock_guard<std::mutex> l(mutex_); + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (e->refs == 1 && e->InCache()) { + // The item is still in cache, and nobody else holds a reference to it + if (usage_ > capacity_ || force_erase) { + // the cache is full + // The LRU list must be empty since the cache is full + ceph_assert(!(usage_ > capacity_) || lru_.next == &lru_); + // take this opportunity and remove the item + table_.Remove(e->key(), e->hash); + e->SetInCache(false); + Unref(e); + usage_ -= e->charge; + last_reference = true; + } else { + // put the item on the list to be potentially freed + LRU_Insert(e); + } + } + } + + // free outside of mutex + if (last_reference) { + e->Free(); + } + return last_reference; +} + +rocksdb::Status BinnedLRUCacheShard::Insert(const rocksdb::Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, rocksdb::Cache::Priority priority) { + auto e = new BinnedLRUHandle(); + rocksdb::Status s; + ceph::autovector<BinnedLRUHandle*> last_reference_list; + + e->value = value; + e->deleter = deleter; + e->charge = charge; + e->key_length = key.size(); + e->key_data = new char[e->key_length]; + e->flags = 0; + e->hash = hash; + e->refs = (handle == nullptr + ? 1 + : 2); // One from BinnedLRUCache, one for the returned handle + e->next = e->prev = nullptr; + e->SetInCache(true); + e->SetPriority(priority); + std::copy_n(key.data(), e->key_length, e->key_data); + + { + std::lock_guard<std::mutex> l(mutex_); + // Free the space following strict LRU policy until enough space + // is freed or the lru list is empty + EvictFromLRU(charge, &last_reference_list); + + if (usage_ - lru_usage_ + charge > capacity_ && + (strict_capacity_limit_ || handle == nullptr)) { + if (handle == nullptr) { + // Don't insert the entry but still return ok, as if the entry inserted + // into cache and get evicted immediately. + last_reference_list.push_back(e); + } else { + delete e; + *handle = nullptr; + s = rocksdb::Status::Incomplete("Insert failed due to LRU cache being full."); + } + } else { + // insert into the cache + // note that the cache might get larger than its capacity if not enough + // space was freed + BinnedLRUHandle* old = table_.Insert(e); + usage_ += e->charge; + if (old != nullptr) { + old->SetInCache(false); + if (Unref(old)) { + usage_ -= old->charge; + // old is on LRU because it's in cache and its reference count + // was just 1 (Unref returned 0) + LRU_Remove(old); + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Insert(e); + } else { + *handle = reinterpret_cast<rocksdb::Cache::Handle*>(e); + } + s = rocksdb::Status::OK(); + } + } + + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } + + return s; +} + +void BinnedLRUCacheShard::Erase(const rocksdb::Slice& key, uint32_t hash) { + BinnedLRUHandle* e; + bool last_reference = false; + { + std::lock_guard<std::mutex> l(mutex_); + e = table_.Remove(key, hash); + if (e != nullptr) { + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (last_reference && e->InCache()) { + LRU_Remove(e); + } + e->SetInCache(false); + } + } + + // mutex not held here + // last_reference will only be true if e != nullptr + if (last_reference) { + e->Free(); + } +} + +size_t BinnedLRUCacheShard::GetUsage() const { + std::lock_guard<std::mutex> l(mutex_); + return usage_; +} + +size_t BinnedLRUCacheShard::GetPinnedUsage() const { + std::lock_guard<std::mutex> l(mutex_); + ceph_assert(usage_ >= lru_usage_); + return usage_ - lru_usage_; +} + +std::string BinnedLRUCacheShard::GetPrintableOptions() const { + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + std::lock_guard<std::mutex> l(mutex_); + snprintf(buffer, kBufferSize, " high_pri_pool_ratio: %.3lf\n", + high_pri_pool_ratio_); + } + return std::string(buffer); +} + +BinnedLRUCache::BinnedLRUCache(CephContext *c, + size_t capacity, + int num_shard_bits, + bool strict_capacity_limit, + double high_pri_pool_ratio) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit), cct(c) { + num_shards_ = 1 << num_shard_bits; + // TODO: Switch over to use mempool + int rc = posix_memalign((void**) &shards_, + CACHE_LINE_SIZE, + sizeof(BinnedLRUCacheShard) * num_shards_); + if (rc != 0) { + throw std::bad_alloc(); + } + size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_; + for (int i = 0; i < num_shards_; i++) { + new (&shards_[i]) + BinnedLRUCacheShard(c, per_shard, strict_capacity_limit, high_pri_pool_ratio); + } +} + +BinnedLRUCache::~BinnedLRUCache() { + for (int i = 0; i < num_shards_; i++) { + shards_[i].~BinnedLRUCacheShard(); + } + aligned_free(shards_); +} + +CacheShard* BinnedLRUCache::GetShard(int shard) { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +const CacheShard* BinnedLRUCache::GetShard(int shard) const { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +void* BinnedLRUCache::Value(Handle* handle) { + return reinterpret_cast<const BinnedLRUHandle*>(handle)->value; +} + +size_t BinnedLRUCache::GetCharge(Handle* handle) const { + return reinterpret_cast<const BinnedLRUHandle*>(handle)->charge; +} + +uint32_t BinnedLRUCache::GetHash(Handle* handle) const { + return reinterpret_cast<const BinnedLRUHandle*>(handle)->hash; +} + +void BinnedLRUCache::DisownData() { +// Do not drop data if compile with ASAN to suppress leak warning. +#ifndef __SANITIZE_ADDRESS__ + shards_ = nullptr; +#endif // !__SANITIZE_ADDRESS__ +} + +size_t BinnedLRUCache::TEST_GetLRUSize() { + size_t lru_size_of_all_shards = 0; + for (int i = 0; i < num_shards_; i++) { + lru_size_of_all_shards += shards_[i].TEST_GetLRUSize(); + } + return lru_size_of_all_shards; +} + +void BinnedLRUCache::SetHighPriPoolRatio(double high_pri_pool_ratio) { + for (int i = 0; i < num_shards_; i++) { + shards_[i].SetHighPriPoolRatio(high_pri_pool_ratio); + } +} + +double BinnedLRUCache::GetHighPriPoolRatio() const { + double result = 0.0; + if (num_shards_ > 0) { + result = shards_[0].GetHighPriPoolRatio(); + } + return result; +} + +size_t BinnedLRUCache::GetHighPriPoolUsage() const { + // We will not lock the cache when getting the usage from shards. + size_t usage = 0; + for (int s = 0; s < num_shards_; s++) { + usage += shards_[s].GetHighPriPoolUsage(); + } + return usage; +} + +// PriCache + +int64_t BinnedLRUCache::request_cache_bytes(PriorityCache::Priority pri, uint64_t total_cache) const +{ + int64_t assigned = get_cache_bytes(pri); + int64_t request = 0; + + switch (pri) { + // PRI0 is for rocksdb's high priority items (indexes/filters) + case PriorityCache::Priority::PRI0: + { + request = GetHighPriPoolUsage(); + break; + } + // All other cache items are currently shoved into the PRI1 priority. + case PriorityCache::Priority::PRI1: + { + request = GetUsage(); + request -= GetHighPriPoolUsage(); + break; + } + default: + break; + } + request = (request > assigned) ? request - assigned : 0; + ldout(cct, 10) << __func__ << " Priority: " << static_cast<uint32_t>(pri) + << " Request: " << request << dendl; + return request; +} + +int64_t BinnedLRUCache::commit_cache_size(uint64_t total_bytes) +{ + size_t old_bytes = GetCapacity(); + int64_t new_bytes = PriorityCache::get_chunk( + get_cache_bytes(), total_bytes); + ldout(cct, 10) << __func__ << " old: " << old_bytes + << " new: " << new_bytes << dendl; + SetCapacity((size_t) new_bytes); + + double ratio = 0; + if (new_bytes > 0) { + int64_t pri0_bytes = get_cache_bytes(PriorityCache::Priority::PRI0); + // Add 10% of the "reserved" bytes so the ratio can't get stuck at 0 + pri0_bytes += (new_bytes - get_cache_bytes()) / 10; + ratio = (double) pri0_bytes / new_bytes; + } + ldout(cct, 10) << __func__ << " High Pri Pool Ratio set to " << ratio << dendl; + SetHighPriPoolRatio(ratio); + return new_bytes; +} + +std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache( + CephContext *c, + size_t capacity, + int num_shard_bits, + bool strict_capacity_limit, + double high_pri_pool_ratio) { + if (num_shard_bits >= 20) { + return nullptr; // the cache cannot be sharded into too many fine pieces + } + if (high_pri_pool_ratio < 0.0 || high_pri_pool_ratio > 1.0) { + // invalid high_pri_pool_ratio + return nullptr; + } + if (num_shard_bits < 0) { + num_shard_bits = GetDefaultCacheShardBits(capacity); + } + return std::make_shared<BinnedLRUCache>( + c, capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio); +} + +} // namespace rocksdb_cache diff --git a/src/kv/rocksdb_cache/BinnedLRUCache.h b/src/kv/rocksdb_cache/BinnedLRUCache.h new file mode 100644 index 000000000..85608be0e --- /dev/null +++ b/src/kv/rocksdb_cache/BinnedLRUCache.h @@ -0,0 +1,336 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_BINNED_LRU_CACHE +#define ROCKSDB_BINNED_LRU_CACHE + +#include <string> +#include <mutex> + +#include "ShardedCache.h" +#include "common/autovector.h" +#include "common/dout.h" +#include "include/ceph_assert.h" +#include "common/ceph_context.h" + +namespace rocksdb_cache { + +// LRU cache implementation + +// An entry is a variable length heap-allocated structure. +// Entries are referenced by cache and/or by any external entity. +// The cache keeps all its entries in table. Some elements +// are also stored on LRU list. +// +// BinnedLRUHandle can be in these states: +// 1. Referenced externally AND in hash table. +// In that case the entry is *not* in the LRU. (refs > 1 && in_cache == true) +// 2. Not referenced externally and in hash table. In that case the entry is +// in the LRU and can be freed. (refs == 1 && in_cache == true) +// 3. Referenced externally and not in hash table. In that case the entry is +// in not on LRU and not in table. (refs >= 1 && in_cache == false) +// +// All newly created BinnedLRUHandles are in state 1. If you call +// BinnedLRUCacheShard::Release +// on entry in state 1, it will go into state 2. To move from state 1 to +// state 3, either call BinnedLRUCacheShard::Erase or BinnedLRUCacheShard::Insert with the +// same key. +// To move from state 2 to state 1, use BinnedLRUCacheShard::Lookup. +// Before destruction, make sure that no handles are in state 1. This means +// that any successful BinnedLRUCacheShard::Lookup/BinnedLRUCacheShard::Insert have a +// matching +// RUCache::Release (to move into state 2) or BinnedLRUCacheShard::Erase (for state 3) + +std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache( + CephContext *c, + size_t capacity, + int num_shard_bits = -1, + bool strict_capacity_limit = false, + double high_pri_pool_ratio = 0.0); + +struct BinnedLRUHandle { + void* value; + void (*deleter)(const rocksdb::Slice&, void* value); + BinnedLRUHandle* next_hash; + BinnedLRUHandle* next; + BinnedLRUHandle* prev; + size_t charge; // TODO(opt): Only allow uint32_t? + size_t key_length; + uint32_t refs; // a number of refs to this entry + // cache itself is counted as 1 + + // Include the following flags: + // in_cache: whether this entry is referenced by the hash table. + // is_high_pri: whether this entry is high priority entry. + // in_high_pri_pool: whether this entry is in high-pri pool. + char flags; + + uint32_t hash; // Hash of key(); used for fast sharding and comparisons + + char* key_data = nullptr; // Beginning of key + + rocksdb::Slice key() const { + // For cheaper lookups, we allow a temporary Handle object + // to store a pointer to a key in "value". + if (next == this) { + return *(reinterpret_cast<rocksdb::Slice*>(value)); + } else { + return rocksdb::Slice(key_data, key_length); + } + } + + bool InCache() { return flags & 1; } + bool IsHighPri() { return flags & 2; } + bool InHighPriPool() { return flags & 4; } + bool HasHit() { return flags & 8; } + + void SetInCache(bool in_cache) { + if (in_cache) { + flags |= 1; + } else { + flags &= ~1; + } + } + + void SetPriority(rocksdb::Cache::Priority priority) { + if (priority == rocksdb::Cache::Priority::HIGH) { + flags |= 2; + } else { + flags &= ~2; + } + } + + void SetInHighPriPool(bool in_high_pri_pool) { + if (in_high_pri_pool) { + flags |= 4; + } else { + flags &= ~4; + } + } + + void SetHit() { flags |= 8; } + + void Free() { + ceph_assert((refs == 1 && InCache()) || (refs == 0 && !InCache())); + if (deleter) { + (*deleter)(key(), value); + } + delete[] key_data; + delete this; + } +}; + +// We provide our own simple hash table since it removes a whole bunch +// of porting hacks and is also faster than some of the built-in hash +// table implementations in some of the compiler/runtime combinations +// we have tested. E.g., readrandom speeds up by ~5% over the g++ +// 4.4.3's builtin hashtable. +class BinnedLRUHandleTable { + public: + BinnedLRUHandleTable(); + ~BinnedLRUHandleTable(); + + BinnedLRUHandle* Lookup(const rocksdb::Slice& key, uint32_t hash); + BinnedLRUHandle* Insert(BinnedLRUHandle* h); + BinnedLRUHandle* Remove(const rocksdb::Slice& key, uint32_t hash); + + template <typename T> + void ApplyToAllCacheEntries(T func) { + for (uint32_t i = 0; i < length_; i++) { + BinnedLRUHandle* h = list_[i]; + while (h != nullptr) { + auto n = h->next_hash; + ceph_assert(h->InCache()); + func(h); + h = n; + } + } + } + + private: + // Return a pointer to slot that points to a cache entry that + // matches key/hash. If there is no such cache entry, return a + // pointer to the trailing slot in the corresponding linked list. + BinnedLRUHandle** FindPointer(const rocksdb::Slice& key, uint32_t hash); + + void Resize(); + + // The table consists of an array of buckets where each bucket is + // a linked list of cache entries that hash into the bucket. + BinnedLRUHandle** list_; + uint32_t length_; + uint32_t elems_; +}; + +// A single shard of sharded cache. +class alignas(CACHE_LINE_SIZE) BinnedLRUCacheShard : public CacheShard { + public: + BinnedLRUCacheShard(CephContext *c, size_t capacity, bool strict_capacity_limit, + double high_pri_pool_ratio); + virtual ~BinnedLRUCacheShard(); + + // Separate from constructor so caller can easily make an array of BinnedLRUCache + // if current usage is more than new capacity, the function will attempt to + // free the needed space + virtual void SetCapacity(size_t capacity) override; + + // Set the flag to reject insertion if cache if full. + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + // Set percentage of capacity reserved for high-pri cache entries. + void SetHighPriPoolRatio(double high_pri_pool_ratio); + + // Like Cache methods, but with an extra "hash" parameter. + virtual rocksdb::Status Insert(const rocksdb::Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, + rocksdb::Cache::Priority priority) override; + virtual rocksdb::Cache::Handle* Lookup(const rocksdb::Slice& key, uint32_t hash) override; + virtual bool Ref(rocksdb::Cache::Handle* handle) override; + virtual bool Release(rocksdb::Cache::Handle* handle, + bool force_erase = false) override; + virtual void Erase(const rocksdb::Slice& key, uint32_t hash) override; + + // Although in some platforms the update of size_t is atomic, to make sure + // GetUsage() and GetPinnedUsage() work correctly under any platform, we'll + // protect them with mutex_. + + virtual size_t GetUsage() const override; + virtual size_t GetPinnedUsage() const override; + + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + + virtual void EraseUnRefEntries() override; + + virtual std::string GetPrintableOptions() const override; + + void TEST_GetLRUList(BinnedLRUHandle** lru, BinnedLRUHandle** lru_low_pri); + + // Retrieves number of elements in LRU, for unit test purpose only + // not threadsafe + size_t TEST_GetLRUSize(); + + // Retrieves high pri pool ratio + double GetHighPriPoolRatio() const; + + // Retrieves high pri pool usage + size_t GetHighPriPoolUsage() const; + + private: + CephContext *cct; + void LRU_Remove(BinnedLRUHandle* e); + void LRU_Insert(BinnedLRUHandle* e); + + // Overflow the last entry in high-pri pool to low-pri pool until size of + // high-pri pool is no larger than the size specify by high_pri_pool_pct. + void MaintainPoolSize(); + + // Just reduce the reference count by 1. + // Return true if last reference + bool Unref(BinnedLRUHandle* e); + + // Free some space following strict LRU policy until enough space + // to hold (usage_ + charge) is freed or the lru list is empty + // This function is not thread safe - it needs to be executed while + // holding the mutex_ + void EvictFromLRU(size_t charge, ceph::autovector<BinnedLRUHandle*>* deleted); + + // Initialized before use. + size_t capacity_; + + // Memory size for entries in high-pri pool. + size_t high_pri_pool_usage_; + + // Whether to reject insertion if cache reaches its full capacity. + bool strict_capacity_limit_; + + // Ratio of capacity reserved for high priority cache entries. + double high_pri_pool_ratio_; + + // High-pri pool size, equals to capacity * high_pri_pool_ratio. + // Remember the value to avoid recomputing each time. + double high_pri_pool_capacity_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + // LRU contains items which can be evicted, ie reference only by cache + BinnedLRUHandle lru_; + + // Pointer to head of low-pri pool in LRU list. + BinnedLRUHandle* lru_low_pri_; + + // ------------^^^^^^^^^^^^^----------- + // Not frequently modified data members + // ------------------------------------ + // + // We separate data members that are updated frequently from the ones that + // are not frequently updated so that they don't share the same cache line + // which will lead into false cache sharing + // + // ------------------------------------ + // Frequently modified data members + // ------------vvvvvvvvvvvvv----------- + BinnedLRUHandleTable table_; + + // Memory size for entries residing in the cache + size_t usage_; + + // Memory size for entries residing only in the LRU list + size_t lru_usage_; + + // mutex_ protects the following state. + // We don't count mutex_ as the cache's internal state so semantically we + // don't mind mutex_ invoking the non-const actions. + mutable std::mutex mutex_; +}; + +class BinnedLRUCache : public ShardedCache { + public: + BinnedLRUCache(CephContext *c, size_t capacity, int num_shard_bits, + bool strict_capacity_limit, double high_pri_pool_ratio); + virtual ~BinnedLRUCache(); + virtual const char* Name() const override { return "BinnedLRUCache"; } + virtual CacheShard* GetShard(int shard) override; + virtual const CacheShard* GetShard(int shard) const override; + virtual void* Value(Handle* handle) override; + virtual size_t GetCharge(Handle* handle) const override; + virtual uint32_t GetHash(Handle* handle) const override; + virtual void DisownData() override; + + // Retrieves number of elements in LRU, for unit test purpose only + size_t TEST_GetLRUSize(); + // Sets the high pri pool ratio + void SetHighPriPoolRatio(double high_pri_pool_ratio); + // Retrieves high pri pool ratio + double GetHighPriPoolRatio() const; + // Retrieves high pri pool usage + size_t GetHighPriPoolUsage() const; + + // PriorityCache + virtual int64_t request_cache_bytes( + PriorityCache::Priority pri, uint64_t total_cache) const; + virtual int64_t commit_cache_size(uint64_t total_cache); + virtual int64_t get_committed_size() const { + return GetCapacity(); + } + virtual std::string get_cache_name() const { + return "RocksDB Binned LRU Cache"; + } + + private: + CephContext *cct; + BinnedLRUCacheShard* shards_; + int num_shards_ = 0; +}; + +} // namespace rocksdb_cache + +#endif // ROCKSDB_BINNED_LRU_CACHE diff --git a/src/kv/rocksdb_cache/ShardedCache.cc b/src/kv/rocksdb_cache/ShardedCache.cc new file mode 100644 index 000000000..367140a94 --- /dev/null +++ b/src/kv/rocksdb_cache/ShardedCache.cc @@ -0,0 +1,159 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "ShardedCache.h" + +#include <string> + +namespace rocksdb_cache { + +ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) + : num_shard_bits_(num_shard_bits), + capacity_(capacity), + strict_capacity_limit_(strict_capacity_limit), + last_id_(1) {} + +void ShardedCache::SetCapacity(size_t capacity) { + int num_shards = 1 << num_shard_bits_; + const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; + std::lock_guard<std::mutex> l(capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetCapacity(per_shard); + } + capacity_ = capacity; +} + +void ShardedCache::SetStrictCapacityLimit(bool strict_capacity_limit) { + int num_shards = 1 << num_shard_bits_; + std::lock_guard<std::mutex> l(capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetStrictCapacityLimit(strict_capacity_limit); + } + strict_capacity_limit_ = strict_capacity_limit; +} + +rocksdb::Status ShardedCache::Insert(const rocksdb::Slice& key, void* value, size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, Priority priority) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Insert(key, hash, value, charge, deleter, handle, priority); +} + +rocksdb::Cache::Handle* ShardedCache::Lookup(const rocksdb::Slice& key, rocksdb::Statistics* /*stats*/) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash))->Lookup(key, hash); +} + +bool ShardedCache::Ref(rocksdb::Cache::Handle* handle) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Ref(handle); +} + +bool ShardedCache::Release(rocksdb::Cache::Handle* handle, bool force_erase) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Release(handle, force_erase); +} + +void ShardedCache::Erase(const rocksdb::Slice& key) { + uint32_t hash = HashSlice(key); + GetShard(Shard(hash))->Erase(key, hash); +} + +uint64_t ShardedCache::NewId() { + return last_id_.fetch_add(1, std::memory_order_relaxed); +} + +size_t ShardedCache::GetCapacity() const { + std::lock_guard<std::mutex> l(capacity_mutex_); + return capacity_; +} + +bool ShardedCache::HasStrictCapacityLimit() const { + std::lock_guard<std::mutex> l(capacity_mutex_); + return strict_capacity_limit_; +} + +size_t ShardedCache::GetUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetUsage(); + } + return usage; +} + +size_t ShardedCache::GetUsage(rocksdb::Cache::Handle* handle) const { + return GetCharge(handle); +} + +size_t ShardedCache::GetPinnedUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetPinnedUsage(); + } + return usage; +} + +void ShardedCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->ApplyToAllCacheEntries(callback, thread_safe); + } +} + +void ShardedCache::EraseUnRefEntries() { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->EraseUnRefEntries(); + } +} + +std::string ShardedCache::GetPrintableOptions() const { + std::string ret; + ret.reserve(20000); + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + std::lock_guard<std::mutex> l(capacity_mutex_); + snprintf(buffer, kBufferSize, " capacity : %" ROCKSDB_PRIszt "\n", + capacity_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " num_shard_bits : %d\n", num_shard_bits_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " strict_capacity_limit : %d\n", + strict_capacity_limit_); + ret.append(buffer); + } + ret.append(GetShard(0)->GetPrintableOptions()); + return ret; +} +int GetDefaultCacheShardBits(size_t capacity) { + int num_shard_bits = 0; + size_t min_shard_size = 512L * 1024L; // Every shard is at least 512KB. + size_t num_shards = capacity / min_shard_size; + while (num_shards >>= 1) { + if (++num_shard_bits >= 6) { + // No more than 6. + return num_shard_bits; + } + } + return num_shard_bits; +} + +} // namespace rocksdb_cache diff --git a/src/kv/rocksdb_cache/ShardedCache.h b/src/kv/rocksdb_cache/ShardedCache.h new file mode 100644 index 000000000..4d64893ab --- /dev/null +++ b/src/kv/rocksdb_cache/ShardedCache.h @@ -0,0 +1,141 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_SHARDED_CACHE +#define ROCKSDB_SHARDED_CACHE + +#include <atomic> +#include <string> +#include <mutex> + +#include "rocksdb/cache.h" +#include "include/ceph_hash.h" +#include "common/PriorityCache.h" +//#include "hash.h" + +#ifndef CACHE_LINE_SIZE +#define CACHE_LINE_SIZE 64 // XXX arch-specific define +#endif +#define ROCKSDB_PRIszt "zu" + +namespace rocksdb_cache { + +// Single cache shard interface. +class CacheShard { + public: + CacheShard() = default; + virtual ~CacheShard() = default; + + virtual rocksdb::Status Insert(const rocksdb::Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, rocksdb::Cache::Priority priority) = 0; + virtual rocksdb::Cache::Handle* Lookup(const rocksdb::Slice& key, uint32_t hash) = 0; + virtual bool Ref(rocksdb::Cache::Handle* handle) = 0; + virtual bool Release(rocksdb::Cache::Handle* handle, bool force_erase = false) = 0; + virtual void Erase(const rocksdb::Slice& key, uint32_t hash) = 0; + virtual void SetCapacity(size_t capacity) = 0; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; + virtual size_t GetUsage() const = 0; + virtual size_t GetPinnedUsage() const = 0; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) = 0; + virtual void EraseUnRefEntries() = 0; + virtual std::string GetPrintableOptions() const { return ""; } +}; + +// Generic cache interface which shards cache by hash of keys. 2^num_shard_bits +// shards will be created, with capacity split evenly to each of the shards. +// Keys are sharded by the highest num_shard_bits bits of hash value. +class ShardedCache : public rocksdb::Cache, public PriorityCache::PriCache { + public: + ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit); + virtual ~ShardedCache() = default; + virtual const char* Name() const override = 0; + virtual CacheShard* GetShard(int shard) = 0; + virtual const CacheShard* GetShard(int shard) const = 0; + virtual void* Value(Handle* handle) override = 0; + virtual size_t GetCharge(Handle* handle) const = 0; + virtual uint32_t GetHash(Handle* handle) const = 0; + virtual void DisownData() override = 0; + + virtual void SetCapacity(size_t capacity) override; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + virtual rocksdb::Status Insert(const rocksdb::Slice& key, void* value, size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, Priority priority) override; + virtual rocksdb::Cache::Handle* Lookup(const rocksdb::Slice& key, rocksdb::Statistics* stats) override; + virtual bool Ref(rocksdb::Cache::Handle* handle) override; + virtual bool Release(rocksdb::Cache::Handle* handle, bool force_erase = false) override; + virtual void Erase(const rocksdb::Slice& key) override; + virtual uint64_t NewId() override; + virtual size_t GetCapacity() const override; + virtual bool HasStrictCapacityLimit() const override; + virtual size_t GetUsage() const override; + virtual size_t GetUsage(rocksdb::Cache::Handle* handle) const override; + virtual size_t GetPinnedUsage() const override; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + virtual void EraseUnRefEntries() override; + virtual std::string GetPrintableOptions() const override; + + int GetNumShardBits() const { return num_shard_bits_; } + + // PriCache + virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const { + return cache_bytes[pri]; + } + virtual int64_t get_cache_bytes() const { + int64_t total = 0; + for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) { + PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i); + total += get_cache_bytes(pri); + } + return total; + } + virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) { + cache_bytes[pri] = bytes; + } + virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) { + cache_bytes[pri] += bytes; + } + virtual double get_cache_ratio() const { + return cache_ratio; + } + virtual void set_cache_ratio(double ratio) { + cache_ratio = ratio; + } + virtual std::string get_cache_name() const = 0; + + private: + static inline uint32_t HashSlice(const rocksdb::Slice& s) { + return ceph_str_hash(CEPH_STR_HASH_RJENKINS, s.data(), s.size()); +// return Hash(s.data(), s.size(), 0); + } + + uint32_t Shard(uint32_t hash) { + // Note, hash >> 32 yields hash in gcc, not the zero we expect! + return (num_shard_bits_ > 0) ? (hash >> (32 - num_shard_bits_)) : 0; + } + + int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0}; + double cache_ratio = 0; + + int num_shard_bits_; + mutable std::mutex capacity_mutex_; + size_t capacity_; + bool strict_capacity_limit_; + std::atomic<uint64_t> last_id_; +}; + +extern int GetDefaultCacheShardBits(size_t capacity); + +} // namespace rocksdb_cache +#endif // ROCKSDB_SHARDED_CACHE |