diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/kv | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/kv/CMakeLists.txt | 17 | ||||
-rw-r--r-- | src/kv/KeyValueDB.cc | 67 | ||||
-rw-r--r-- | src/kv/KeyValueDB.h | 432 | ||||
-rw-r--r-- | src/kv/KineticStore.cc | 377 | ||||
-rw-r--r-- | src/kv/KineticStore.h | 152 | ||||
-rw-r--r-- | src/kv/LevelDBStore.cc | 447 | ||||
-rw-r--r-- | src/kv/LevelDBStore.h | 413 | ||||
-rw-r--r-- | src/kv/MemDB.cc | 643 | ||||
-rw-r--r-- | src/kv/MemDB.h | 222 | ||||
-rw-r--r-- | src/kv/RocksDBStore.cc | 1606 | ||||
-rw-r--r-- | src/kv/RocksDBStore.h | 512 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/BinnedLRUCache.cc | 623 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/BinnedLRUCache.h | 335 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/ShardedCache.cc | 159 | ||||
-rw-r--r-- | src/kv/rocksdb_cache/ShardedCache.h | 141 |
15 files changed, 6146 insertions, 0 deletions
diff --git a/src/kv/CMakeLists.txt b/src/kv/CMakeLists.txt new file mode 100644 index 00000000..057eb826 --- /dev/null +++ b/src/kv/CMakeLists.txt @@ -0,0 +1,17 @@ +set(kv_srcs + KeyValueDB.cc + MemDB.cc + RocksDBStore.cc + rocksdb_cache/ShardedCache.cc + rocksdb_cache/BinnedLRUCache.cc) + +if (WITH_LEVELDB) + list(APPEND kv_srcs LevelDBStore.cc) +endif (WITH_LEVELDB) + +add_library(kv STATIC ${kv_srcs} + $<TARGET_OBJECTS:common_prioritycache_obj>) + +target_link_libraries(kv ${LEVELDB_LIBRARIES} + RocksDB::RocksDB + heap_profiler) diff --git a/src/kv/KeyValueDB.cc b/src/kv/KeyValueDB.cc new file mode 100644 index 00000000..30e7e671 --- /dev/null +++ b/src/kv/KeyValueDB.cc @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "KeyValueDB.h" +#ifdef WITH_LEVELDB +#include "LevelDBStore.h" +#endif +#include "MemDB.h" +#ifdef HAVE_LIBROCKSDB +#include "RocksDBStore.h" +#endif +#ifdef HAVE_KINETIC +#include "KineticStore.h" +#endif + +KeyValueDB *KeyValueDB::create(CephContext *cct, const string& type, + const string& dir, + map<string,string> options, + void *p) +{ +#ifdef WITH_LEVELDB + if (type == "leveldb") { + return new LevelDBStore(cct, dir); + } +#endif +#ifdef HAVE_KINETIC + if (type == "kinetic" && + cct->check_experimental_feature_enabled("kinetic")) { + return new KineticStore(cct); + } +#endif +#ifdef HAVE_LIBROCKSDB + if (type == "rocksdb") { + return new RocksDBStore(cct, dir, options, p); + } +#endif + + if ((type == "memdb") && + cct->check_experimental_feature_enabled("memdb")) { + return new MemDB(cct, dir, p); + } + return NULL; +} + +int KeyValueDB::test_init(const string& type, const string& dir) +{ +#ifdef WITH_LEVELDB + if (type == "leveldb") { + return LevelDBStore::_test_init(dir); + } +#endif +#ifdef HAVE_KINETIC + if (type == "kinetic") { + return KineticStore::_test_init(g_ceph_context); + } +#endif +#ifdef HAVE_LIBROCKSDB + if (type == "rocksdb") { + return RocksDBStore::_test_init(dir); + } +#endif + + if (type == "memdb") { + return MemDB::_test_init(dir); + } + return -EINVAL; +} diff --git a/src/kv/KeyValueDB.h b/src/kv/KeyValueDB.h new file mode 100644 index 00000000..289e562f --- /dev/null +++ b/src/kv/KeyValueDB.h @@ -0,0 +1,432 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef KEY_VALUE_DB_H +#define KEY_VALUE_DB_H + +#include "include/buffer.h" +#include <ostream> +#include <set> +#include <map> +#include <string> +#include <boost/scoped_ptr.hpp> +#include "include/encoding.h" +#include "common/Formatter.h" +#include "common/perf_counters.h" +#include "common/PriorityCache.h" + +using std::string; +using std::vector; +/** + * Defines virtual interface to be implemented by key value store + * + * Kyoto Cabinet or LevelDB should implement this + */ +class KeyValueDB { +public: + /* + * See RocksDB's definition of a column family(CF) and how to use it. + * The interfaces of KeyValueDB is extended, when a column family is created. + * Prefix will be the name of column family to use. + */ + struct ColumnFamily { + string name; //< name of this individual column family + string option; //< configure option string for this CF + ColumnFamily(const string &name, const string &option) + : name(name), option(option) {} + }; + + class TransactionImpl { + public: + /// Set Keys + void set( + const std::string &prefix, ///< [in] Prefix for keys, or CF name + const std::map<std::string, bufferlist> &to_set ///< [in] keys/values to set + ) { + std::map<std::string, bufferlist>::const_iterator it; + for (it = to_set.begin(); it != to_set.end(); ++it) + set(prefix, it->first, it->second); + } + + /// Set Keys (via encoded bufferlist) + void set( + const std::string &prefix, ///< [in] prefix, or CF name + bufferlist& to_set_bl ///< [in] encoded key/values to set + ) { + auto p = std::cbegin(to_set_bl); + uint32_t num; + decode(num, p); + while (num--) { + string key; + bufferlist value; + decode(key, p); + decode(value, p); + set(prefix, key, value); + } + } + + /// Set Key + virtual void set( + const std::string &prefix, ///< [in] Prefix or CF for the key + const std::string &k, ///< [in] Key to set + const bufferlist &bl ///< [in] Value to set + ) = 0; + virtual void set( + const std::string &prefix, + const char *k, + size_t keylen, + const bufferlist& bl) { + set(prefix, string(k, keylen), bl); + } + + /// Removes Keys (via encoded bufferlist) + void rmkeys( + const std::string &prefix, ///< [in] Prefix or CF to search for + bufferlist &keys_bl ///< [in] Keys to remove + ) { + auto p = std::cbegin(keys_bl); + uint32_t num; + decode(num, p); + while (num--) { + string key; + decode(key, p); + rmkey(prefix, key); + } + } + + /// Removes Keys + void rmkeys( + const std::string &prefix, ///< [in] Prefix/CF to search for + const std::set<std::string> &keys ///< [in] Keys to remove + ) { + std::set<std::string>::const_iterator it; + for (it = keys.begin(); it != keys.end(); ++it) + rmkey(prefix, *it); + } + + /// Remove Key + virtual void rmkey( + const std::string &prefix, ///< [in] Prefix/CF to search for + const std::string &k ///< [in] Key to remove + ) = 0; + virtual void rmkey( + const std::string &prefix, ///< [in] Prefix to search for + const char *k, ///< [in] Key to remove + size_t keylen + ) { + rmkey(prefix, string(k, keylen)); + } + + /// Remove Single Key which exists and was not overwritten. + /// This API is only related to performance optimization, and should only be + /// re-implemented by log-insert-merge tree based keyvalue stores(such as RocksDB). + /// If a key is overwritten (by calling set multiple times), then the result + /// of calling rm_single_key on this key is undefined. + virtual void rm_single_key( + const std::string &prefix, ///< [in] Prefix/CF to search for + const std::string &k ///< [in] Key to remove + ) { return rmkey(prefix, k);} + + /// Removes keys beginning with prefix + virtual void rmkeys_by_prefix( + const std::string &prefix ///< [in] Prefix/CF by which to remove keys + ) = 0; + + virtual void rm_range_keys( + const string &prefix, ///< [in] Prefix by which to remove keys + const string &start, ///< [in] The start bound of remove keys + const string &end ///< [in] The start bound of remove keys + ) = 0; + + /// Merge value into key + virtual void merge( + const std::string &prefix, ///< [in] Prefix/CF ==> MUST match some established merge operator + const std::string &key, ///< [in] Key to be merged + const bufferlist &value ///< [in] value to be merged into key + ) { ceph_abort_msg("Not implemented"); } + + virtual ~TransactionImpl() {} + }; + typedef std::shared_ptr< TransactionImpl > Transaction; + + /// create a new instance + static KeyValueDB *create(CephContext *cct, const std::string& type, + const std::string& dir, + map<std::string,std::string> options = {}, + void *p = NULL); + + /// test whether we can successfully initialize; may have side effects (e.g., create) + static int test_init(const std::string& type, const std::string& dir); + virtual int init(string option_str="") = 0; + virtual int open(std::ostream &out, const vector<ColumnFamily>& cfs = {}) = 0; + // vector cfs contains column families to be created when db is created. + virtual int create_and_open(std::ostream &out, + const vector<ColumnFamily>& cfs = {}) = 0; + + virtual int open_read_only(ostream &out, const vector<ColumnFamily>& cfs = {}) { + return -ENOTSUP; + } + + virtual void close() { } + + /// Try to repair K/V database. leveldb and rocksdb require that database must be not opened. + virtual int repair(std::ostream &out) { return 0; } + + virtual Transaction get_transaction() = 0; + virtual int submit_transaction(Transaction) = 0; + virtual int submit_transaction_sync(Transaction t) { + return submit_transaction(t); + } + + /// Retrieve Keys + virtual int get( + const std::string &prefix, ///< [in] Prefix/CF for key + const std::set<std::string> &key, ///< [in] Key to retrieve + std::map<std::string, bufferlist> *out ///< [out] Key value retrieved + ) = 0; + virtual int get(const std::string &prefix, ///< [in] prefix or CF name + const std::string &key, ///< [in] key + bufferlist *value) { ///< [out] value + std::set<std::string> ks; + ks.insert(key); + std::map<std::string,bufferlist> om; + int r = get(prefix, ks, &om); + if (om.find(key) != om.end()) { + *value = std::move(om[key]); + } else { + *value = bufferlist(); + r = -ENOENT; + } + return r; + } + virtual int get(const string &prefix, + const char *key, size_t keylen, + bufferlist *value) { + return get(prefix, string(key, keylen), value); + } + + // This superclass is used both by kv iterators *and* by the ObjectMap + // omap iterator. The class hierarchies are unfortunately tied together + // by the legacy DBOjectMap implementation :(. + class SimplestIteratorImpl { + public: + virtual int seek_to_first() = 0; + virtual int upper_bound(const std::string &after) = 0; + virtual int lower_bound(const std::string &to) = 0; + virtual bool valid() = 0; + virtual int next() = 0; + virtual std::string key() = 0; + virtual bufferlist value() = 0; + virtual int status() = 0; + virtual ~SimplestIteratorImpl() {} + }; + + class IteratorImpl : public SimplestIteratorImpl { + public: + virtual ~IteratorImpl() {} + virtual int seek_to_last() = 0; + virtual int prev() = 0; + virtual std::pair<std::string, std::string> raw_key() = 0; + virtual bufferptr value_as_ptr() { + bufferlist bl = value(); + if (bl.length() == 1) { + return *bl.buffers().begin(); + } else if (bl.length() == 0) { + return bufferptr(); + } else { + ceph_abort(); + } + } + }; + typedef std::shared_ptr< IteratorImpl > Iterator; + + // This is the low-level iterator implemented by the underlying KV store. + class WholeSpaceIteratorImpl { + public: + virtual int seek_to_first() = 0; + virtual int seek_to_first(const std::string &prefix) = 0; + virtual int seek_to_last() = 0; + virtual int seek_to_last(const std::string &prefix) = 0; + virtual int upper_bound(const std::string &prefix, const std::string &after) = 0; + virtual int lower_bound(const std::string &prefix, const std::string &to) = 0; + virtual bool valid() = 0; + virtual int next() = 0; + virtual int prev() = 0; + virtual std::string key() = 0; + virtual std::pair<std::string,std::string> raw_key() = 0; + virtual bool raw_key_is_prefixed(const std::string &prefix) = 0; + virtual bufferlist value() = 0; + virtual bufferptr value_as_ptr() { + bufferlist bl = value(); + if (bl.length()) { + return *bl.buffers().begin(); + } else { + return bufferptr(); + } + } + virtual int status() = 0; + virtual size_t key_size() { + return 0; + } + virtual size_t value_size() { + return 0; + } + virtual ~WholeSpaceIteratorImpl() { } + }; + typedef std::shared_ptr< WholeSpaceIteratorImpl > WholeSpaceIterator; + +private: + // This class filters a WholeSpaceIterator by a prefix. + class PrefixIteratorImpl : public IteratorImpl { + const std::string prefix; + WholeSpaceIterator generic_iter; + public: + PrefixIteratorImpl(const std::string &prefix, WholeSpaceIterator iter) : + prefix(prefix), generic_iter(iter) { } + ~PrefixIteratorImpl() override { } + + int seek_to_first() override { + return generic_iter->seek_to_first(prefix); + } + int seek_to_last() override { + return generic_iter->seek_to_last(prefix); + } + int upper_bound(const std::string &after) override { + return generic_iter->upper_bound(prefix, after); + } + int lower_bound(const std::string &to) override { + return generic_iter->lower_bound(prefix, to); + } + bool valid() override { + if (!generic_iter->valid()) + return false; + return generic_iter->raw_key_is_prefixed(prefix); + } + int next() override { + return generic_iter->next(); + } + int prev() override { + return generic_iter->prev(); + } + std::string key() override { + return generic_iter->key(); + } + std::pair<std::string, std::string> raw_key() override { + return generic_iter->raw_key(); + } + bufferlist value() override { + return generic_iter->value(); + } + bufferptr value_as_ptr() override { + return generic_iter->value_as_ptr(); + } + int status() override { + return generic_iter->status(); + } + }; +public: + + virtual WholeSpaceIterator get_wholespace_iterator() = 0; + virtual Iterator get_iterator(const std::string &prefix) { + return std::make_shared<PrefixIteratorImpl>( + prefix, + get_wholespace_iterator()); + } + + void add_column_family(const std::string& cf_name, void *handle) { + cf_handles.insert(std::make_pair(cf_name, handle)); + } + + bool is_column_family(const std::string& prefix) { + return cf_handles.count(prefix); + } + + virtual uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) = 0; + virtual int get_statfs(struct store_statfs_t *buf) { + return -EOPNOTSUPP; + } + + virtual int set_cache_size(uint64_t) { + return -EOPNOTSUPP; + } + + virtual int set_cache_high_pri_pool_ratio(double ratio) { + return -EOPNOTSUPP; + } + + virtual int64_t get_cache_usage() const { + return -EOPNOTSUPP; + } + + virtual std::shared_ptr<PriorityCache::PriCache> get_priority_cache() const { + return nullptr; + } + + virtual ~KeyValueDB() {} + + /// estimate space utilization for a prefix (in bytes) + virtual int64_t estimate_prefix_size(const string& prefix) { + return 0; + } + + /// compact the underlying store + virtual void compact() {} + + /// compact the underlying store in async mode + virtual void compact_async() {} + + /// compact db for all keys with a given prefix + virtual void compact_prefix(const std::string& prefix) {} + /// compact db for all keys with a given prefix, async + virtual void compact_prefix_async(const std::string& prefix) {} + virtual void compact_range(const std::string& prefix, + const std::string& start, const std::string& end) {} + virtual void compact_range_async(const std::string& prefix, + const std::string& start, const std::string& end) {} + + // See RocksDB merge operator definition, we support the basic + // associative merge only right now. + class MergeOperator { + public: + /// Merge into a key that doesn't exist + virtual void merge_nonexistent( + const char *rdata, size_t rlen, + std::string *new_value) = 0; + /// Merge into a key that does exist + virtual void merge( + const char *ldata, size_t llen, + const char *rdata, size_t rlen, + std::string *new_value) = 0; + /// We use each operator name and each prefix to construct the overall RocksDB operator name for consistency check at open time. + virtual const char *name() const = 0; + + virtual ~MergeOperator() {} + }; + + /// Setup one or more operators, this needs to be done BEFORE the DB is opened. + virtual int set_merge_operator(const std::string& prefix, + std::shared_ptr<MergeOperator> mop) { + return -EOPNOTSUPP; + } + + virtual void get_statistics(Formatter *f) { + return; + } + + /** + * Return your perf counters if you have any. Subclasses are not + * required to implement this, and callers must respect a null return + * value. + */ + virtual PerfCounters *get_perf_counters() { + return nullptr; + } +protected: + /// List of matching prefixes/ColumnFamilies and merge operators + std::vector<std::pair<std::string, + std::shared_ptr<MergeOperator> > > merge_ops; + + /// column families in use, name->handle + std::unordered_map<std::string, void *> cf_handles; +}; + +#endif diff --git a/src/kv/KineticStore.cc b/src/kv/KineticStore.cc new file mode 100644 index 00000000..ac6ac8f3 --- /dev/null +++ b/src/kv/KineticStore.cc @@ -0,0 +1,377 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "KineticStore.h" +#include "common/ceph_crypto.h" + +#include <set> +#include <map> +#include <string> +#include <errno.h> +using std::string; +#include "common/perf_counters.h" + +#define dout_subsys ceph_subsys_kinetic + +int KineticStore::init() +{ + // init defaults. caller can override these if they want + // prior to calling open. + host = cct->_conf->kinetic_host; + port = cct->_conf->kinetic_port; + user_id = cct->_conf->kinetic_user_id; + hmac_key = cct->_conf->kinetic_hmac_key; + use_ssl = cct->_conf->kinetic_use_ssl; + return 0; +} + +int KineticStore::_test_init(CephContext *c) +{ + kinetic::KineticConnectionFactory conn_factory = + kinetic::NewKineticConnectionFactory(); + + kinetic::ConnectionOptions options; + options.host = cct->_conf->kinetic_host; + options.port = cct->_conf->kinetic_port; + options.user_id = cct->_conf->kinetic_user_id; + options.hmac_key = cct->_conf->kinetic_hmac_key; + options.use_ssl = cct->_conf->kinetic_use_ssl; + + kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10); + kinetic_conn.reset(); + if (!status.ok()) + derr << __func__ << " Unable to connect to kinetic store " << options.host + << ":" << options.port << " : " << status.ToString() << dendl; + return status.ok() ? 0 : -EIO; +} + +int KineticStore::open(ostream &out, const vector<ColumnFamily>& cfs) +{ + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, false); +} + +int KineticStore::create_and_open(ostream &out, const vector<ColumnFamily>& cfs) +{ + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, true); +} + +int KineticStore::do_open(ostream &out, bool create_if_missing) +{ + kinetic::KineticConnectionFactory conn_factory = + kinetic::NewKineticConnectionFactory(); + kinetic::ConnectionOptions options; + options.host = host; + options.port = port; + options.user_id = user_id; + options.hmac_key = hmac_key; + options.use_ssl = use_ssl; + kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10); + if (!status.ok()) { + derr << "Unable to connect to kinetic store " << host << ":" << port + << " : " << status.ToString() << dendl; + return -EINVAL; + } + + PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last); + plb.add_u64_counter(l_kinetic_gets, "kinetic_get", "Gets"); + plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction", "Transactions"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + return 0; +} + +KineticStore::KineticStore(CephContext *c) : + cct(c), + logger(NULL) +{ + host = c->_conf->kinetic_host; + port = c->_conf->kinetic_port; + user_id = c->_conf->kinetic_user_id; + hmac_key = c->_conf->kinetic_hmac_key; + use_ssl = c->_conf->kinetic_use_ssl; +} + +KineticStore::~KineticStore() +{ + close(); + delete logger; +} + +void KineticStore::close() +{ + kinetic_conn.reset(); + if (logger) + cct->get_perfcounters_collection()->remove(logger); +} + +int KineticStore::submit_transaction(KeyValueDB::Transaction t) +{ + KineticTransactionImpl * _t = + static_cast<KineticTransactionImpl *>(t.get()); + + dout(20) << "kinetic submit_transaction" << dendl; + + for (vector<KineticOp>::iterator it = _t->ops.begin(); + it != _t->ops.end(); ++it) { + kinetic::KineticStatus status(kinetic::StatusCode::OK, ""); + if (it->type == KINETIC_OP_WRITE) { + string data(it->data.c_str(), it->data.length()); + kinetic::KineticRecord record(data, "", "", + com::seagate::kinetic::client::proto::Message_Algorithm_SHA1); + dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl; + status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION, + record); + dout(30) << "kinetic after put of " << it->key << dendl; + } else { + ceph_assert(it->type == KINETIC_OP_DELETE); + dout(30) << "kinetic before delete" << dendl; + status = kinetic_conn->Delete(it->key, "", + kinetic::WriteMode::IGNORE_VERSION); + dout(30) << "kinetic after delete" << dendl; + } + if (!status.ok()) { + derr << "kinetic error submitting transaction: " + << status.message() << dendl; + return -1; + } + } + + logger->inc(l_kinetic_txns); + return 0; +} + +int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + return submit_transaction(t); +} + +void KineticStore::KineticTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + dout(30) << "kinetic set key " << key << dendl; + ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl)); +} + +void KineticStore::KineticTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + string key = combine_strings(prefix, k); + dout(30) << "kinetic rm key " << key << dendl; + ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); +} + +void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl; + KeyValueDB::Iterator it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + string key = combine_strings(prefix, it->key()); + ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); + dout(30) << "kinetic rm key by prefix: " << key << dendl; + } +} + +void KineticStore::KineticTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end) +{ + KeyValueDB::Iterator it = db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + ops.push_back( + KineticOp(KINETIC_OP_DELETE, combine_strings(prefix, it->key()))); + it->next(); + } +} + +int KineticStore::get( + const string &prefix, + const std::set<string> &keys, + std::map<string, bufferlist> *out) +{ + dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl; + for (std::set<string>::const_iterator i = keys.begin(); + i != keys.end(); + ++i) { + unique_ptr<kinetic::KineticRecord> record; + string key = combine_strings(prefix, *i); + dout(30) << "before get key " << key << dendl; + kinetic::KineticStatus status = kinetic_conn->Get(key, record); + if (!status.ok()) + break; + dout(30) << "kinetic get got key: " << key << dendl; + out->insert(make_pair(key, to_bufferlist(*record.get()))); + } + logger->inc(l_kinetic_gets); + return 0; +} + +string KineticStore::combine_strings(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(1); + out.append(value); + return out; +} + +bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record) +{ + bufferlist bl; + bl.append(*(record.value())); + return bl; +} + +int KineticStore::split_key(string &in, string *prefix, string *key) +{ + size_t prefix_len = 0; + char* in_data = in.c_str(); + + // Find separator inside Slice + char* separator = (char*) memchr((void*)in_data, 1, in.size()); + if (separator == NULL) + return -EINVAL; + prefix_len = size_t(separator - in_data); + if (prefix_len >= in.size()) + return -EINVAL; + + // Fetch prefix and/or key directly from Slice + if (prefix) + *prefix = string(in_data, prefix_len); + if (key) + *key = string(separator+1, in.size()-prefix_len-1); + return 0; +} + +KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn), + kinetic_status(kinetic::StatusCode::OK, "") +{ + dout(30) << "kinetic iterator constructor()" << dendl; + const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"; + kinetic::KeyRangeIterator it = + kinetic_conn->IterateKeyRange("", true, last_key, true, 1024); + while (it != kinetic::KeyRangeEnd()) { + try { + keys.insert(*it); + dout(30) << "kinetic iterator added " << *it << dendl; + } catch (std::runtime_error &e) { + kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what()); + return; + } + ++it; + } + keys_iter = keys.begin(); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix) +{ + dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl; + keys_iter = keys.lower_bound(prefix); + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last() +{ + dout(30) << "kinetic iterator seek_to_last()" << dendl; + keys_iter = keys.end(); + if (keys.begin() != keys_iter) + --keys_iter; + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix) +{ + dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl; + keys_iter = keys.upper_bound(prefix + "\2"); + if (keys.begin() == keys_iter) { + keys_iter = keys.end(); + } else { + --keys_iter; + } + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) { + dout(30) << "kinetic iterator upper_bound()" << dendl; + string bound = combine_strings(prefix, after); + keys_iter = keys.upper_bound(bound); + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) { + dout(30) << "kinetic iterator lower_bound()" << dendl; + string bound = combine_strings(prefix, to); + keys_iter = keys.lower_bound(bound); + return 0; +} + +bool KineticStore::KineticWholeSpaceIteratorImpl::valid() { + dout(30) << "kinetic iterator valid()" << dendl; + return keys_iter != keys.end(); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::next() { + dout(30) << "kinetic iterator next()" << dendl; + if (keys_iter != keys.end()) { + ++keys_iter; + return 0; + } + return -1; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::prev() { + dout(30) << "kinetic iterator prev()" << dendl; + if (keys_iter != keys.begin()) { + --keys_iter; + return 0; + } + keys_iter = keys.end(); + return -1; +} + +string KineticStore::KineticWholeSpaceIteratorImpl::key() { + dout(30) << "kinetic iterator key()" << dendl; + string out_key; + split_key(*keys_iter, NULL, &out_key); + return out_key; +} + +pair<string,string> KineticStore::KineticWholeSpaceIteratorImpl::raw_key() { + dout(30) << "kinetic iterator raw_key()" << dendl; + string prefix, key; + split_key(*keys_iter, &prefix, &key); + return make_pair(prefix, key); +} + +bool KineticStore::KineticWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) { + // Look for "prefix\1" right in *keys_iter without making a copy + string key = *keys_iter; + if ((key.size() > prefix.length()) && (key[prefix.length()] == '\1')) { + return memcmp(key.c_str(), prefix.c_str(), prefix.length()) == 0; + } else { + return false; + } +} + + +bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() { + dout(30) << "kinetic iterator value()" << dendl; + unique_ptr<kinetic::KineticRecord> record; + kinetic_status = kinetic_conn->Get(*keys_iter, record); + return to_bufferlist(*record.get()); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::status() { + dout(30) << "kinetic iterator status()" << dendl; + return kinetic_status.ok() ? 0 : -1; +} diff --git a/src/kv/KineticStore.h b/src/kv/KineticStore.h new file mode 100644 index 00000000..b22d4f02 --- /dev/null +++ b/src/kv/KineticStore.h @@ -0,0 +1,152 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef KINETIC_STORE_H +#define KINETIC_STORE_H + +#include "include/types.h" +#include "include/buffer_fwd.h" +#include "KeyValueDB.h" +#include <set> +#include <map> +#include <string> +#include <kinetic/kinetic.h> + +#include <errno.h> +#include "common/errno.h" +#include "common/dout.h" +#include "include/ceph_assert.h" +#include "common/Formatter.h" + +#include "common/ceph_context.h" + +class PerfCounters; + +enum { + l_kinetic_first = 34400, + l_kinetic_gets, + l_kinetic_txns, + l_kinetic_last, +}; + +/** + * Uses Kinetic to implement the KeyValueDB interface + */ +class KineticStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + string host; + int port; + int user_id; + string hmac_key; + bool use_ssl; + std::unique_ptr<kinetic::BlockingKineticConnection> kinetic_conn; + + int do_open(ostream &out, bool create_if_missing); + +public: + explicit KineticStore(CephContext *c); + ~KineticStore(); + + static int _test_init(CephContext *c); + int init(); + + /// Opens underlying db + int open(ostream &out, const std::vector<ColumnFamily>& = {}) override; + /// Creates underlying db if missing and opens it + int create_and_open(ostream &out, const std::vector<ColumnFamily>& = {}) override; + + void close() override; + + enum KineticOpType { + KINETIC_OP_WRITE, + KINETIC_OP_DELETE, + }; + + struct KineticOp { + KineticOpType type; + std::string key; + bufferlist data; + KineticOp(KineticOpType type, const string &key) : type(type), key(key) {} + KineticOp(KineticOpType type, const string &key, const bufferlist &data) + : type(type), key(key), data(data) {} + }; + + class KineticTransactionImpl : public KeyValueDB::TransactionImpl { + public: + vector<KineticOp> ops; + KineticStore *db; + + explicit KineticTransactionImpl(KineticStore *db) : db(db) {} + void set( + const string &prefix, + const string &k, + const bufferlist &bl); + void rmkey( + const string &prefix, + const string &k); + void rmkeys_by_prefix( + const string &prefix + ); + void rm_range_keys( + const string &prefix, + const string &start, + const string &end) override; + }; + + KeyValueDB::Transaction get_transaction() override { + return std::make_shared<KineticTransactionImpl>(this); + } + + int submit_transaction(KeyValueDB::Transaction t) override; + int submit_transaction_sync(KeyValueDB::Transaction t) override; + int get( + const string &prefix, + const std::set<string> &key, + std::map<string, bufferlist> *out + ); + using KeyValueDB::get; + + class KineticWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + std::set<std::string> keys; + std::set<std::string>::iterator keys_iter; + kinetic::BlockingKineticConnection *kinetic_conn; + kinetic::KineticStatus kinetic_status; + public: + explicit KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn); + virtual ~KineticWholeSpaceIteratorImpl() { } + + int seek_to_first() override { + return seek_to_first(""); + } + int seek_to_first(const string &prefix); + int seek_to_last() override; + int seek_to_last(const string &prefix); + int upper_bound(const string &prefix, const string &after); + int lower_bound(const string &prefix, const string &to); + bool valid() override; + int next() override; + int prev() override; + string key(); + pair<string,string> raw_key(); + bool raw_key_is_prefixed(const string &prefix); + bufferlist value() override; + int status() override; + }; + + /// Utility + static string combine_strings(const string &prefix, const string &value); + static int split_key(string &in_prefix, string *prefix, string *key); + static bufferlist to_bufferlist(const kinetic::KineticRecord &record); + virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) { + // not used by the osd + return 0; + } + + + WholeSpaceIterator get_wholespace_iterator() { + return std::make_shared<KineticWholeSpaceIteratorImpl>(kinetic_conn.get()); + } +}; + +#endif diff --git a/src/kv/LevelDBStore.cc b/src/kv/LevelDBStore.cc new file mode 100644 index 00000000..a02aacf2 --- /dev/null +++ b/src/kv/LevelDBStore.cc @@ -0,0 +1,447 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "LevelDBStore.h" + +#include <set> +#include <map> +#include <string> +#include <cerrno> + +using std::string; + +#include "common/debug.h" +#include "common/perf_counters.h" + +// re-include our assert to clobber the system one; fix dout: +#include "include/ceph_assert.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_leveldb +#undef dout_prefix +#define dout_prefix *_dout << "leveldb: " + +class CephLevelDBLogger : public leveldb::Logger { + CephContext *cct; +public: + explicit CephLevelDBLogger(CephContext *c) : cct(c) { + cct->get(); + } + ~CephLevelDBLogger() override { + cct->put(); + } + + // Write an entry to the log file with the specified format. + void Logv(const char* format, va_list ap) override { + dout(1); + char buf[65536]; + vsnprintf(buf, sizeof(buf), format, ap); + *_dout << buf << dendl; + } +}; + +leveldb::Logger *create_leveldb_ceph_logger() +{ + return new CephLevelDBLogger(g_ceph_context); +} + +int LevelDBStore::init(string option_str) +{ + // init defaults. caller can override these if they want + // prior to calling open. + options.write_buffer_size = g_conf()->leveldb_write_buffer_size; + options.cache_size = g_conf()->leveldb_cache_size; + options.block_size = g_conf()->leveldb_block_size; + options.bloom_size = g_conf()->leveldb_bloom_size; + options.compression_enabled = g_conf()->leveldb_compression; + options.paranoid_checks = g_conf()->leveldb_paranoid; + options.max_open_files = g_conf()->leveldb_max_open_files; + options.log_file = g_conf()->leveldb_log; + return 0; +} + +int LevelDBStore::open(ostream &out, const vector<ColumnFamily>& cfs) { + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, false); +} + +int LevelDBStore::create_and_open(ostream &out, const vector<ColumnFamily>& cfs) { + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, true); +} + +int LevelDBStore::load_leveldb_options(bool create_if_missing, leveldb::Options &ldoptions) +{ + if (options.write_buffer_size) + ldoptions.write_buffer_size = options.write_buffer_size; + if (options.max_open_files) + ldoptions.max_open_files = options.max_open_files; + if (options.cache_size) { + leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size); + db_cache.reset(_db_cache); + ldoptions.block_cache = db_cache.get(); + } + if (options.block_size) + ldoptions.block_size = options.block_size; + if (options.bloom_size) { +#ifdef HAVE_LEVELDB_FILTER_POLICY + const leveldb::FilterPolicy *_filterpolicy = + leveldb::NewBloomFilterPolicy(options.bloom_size); + filterpolicy.reset(_filterpolicy); + ldoptions.filter_policy = filterpolicy.get(); +#else + ceph_abort_msg(0 == "bloom size set but installed leveldb doesn't support bloom filters"); +#endif + } + if (options.compression_enabled) + ldoptions.compression = leveldb::kSnappyCompression; + else + ldoptions.compression = leveldb::kNoCompression; + if (options.block_restart_interval) + ldoptions.block_restart_interval = options.block_restart_interval; + + ldoptions.error_if_exists = options.error_if_exists; + ldoptions.paranoid_checks = options.paranoid_checks; + ldoptions.create_if_missing = create_if_missing; + + if (g_conf()->leveldb_log_to_ceph_log) { + ceph_logger = new CephLevelDBLogger(g_ceph_context); + ldoptions.info_log = ceph_logger; + } + + if (options.log_file.length()) { + leveldb::Env *env = leveldb::Env::Default(); + env->NewLogger(options.log_file, &ldoptions.info_log); + } + return 0; +} + +int LevelDBStore::do_open(ostream &out, bool create_if_missing) +{ + leveldb::Options ldoptions; + int r = load_leveldb_options(create_if_missing, ldoptions); + if (r) { + dout(1) << "load leveldb options failed" << dendl; + return r; + } + + leveldb::DB *_db; + leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db); + db.reset(_db); + if (!status.ok()) { + out << status.ToString() << std::endl; + return -EINVAL; + } + + PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last); + plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets"); + plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions"); + plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency"); + plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency"); + plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency"); + plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions"); + plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range"); + plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue"); + plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + + if (g_conf()->leveldb_compact_on_mount) { + derr << "Compacting leveldb store..." << dendl; + compact(); + derr << "Finished compacting leveldb store" << dendl; + } + return 0; +} + +int LevelDBStore::_test_init(const string& dir) +{ + leveldb::Options options; + options.create_if_missing = true; + leveldb::DB *db; + leveldb::Status status = leveldb::DB::Open(options, dir, &db); + delete db; + return status.ok() ? 0 : -EIO; +} + +LevelDBStore::~LevelDBStore() +{ + close(); + delete logger; + + // Ensure db is destroyed before dependent db_cache and filterpolicy + db.reset(); + delete ceph_logger; +} + +void LevelDBStore::close() +{ + // stop compaction thread + compact_queue_lock.Lock(); + if (compact_thread.is_started()) { + compact_queue_stop = true; + compact_queue_cond.Signal(); + compact_queue_lock.Unlock(); + compact_thread.join(); + } else { + compact_queue_lock.Unlock(); + } + + if (logger) + cct->get_perfcounters_collection()->remove(logger); +} + +int LevelDBStore::repair(std::ostream &out) +{ + leveldb::Options ldoptions; + int r = load_leveldb_options(false, ldoptions); + if (r) { + dout(1) << "load leveldb options failed" << dendl; + out << "load leveldb options failed" << std::endl; + return r; + } + leveldb::Status status = leveldb::RepairDB(path, ldoptions); + if (status.ok()) { + return 0; + } else { + out << "repair leveldb failed : " << status.ToString() << std::endl; + return 1; + } +} + +int LevelDBStore::submit_transaction(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + LevelDBTransactionImpl * _t = + static_cast<LevelDBTransactionImpl *>(t.get()); + leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat)); + utime_t lat = ceph_clock_now() - start; + logger->inc(l_leveldb_txns); + logger->tinc(l_leveldb_submit_latency, lat); + return s.ok() ? 0 : -1; +} + +int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + LevelDBTransactionImpl * _t = + static_cast<LevelDBTransactionImpl *>(t.get()); + leveldb::WriteOptions options; + options.sync = true; + leveldb::Status s = db->Write(options, &(_t->bat)); + utime_t lat = ceph_clock_now() - start; + logger->inc(l_leveldb_txns); + logger->tinc(l_leveldb_submit_sync_latency, lat); + return s.ok() ? 0 : -1; +} + +void LevelDBStore::LevelDBTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + size_t bllen = to_set_bl.length(); + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && bllen > 0) { + // bufferlist contains just one ptr or they're contiguous + bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen)); + } else if ((bllen <= 32 * 1024) && (bllen > 0)) { + // 2+ bufferptrs that are not contiguopus + // allocate buffer on stack and copy bl contents to that buffer + // make sure the buffer isn't too large or we might crash here... + char* slicebuf = (char*) alloca(bllen); + leveldb::Slice newslice(slicebuf, bllen); + for (const auto& node : to_set_bl.buffers()) { + const size_t ptrlen = node.length(); + memcpy(static_cast<void*>(slicebuf), node.c_str(), ptrlen); + slicebuf += ptrlen; + } + bat.Put(leveldb::Slice(key), newslice); + } else { + // 2+ bufferptrs that are not contiguous, and enormous in size + bufferlist val = to_set_bl; + bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length())); + } +} + +void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + string key = combine_strings(prefix, k); + bat.Delete(leveldb::Slice(key)); +} + +void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + KeyValueDB::Iterator it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + bat.Delete(leveldb::Slice(combine_strings(prefix, it->key()))); + } +} + +void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end) +{ + KeyValueDB::Iterator it = db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + bat.Delete(combine_strings(prefix, it->key())); + it->next(); + } +} + +int LevelDBStore::get( + const string &prefix, + const std::set<string> &keys, + std::map<string, bufferlist> *out) +{ + utime_t start = ceph_clock_now(); + for (std::set<string>::const_iterator i = keys.begin(); + i != keys.end(); ++i) { + std::string value; + std::string bound = combine_strings(prefix, *i); + auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value); + if (status.ok()) + (*out)[*i].append(value); + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_leveldb_gets); + logger->tinc(l_leveldb_get_latency, lat); + return 0; +} + +int LevelDBStore::get(const string &prefix, + const string &key, + bufferlist *out) +{ + ceph_assert(out && (out->length() == 0)); + utime_t start = ceph_clock_now(); + int r = 0; + string value, k; + leveldb::Status s; + k = combine_strings(prefix, key); + s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value); + if (s.ok()) { + out->append(value); + } else { + r = -ENOENT; + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_leveldb_gets); + logger->tinc(l_leveldb_get_latency, lat); + return r; +} + +string LevelDBStore::combine_strings(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(0); + out.append(value); + return out; +} + +bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in) +{ + bufferlist bl; + bl.append(bufferptr(in.data(), in.size())); + return bl; +} + +int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key) +{ + size_t prefix_len = 0; + + // Find separator inside Slice + char* separator = (char*) memchr(in.data(), 0, in.size()); + if (separator == NULL) + return -EINVAL; + prefix_len = size_t(separator - in.data()); + if (prefix_len >= in.size()) + return -EINVAL; + + if (prefix) + *prefix = string(in.data(), prefix_len); + if (key) + *key = string(separator+1, in.size() - prefix_len - 1); + return 0; +} + +void LevelDBStore::compact() +{ + logger->inc(l_leveldb_compact); + db->CompactRange(NULL, NULL); +} + + +void LevelDBStore::compact_thread_entry() +{ + compact_queue_lock.Lock(); + while (!compact_queue_stop) { + while (!compact_queue.empty()) { + pair<string,string> range = compact_queue.front(); + compact_queue.pop_front(); + logger->set(l_leveldb_compact_queue_len, compact_queue.size()); + compact_queue_lock.Unlock(); + logger->inc(l_leveldb_compact_range); + if (range.first.empty() && range.second.empty()) { + compact(); + } else { + compact_range(range.first, range.second); + } + compact_queue_lock.Lock(); + continue; + } + if (compact_queue_stop) + break; + compact_queue_cond.Wait(compact_queue_lock); + } + compact_queue_lock.Unlock(); +} + +void LevelDBStore::compact_range_async(const string& start, const string& end) +{ + std::lock_guard l(compact_queue_lock); + + // try to merge adjacent ranges. this is O(n), but the queue should + // be short. note that we do not cover all overlap cases and merge + // opportunities here, but we capture the ones we currently need. + list< pair<string,string> >::iterator p = compact_queue.begin(); + while (p != compact_queue.end()) { + if (p->first == start && p->second == end) { + // dup; no-op + return; + } + if (p->first <= end && p->first > start) { + // merge with existing range to the right + compact_queue.push_back(make_pair(start, p->second)); + compact_queue.erase(p); + logger->inc(l_leveldb_compact_queue_merge); + break; + } + if (p->second >= start && p->second < end) { + // merge with existing range to the left + compact_queue.push_back(make_pair(p->first, end)); + compact_queue.erase(p); + logger->inc(l_leveldb_compact_queue_merge); + break; + } + ++p; + } + if (p == compact_queue.end()) { + // no merge, new entry. + compact_queue.push_back(make_pair(start, end)); + logger->set(l_leveldb_compact_queue_len, compact_queue.size()); + } + compact_queue_cond.Signal(); + if (!compact_thread.is_started()) { + compact_thread.create("levdbst_compact"); + } +} diff --git a/src/kv/LevelDBStore.h b/src/kv/LevelDBStore.h new file mode 100644 index 00000000..34fdb7a6 --- /dev/null +++ b/src/kv/LevelDBStore.h @@ -0,0 +1,413 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef LEVEL_DB_STORE_H +#define LEVEL_DB_STORE_H + +#include "include/types.h" +#include "include/buffer_fwd.h" +#include "KeyValueDB.h" +#include <set> +#include <map> +#include <string> +#include <boost/scoped_ptr.hpp> +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/write_batch.h" +#include "leveldb/slice.h" +#include "leveldb/cache.h" +#ifdef HAVE_LEVELDB_FILTER_POLICY +#include "leveldb/filter_policy.h" +#endif + +#include <errno.h> +#include "common/errno.h" +#include "common/dout.h" +#include "include/ceph_assert.h" +#include "common/Formatter.h" +#include "common/Cond.h" + +#include "common/ceph_context.h" + +// reinclude our assert to clobber the system one +# include "include/ceph_assert.h" + +class PerfCounters; + +enum { + l_leveldb_first = 34300, + l_leveldb_gets, + l_leveldb_txns, + l_leveldb_get_latency, + l_leveldb_submit_latency, + l_leveldb_submit_sync_latency, + l_leveldb_compact, + l_leveldb_compact_range, + l_leveldb_compact_queue_merge, + l_leveldb_compact_queue_len, + l_leveldb_last, +}; + +extern leveldb::Logger *create_leveldb_ceph_logger(); + +class CephLevelDBLogger; + +/** + * Uses LevelDB to implement the KeyValueDB interface + */ +class LevelDBStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + CephLevelDBLogger *ceph_logger; + string path; + boost::scoped_ptr<leveldb::Cache> db_cache; +#ifdef HAVE_LEVELDB_FILTER_POLICY + boost::scoped_ptr<const leveldb::FilterPolicy> filterpolicy; +#endif + boost::scoped_ptr<leveldb::DB> db; + + int load_leveldb_options(bool create_if_missing, leveldb::Options &opts); + int do_open(ostream &out, bool create_if_missing); + + // manage async compactions + Mutex compact_queue_lock; + Cond compact_queue_cond; + list< pair<string,string> > compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + LevelDBStore *db; + public: + explicit CompactThread(LevelDBStore *d) : db(d) {} + void *entry() override { + db->compact_thread_entry(); + return NULL; + } + friend class LevelDBStore; + } compact_thread; + + void compact_thread_entry(); + + void compact_range(const string& start, const string& end) { + leveldb::Slice cstart(start); + leveldb::Slice cend(end); + db->CompactRange(&cstart, &cend); + } + void compact_range_async(const string& start, const string& end); + +public: + /// compact the underlying leveldb store + void compact() override; + + void compact_async() override { + compact_range_async(string(), string()); + } + + /// compact db for all keys with a given prefix + void compact_prefix(const string& prefix) override { + compact_range(prefix, past_prefix(prefix)); + } + void compact_prefix_async(const string& prefix) override { + compact_range_async(prefix, past_prefix(prefix)); + } + void compact_range(const string& prefix, + const string& start, const string& end) override { + compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); + } + void compact_range_async(const string& prefix, + const string& start, const string& end) override { + compact_range_async(combine_strings(prefix, start), + combine_strings(prefix, end)); + } + + + /** + * options_t: Holds options which are minimally interpreted + * on initialization and then passed through to LevelDB. + * We transform a couple of these into actual LevelDB + * structures, but the rest are simply passed through unchanged. See + * leveldb/options.h for more precise details on each. + * + * Set them after constructing the LevelDBStore, but before calling + * open() or create_and_open(). + */ + struct options_t { + uint64_t write_buffer_size; /// in-memory write buffer size + int max_open_files; /// maximum number of files LevelDB can open at once + uint64_t cache_size; /// size of extra decompressed cache to use + uint64_t block_size; /// user data per block + int bloom_size; /// number of bits per entry to put in a bloom filter + bool compression_enabled; /// whether to use libsnappy compression or not + + // don't change these ones. No, seriously + int block_restart_interval; + bool error_if_exists; + bool paranoid_checks; + + string log_file; + + options_t() : + write_buffer_size(0), //< 0 means default + max_open_files(0), //< 0 means default + cache_size(0), //< 0 means no cache (default) + block_size(0), //< 0 means default + bloom_size(0), //< 0 means no bloom filter (default) + compression_enabled(true), //< set to false for no compression + block_restart_interval(0), //< 0 means default + error_if_exists(false), //< set to true if you want to check nonexistence + paranoid_checks(false) //< set to true if you want paranoid checks + {} + } options; + + LevelDBStore(CephContext *c, const string &path) : + cct(c), + logger(NULL), + ceph_logger(NULL), + path(path), + db_cache(NULL), +#ifdef HAVE_LEVELDB_FILTER_POLICY + filterpolicy(NULL), +#endif + compact_queue_lock("LevelDBStore::compact_thread_lock"), + compact_queue_stop(false), + compact_thread(this), + options() + {} + + ~LevelDBStore() override; + + static int _test_init(const string& dir); + int init(string option_str="") override; + + /// Opens underlying db + int open(ostream &out, const std::vector<ColumnFamily>& = {}) override; + /// Creates underlying db if missing and opens it + int create_and_open(ostream &out, const std::vector<ColumnFamily>& = {}) override; + + void close() override; + + PerfCounters *get_perf_counters() override + { + return logger; + } + int repair(std::ostream &out) override; + + class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + leveldb::WriteBatch bat; + LevelDBStore *db; + explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {} + void set( + const string &prefix, + const string &k, + const bufferlist &bl) override; + using KeyValueDB::TransactionImpl::set; + void rmkey( + const string &prefix, + const string &k) override; + void rmkeys_by_prefix( + const string &prefix + ) override; + virtual void rm_range_keys( + const string &prefix, + const string &start, + const string &end) override; + + using KeyValueDB::TransactionImpl::rmkey; + }; + + KeyValueDB::Transaction get_transaction() override { + return std::make_shared<LevelDBTransactionImpl>(this); + } + + int submit_transaction(KeyValueDB::Transaction t) override; + int submit_transaction_sync(KeyValueDB::Transaction t) override; + int get( + const string &prefix, + const std::set<string> &key, + std::map<string, bufferlist> *out + ) override; + + int get(const string &prefix, + const string &key, + bufferlist *value) override; + + using KeyValueDB::get; + + class LevelDBWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + protected: + boost::scoped_ptr<leveldb::Iterator> dbiter; + public: + explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) : + dbiter(iter) { } + ~LevelDBWholeSpaceIteratorImpl() override { } + + int seek_to_first() override { + dbiter->SeekToFirst(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_first(const string &prefix) override { + leveldb::Slice slice_prefix(prefix); + dbiter->Seek(slice_prefix); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last() override { + dbiter->SeekToLast(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last(const string &prefix) override { + string limit = past_prefix(prefix); + leveldb::Slice slice_limit(limit); + dbiter->Seek(slice_limit); + + if (!dbiter->Valid()) { + dbiter->SeekToLast(); + } else { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; + } + int upper_bound(const string &prefix, const string &after) override { + lower_bound(prefix, after); + if (valid()) { + pair<string,string> key = raw_key(); + if (key.first == prefix && key.second == after) + next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int lower_bound(const string &prefix, const string &to) override { + string bound = combine_strings(prefix, to); + leveldb::Slice slice_bound(bound); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; + } + bool valid() override { + return dbiter->Valid(); + } + int next() override { + if (valid()) + dbiter->Next(); + return dbiter->status().ok() ? 0 : -1; + } + int prev() override { + if (valid()) + dbiter->Prev(); + return dbiter->status().ok() ? 0 : -1; + } + string key() override { + string out_key; + split_key(dbiter->key(), 0, &out_key); + return out_key; + } + pair<string,string> raw_key() override { + string prefix, key; + split_key(dbiter->key(), &prefix, &key); + return make_pair(prefix, key); + } + bool raw_key_is_prefixed(const string &prefix) override { + leveldb::Slice key = dbiter->key(); + if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { + return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; + } else { + return false; + } + } + bufferlist value() override { + return to_bufferlist(dbiter->value()); + } + + bufferptr value_as_ptr() override { + leveldb::Slice data = dbiter->value(); + return bufferptr(data.data(), data.size()); + } + + int status() override { + return dbiter->status().ok() ? 0 : -1; + } + }; + + /// Utility + static string combine_strings(const string &prefix, const string &value); + static int split_key(leveldb::Slice in, string *prefix, string *key); + static bufferlist to_bufferlist(leveldb::Slice in); + static string past_prefix(const string &prefix) { + string limit = prefix; + limit.push_back(1); + return limit; + } + + uint64_t get_estimated_size(map<string,uint64_t> &extra) override { + DIR *store_dir = opendir(path.c_str()); + if (!store_dir) { + lderr(cct) << __func__ << " something happened opening the store: " + << cpp_strerror(errno) << dendl; + return 0; + } + + uint64_t total_size = 0; + uint64_t sst_size = 0; + uint64_t log_size = 0; + uint64_t misc_size = 0; + + struct dirent *entry = NULL; + while ((entry = readdir(store_dir)) != NULL) { + string n(entry->d_name); + + if (n == "." || n == "..") + continue; + + string fpath = path + '/' + n; + struct stat s; + int err = stat(fpath.c_str(), &s); + if (err < 0) + err = -errno; + // we may race against leveldb while reading files; this should only + // happen when those files are being updated, data is being shuffled + // and files get removed, in which case there's not much of a problem + // as we'll get to them next time around. + if (err == -ENOENT) { + continue; + } + if (err < 0) { + lderr(cct) << __func__ << " error obtaining stats for " << fpath + << ": " << cpp_strerror(err) << dendl; + goto err; + } + + size_t pos = n.find_last_of('.'); + if (pos == string::npos) { + misc_size += s.st_size; + continue; + } + + string ext = n.substr(pos+1); + if (ext == "sst") { + sst_size += s.st_size; + } else if (ext == "log") { + log_size += s.st_size; + } else { + misc_size += s.st_size; + } + } + + total_size = sst_size + log_size + misc_size; + + extra["sst"] = sst_size; + extra["log"] = log_size; + extra["misc"] = misc_size; + extra["total"] = total_size; + +err: + closedir(store_dir); + return total_size; + } + + + WholeSpaceIterator get_wholespace_iterator() override { + return std::make_shared<LevelDBWholeSpaceIteratorImpl>( + db->NewIterator(leveldb::ReadOptions())); + } + +}; + +#endif diff --git a/src/kv/MemDB.cc b/src/kv/MemDB.cc new file mode 100644 index 00000000..3cf2cb6e --- /dev/null +++ b/src/kv/MemDB.cc @@ -0,0 +1,643 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * In-memory crash non-safe keyvalue db + * Author: Ramesh Chander, Ramesh.Chander@sandisk.com + */ + +#include "include/compat.h" +#include <set> +#include <map> +#include <string> +#include <memory> +#include <errno.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include "common/perf_counters.h" +#include "common/debug.h" +#include "include/str_list.h" +#include "include/str_map.h" +#include "KeyValueDB.h" +#include "MemDB.h" + +#include "include/ceph_assert.h" +#include "common/debug.h" +#include "common/errno.h" +#include "include/buffer.h" +#include "include/buffer_raw.h" +#include "include/compat.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_memdb +#undef dout_prefix +#define dout_prefix *_dout << "memdb: " +#define dtrace dout(30) +#define dwarn dout(0) +#define dinfo dout(0) + +static void split_key(const string& raw_key, string *prefix, string *key) +{ + size_t pos = raw_key.find(KEY_DELIM, 0); + ceph_assert(pos != std::string::npos); + *prefix = raw_key.substr(0, pos); + *key = raw_key.substr(pos + 1, raw_key.length()); +} + +static string make_key(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(KEY_DELIM); + out.append(value); + return out; +} + +void MemDB::_encode(mdb_iter_t iter, bufferlist &bl) +{ + encode(iter->first, bl); + encode(iter->second, bl); +} + +std::string MemDB::_get_data_fn() +{ + string fn = m_db_path + "/" + "MemDB.db"; + return fn; +} + +void MemDB::_save() +{ + std::lock_guard<std::mutex> l(m_lock); + dout(10) << __func__ << " Saving MemDB to file: "<< _get_data_fn().c_str() << dendl; + int mode = 0644; + int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(), + O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode)); + if (fd < 0) { + int err = errno; + cerr << "write_file(" << _get_data_fn().c_str() << "): failed to open file: " + << cpp_strerror(err) << std::endl; + return; + } + bufferlist bl; + mdb_iter_t iter = m_map.begin(); + while (iter != m_map.end()) { + dout(10) << __func__ << " Key:"<< iter->first << dendl; + _encode(iter, bl); + ++iter; + } + bl.write_fd(fd); + + VOID_TEMP_FAILURE_RETRY(::close(fd)); +} + +int MemDB::_load() +{ + std::lock_guard<std::mutex> l(m_lock); + dout(10) << __func__ << " Reading MemDB from file: "<< _get_data_fn().c_str() << dendl; + /* + * Open file and read it in single shot. + */ + int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(), O_RDONLY|O_CLOEXEC)); + if (fd < 0) { + int err = errno; + cerr << "can't open " << _get_data_fn().c_str() << ": " + << cpp_strerror(err) << std::endl; + return -err; + } + + struct stat st; + memset(&st, 0, sizeof(st)); + if (::fstat(fd, &st) < 0) { + int err = errno; + cerr << "can't stat file " << _get_data_fn().c_str() << ": " + << cpp_strerror(err) << std::endl; + VOID_TEMP_FAILURE_RETRY(::close(fd)); + return -err; + } + + ssize_t file_size = st.st_size; + ssize_t bytes_done = 0; + while (bytes_done < file_size) { + string key; + bufferptr datap; + + bytes_done += ::decode_file(fd, key); + bytes_done += ::decode_file(fd, datap); + + dout(10) << __func__ << " Key:"<< key << dendl; + m_map[key] = datap; + m_total_bytes += datap.length(); + } + VOID_TEMP_FAILURE_RETRY(::close(fd)); + return 0; +} + +int MemDB::_init(bool create) +{ + int r; + dout(1) << __func__ << dendl; + if (create) { + r = ::mkdir(m_db_path.c_str(), 0700); + if (r < 0) { + r = -errno; + if (r != -EEXIST) { + derr << __func__ << " mkdir failed: " << cpp_strerror(r) << dendl; + return r; + } + r = 0; // ignore EEXIST + } + } else { + r = _load(); + } + + PerfCountersBuilder plb(g_ceph_context, "memdb", l_memdb_first, l_memdb_last); + plb.add_u64_counter(l_memdb_gets, "get", "Gets"); + plb.add_u64_counter(l_memdb_txns, "submit_transaction", "Submit transactions"); + plb.add_time_avg(l_memdb_get_latency, "get_latency", "Get latency"); + plb.add_time_avg(l_memdb_submit_latency, "submit_latency", "Submit Latency"); + logger = plb.create_perf_counters(); + m_cct->get_perfcounters_collection()->add(logger); + + return r; +} + +int MemDB::set_merge_operator( + const string& prefix, + std::shared_ptr<KeyValueDB::MergeOperator> mop) +{ + merge_ops.push_back(std::make_pair(prefix, mop)); + return 0; +} + +int MemDB::do_open(ostream &out, bool create) +{ + m_total_bytes = 0; + m_allocated_bytes = 1; + + return _init(create); +} + +int MemDB::open(ostream &out, const vector<ColumnFamily>& cfs) { + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, false); +} + +int MemDB::create_and_open(ostream &out, const vector<ColumnFamily>& cfs) { + if (!cfs.empty()) { + ceph_abort_msg("Not implemented"); + } + return do_open(out, true); +} + +MemDB::~MemDB() +{ + close(); + dout(10) << __func__ << " Destroying MemDB instance: "<< dendl; +} + +void MemDB::close() +{ + /* + * Save whatever in memory btree. + */ + _save(); + if (logger) + m_cct->get_perfcounters_collection()->remove(logger); +} + +int MemDB::submit_transaction(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + + MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get()); + + dtrace << __func__ << " " << mt->get_ops().size() << dendl; + for(auto& op : mt->get_ops()) { + if(op.first == MDBTransactionImpl::WRITE) { + ms_op_t set_op = op.second; + _setkey(set_op); + } else if (op.first == MDBTransactionImpl::MERGE) { + ms_op_t merge_op = op.second; + _merge(merge_op); + } else { + ms_op_t rm_op = op.second; + ceph_assert(op.first == MDBTransactionImpl::DELETE); + _rmkey(rm_op); + } + } + + utime_t lat = ceph_clock_now() - start; + logger->inc(l_memdb_txns); + logger->tinc(l_memdb_submit_latency, lat); + + return 0; +} + +int MemDB::submit_transaction_sync(KeyValueDB::Transaction tsync) +{ + dtrace << __func__ << " " << dendl; + submit_transaction(tsync); + return 0; +} + +int MemDB::transaction_rollback(KeyValueDB::Transaction t) +{ + MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get()); + mt->clear(); + return 0; +} + +void MemDB::MDBTransactionImpl::set( + const string &prefix, const string &k, const bufferlist &to_set_bl) +{ + dtrace << __func__ << " " << prefix << " " << k << dendl; + ops.push_back(make_pair(WRITE, std::make_pair(std::make_pair(prefix, k), + to_set_bl))); +} + +void MemDB::MDBTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + dtrace << __func__ << " " << prefix << " " << k << dendl; + ops.push_back(make_pair(DELETE, + std::make_pair(std::make_pair(prefix, k), + bufferlist()))); +} + +void MemDB::MDBTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + KeyValueDB::Iterator it = m_db->get_iterator(prefix); + for (it->seek_to_first(); it->valid(); it->next()) { + rmkey(prefix, it->key()); + } +} + +void MemDB::MDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end) +{ + KeyValueDB::Iterator it = m_db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + rmkey(prefix, it->key()); + it->next(); + } +} + +void MemDB::MDBTransactionImpl::merge( + const std::string &prefix, const std::string &key, const bufferlist &value) +{ + + dtrace << __func__ << " " << prefix << " " << key << dendl; + ops.push_back(make_pair(MERGE, make_pair(std::make_pair(prefix, key), value))); + return; +} + +int MemDB::_setkey(ms_op_t &op) +{ + std::lock_guard<std::mutex> l(m_lock); + std::string key = make_key(op.first.first, op.first.second); + bufferlist bl = op.second; + + m_total_bytes += bl.length(); + + bufferlist bl_old; + if (_get(op.first.first, op.first.second, &bl_old)) { + /* + * delete and free existing key. + */ + ceph_assert(m_total_bytes >= bl_old.length()); + m_total_bytes -= bl_old.length(); + m_map.erase(key); + } + + m_map[key] = bufferptr((char *) bl.c_str(), bl.length()); + iterator_seq_no++; + return 0; +} + +int MemDB::_rmkey(ms_op_t &op) +{ + std::lock_guard<std::mutex> l(m_lock); + std::string key = make_key(op.first.first, op.first.second); + + bufferlist bl_old; + if (_get(op.first.first, op.first.second, &bl_old)) { + ceph_assert(m_total_bytes >= bl_old.length()); + m_total_bytes -= bl_old.length(); + } + iterator_seq_no++; + /* + * Erase will call the destructor for bufferptr. + */ + return m_map.erase(key); +} + +std::shared_ptr<KeyValueDB::MergeOperator> MemDB::_find_merge_op(const std::string &prefix) +{ + for (const auto& i : merge_ops) { + if (i.first == prefix) { + return i.second; + } + } + + dtrace << __func__ << " No merge op for " << prefix << dendl; + return NULL; +} + + +int MemDB::_merge(ms_op_t &op) +{ + std::lock_guard<std::mutex> l(m_lock); + std::string prefix = op.first.first; + std::string key = make_key(op.first.first, op.first.second); + bufferlist bl = op.second; + int64_t bytes_adjusted = bl.length(); + + /* + * find the operator for this prefix + */ + std::shared_ptr<MergeOperator> mop = _find_merge_op(prefix); + ceph_assert(mop); + + /* + * call the merge operator with value and non value + */ + bufferlist bl_old; + if (_get(op.first.first, op.first.second, &bl_old) == false) { + std::string new_val; + /* + * Merge non existent. + */ + mop->merge_nonexistent(bl.c_str(), bl.length(), &new_val); + m_map[key] = bufferptr(new_val.c_str(), new_val.length()); + } else { + /* + * Merge existing. + */ + std::string new_val; + mop->merge(bl_old.c_str(), bl_old.length(), bl.c_str(), bl.length(), &new_val); + m_map[key] = bufferptr(new_val.c_str(), new_val.length()); + bytes_adjusted -= bl_old.length(); + bl_old.clear(); + } + + ceph_assert((int64_t)m_total_bytes + bytes_adjusted >= 0); + m_total_bytes += bytes_adjusted; + iterator_seq_no++; + return 0; +} + +/* + * Caller take btree lock. + */ +bool MemDB::_get(const string &prefix, const string &k, bufferlist *out) +{ + string key = make_key(prefix, k); + + mdb_iter_t iter = m_map.find(key); + if (iter == m_map.end()) { + return false; + } + + out->push_back((m_map[key].clone())); + return true; +} + +bool MemDB::_get_locked(const string &prefix, const string &k, bufferlist *out) +{ + std::lock_guard<std::mutex> l(m_lock); + return _get(prefix, k, out); +} + + +int MemDB::get(const string &prefix, const std::string& key, + bufferlist *out) +{ + utime_t start = ceph_clock_now(); + int ret; + + if (_get_locked(prefix, key, out)) { + ret = 0; + } else { + ret = -ENOENT; + } + + utime_t lat = ceph_clock_now() - start; + logger->inc(l_memdb_gets); + logger->tinc(l_memdb_get_latency, lat); + + return ret; +} + +int MemDB::get(const string &prefix, const std::set<string> &keys, + std::map<string, bufferlist> *out) +{ + utime_t start = ceph_clock_now(); + + for (const auto& i : keys) { + bufferlist bl; + if (_get_locked(prefix, i, &bl)) + out->insert(make_pair(i, bl)); + } + + utime_t lat = ceph_clock_now() - start; + logger->inc(l_memdb_gets); + logger->tinc(l_memdb_get_latency, lat); + + return 0; +} + +void MemDB::MDBWholeSpaceIteratorImpl::fill_current() +{ + bufferlist bl; + bl.push_back(m_iter->second.clone()); + m_key_value = std::make_pair(m_iter->first, bl); +} + +bool MemDB::MDBWholeSpaceIteratorImpl::valid() +{ + if (m_key_value.first.empty()) { + return false; + } + return true; +} + +bool MemDB::MDBWholeSpaceIteratorImpl::iterator_validate() { + + if (this_seq_no != *global_seq_no) { + auto key = m_key_value.first; + ceph_assert(!key.empty()); + + bool restart_iter = false; + if (!m_using_btree) { + /* + * Map is modified and marker key does not exists, + * restart the iterator from next key. + */ + if (m_map_p->find(key) == m_map_p->end()) { + restart_iter = true; + } + } else { + restart_iter = true; + } + + if (restart_iter) { + m_iter = m_map_p->lower_bound(key); + if (m_iter == m_map_p->end()) { + return false; + } + } + + /* + * This iter is valid now. + */ + this_seq_no = *global_seq_no; + } + + return true; +} + +void +MemDB::MDBWholeSpaceIteratorImpl::free_last() +{ + m_key_value.first.clear(); + m_key_value.second.clear(); +} + +string MemDB::MDBWholeSpaceIteratorImpl::key() +{ + dtrace << __func__ << " " << m_key_value.first << dendl; + string prefix, key; + split_key(m_key_value.first, &prefix, &key); + return key; +} + +pair<string,string> MemDB::MDBWholeSpaceIteratorImpl::raw_key() +{ + string prefix, key; + split_key(m_key_value.first, &prefix, &key); + return make_pair(prefix, key); +} + +bool MemDB::MDBWholeSpaceIteratorImpl::raw_key_is_prefixed( + const string &prefix) +{ + string p, k; + split_key(m_key_value.first, &p, &k); + return (p == prefix); +} + +bufferlist MemDB::MDBWholeSpaceIteratorImpl::value() +{ + dtrace << __func__ << " " << m_key_value << dendl; + return m_key_value.second; +} + +int MemDB::MDBWholeSpaceIteratorImpl::next() +{ + std::lock_guard<std::mutex> l(*m_map_lock_p); + if (!iterator_validate()) { + free_last(); + return -1; + } + free_last(); + ++m_iter; + if (m_iter != m_map_p->end()) { + fill_current(); + return 0; + } else { + return -1; + } +} + +int MemDB::MDBWholeSpaceIteratorImpl:: prev() +{ + std::lock_guard<std::mutex> l(*m_map_lock_p); + if (!iterator_validate()) { + free_last(); + return -1; + } + free_last(); + if (m_iter != m_map_p->begin()) { + --m_iter; + fill_current(); + return 0; + } else { + return -1; + } +} + +/* + * First key >= to given key, if key is null then first key in btree. + */ +int MemDB::MDBWholeSpaceIteratorImpl::seek_to_first(const std::string &k) +{ + std::lock_guard<std::mutex> l(*m_map_lock_p); + free_last(); + if (k.empty()) { + m_iter = m_map_p->begin(); + } else { + m_iter = m_map_p->lower_bound(k); + } + + if (m_iter == m_map_p->end()) { + return -1; + } + fill_current(); + return 0; +} + +int MemDB::MDBWholeSpaceIteratorImpl::seek_to_last(const std::string &k) +{ + std::lock_guard<std::mutex> l(*m_map_lock_p); + free_last(); + if (k.empty()) { + m_iter = m_map_p->end(); + --m_iter; + } else { + m_iter = m_map_p->lower_bound(k); + } + + if (m_iter == m_map_p->end()) { + return -1; + } + fill_current(); + return 0; +} + +MemDB::MDBWholeSpaceIteratorImpl::~MDBWholeSpaceIteratorImpl() +{ + free_last(); +} + +int MemDB::MDBWholeSpaceIteratorImpl::upper_bound(const std::string &prefix, + const std::string &after) { + + std::lock_guard<std::mutex> l(*m_map_lock_p); + + dtrace << "upper_bound " << prefix.c_str() << after.c_str() << dendl; + string k = make_key(prefix, after); + m_iter = m_map_p->upper_bound(k); + if (m_iter != m_map_p->end()) { + fill_current(); + return 0; + } + return -1; +} + +int MemDB::MDBWholeSpaceIteratorImpl::lower_bound(const std::string &prefix, + const std::string &to) { + std::lock_guard<std::mutex> l(*m_map_lock_p); + dtrace << "lower_bound " << prefix.c_str() << to.c_str() << dendl; + string k = make_key(prefix, to); + m_iter = m_map_p->lower_bound(k); + if (m_iter != m_map_p->end()) { + fill_current(); + return 0; + } + return -1; +} diff --git a/src/kv/MemDB.h b/src/kv/MemDB.h new file mode 100644 index 00000000..117b6aee --- /dev/null +++ b/src/kv/MemDB.h @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * In-memory crash non-safe keyvalue db + * Author: Ramesh Chander, Ramesh.Chander@sandisk.com + */ + +#ifndef CEPH_OS_BLUESTORE_MEMDB_H +#define CEPH_OS_BLUESTORE_MEMDB_H + +#include "include/buffer.h" +#include <ostream> +#include <set> +#include <map> +#include <string> +#include <memory> +#include <boost/scoped_ptr.hpp> +#include "include/encoding.h" +#include "include/btree_map.h" +#include "KeyValueDB.h" +#include "osd/osd_types.h" + +using std::string; +#define KEY_DELIM '\0' + +class PerfCounters; + +enum { + l_memdb_first = 34440, + l_memdb_gets, + l_memdb_txns, + l_memdb_get_latency, + l_memdb_submit_latency, + l_memdb_last, +}; + +class MemDB : public KeyValueDB +{ + typedef std::pair<std::pair<std::string, std::string>, bufferlist> ms_op_t; + std::mutex m_lock; + uint64_t m_total_bytes; + uint64_t m_allocated_bytes; + + typedef std::map<std::string, bufferptr> mdb_map_t; + typedef mdb_map_t::iterator mdb_iter_t; + bool m_using_btree; + + mdb_map_t m_map; + + CephContext *m_cct; + PerfCounters *logger; + void* m_priv; + string m_options; + string m_db_path; + + int transaction_rollback(KeyValueDB::Transaction t); + int _open(ostream &out); + void close() override; + bool _get(const string &prefix, const string &k, bufferlist *out); + bool _get_locked(const string &prefix, const string &k, bufferlist *out); + std::string _get_data_fn(); + void _encode(mdb_iter_t iter, bufferlist &bl); + void _save(); + int _load(); + uint64_t iterator_seq_no; + +public: + MemDB(CephContext *c, const string &path, void *p) : + m_total_bytes(0), m_allocated_bytes(0), m_using_btree(false), + m_cct(c), logger(NULL), m_priv(p), m_db_path(path), iterator_seq_no(1) + { + //Nothing as of now + } + + ~MemDB() override; + int set_merge_operator(const std::string& prefix, + std::shared_ptr<MergeOperator> mop) override; + + std::shared_ptr<MergeOperator> _find_merge_op(const std::string &prefix); + + static + int _test_init(const string& dir) { return 0; }; + + class MDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + enum op_type { WRITE = 1, MERGE = 2, DELETE = 3}; + private: + + std::vector<std::pair<op_type, ms_op_t>> ops; + MemDB *m_db; + + bool key_is_prefixed(const string &prefix, const string& full_key); + public: + const std::vector<std::pair<op_type, ms_op_t>>& + get_ops() { return ops; }; + + void set(const std::string &prefix, const std::string &key, + const bufferlist &val) override; + using KeyValueDB::TransactionImpl::set; + void rmkey(const std::string &prefix, const std::string &k) override; + using KeyValueDB::TransactionImpl::rmkey; + void rmkeys_by_prefix(const std::string &prefix) override; + void rm_range_keys( + const string &prefix, + const string &start, + const string &end) override; + + void merge(const std::string &prefix, const std::string &key, const bufferlist &value) override; + void clear() { + ops.clear(); + } + explicit MDBTransactionImpl(MemDB* _db) :m_db(_db) + { + ops.clear(); + } + ~MDBTransactionImpl() override {}; + }; + +private: + + /* + * Transaction states. + */ + int _merge(const std::string &k, bufferptr &bl); + int _merge(ms_op_t &op); + int _setkey(ms_op_t &op); + int _rmkey(ms_op_t &op); + +public: + + int init(string option_str="") override { m_options = option_str; return 0; } + int _init(bool format); + + int do_open(ostream &out, bool create); + int open(ostream &out, const std::vector<ColumnFamily>&) override; + int create_and_open(ostream &out, const std::vector<ColumnFamily>&) override; + using KeyValueDB::create_and_open; + + KeyValueDB::Transaction get_transaction() override { + return std::shared_ptr<MDBTransactionImpl>(new MDBTransactionImpl(this)); + } + + int submit_transaction(Transaction) override; + int submit_transaction_sync(Transaction) override; + + int get(const std::string &prefix, const std::set<std::string> &key, + std::map<std::string, bufferlist> *out) override; + + int get(const std::string &prefix, const std::string &key, + bufferlist *out) override; + + using KeyValueDB::get; + + class MDBWholeSpaceIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl { + + mdb_iter_t m_iter; + std::pair<string, bufferlist> m_key_value; + mdb_map_t *m_map_p; + std::mutex *m_map_lock_p; + uint64_t *global_seq_no; + uint64_t this_seq_no; + bool m_using_btree; + + public: + MDBWholeSpaceIteratorImpl(mdb_map_t *btree_p, std::mutex *btree_lock_p, + uint64_t *iterator_seq_no, bool using_btree) { + m_map_p = btree_p; + m_map_lock_p = btree_lock_p; + std::lock_guard<std::mutex> l(*m_map_lock_p); + global_seq_no = iterator_seq_no; + this_seq_no = *iterator_seq_no; + m_using_btree = using_btree; + } + + void fill_current(); + void free_last(); + + + int seek_to_first(const std::string &k) override; + int seek_to_last(const std::string &k) override; + + int seek_to_first() override { return seek_to_first(std::string()); }; + int seek_to_last() override { return seek_to_last(std::string()); }; + + int upper_bound(const std::string &prefix, const std::string &after) override; + int lower_bound(const std::string &prefix, const std::string &to) override; + bool valid() override; + bool iterator_validate(); + + int next() override; + int prev() override; + int status() override { return 0; }; + + std::string key() override; + std::pair<std::string,std::string> raw_key() override; + bool raw_key_is_prefixed(const std::string &prefix) override; + bufferlist value() override; + ~MDBWholeSpaceIteratorImpl() override; + }; + + uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) override { + std::lock_guard<std::mutex> l(m_lock); + return m_allocated_bytes; + }; + + int get_statfs(struct store_statfs_t *buf) override { + std::lock_guard<std::mutex> l(m_lock); + buf->reset(); + buf->total = m_total_bytes; + buf->allocated = m_allocated_bytes; + buf->data_stored = m_total_bytes; + return 0; + } + + WholeSpaceIterator get_wholespace_iterator() override { + return std::shared_ptr<KeyValueDB::WholeSpaceIteratorImpl>( + new MDBWholeSpaceIteratorImpl(&m_map, &m_lock, &iterator_seq_no, m_using_btree)); + } +}; + +#endif + diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc new file mode 100644 index 00000000..6c020a74 --- /dev/null +++ b/src/kv/RocksDBStore.cc @@ -0,0 +1,1606 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <set> +#include <map> +#include <string> +#include <memory> +#include <errno.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include "rocksdb/db.h" +#include "rocksdb/table.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/cache.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/utilities/convenience.h" +#include "rocksdb/merge_operator.h" + +using std::string; +#include "common/perf_counters.h" +#include "common/PriorityCache.h" +#include "include/str_list.h" +#include "include/stringify.h" +#include "include/str_map.h" +#include "KeyValueDB.h" +#include "RocksDBStore.h" + +#include "common/debug.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_rocksdb +#undef dout_prefix +#define dout_prefix *_dout << "rocksdb: " + +static bufferlist to_bufferlist(rocksdb::Slice in) { + bufferlist bl; + bl.append(bufferptr(in.data(), in.size())); + return bl; +} + +static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, + vector<rocksdb::Slice> *slices) +{ + unsigned n = 0; + for (auto& buf : bl.buffers()) { + (*slices)[n].data_ = buf.c_str(); + (*slices)[n].size_ = buf.length(); + n++; + } + return rocksdb::SliceParts(slices->data(), slices->size()); +} + + +// +// One of these for the default rocksdb column family, routing each prefix +// to the appropriate MergeOperator. +// +class RocksDBStore::MergeOperatorRouter + : public rocksdb::AssociativeMergeOperator +{ + RocksDBStore& store; +public: + const char *Name() const override { + // Construct a name that rocksDB will validate against. We want to + // do this in a way that doesn't constrain the ordering of calls + // to set_merge_operator, so sort the merge operators and then + // construct a name from all of those parts. + store.assoc_name.clear(); + map<std::string,std::string> names; + + for (auto& p : store.merge_ops) { + names[p.first] = p.second->name(); + } + for (auto& p : store.cf_handles) { + names.erase(p.first); + } + for (auto& p : names) { + store.assoc_name += '.'; + store.assoc_name += p.first; + store.assoc_name += ':'; + store.assoc_name += p.second; + } + return store.assoc_name.c_str(); + } + + explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} + + bool Merge(const rocksdb::Slice& key, + const rocksdb::Slice* existing_value, + const rocksdb::Slice& value, + std::string* new_value, + rocksdb::Logger* logger) const override { + // for default column family + // extract prefix from key and compare against each registered merge op; + // even though merge operator for explicit CF is included in merge_ops, + // it won't be picked up, since it won't match. + for (auto& p : store.merge_ops) { + if (p.first.compare(0, p.first.length(), + key.data(), p.first.length()) == 0 && + key.data()[p.first.length()] == 0) { + if (existing_value) { + p.second->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + p.second->merge_nonexistent(value.data(), value.size(), new_value); + } + break; + } + } + return true; // OK :) + } +}; + +// +// One of these per non-default column family, linked directly to the +// merge operator for that CF/prefix (if any). +// +class RocksDBStore::MergeOperatorLinker + : public rocksdb::AssociativeMergeOperator +{ +private: + std::shared_ptr<KeyValueDB::MergeOperator> mop; +public: + explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {} + + const char *Name() const override { + return mop->name(); + } + + bool Merge(const rocksdb::Slice& key, + const rocksdb::Slice* existing_value, + const rocksdb::Slice& value, + std::string* new_value, + rocksdb::Logger* logger) const override { + if (existing_value) { + mop->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + mop->merge_nonexistent(value.data(), value.size(), new_value); + } + return true; + } +}; + +int RocksDBStore::set_merge_operator( + const string& prefix, + std::shared_ptr<KeyValueDB::MergeOperator> mop) +{ + // If you fail here, it's because you can't do this on an open database + ceph_assert(db == nullptr); + merge_ops.push_back(std::make_pair(prefix,mop)); + return 0; +} + +class CephRocksdbLogger : public rocksdb::Logger { + CephContext *cct; +public: + explicit CephRocksdbLogger(CephContext *c) : cct(c) { + cct->get(); + } + ~CephRocksdbLogger() override { + cct->put(); + } + + // Write an entry to the log file with the specified format. + void Logv(const char* format, va_list ap) override { + Logv(rocksdb::INFO_LEVEL, format, ap); + } + + // Write an entry to the log file with the specified log level + // and format. Any log with level under the internal log level + // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be + // printed. + void Logv(const rocksdb::InfoLogLevel log_level, const char* format, + va_list ap) override { + int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1; + dout(ceph::dout::need_dynamic(v)); + char buf[65536]; + vsnprintf(buf, sizeof(buf), format, ap); + *_dout << buf << dendl; + } +}; + +rocksdb::Logger *create_rocksdb_ceph_logger() +{ + return new CephRocksdbLogger(g_ceph_context); +} + +static int string2bool(const string &val, bool &b_val) +{ + if (strcasecmp(val.c_str(), "false") == 0) { + b_val = false; + return 0; + } else if (strcasecmp(val.c_str(), "true") == 0) { + b_val = true; + return 0; + } else { + std::string err; + int b = strict_strtol(val.c_str(), 10, &err); + if (!err.empty()) + return -EINVAL; + b_val = !!b; + return 0; + } +} + +int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt) +{ + if (key == "compaction_threads") { + std::string err; + int f = strict_iecstrtoll(val.c_str(), &err); + if (!err.empty()) + return -EINVAL; + //Low priority threadpool is used for compaction + opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW); + } else if (key == "flusher_threads") { + std::string err; + int f = strict_iecstrtoll(val.c_str(), &err); + if (!err.empty()) + return -EINVAL; + //High priority threadpool is used for flusher + opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH); + } else if (key == "compact_on_mount") { + int ret = string2bool(val, compact_on_mount); + if (ret != 0) + return ret; + } else if (key == "disableWAL") { + int ret = string2bool(val, disableWAL); + if (ret != 0) + return ret; + } else { + //unrecognize config options. + return -EINVAL; + } + return 0; +} + +int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt) +{ + return ParseOptionsFromStringStatic(cct, opt_str, opt, + [&](const string& k, const string& v, rocksdb::Options& o) { + return tryInterpret(k, v, o); + } + ); +} + +int RocksDBStore::ParseOptionsFromStringStatic( + CephContext *cct, + const string& opt_str, + rocksdb::Options& opt, + function<int(const string&, const string&, rocksdb::Options&)> interp) +{ + // keep aligned with func tryInterpret + const set<string> need_interp_keys = {"compaction_threads", "flusher_threads", "compact_on_mount", "disableWAL"}; + + map<string, string> str_map; + int r = get_str_map(opt_str, &str_map, ",\n;"); + if (r < 0) + return r; + map<string, string>::iterator it; + for (it = str_map.begin(); it != str_map.end(); ++it) { + string this_opt = it->first + "=" + it->second; + rocksdb::Status status = + rocksdb::GetOptionsFromString(opt, this_opt, &opt); + if (!status.ok()) { + if (interp != nullptr) { + r = interp(it->first, it->second, opt); + } else if (!need_interp_keys.count(it->first)) { + r = -1; + } + if (r < 0) { + derr << status.ToString() << dendl; + return -EINVAL; + } + } + lgeneric_dout(cct, 0) << " set rocksdb option " << it->first + << " = " << it->second << dendl; + } + return 0; +} + +int RocksDBStore::init(string _options_str) +{ + options_str = _options_str; + rocksdb::Options opt; + //try parse options + if (options_str.length()) { + int r = ParseOptionsFromString(options_str, opt); + if (r != 0) { + return -EINVAL; + } + } + return 0; +} + +int RocksDBStore::create_db_dir() +{ + if (env) { + unique_ptr<rocksdb::Directory> dir; + env->NewDirectory(path, &dir); + } else { + int r = ::mkdir(path.c_str(), 0755); + if (r < 0) + r = -errno; + if (r < 0 && r != -EEXIST) { + derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r) + << dendl; + return r; + } + } + return 0; +} + +int RocksDBStore::install_cf_mergeop( + const string &cf_name, + rocksdb::ColumnFamilyOptions *cf_opt) +{ + ceph_assert(cf_opt != nullptr); + cf_opt->merge_operator.reset(); + for (auto& i : merge_ops) { + if (i.first == cf_name) { + cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second)); + } + } + return 0; +} + +int RocksDBStore::create_and_open(ostream &out, + const vector<ColumnFamily>& cfs) +{ + int r = create_db_dir(); + if (r < 0) + return r; + if (cfs.empty()) { + return do_open(out, true, false, nullptr); + } else { + return do_open(out, true, false, &cfs); + } +} + +int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt) +{ + rocksdb::Status status; + + if (options_str.length()) { + int r = ParseOptionsFromString(options_str, opt); + if (r != 0) { + return -EINVAL; + } + } + + if (g_conf()->rocksdb_perf) { + dbstats = rocksdb::CreateDBStatistics(); + opt.statistics = dbstats; + } + + opt.create_if_missing = create_if_missing; + if (kv_options.count("separate_wal_dir")) { + opt.wal_dir = path + ".wal"; + } + + // Since ceph::for_each_substr doesn't return a value and + // std::stoull does throw, we may as well just catch everything here. + try { + if (kv_options.count("db_paths")) { + list<string> paths; + get_str_list(kv_options["db_paths"], "; \t", paths); + for (auto& p : paths) { + size_t pos = p.find(','); + if (pos == std::string::npos) { + derr << __func__ << " invalid db path item " << p << " in " + << kv_options["db_paths"] << dendl; + return -EINVAL; + } + string path = p.substr(0, pos); + string size_str = p.substr(pos + 1); + uint64_t size = atoll(size_str.c_str()); + if (!size) { + derr << __func__ << " invalid db path item " << p << " in " + << kv_options["db_paths"] << dendl; + return -EINVAL; + } + opt.db_paths.push_back(rocksdb::DbPath(path, size)); + dout(10) << __func__ << " db_path " << path << " size " << size << dendl; + } + } + } catch (const std::system_error& e) { + return -e.code().value(); + } + + if (g_conf()->rocksdb_log_to_ceph_log) { + opt.info_log.reset(new CephRocksdbLogger(g_ceph_context)); + } + + if (priv) { + dout(10) << __func__ << " using custom Env " << priv << dendl; + opt.env = static_cast<rocksdb::Env*>(priv); + } + + opt.env->SetAllowNonOwnerAccess(false); + + // caches + if (!set_cache_flag) { + cache_size = g_conf()->rocksdb_cache_size; + } + uint64_t row_cache_size = cache_size * g_conf()->rocksdb_cache_row_ratio; + uint64_t block_cache_size = cache_size - row_cache_size; + + if (g_conf()->rocksdb_cache_type == "binned_lru") { + bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache( + cct, + block_cache_size, + g_conf()->rocksdb_cache_shard_bits); + } else if (g_conf()->rocksdb_cache_type == "lru") { + bbt_opts.block_cache = rocksdb::NewLRUCache( + block_cache_size, + g_conf()->rocksdb_cache_shard_bits); + } else if (g_conf()->rocksdb_cache_type == "clock") { + bbt_opts.block_cache = rocksdb::NewClockCache( + block_cache_size, + g_conf()->rocksdb_cache_shard_bits); + if (!bbt_opts.block_cache) { + derr << "rocksdb_cache_type '" << g_conf()->rocksdb_cache_type + << "' chosen, but RocksDB not compiled with LibTBB. " + << dendl; + return -EINVAL; + } + } else { + derr << "unrecognized rocksdb_cache_type '" << g_conf()->rocksdb_cache_type + << "'" << dendl; + return -EINVAL; + } + bbt_opts.block_size = g_conf()->rocksdb_block_size; + + if (row_cache_size > 0) + opt.row_cache = rocksdb::NewLRUCache(row_cache_size, + g_conf()->rocksdb_cache_shard_bits); + uint64_t bloom_bits = g_conf().get_val<uint64_t>("rocksdb_bloom_bits_per_key"); + if (bloom_bits > 0) { + dout(10) << __func__ << " set bloom filter bits per key to " + << bloom_bits << dendl; + bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits)); + } + using std::placeholders::_1; + if (g_conf().with_val<std::string>("rocksdb_index_type", + std::bind(std::equal_to<std::string>(), _1, + "binary_search"))) + bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch; + if (g_conf().with_val<std::string>("rocksdb_index_type", + std::bind(std::equal_to<std::string>(), _1, + "hash_search"))) + bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch; + if (g_conf().with_val<std::string>("rocksdb_index_type", + std::bind(std::equal_to<std::string>(), _1, + "two_level"))) + bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + if (!bbt_opts.no_block_cache) { + bbt_opts.cache_index_and_filter_blocks = + g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks"); + bbt_opts.cache_index_and_filter_blocks_with_high_priority = + g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority"); + bbt_opts.pin_l0_filter_and_index_blocks_in_cache = + g_conf().get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache"); + } + bbt_opts.partition_filters = g_conf().get_val<bool>("rocksdb_partition_filters"); + if (g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size") > 0) + bbt_opts.metadata_block_size = g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size"); + + opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts)); + dout(10) << __func__ << " block size " << g_conf()->rocksdb_block_size + << ", block_cache size " << byte_u_t(block_cache_size) + << ", row_cache size " << byte_u_t(row_cache_size) + << "; shards " + << (1 << g_conf()->rocksdb_cache_shard_bits) + << ", type " << g_conf()->rocksdb_cache_type + << dendl; + + opt.merge_operator.reset(new MergeOperatorRouter(*this)); + + return 0; +} + +int RocksDBStore::do_open(ostream &out, + bool create_if_missing, + bool open_readonly, + const vector<ColumnFamily>* cfs) +{ + ceph_assert(!(create_if_missing && open_readonly)); + rocksdb::Options opt; + int r = load_rocksdb_options(create_if_missing, opt); + if (r) { + dout(1) << __func__ << " load rocksdb options failed" << dendl; + return r; + } + rocksdb::Status status; + if (create_if_missing) { + status = rocksdb::DB::Open(opt, path, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + // create and open column families + if (cfs) { + for (auto& p : *cfs) { + // copy default CF settings, block cache, merge operators as + // the base for new CF + rocksdb::ColumnFamilyOptions cf_opt(opt); + // user input options will override the base options + status = rocksdb::GetColumnFamilyOptionsFromString( + cf_opt, p.option, &cf_opt); + if (!status.ok()) { + derr << __func__ << " invalid db column family option string for CF: " + << p.name << dendl; + return -EINVAL; + } + install_cf_mergeop(p.name, &cf_opt); + rocksdb::ColumnFamilyHandle *cf; + status = db->CreateColumnFamily(cf_opt, p.name, &cf); + if (!status.ok()) { + derr << __func__ << " Failed to create rocksdb column family: " + << p.name << dendl; + return -EINVAL; + } + // store the new CF handle + add_column_family(p.name, static_cast<void*>(cf)); + } + } + default_cf = db->DefaultColumnFamily(); + } else { + std::vector<string> existing_cfs; + status = rocksdb::DB::ListColumnFamilies( + rocksdb::DBOptions(opt), + path, + &existing_cfs); + dout(1) << __func__ << " column families: " << existing_cfs << dendl; + if (existing_cfs.empty()) { + // no column families + if (open_readonly) { + status = rocksdb::DB::Open(opt, path, &db); + } else { + status = rocksdb::DB::OpenForReadOnly(opt, path, &db); + } + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + default_cf = db->DefaultColumnFamily(); + } else { + // we cannot change column families for a created database. so, map + // what options we are given to whatever cf's already exist. + std::vector<rocksdb::ColumnFamilyDescriptor> column_families; + for (auto& n : existing_cfs) { + // copy default CF settings, block cache, merge operators as + // the base for new CF + rocksdb::ColumnFamilyOptions cf_opt(opt); + bool found = false; + if (cfs) { + for (auto& i : *cfs) { + if (i.name == n) { + found = true; + status = rocksdb::GetColumnFamilyOptionsFromString( + cf_opt, i.option, &cf_opt); + if (!status.ok()) { + derr << __func__ << " invalid db column family options for CF '" + << i.name << "': " << i.option << dendl; + return -EINVAL; + } + } + } + } + if (n != rocksdb::kDefaultColumnFamilyName) { + install_cf_mergeop(n, &cf_opt); + } + column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt)); + if (!found && n != rocksdb::kDefaultColumnFamilyName) { + dout(1) << __func__ << " column family '" << n + << "' exists but not expected" << dendl; + } + } + std::vector<rocksdb::ColumnFamilyHandle*> handles; + if (open_readonly) { + status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt), + path, column_families, + &handles, &db); + } else { + status = rocksdb::DB::Open(rocksdb::DBOptions(opt), + path, column_families, &handles, &db); + } + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + for (unsigned i = 0; i < existing_cfs.size(); ++i) { + if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) { + default_cf = handles[i]; + must_close_default_cf = true; + } else { + add_column_family(existing_cfs[i], static_cast<void*>(handles[i])); + } + } + } + } + ceph_assert(default_cf != nullptr); + + PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); + plb.add_u64_counter(l_rocksdb_gets, "get", "Gets"); + plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions"); + plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync"); + plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency"); + plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency"); + plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency"); + plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions"); + plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range"); + plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue"); + plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue"); + plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time"); + plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time"); + plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time"); + plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time, + "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + + if (compact_on_mount) { + derr << "Compacting rocksdb store..." << dendl; + compact(); + derr << "Finished compacting rocksdb store" << dendl; + } + return 0; +} + +int RocksDBStore::_test_init(const string& dir) +{ + rocksdb::Options options; + options.create_if_missing = true; + rocksdb::DB *db; + rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); + delete db; + db = nullptr; + return status.ok() ? 0 : -EIO; +} + +RocksDBStore::~RocksDBStore() +{ + close(); + delete logger; + + // Ensure db is destroyed before dependent db_cache and filterpolicy + for (auto& p : cf_handles) { + db->DestroyColumnFamilyHandle( + static_cast<rocksdb::ColumnFamilyHandle*>(p.second)); + p.second = nullptr; + } + if (must_close_default_cf) { + db->DestroyColumnFamilyHandle(default_cf); + must_close_default_cf = false; + } + default_cf = nullptr; + delete db; + db = nullptr; + + if (priv) { + delete static_cast<rocksdb::Env*>(priv); + } +} + +void RocksDBStore::close() +{ + // stop compaction thread + compact_queue_lock.Lock(); + if (compact_thread.is_started()) { + dout(1) << __func__ << " waiting for compaction thread to stop" << dendl; + compact_queue_stop = true; + compact_queue_cond.Signal(); + compact_queue_lock.Unlock(); + compact_thread.join(); + dout(1) << __func__ << " compaction thread to stopped" << dendl; + } else { + compact_queue_lock.Unlock(); + } + + if (logger) + cct->get_perfcounters_collection()->remove(logger); +} + +int RocksDBStore::repair(std::ostream &out) +{ + rocksdb::Options opt; + int r = load_rocksdb_options(false, opt); + if (r) { + dout(1) << __func__ << " load rocksdb options failed" << dendl; + out << "load rocksdb options failed" << std::endl; + return r; + } + rocksdb::Status status = rocksdb::RepairDB(path, opt); + if (status.ok()) { + return 0; + } else { + out << "repair rocksdb failed : " << status.ToString() << std::endl; + return 1; + } +} + +void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) { + std::stringstream ss; + ss.str(s); + std::string item; + while (std::getline(ss, item, delim)) { + elems.push_back(item); + } +} + +int64_t RocksDBStore::estimate_prefix_size(const string& prefix) +{ + auto cf = get_cf_handle(prefix); + uint64_t size = 0; + uint8_t flags = + //rocksdb::DB::INCLUDE_MEMTABLES | // do not include memtables... + rocksdb::DB::INCLUDE_FILES; + if (cf) { + string start(1, '\x00'); + string limit("\xff\xff\xff\xff"); + rocksdb::Range r(start, limit); + db->GetApproximateSizes(cf, &r, 1, &size, flags); + } else { + string limit = prefix + "\xff\xff\xff\xff"; + rocksdb::Range r(prefix, limit); + db->GetApproximateSizes(default_cf, + &r, 1, &size, flags); + } + return size; +} + +void RocksDBStore::get_statistics(Formatter *f) +{ + if (!g_conf()->rocksdb_perf) { + dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats" + << dendl; + return; + } + + if (g_conf()->rocksdb_collect_compaction_stats) { + std::string stat_str; + bool status = db->GetProperty("rocksdb.stats", &stat_str); + if (status) { + f->open_object_section("rocksdb_statistics"); + f->dump_string("rocksdb_compaction_statistics", ""); + vector<string> stats; + split_stats(stat_str, '\n', stats); + for (auto st :stats) { + f->dump_string("", st); + } + f->close_section(); + } + } + if (g_conf()->rocksdb_collect_extended_stats) { + if (dbstats) { + f->open_object_section("rocksdb_extended_statistics"); + string stat_str = dbstats->ToString(); + vector<string> stats; + split_stats(stat_str, '\n', stats); + f->dump_string("rocksdb_extended_statistics", ""); + for (auto st :stats) { + f->dump_string(".", st); + } + f->close_section(); + } + f->open_object_section("rocksdbstore_perf_counters"); + logger->dump_formatted(f,0); + f->close_section(); + } + if (g_conf()->rocksdb_collect_memory_stats) { + f->open_object_section("rocksdb_memtable_statistics"); + std::string str; + if (!bbt_opts.no_block_cache) { + str.append(stringify(bbt_opts.block_cache->GetUsage())); + f->dump_string("block_cache_usage", str.data()); + str.clear(); + str.append(stringify(bbt_opts.block_cache->GetPinnedUsage())); + f->dump_string("block_cache_pinned_blocks_usage", str); + str.clear(); + } + db->GetProperty("rocksdb.cur-size-all-mem-tables", &str); + f->dump_string("rocksdb_memtable_usage", str); + str.clear(); + db->GetProperty("rocksdb.estimate-table-readers-mem", &str); + f->dump_string("rocksdb_index_filter_blocks_usage", str); + f->close_section(); + } +} + +int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t) +{ + // enable rocksdb breakdown + // considering performance overhead, default is disabled + if (g_conf()->rocksdb_perf) { + rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); + rocksdb::get_perf_context()->Reset(); + } + + RocksDBTransactionImpl * _t = + static_cast<RocksDBTransactionImpl *>(t.get()); + woptions.disableWAL = disableWAL; + lgeneric_subdout(cct, rocksdb, 30) << __func__; + RocksWBHandler bat_txc; + _t->bat.Iterate(&bat_txc); + *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl; + + rocksdb::Status s = db->Write(woptions, &_t->bat); + if (!s.ok()) { + RocksWBHandler rocks_txc; + _t->bat.Iterate(&rocks_txc); + derr << __func__ << " error: " << s.ToString() << " code = " << s.code() + << " Rocksdb transaction: " << rocks_txc.seen << dendl; + } + + if (g_conf()->rocksdb_perf) { + utime_t write_memtable_time; + utime_t write_delay_time; + utime_t write_wal_time; + utime_t write_pre_and_post_process_time; + write_wal_time.set_from_double( + static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000); + write_memtable_time.set_from_double( + static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000); + write_delay_time.set_from_double( + static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000); + write_pre_and_post_process_time.set_from_double( + static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000); + logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time); + logger->tinc(l_rocksdb_write_delay_time, write_delay_time); + logger->tinc(l_rocksdb_write_wal_time, write_wal_time); + logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time); + } + + return s.ok() ? 0 : -1; +} + +int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + rocksdb::WriteOptions woptions; + woptions.sync = false; + + int result = submit_common(woptions, t); + + utime_t lat = ceph_clock_now() - start; + logger->inc(l_rocksdb_txns); + logger->tinc(l_rocksdb_submit_latency, lat); + + return result; +} + +int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + utime_t start = ceph_clock_now(); + rocksdb::WriteOptions woptions; + // if disableWAL, sync can't set + woptions.sync = !disableWAL; + + int result = submit_common(woptions, t); + + utime_t lat = ceph_clock_now() - start; + logger->inc(l_rocksdb_txns_sync); + logger->tinc(l_rocksdb_submit_sync_latency, lat); + + return result; +} + +RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) +{ + db = _db; +} + +void RocksDBStore::RocksDBTransactionImpl::put_bat( + rocksdb::WriteBatch& bat, + rocksdb::ColumnFamilyHandle *cf, + const string &key, + const bufferlist &to_set_bl) +{ + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat.Put(cf, + rocksdb::Slice(key), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), + to_set_bl.length())); + } else { + rocksdb::Slice key_slice(key); + vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size()); + bat.Put(cf, + rocksdb::SliceParts(&key_slice, 1), + prepare_sliceparts(to_set_bl, &value_slices)); + } +} + +void RocksDBStore::RocksDBTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + auto cf = db->get_cf_handle(prefix); + if (cf) { + put_bat(bat, cf, k, to_set_bl); + } else { + string key = combine_strings(prefix, k); + put_bat(bat, db->default_cf, key, to_set_bl); + } +} + +void RocksDBStore::RocksDBTransactionImpl::set( + const string &prefix, + const char *k, size_t keylen, + const bufferlist &to_set_bl) +{ + auto cf = db->get_cf_handle(prefix); + if (cf) { + string key(k, keylen); // fixme? + put_bat(bat, cf, key, to_set_bl); + } else { + string key; + combine_strings(prefix, k, keylen, &key); + put_bat(bat, cf, key, to_set_bl); + } +} + +void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + auto cf = db->get_cf_handle(prefix); + if (cf) { + bat.Delete(cf, rocksdb::Slice(k)); + } else { + bat.Delete(db->default_cf, combine_strings(prefix, k)); + } +} + +void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, + const char *k, + size_t keylen) +{ + auto cf = db->get_cf_handle(prefix); + if (cf) { + bat.Delete(cf, rocksdb::Slice(k, keylen)); + } else { + string key; + combine_strings(prefix, k, keylen, &key); + bat.Delete(db->default_cf, rocksdb::Slice(key)); + } +} + +void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix, + const string &k) +{ + auto cf = db->get_cf_handle(prefix); + if (cf) { + bat.SingleDelete(cf, k); + } else { + bat.SingleDelete(db->default_cf, combine_strings(prefix, k)); + } +} + +void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + auto cf = db->get_cf_handle(prefix); + if (cf) { + if (db->enable_rmrange) { + string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating... + if (db->max_items_rmrange) { + uint64_t cnt = db->max_items_rmrange; + bat.SetSavePoint(); + auto it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + if (!cnt) { + bat.RollbackToSavePoint(); + bat.DeleteRange(cf, string(), endprefix); + return; + } + bat.Delete(cf, rocksdb::Slice(it->key())); + --cnt; + } + bat.PopSavePoint(); + } else { + bat.DeleteRange(cf, string(), endprefix); + } + } else { + auto it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + bat.Delete(cf, rocksdb::Slice(it->key())); + } + } + } else { + if (db->enable_rmrange) { + string endprefix = prefix; + endprefix.push_back('\x01'); + if (db->max_items_rmrange) { + uint64_t cnt = db->max_items_rmrange; + bat.SetSavePoint(); + auto it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + if (!cnt) { + bat.RollbackToSavePoint(); + bat.DeleteRange(db->default_cf, + combine_strings(prefix, string()), + combine_strings(endprefix, string())); + return; + } + bat.Delete(db->default_cf, combine_strings(prefix, it->key())); + --cnt; + } + bat.PopSavePoint(); + } else { + bat.DeleteRange(db->default_cf, + combine_strings(prefix, string()), + combine_strings(endprefix, string())); + } + } else { + auto it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + bat.Delete(db->default_cf, combine_strings(prefix, it->key())); + } + } + } +} + +void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, + const string &start, + const string &end) +{ + auto cf = db->get_cf_handle(prefix); + if (cf) { + if (db->enable_rmrange) { + if (db->max_items_rmrange) { + uint64_t cnt = db->max_items_rmrange; + auto it = db->get_iterator(prefix); + bat.SetSavePoint(); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + if (!cnt) { + bat.RollbackToSavePoint(); + bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end)); + return; + } + bat.Delete(cf, rocksdb::Slice(it->key())); + it->next(); + --cnt; + } + bat.PopSavePoint(); + } else { + bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end)); + } + } else { + auto it = db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + bat.Delete(cf, rocksdb::Slice(it->key())); + it->next(); + } + } + } else { + if (db->enable_rmrange) { + if (db->max_items_rmrange) { + uint64_t cnt = db->max_items_rmrange; + auto it = db->get_iterator(prefix); + bat.SetSavePoint(); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + if (!cnt) { + bat.RollbackToSavePoint(); + bat.DeleteRange( + db->default_cf, + rocksdb::Slice(combine_strings(prefix, start)), + rocksdb::Slice(combine_strings(prefix, end))); + return; + } + bat.Delete(db->default_cf, + combine_strings(prefix, it->key())); + it->next(); + --cnt; + } + bat.PopSavePoint(); + } else { + bat.DeleteRange( + db->default_cf, + rocksdb::Slice(combine_strings(prefix, start)), + rocksdb::Slice(combine_strings(prefix, end))); + } + } else { + auto it = db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + bat.Delete(db->default_cf, + combine_strings(prefix, it->key())); + it->next(); + } + } + } +} + +void RocksDBStore::RocksDBTransactionImpl::merge( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + auto cf = db->get_cf_handle(prefix); + if (cf) { + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat.Merge( + cf, + rocksdb::Slice(k), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); + } else { + // make a copy + rocksdb::Slice key_slice(k); + vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size()); + bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1), + prepare_sliceparts(to_set_bl, &value_slices)); + } + } else { + string key = combine_strings(prefix, k); + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat.Merge( + db->default_cf, + rocksdb::Slice(key), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); + } else { + // make a copy + rocksdb::Slice key_slice(key); + vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size()); + bat.Merge( + db->default_cf, + rocksdb::SliceParts(&key_slice, 1), + prepare_sliceparts(to_set_bl, &value_slices)); + } + } +} + +int RocksDBStore::get( + const string &prefix, + const std::set<string> &keys, + std::map<string, bufferlist> *out) +{ + utime_t start = ceph_clock_now(); + auto cf = get_cf_handle(prefix); + if (cf) { + for (auto& key : keys) { + std::string value; + auto status = db->Get(rocksdb::ReadOptions(), + cf, + rocksdb::Slice(key), + &value); + if (status.ok()) { + (*out)[key].append(value); + } else if (status.IsIOError()) { + ceph_abort_msg(status.getState()); + } + } + } else { + for (auto& key : keys) { + std::string value; + string k = combine_strings(prefix, key); + auto status = db->Get(rocksdb::ReadOptions(), + default_cf, + rocksdb::Slice(k), + &value); + if (status.ok()) { + (*out)[key].append(value); + } else if (status.IsIOError()) { + ceph_abort_msg(status.getState()); + } + } + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_rocksdb_gets); + logger->tinc(l_rocksdb_get_latency, lat); + return 0; +} + +int RocksDBStore::get( + const string &prefix, + const string &key, + bufferlist *out) +{ + ceph_assert(out && (out->length() == 0)); + utime_t start = ceph_clock_now(); + int r = 0; + string value; + rocksdb::Status s; + auto cf = get_cf_handle(prefix); + if (cf) { + s = db->Get(rocksdb::ReadOptions(), + cf, + rocksdb::Slice(key), + &value); + } else { + string k = combine_strings(prefix, key); + s = db->Get(rocksdb::ReadOptions(), + default_cf, + rocksdb::Slice(k), + &value); + } + if (s.ok()) { + out->append(value); + } else if (s.IsNotFound()) { + r = -ENOENT; + } else { + ceph_abort_msg(s.getState()); + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_rocksdb_gets); + logger->tinc(l_rocksdb_get_latency, lat); + return r; +} + +int RocksDBStore::get( + const string& prefix, + const char *key, + size_t keylen, + bufferlist *out) +{ + ceph_assert(out && (out->length() == 0)); + utime_t start = ceph_clock_now(); + int r = 0; + string value; + rocksdb::Status s; + auto cf = get_cf_handle(prefix); + if (cf) { + s = db->Get(rocksdb::ReadOptions(), + cf, + rocksdb::Slice(key, keylen), + &value); + } else { + string k; + combine_strings(prefix, key, keylen, &k); + s = db->Get(rocksdb::ReadOptions(), + default_cf, + rocksdb::Slice(k), + &value); + } + if (s.ok()) { + out->append(value); + } else if (s.IsNotFound()) { + r = -ENOENT; + } else { + ceph_abort_msg(s.getState()); + } + utime_t lat = ceph_clock_now() - start; + logger->inc(l_rocksdb_gets); + logger->tinc(l_rocksdb_get_latency, lat); + return r; +} + +int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key) +{ + size_t prefix_len = 0; + + // Find separator inside Slice + char* separator = (char*) memchr(in.data(), 0, in.size()); + if (separator == NULL) + return -EINVAL; + prefix_len = size_t(separator - in.data()); + if (prefix_len >= in.size()) + return -EINVAL; + + // Fetch prefix and/or key directly from Slice + if (prefix) + *prefix = string(in.data(), prefix_len); + if (key) + *key = string(separator+1, in.size()-prefix_len-1); + return 0; +} + +void RocksDBStore::compact() +{ + logger->inc(l_rocksdb_compact); + rocksdb::CompactRangeOptions options; + db->CompactRange(options, default_cf, nullptr, nullptr); + for (auto cf : cf_handles) { + db->CompactRange( + options, + static_cast<rocksdb::ColumnFamilyHandle*>(cf.second), + nullptr, nullptr); + } +} + + +void RocksDBStore::compact_thread_entry() +{ + compact_queue_lock.Lock(); + dout(10) << __func__ << " enter" << dendl; + while (!compact_queue_stop) { + if (!compact_queue.empty()) { + pair<string,string> range = compact_queue.front(); + compact_queue.pop_front(); + logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); + compact_queue_lock.Unlock(); + logger->inc(l_rocksdb_compact_range); + if (range.first.empty() && range.second.empty()) { + compact(); + } else { + compact_range(range.first, range.second); + } + compact_queue_lock.Lock(); + continue; + } + dout(10) << __func__ << " waiting" << dendl; + compact_queue_cond.Wait(compact_queue_lock); + } + compact_queue_lock.Unlock(); +} + +void RocksDBStore::compact_range_async(const string& start, const string& end) +{ + std::lock_guard l(compact_queue_lock); + + // try to merge adjacent ranges. this is O(n), but the queue should + // be short. note that we do not cover all overlap cases and merge + // opportunities here, but we capture the ones we currently need. + list< pair<string,string> >::iterator p = compact_queue.begin(); + while (p != compact_queue.end()) { + if (p->first == start && p->second == end) { + // dup; no-op + return; + } + if (p->first <= end && p->first > start) { + // merge with existing range to the right + compact_queue.push_back(make_pair(start, p->second)); + compact_queue.erase(p); + logger->inc(l_rocksdb_compact_queue_merge); + break; + } + if (p->second >= start && p->second < end) { + // merge with existing range to the left + compact_queue.push_back(make_pair(p->first, end)); + compact_queue.erase(p); + logger->inc(l_rocksdb_compact_queue_merge); + break; + } + ++p; + } + if (p == compact_queue.end()) { + // no merge, new entry. + compact_queue.push_back(make_pair(start, end)); + logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); + } + compact_queue_cond.Signal(); + if (!compact_thread.is_started()) { + compact_thread.create("rstore_compact"); + } +} +bool RocksDBStore::check_omap_dir(string &omap_dir) +{ + rocksdb::Options options; + options.create_if_missing = true; + rocksdb::DB *db; + rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); + delete db; + db = nullptr; + return status.ok(); +} +void RocksDBStore::compact_range(const string& start, const string& end) +{ + rocksdb::CompactRangeOptions options; + rocksdb::Slice cstart(start); + rocksdb::Slice cend(end); + db->CompactRange(options, &cstart, &cend); +} + +RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() +{ + delete dbiter; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first() +{ + dbiter->SeekToFirst(); + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix) +{ + rocksdb::Slice slice_prefix(prefix); + dbiter->Seek(slice_prefix); + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last() +{ + dbiter->SeekToLast(); + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix) +{ + string limit = past_prefix(prefix); + rocksdb::Slice slice_limit(limit); + dbiter->Seek(slice_limit); + + if (!dbiter->Valid()) { + dbiter->SeekToLast(); + } else { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) +{ + lower_bound(prefix, after); + if (valid()) { + pair<string,string> key = raw_key(); + if (key.first == prefix && key.second == after) + next(); + } + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) +{ + string bound = combine_strings(prefix, to); + rocksdb::Slice slice_bound(bound); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; +} +bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid() +{ + return dbiter->Valid(); +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next() +{ + if (valid()) { + dbiter->Next(); + } + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev() +{ + if (valid()) { + dbiter->Prev(); + } + ceph_assert(!dbiter->status().IsIOError()); + return dbiter->status().ok() ? 0 : -1; +} +string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key() +{ + string out_key; + split_key(dbiter->key(), 0, &out_key); + return out_key; +} +pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key() +{ + string prefix, key; + split_key(dbiter->key(), &prefix, &key); + return make_pair(prefix, key); +} + +bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) { + // Look for "prefix\0" right in rocksb::Slice + rocksdb::Slice key = dbiter->key(); + if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { + return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; + } else { + return false; + } +} + +bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value() +{ + return to_bufferlist(dbiter->value()); +} + +size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size() +{ + return dbiter->key().size(); +} + +size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size() +{ + return dbiter->value().size(); +} + +bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr() +{ + rocksdb::Slice val = dbiter->value(); + return bufferptr(val.data(), val.size()); +} + +int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status() +{ + return dbiter->status().ok() ? 0 : -1; +} + +string RocksDBStore::past_prefix(const string &prefix) +{ + string limit = prefix; + limit.push_back(1); + return limit; +} + +RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator() +{ + return std::make_shared<RocksDBWholeSpaceIteratorImpl>( + db->NewIterator(rocksdb::ReadOptions(), default_cf)); +} + +class CFIteratorImpl : public KeyValueDB::IteratorImpl { +protected: + string prefix; + rocksdb::Iterator *dbiter; +public: + explicit CFIteratorImpl(const std::string& p, + rocksdb::Iterator *iter) + : prefix(p), dbiter(iter) { } + ~CFIteratorImpl() { + delete dbiter; + } + + int seek_to_first() override { + dbiter->SeekToFirst(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last() override { + dbiter->SeekToLast(); + return dbiter->status().ok() ? 0 : -1; + } + int upper_bound(const string &after) override { + lower_bound(after); + if (valid() && (key() == after)) { + next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int lower_bound(const string &to) override { + rocksdb::Slice slice_bound(to); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; + } + int next() override { + if (valid()) { + dbiter->Next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int prev() override { + if (valid()) { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; + } + bool valid() override { + return dbiter->Valid(); + } + string key() override { + return dbiter->key().ToString(); + } + std::pair<std::string, std::string> raw_key() override { + return make_pair(prefix, key()); + } + bufferlist value() override { + return to_bufferlist(dbiter->value()); + } + bufferptr value_as_ptr() override { + rocksdb::Slice val = dbiter->value(); + return bufferptr(val.data(), val.size()); + } + int status() override { + return dbiter->status().ok() ? 0 : -1; + } +}; + +KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix) +{ + rocksdb::ColumnFamilyHandle *cf_handle = + static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix)); + if (cf_handle) { + return std::make_shared<CFIteratorImpl>( + prefix, + db->NewIterator(rocksdb::ReadOptions(), cf_handle)); + } else { + return KeyValueDB::get_iterator(prefix); + } +} diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h new file mode 100644 index 00000000..e1d5181e --- /dev/null +++ b/src/kv/RocksDBStore.h @@ -0,0 +1,512 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef ROCKS_DB_STORE_H +#define ROCKS_DB_STORE_H + +#include "include/types.h" +#include "include/buffer_fwd.h" +#include "KeyValueDB.h" +#include <set> +#include <map> +#include <string> +#include <memory> +#include <boost/scoped_ptr.hpp> +#include "rocksdb/write_batch.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/iostats_context.h" +#include "rocksdb/statistics.h" +#include "rocksdb/table.h" +#include "kv/rocksdb_cache/BinnedLRUCache.h" +#include <errno.h> +#include "common/errno.h" +#include "common/dout.h" +#include "include/ceph_assert.h" +#include "common/Formatter.h" +#include "common/Cond.h" +#include "common/ceph_context.h" +#include "common/PriorityCache.h" + +class PerfCounters; + +enum { + l_rocksdb_first = 34300, + l_rocksdb_gets, + l_rocksdb_txns, + l_rocksdb_txns_sync, + l_rocksdb_get_latency, + l_rocksdb_submit_latency, + l_rocksdb_submit_sync_latency, + l_rocksdb_compact, + l_rocksdb_compact_range, + l_rocksdb_compact_queue_merge, + l_rocksdb_compact_queue_len, + l_rocksdb_write_wal_time, + l_rocksdb_write_memtable_time, + l_rocksdb_write_delay_time, + l_rocksdb_write_pre_and_post_process_time, + l_rocksdb_last, +}; + +namespace rocksdb{ + class DB; + class Env; + class Cache; + class FilterPolicy; + class Snapshot; + class Slice; + class WriteBatch; + class Iterator; + class Logger; + class ColumnFamilyHandle; + struct Options; + struct BlockBasedTableOptions; + struct DBOptions; + struct ColumnFamilyOptions; +} + +extern rocksdb::Logger *create_rocksdb_ceph_logger(); + +/** + * Uses RocksDB to implement the KeyValueDB interface + */ +class RocksDBStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + string path; + map<string,string> kv_options; + void *priv; + rocksdb::DB *db; + rocksdb::Env *env; + std::shared_ptr<rocksdb::Statistics> dbstats; + rocksdb::BlockBasedTableOptions bbt_opts; + string options_str; + + uint64_t cache_size = 0; + bool set_cache_flag = false; + + bool must_close_default_cf = false; + rocksdb::ColumnFamilyHandle *default_cf = nullptr; + + int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t); + int install_cf_mergeop(const string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt); + int create_db_dir(); + int do_open(ostream &out, bool create_if_missing, bool open_readonly, + const vector<ColumnFamily>* cfs = nullptr); + int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt); + + // manage async compactions + Mutex compact_queue_lock; + Cond compact_queue_cond; + list< pair<string,string> > compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + RocksDBStore *db; + public: + explicit CompactThread(RocksDBStore *d) : db(d) {} + void *entry() override { + db->compact_thread_entry(); + return NULL; + } + friend class RocksDBStore; + } compact_thread; + + void compact_thread_entry(); + + void compact_range(const string& start, const string& end); + void compact_range_async(const string& start, const string& end); + int tryInterpret(const string& key, const string& val, rocksdb::Options& opt); + +public: + /// compact the underlying rocksdb store + bool compact_on_mount; + bool disableWAL; + bool enable_rmrange; + const uint64_t max_items_rmrange; + void compact() override; + + void compact_async() override { + compact_range_async(string(), string()); + } + + int ParseOptionsFromString(const string& opt_str, rocksdb::Options& opt); + static int ParseOptionsFromStringStatic( + CephContext* cct, + const string& opt_str, + rocksdb::Options &opt, + function<int(const string&, const string&, rocksdb::Options&)> interp); + static int _test_init(const string& dir); + int init(string options_str) override; + /// compact rocksdb for all keys with a given prefix + void compact_prefix(const string& prefix) override { + compact_range(prefix, past_prefix(prefix)); + } + void compact_prefix_async(const string& prefix) override { + compact_range_async(prefix, past_prefix(prefix)); + } + + void compact_range(const string& prefix, const string& start, const string& end) override { + compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); + } + void compact_range_async(const string& prefix, const string& start, const string& end) override { + compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end)); + } + + RocksDBStore(CephContext *c, const string &path, map<string,string> opt, void *p) : + cct(c), + logger(NULL), + path(path), + kv_options(opt), + priv(p), + db(NULL), + env(static_cast<rocksdb::Env*>(p)), + dbstats(NULL), + compact_queue_lock("RocksDBStore::compact_thread_lock"), + compact_queue_stop(false), + compact_thread(this), + compact_on_mount(false), + disableWAL(false), + enable_rmrange(cct->_conf->rocksdb_enable_rmrange), + max_items_rmrange(cct->_conf.get_val<uint64_t>("rocksdb_max_items_rmrange")) + {} + + ~RocksDBStore() override; + + static bool check_omap_dir(string &omap_dir); + /// Opens underlying db + int open(ostream &out, const vector<ColumnFamily>& cfs = {}) override { + return do_open(out, false, false, &cfs); + } + /// Creates underlying db if missing and opens it + int create_and_open(ostream &out, + const vector<ColumnFamily>& cfs = {}) override; + + int open_read_only(ostream &out, const vector<ColumnFamily>& cfs = {}) override { + return do_open(out, false, true, &cfs); + } + + void close() override; + + rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& cf_name) { + auto iter = cf_handles.find(cf_name); + if (iter == cf_handles.end()) + return nullptr; + else + return static_cast<rocksdb::ColumnFamilyHandle*>(iter->second); + } + int repair(std::ostream &out) override; + void split_stats(const std::string &s, char delim, std::vector<std::string> &elems); + void get_statistics(Formatter *f) override; + + PerfCounters *get_perf_counters() override + { + return logger; + } + + int64_t estimate_prefix_size(const string& prefix) override; + + struct RocksWBHandler: public rocksdb::WriteBatch::Handler { + std::string seen ; + int num_seen = 0; + static string pretty_binary_string(const string& in) { + char buf[10]; + string out; + out.reserve(in.length() * 3); + enum { NONE, HEX, STRING } mode = NONE; + unsigned from = 0, i; + for (i=0; i < in.length(); ++i) { + if ((in[i] < 32 || (unsigned char)in[i] > 126) || + (mode == HEX && in.length() - i >= 4 && + ((in[i] < 32 || (unsigned char)in[i] > 126) || + (in[i+1] < 32 || (unsigned char)in[i+1] > 126) || + (in[i+2] < 32 || (unsigned char)in[i+2] > 126) || + (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) { + + if (mode == STRING) { + out.append(in.substr(from, i - from)); + out.push_back('\''); + } + if (mode != HEX) { + out.append("0x"); + mode = HEX; + } + if (in.length() - i >= 4) { + // print a whole u32 at once + snprintf(buf, sizeof(buf), "%08x", + (uint32_t)(((unsigned char)in[i] << 24) | + ((unsigned char)in[i+1] << 16) | + ((unsigned char)in[i+2] << 8) | + ((unsigned char)in[i+3] << 0))); + i += 3; + } else { + snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]); + } + out.append(buf); + } else { + if (mode != STRING) { + out.push_back('\''); + mode = STRING; + from = i; + } + } + } + if (mode == STRING) { + out.append(in.substr(from, i - from)); + out.push_back('\''); + } + return out; + } + void Put(const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + string prefix ((key.ToString()).substr(0,1)); + string key_to_decode ((key.ToString()).substr(2,string::npos)); + uint64_t size = (value.ToString()).size(); + seen += "\nPut( Prefix = " + prefix + " key = " + + pretty_binary_string(key_to_decode) + + " Value size = " + std::to_string(size) + ")"; + num_seen++; + } + void SingleDelete(const rocksdb::Slice& key) override { + string prefix ((key.ToString()).substr(0,1)); + string key_to_decode ((key.ToString()).substr(2,string::npos)); + seen += "\nSingleDelete(Prefix = "+ prefix + " Key = " + + pretty_binary_string(key_to_decode) + ")"; + num_seen++; + } + void Delete(const rocksdb::Slice& key) override { + string prefix ((key.ToString()).substr(0,1)); + string key_to_decode ((key.ToString()).substr(2,string::npos)); + seen += "\nDelete( Prefix = " + prefix + " key = " + + pretty_binary_string(key_to_decode) + ")"; + + num_seen++; + } + void Merge(const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + string prefix ((key.ToString()).substr(0,1)); + string key_to_decode ((key.ToString()).substr(2,string::npos)); + uint64_t size = (value.ToString()).size(); + seen += "\nMerge( Prefix = " + prefix + " key = " + + pretty_binary_string(key_to_decode) + " Value size = " + + std::to_string(size) + ")"; + + num_seen++; + } + bool Continue() override { return num_seen < 50; } + + }; + + class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + rocksdb::WriteBatch bat; + RocksDBStore *db; + + explicit RocksDBTransactionImpl(RocksDBStore *_db); + private: + void put_bat( + rocksdb::WriteBatch& bat, + rocksdb::ColumnFamilyHandle *cf, + const string &k, + const bufferlist &to_set_bl); + public: + void set( + const string &prefix, + const string &k, + const bufferlist &bl) override; + void set( + const string &prefix, + const char *k, + size_t keylen, + const bufferlist &bl) override; + void rmkey( + const string &prefix, + const string &k) override; + void rmkey( + const string &prefix, + const char *k, + size_t keylen) override; + void rm_single_key( + const string &prefix, + const string &k) override; + void rmkeys_by_prefix( + const string &prefix + ) override; + void rm_range_keys( + const string &prefix, + const string &start, + const string &end) override; + void merge( + const string& prefix, + const string& k, + const bufferlist &bl) override; + }; + + KeyValueDB::Transaction get_transaction() override { + return std::make_shared<RocksDBTransactionImpl>(this); + } + + int submit_transaction(KeyValueDB::Transaction t) override; + int submit_transaction_sync(KeyValueDB::Transaction t) override; + int get( + const string &prefix, + const std::set<string> &key, + std::map<string, bufferlist> *out + ) override; + int get( + const string &prefix, + const string &key, + bufferlist *out + ) override; + int get( + const string &prefix, + const char *key, + size_t keylen, + bufferlist *out) override; + + + class RocksDBWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + protected: + rocksdb::Iterator *dbiter; + public: + explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) : + dbiter(iter) { } + //virtual ~RocksDBWholeSpaceIteratorImpl() { } + ~RocksDBWholeSpaceIteratorImpl() override; + + int seek_to_first() override; + int seek_to_first(const string &prefix) override; + int seek_to_last() override; + int seek_to_last(const string &prefix) override; + int upper_bound(const string &prefix, const string &after) override; + int lower_bound(const string &prefix, const string &to) override; + bool valid() override; + int next() override; + int prev() override; + string key() override; + pair<string,string> raw_key() override; + bool raw_key_is_prefixed(const string &prefix) override; + bufferlist value() override; + bufferptr value_as_ptr() override; + int status() override; + size_t key_size() override; + size_t value_size() override; + }; + + Iterator get_iterator(const std::string& prefix) override; + + /// Utility + static string combine_strings(const string &prefix, const string &value) { + string out = prefix; + out.push_back(0); + out.append(value); + return out; + } + static void combine_strings(const string &prefix, + const char *key, size_t keylen, + string *out) { + out->reserve(prefix.size() + 1 + keylen); + *out = prefix; + out->push_back(0); + out->append(key, keylen); + } + + static int split_key(rocksdb::Slice in, string *prefix, string *key); + + static string past_prefix(const string &prefix); + + class MergeOperatorRouter; + class MergeOperatorLinker; + friend class MergeOperatorRouter; + int set_merge_operator( + const std::string& prefix, + std::shared_ptr<KeyValueDB::MergeOperator> mop) override; + string assoc_name; ///< Name of associative operator + + uint64_t get_estimated_size(map<string,uint64_t> &extra) override { + DIR *store_dir = opendir(path.c_str()); + if (!store_dir) { + lderr(cct) << __func__ << " something happened opening the store: " + << cpp_strerror(errno) << dendl; + return 0; + } + + uint64_t total_size = 0; + uint64_t sst_size = 0; + uint64_t log_size = 0; + uint64_t misc_size = 0; + + struct dirent *entry = NULL; + while ((entry = readdir(store_dir)) != NULL) { + string n(entry->d_name); + + if (n == "." || n == "..") + continue; + + string fpath = path + '/' + n; + struct stat s; + int err = stat(fpath.c_str(), &s); + if (err < 0) + err = -errno; + // we may race against rocksdb while reading files; this should only + // happen when those files are being updated, data is being shuffled + // and files get removed, in which case there's not much of a problem + // as we'll get to them next time around. + if (err == -ENOENT) { + continue; + } + if (err < 0) { + lderr(cct) << __func__ << " error obtaining stats for " << fpath + << ": " << cpp_strerror(err) << dendl; + goto err; + } + + size_t pos = n.find_last_of('.'); + if (pos == string::npos) { + misc_size += s.st_size; + continue; + } + + string ext = n.substr(pos+1); + if (ext == "sst") { + sst_size += s.st_size; + } else if (ext == "log") { + log_size += s.st_size; + } else { + misc_size += s.st_size; + } + } + + total_size = sst_size + log_size + misc_size; + + extra["sst"] = sst_size; + extra["log"] = log_size; + extra["misc"] = misc_size; + extra["total"] = total_size; + +err: + closedir(store_dir); + return total_size; + } + + virtual int64_t get_cache_usage() const override { + return static_cast<int64_t>(bbt_opts.block_cache->GetUsage()); + } + + int set_cache_size(uint64_t s) override { + cache_size = s; + set_cache_flag = true; + return 0; + } + + virtual std::shared_ptr<PriorityCache::PriCache> get_priority_cache() + const override { + return dynamic_pointer_cast<PriorityCache::PriCache>( + bbt_opts.block_cache); + } + + WholeSpaceIterator get_wholespace_iterator() override; +}; + + + +#endif diff --git a/src/kv/rocksdb_cache/BinnedLRUCache.cc b/src/kv/rocksdb_cache/BinnedLRUCache.cc new file mode 100644 index 00000000..2391a7f6 --- /dev/null +++ b/src/kv/rocksdb_cache/BinnedLRUCache.cc @@ -0,0 +1,623 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "BinnedLRUCache.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string> + +#define dout_context cct +#define dout_subsys ceph_subsys_rocksdb +#undef dout_prefix +#define dout_prefix *_dout << "rocksdb: " + +namespace rocksdb_cache { + +BinnedLRUHandleTable::BinnedLRUHandleTable() : list_(nullptr), length_(0), elems_(0) { + Resize(); +} + +BinnedLRUHandleTable::~BinnedLRUHandleTable() { + ApplyToAllCacheEntries([](BinnedLRUHandle* h) { + if (h->refs == 1) { + h->Free(); + } + }); + delete[] list_; +} + +BinnedLRUHandle* BinnedLRUHandleTable::Lookup(const rocksdb::Slice& key, uint32_t hash) { + return *FindPointer(key, hash); +} + +BinnedLRUHandle* BinnedLRUHandleTable::Insert(BinnedLRUHandle* h) { + BinnedLRUHandle** ptr = FindPointer(h->key(), h->hash); + BinnedLRUHandle* old = *ptr; + h->next_hash = (old == nullptr ? nullptr : old->next_hash); + *ptr = h; + if (old == nullptr) { + ++elems_; + if (elems_ > length_) { + // Since each cache entry is fairly large, we aim for a small + // average linked list length (<= 1). + Resize(); + } + } + return old; +} + +BinnedLRUHandle* BinnedLRUHandleTable::Remove(const rocksdb::Slice& key, uint32_t hash) { + BinnedLRUHandle** ptr = FindPointer(key, hash); + BinnedLRUHandle* result = *ptr; + if (result != nullptr) { + *ptr = result->next_hash; + --elems_; + } + return result; +} + +BinnedLRUHandle** BinnedLRUHandleTable::FindPointer(const rocksdb::Slice& key, uint32_t hash) { + BinnedLRUHandle** ptr = &list_[hash & (length_ - 1)]; + while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) { + ptr = &(*ptr)->next_hash; + } + return ptr; +} + +void BinnedLRUHandleTable::Resize() { + uint32_t new_length = 16; + while (new_length < elems_ * 1.5) { + new_length *= 2; + } + BinnedLRUHandle** new_list = new BinnedLRUHandle*[new_length]; + memset(new_list, 0, sizeof(new_list[0]) * new_length); + uint32_t count = 0; + for (uint32_t i = 0; i < length_; i++) { + BinnedLRUHandle* h = list_[i]; + while (h != nullptr) { + BinnedLRUHandle* next = h->next_hash; + uint32_t hash = h->hash; + BinnedLRUHandle** ptr = &new_list[hash & (new_length - 1)]; + h->next_hash = *ptr; + *ptr = h; + h = next; + count++; + } + } + ceph_assert(elems_ == count); + delete[] list_; + list_ = new_list; + length_ = new_length; +} + +BinnedLRUCacheShard::BinnedLRUCacheShard(size_t capacity, bool strict_capacity_limit, + double high_pri_pool_ratio) + : capacity_(0), + high_pri_pool_usage_(0), + strict_capacity_limit_(strict_capacity_limit), + high_pri_pool_ratio_(high_pri_pool_ratio), + high_pri_pool_capacity_(0), + usage_(0), + lru_usage_(0) { + // Make empty circular linked list + lru_.next = &lru_; + lru_.prev = &lru_; + lru_low_pri_ = &lru_; + SetCapacity(capacity); +} + +BinnedLRUCacheShard::~BinnedLRUCacheShard() {} + +bool BinnedLRUCacheShard::Unref(BinnedLRUHandle* e) { + ceph_assert(e->refs > 0); + e->refs--; + return e->refs == 0; +} + +// Call deleter and free + +void BinnedLRUCacheShard::EraseUnRefEntries() { + ceph::autovector<BinnedLRUHandle*> last_reference_list; + { + std::lock_guard<std::mutex> l(mutex_); + while (lru_.next != &lru_) { + BinnedLRUHandle* old = lru_.next; + ceph_assert(old->InCache()); + ceph_assert(old->refs == + 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + last_reference_list.push_back(old); + } + } + + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void BinnedLRUCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + if (thread_safe) { + mutex_.lock(); + } + table_.ApplyToAllCacheEntries( + [callback](BinnedLRUHandle* h) { callback(h->value, h->charge); }); + if (thread_safe) { + mutex_.unlock(); + } +} + +void BinnedLRUCacheShard::TEST_GetLRUList(BinnedLRUHandle** lru, BinnedLRUHandle** lru_low_pri) { + *lru = &lru_; + *lru_low_pri = lru_low_pri_; +} + +size_t BinnedLRUCacheShard::TEST_GetLRUSize() { + BinnedLRUHandle* lru_handle = lru_.next; + size_t lru_size = 0; + while (lru_handle != &lru_) { + lru_size++; + lru_handle = lru_handle->next; + } + return lru_size; +} + +double BinnedLRUCacheShard::GetHighPriPoolRatio() const { + std::lock_guard<std::mutex> l(mutex_); + return high_pri_pool_ratio_; +} + +size_t BinnedLRUCacheShard::GetHighPriPoolUsage() const { + std::lock_guard<std::mutex> l(mutex_); + return high_pri_pool_usage_; +} + +void BinnedLRUCacheShard::LRU_Remove(BinnedLRUHandle* e) { + ceph_assert(e->next != nullptr); + ceph_assert(e->prev != nullptr); + if (lru_low_pri_ == e) { + lru_low_pri_ = e->prev; + } + e->next->prev = e->prev; + e->prev->next = e->next; + e->prev = e->next = nullptr; + lru_usage_ -= e->charge; + if (e->InHighPriPool()) { + ceph_assert(high_pri_pool_usage_ >= e->charge); + high_pri_pool_usage_ -= e->charge; + } +} + +void BinnedLRUCacheShard::LRU_Insert(BinnedLRUHandle* e) { + ceph_assert(e->next == nullptr); + ceph_assert(e->prev == nullptr); + if (high_pri_pool_ratio_ > 0 && e->IsHighPri()) { + // Inset "e" to head of LRU list. + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(true); + high_pri_pool_usage_ += e->charge; + MaintainPoolSize(); + } else { + // Insert "e" to the head of low-pri pool. Note that when + // high_pri_pool_ratio is 0, head of low-pri pool is also head of LRU list. + e->next = lru_low_pri_->next; + e->prev = lru_low_pri_; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(false); + lru_low_pri_ = e; + } + lru_usage_ += e->charge; +} + +void BinnedLRUCacheShard::MaintainPoolSize() { + while (high_pri_pool_usage_ > high_pri_pool_capacity_) { + // Overflow last entry in high-pri pool to low-pri pool. + lru_low_pri_ = lru_low_pri_->next; + ceph_assert(lru_low_pri_ != &lru_); + lru_low_pri_->SetInHighPriPool(false); + high_pri_pool_usage_ -= lru_low_pri_->charge; + } +} + +void BinnedLRUCacheShard::EvictFromLRU(size_t charge, + ceph::autovector<BinnedLRUHandle*>* deleted) { + while (usage_ + charge > capacity_ && lru_.next != &lru_) { + BinnedLRUHandle* old = lru_.next; + ceph_assert(old->InCache()); + ceph_assert(old->refs == 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + deleted->push_back(old); + } +} + +void BinnedLRUCacheShard::SetCapacity(size_t capacity) { + ceph::autovector<BinnedLRUHandle*> last_reference_list; + { + std::lock_guard<std::mutex> l(mutex_); + capacity_ = capacity; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + EvictFromLRU(0, &last_reference_list); + } + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void BinnedLRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { + std::lock_guard<std::mutex> l(mutex_); + strict_capacity_limit_ = strict_capacity_limit; +} + +rocksdb::Cache::Handle* BinnedLRUCacheShard::Lookup(const rocksdb::Slice& key, uint32_t hash) { + std::lock_guard<std::mutex> l(mutex_); + BinnedLRUHandle* e = table_.Lookup(key, hash); + if (e != nullptr) { + ceph_assert(e->InCache()); + if (e->refs == 1) { + LRU_Remove(e); + } + e->refs++; + e->SetHit(); + } + return reinterpret_cast<rocksdb::Cache::Handle*>(e); +} + +bool BinnedLRUCacheShard::Ref(rocksdb::Cache::Handle* h) { + BinnedLRUHandle* handle = reinterpret_cast<BinnedLRUHandle*>(h); + std::lock_guard<std::mutex> l(mutex_); + if (handle->InCache() && handle->refs == 1) { + LRU_Remove(handle); + } + handle->refs++; + return true; +} + +void BinnedLRUCacheShard::SetHighPriPoolRatio(double high_pri_pool_ratio) { + std::lock_guard<std::mutex> l(mutex_); + high_pri_pool_ratio_ = high_pri_pool_ratio; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + MaintainPoolSize(); +} + +bool BinnedLRUCacheShard::Release(rocksdb::Cache::Handle* handle, bool force_erase) { + if (handle == nullptr) { + return false; + } + BinnedLRUHandle* e = reinterpret_cast<BinnedLRUHandle*>(handle); + bool last_reference = false; + { + std::lock_guard<std::mutex> l(mutex_); + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (e->refs == 1 && e->InCache()) { + // The item is still in cache, and nobody else holds a reference to it + if (usage_ > capacity_ || force_erase) { + // the cache is full + // The LRU list must be empty since the cache is full + ceph_assert(!(usage_ > capacity_) || lru_.next == &lru_); + // take this opportunity and remove the item + table_.Remove(e->key(), e->hash); + e->SetInCache(false); + Unref(e); + usage_ -= e->charge; + last_reference = true; + } else { + // put the item on the list to be potentially freed + LRU_Insert(e); + } + } + } + + // free outside of mutex + if (last_reference) { + e->Free(); + } + return last_reference; +} + +rocksdb::Status BinnedLRUCacheShard::Insert(const rocksdb::Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, rocksdb::Cache::Priority priority) { + auto e = new BinnedLRUHandle(); + rocksdb::Status s; + ceph::autovector<BinnedLRUHandle*> last_reference_list; + + e->value = value; + e->deleter = deleter; + e->charge = charge; + e->key_length = key.size(); + e->key_data = new char[e->key_length]; + e->flags = 0; + e->hash = hash; + e->refs = (handle == nullptr + ? 1 + : 2); // One from BinnedLRUCache, one for the returned handle + e->next = e->prev = nullptr; + e->SetInCache(true); + e->SetPriority(priority); + std::copy_n(key.data(), e->key_length, e->key_data); + + { + std::lock_guard<std::mutex> l(mutex_); + // Free the space following strict LRU policy until enough space + // is freed or the lru list is empty + EvictFromLRU(charge, &last_reference_list); + + if (usage_ - lru_usage_ + charge > capacity_ && + (strict_capacity_limit_ || handle == nullptr)) { + if (handle == nullptr) { + // Don't insert the entry but still return ok, as if the entry inserted + // into cache and get evicted immediately. + last_reference_list.push_back(e); + } else { + delete e; + *handle = nullptr; + s = rocksdb::Status::Incomplete("Insert failed due to LRU cache being full."); + } + } else { + // insert into the cache + // note that the cache might get larger than its capacity if not enough + // space was freed + BinnedLRUHandle* old = table_.Insert(e); + usage_ += e->charge; + if (old != nullptr) { + old->SetInCache(false); + if (Unref(old)) { + usage_ -= old->charge; + // old is on LRU because it's in cache and its reference count + // was just 1 (Unref returned 0) + LRU_Remove(old); + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Insert(e); + } else { + *handle = reinterpret_cast<rocksdb::Cache::Handle*>(e); + } + s = rocksdb::Status::OK(); + } + } + + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } + + return s; +} + +void BinnedLRUCacheShard::Erase(const rocksdb::Slice& key, uint32_t hash) { + BinnedLRUHandle* e; + bool last_reference = false; + { + std::lock_guard<std::mutex> l(mutex_); + e = table_.Remove(key, hash); + if (e != nullptr) { + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (last_reference && e->InCache()) { + LRU_Remove(e); + } + e->SetInCache(false); + } + } + + // mutex not held here + // last_reference will only be true if e != nullptr + if (last_reference) { + e->Free(); + } +} + +size_t BinnedLRUCacheShard::GetUsage() const { + std::lock_guard<std::mutex> l(mutex_); + return usage_; +} + +size_t BinnedLRUCacheShard::GetPinnedUsage() const { + std::lock_guard<std::mutex> l(mutex_); + ceph_assert(usage_ >= lru_usage_); + return usage_ - lru_usage_; +} + +std::string BinnedLRUCacheShard::GetPrintableOptions() const { + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + std::lock_guard<std::mutex> l(mutex_); + snprintf(buffer, kBufferSize, " high_pri_pool_ratio: %.3lf\n", + high_pri_pool_ratio_); + } + return std::string(buffer); +} + +BinnedLRUCache::BinnedLRUCache(CephContext *c, + size_t capacity, + int num_shard_bits, + bool strict_capacity_limit, + double high_pri_pool_ratio) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit), cct(c) { + num_shards_ = 1 << num_shard_bits; + // TODO: Switch over to use mempool + int rc = posix_memalign((void**) &shards_, + CACHE_LINE_SIZE, + sizeof(BinnedLRUCacheShard) * num_shards_); + if (rc != 0) { + throw std::bad_alloc(); + } + size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_; + for (int i = 0; i < num_shards_; i++) { + new (&shards_[i]) + BinnedLRUCacheShard(per_shard, strict_capacity_limit, high_pri_pool_ratio); + } +} + +BinnedLRUCache::~BinnedLRUCache() { + for (int i = 0; i < num_shards_; i++) { + shards_[i].~BinnedLRUCacheShard(); + } + free(shards_); +} + +CacheShard* BinnedLRUCache::GetShard(int shard) { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +const CacheShard* BinnedLRUCache::GetShard(int shard) const { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +void* BinnedLRUCache::Value(Handle* handle) { + return reinterpret_cast<const BinnedLRUHandle*>(handle)->value; +} + +size_t BinnedLRUCache::GetCharge(Handle* handle) const { + return reinterpret_cast<const BinnedLRUHandle*>(handle)->charge; +} + +uint32_t BinnedLRUCache::GetHash(Handle* handle) const { + return reinterpret_cast<const BinnedLRUHandle*>(handle)->hash; +} + +void BinnedLRUCache::DisownData() { +// Do not drop data if compile with ASAN to suppress leak warning. +#ifndef __SANITIZE_ADDRESS__ + shards_ = nullptr; +#endif // !__SANITIZE_ADDRESS__ +} + +size_t BinnedLRUCache::TEST_GetLRUSize() { + size_t lru_size_of_all_shards = 0; + for (int i = 0; i < num_shards_; i++) { + lru_size_of_all_shards += shards_[i].TEST_GetLRUSize(); + } + return lru_size_of_all_shards; +} + +void BinnedLRUCache::SetHighPriPoolRatio(double high_pri_pool_ratio) { + for (int i = 0; i < num_shards_; i++) { + shards_[i].SetHighPriPoolRatio(high_pri_pool_ratio); + } +} + +double BinnedLRUCache::GetHighPriPoolRatio() const { + double result = 0.0; + if (num_shards_ > 0) { + result = shards_[0].GetHighPriPoolRatio(); + } + return result; +} + +size_t BinnedLRUCache::GetHighPriPoolUsage() const { + // We will not lock the cache when getting the usage from shards. + size_t usage = 0; + for (int s = 0; s < num_shards_; s++) { + usage += shards_[s].GetHighPriPoolUsage(); + } + return usage; +} + +// PriCache + +int64_t BinnedLRUCache::request_cache_bytes(PriorityCache::Priority pri, uint64_t total_cache) const +{ + int64_t assigned = get_cache_bytes(pri); + int64_t request = 0; + + switch (pri) { + // PRI0 is for rocksdb's high priority items (indexes/filters) + case PriorityCache::Priority::PRI0: + { + request = GetHighPriPoolUsage(); + break; + } + // All other cache items are currently shoved into the PRI1 priority. + case PriorityCache::Priority::PRI1: + { + request = GetUsage(); + request -= GetHighPriPoolUsage(); + break; + } + default: + break; + } + request = (request > assigned) ? request - assigned : 0; + ldout(cct, 10) << __func__ << " Priority: " << static_cast<uint32_t>(pri) + << " Request: " << request << dendl; + return request; +} + +int64_t BinnedLRUCache::commit_cache_size(uint64_t total_bytes) +{ + size_t old_bytes = GetCapacity(); + int64_t new_bytes = PriorityCache::get_chunk( + get_cache_bytes(), total_bytes); + ldout(cct, 10) << __func__ << " old: " << old_bytes + << " new: " << new_bytes << dendl; + SetCapacity((size_t) new_bytes); + + double ratio = 0; + if (new_bytes > 0) { + int64_t pri0_bytes = get_cache_bytes(PriorityCache::Priority::PRI0); + // Add 10% of the "reserved" bytes so the ratio can't get stuck at 0 + pri0_bytes += (new_bytes - get_cache_bytes()) / 10; + ratio = (double) pri0_bytes / new_bytes; + } + ldout(cct, 10) << __func__ << " High Pri Pool Ratio set to " << ratio << dendl; + SetHighPriPoolRatio(ratio); + return new_bytes; +} + +std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache( + CephContext *c, + size_t capacity, + int num_shard_bits, + bool strict_capacity_limit, + double high_pri_pool_ratio) { + if (num_shard_bits >= 20) { + return nullptr; // the cache cannot be sharded into too many fine pieces + } + if (high_pri_pool_ratio < 0.0 || high_pri_pool_ratio > 1.0) { + // invalid high_pri_pool_ratio + return nullptr; + } + if (num_shard_bits < 0) { + num_shard_bits = GetDefaultCacheShardBits(capacity); + } + return std::make_shared<BinnedLRUCache>( + c, capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio); +} + +} // namespace rocksdb_cache diff --git a/src/kv/rocksdb_cache/BinnedLRUCache.h b/src/kv/rocksdb_cache/BinnedLRUCache.h new file mode 100644 index 00000000..96023ce2 --- /dev/null +++ b/src/kv/rocksdb_cache/BinnedLRUCache.h @@ -0,0 +1,335 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_BINNED_LRU_CACHE +#define ROCKSDB_BINNED_LRU_CACHE + +#include <string> +#include <mutex> + +#include "ShardedCache.h" +#include "common/autovector.h" +#include "common/dout.h" +#include "include/ceph_assert.h" +#include "common/ceph_context.h" + +namespace rocksdb_cache { + +// LRU cache implementation + +// An entry is a variable length heap-allocated structure. +// Entries are referenced by cache and/or by any external entity. +// The cache keeps all its entries in table. Some elements +// are also stored on LRU list. +// +// BinnedLRUHandle can be in these states: +// 1. Referenced externally AND in hash table. +// In that case the entry is *not* in the LRU. (refs > 1 && in_cache == true) +// 2. Not referenced externally and in hash table. In that case the entry is +// in the LRU and can be freed. (refs == 1 && in_cache == true) +// 3. Referenced externally and not in hash table. In that case the entry is +// in not on LRU and not in table. (refs >= 1 && in_cache == false) +// +// All newly created BinnedLRUHandles are in state 1. If you call +// BinnedLRUCacheShard::Release +// on entry in state 1, it will go into state 2. To move from state 1 to +// state 3, either call BinnedLRUCacheShard::Erase or BinnedLRUCacheShard::Insert with the +// same key. +// To move from state 2 to state 1, use BinnedLRUCacheShard::Lookup. +// Before destruction, make sure that no handles are in state 1. This means +// that any successful BinnedLRUCacheShard::Lookup/BinnedLRUCacheShard::Insert have a +// matching +// RUCache::Release (to move into state 2) or BinnedLRUCacheShard::Erase (for state 3) + +std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache( + CephContext *c, + size_t capacity, + int num_shard_bits = -1, + bool strict_capacity_limit = false, + double high_pri_pool_ratio = 0.0); + +struct BinnedLRUHandle { + void* value; + void (*deleter)(const rocksdb::Slice&, void* value); + BinnedLRUHandle* next_hash; + BinnedLRUHandle* next; + BinnedLRUHandle* prev; + size_t charge; // TODO(opt): Only allow uint32_t? + size_t key_length; + uint32_t refs; // a number of refs to this entry + // cache itself is counted as 1 + + // Include the following flags: + // in_cache: whether this entry is referenced by the hash table. + // is_high_pri: whether this entry is high priority entry. + // in_high_pri_pool: whether this entry is in high-pri pool. + char flags; + + uint32_t hash; // Hash of key(); used for fast sharding and comparisons + + char* key_data = nullptr; // Beginning of key + + rocksdb::Slice key() const { + // For cheaper lookups, we allow a temporary Handle object + // to store a pointer to a key in "value". + if (next == this) { + return *(reinterpret_cast<rocksdb::Slice*>(value)); + } else { + return rocksdb::Slice(key_data, key_length); + } + } + + bool InCache() { return flags & 1; } + bool IsHighPri() { return flags & 2; } + bool InHighPriPool() { return flags & 4; } + bool HasHit() { return flags & 8; } + + void SetInCache(bool in_cache) { + if (in_cache) { + flags |= 1; + } else { + flags &= ~1; + } + } + + void SetPriority(rocksdb::Cache::Priority priority) { + if (priority == rocksdb::Cache::Priority::HIGH) { + flags |= 2; + } else { + flags &= ~2; + } + } + + void SetInHighPriPool(bool in_high_pri_pool) { + if (in_high_pri_pool) { + flags |= 4; + } else { + flags &= ~4; + } + } + + void SetHit() { flags |= 8; } + + void Free() { + ceph_assert((refs == 1 && InCache()) || (refs == 0 && !InCache())); + if (deleter) { + (*deleter)(key(), value); + } + delete[] key_data; + delete this; + } +}; + +// We provide our own simple hash table since it removes a whole bunch +// of porting hacks and is also faster than some of the built-in hash +// table implementations in some of the compiler/runtime combinations +// we have tested. E.g., readrandom speeds up by ~5% over the g++ +// 4.4.3's builtin hashtable. +class BinnedLRUHandleTable { + public: + BinnedLRUHandleTable(); + ~BinnedLRUHandleTable(); + + BinnedLRUHandle* Lookup(const rocksdb::Slice& key, uint32_t hash); + BinnedLRUHandle* Insert(BinnedLRUHandle* h); + BinnedLRUHandle* Remove(const rocksdb::Slice& key, uint32_t hash); + + template <typename T> + void ApplyToAllCacheEntries(T func) { + for (uint32_t i = 0; i < length_; i++) { + BinnedLRUHandle* h = list_[i]; + while (h != nullptr) { + auto n = h->next_hash; + ceph_assert(h->InCache()); + func(h); + h = n; + } + } + } + + private: + // Return a pointer to slot that points to a cache entry that + // matches key/hash. If there is no such cache entry, return a + // pointer to the trailing slot in the corresponding linked list. + BinnedLRUHandle** FindPointer(const rocksdb::Slice& key, uint32_t hash); + + void Resize(); + + // The table consists of an array of buckets where each bucket is + // a linked list of cache entries that hash into the bucket. + BinnedLRUHandle** list_; + uint32_t length_; + uint32_t elems_; +}; + +// A single shard of sharded cache. +class alignas(CACHE_LINE_SIZE) BinnedLRUCacheShard : public CacheShard { + public: + BinnedLRUCacheShard(size_t capacity, bool strict_capacity_limit, + double high_pri_pool_ratio); + virtual ~BinnedLRUCacheShard(); + + // Separate from constructor so caller can easily make an array of BinnedLRUCache + // if current usage is more than new capacity, the function will attempt to + // free the needed space + virtual void SetCapacity(size_t capacity) override; + + // Set the flag to reject insertion if cache if full. + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + // Set percentage of capacity reserved for high-pri cache entries. + void SetHighPriPoolRatio(double high_pri_pool_ratio); + + // Like Cache methods, but with an extra "hash" parameter. + virtual rocksdb::Status Insert(const rocksdb::Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, + rocksdb::Cache::Priority priority) override; + virtual rocksdb::Cache::Handle* Lookup(const rocksdb::Slice& key, uint32_t hash) override; + virtual bool Ref(rocksdb::Cache::Handle* handle) override; + virtual bool Release(rocksdb::Cache::Handle* handle, + bool force_erase = false) override; + virtual void Erase(const rocksdb::Slice& key, uint32_t hash) override; + + // Although in some platforms the update of size_t is atomic, to make sure + // GetUsage() and GetPinnedUsage() work correctly under any platform, we'll + // protect them with mutex_. + + virtual size_t GetUsage() const override; + virtual size_t GetPinnedUsage() const override; + + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + + virtual void EraseUnRefEntries() override; + + virtual std::string GetPrintableOptions() const override; + + void TEST_GetLRUList(BinnedLRUHandle** lru, BinnedLRUHandle** lru_low_pri); + + // Retrieves number of elements in LRU, for unit test purpose only + // not threadsafe + size_t TEST_GetLRUSize(); + + // Retrieves high pri pool ratio + double GetHighPriPoolRatio() const; + + // Retrieves high pri pool usage + size_t GetHighPriPoolUsage() const; + + private: + void LRU_Remove(BinnedLRUHandle* e); + void LRU_Insert(BinnedLRUHandle* e); + + // Overflow the last entry in high-pri pool to low-pri pool until size of + // high-pri pool is no larger than the size specify by high_pri_pool_pct. + void MaintainPoolSize(); + + // Just reduce the reference count by 1. + // Return true if last reference + bool Unref(BinnedLRUHandle* e); + + // Free some space following strict LRU policy until enough space + // to hold (usage_ + charge) is freed or the lru list is empty + // This function is not thread safe - it needs to be executed while + // holding the mutex_ + void EvictFromLRU(size_t charge, ceph::autovector<BinnedLRUHandle*>* deleted); + + // Initialized before use. + size_t capacity_; + + // Memory size for entries in high-pri pool. + size_t high_pri_pool_usage_; + + // Whether to reject insertion if cache reaches its full capacity. + bool strict_capacity_limit_; + + // Ratio of capacity reserved for high priority cache entries. + double high_pri_pool_ratio_; + + // High-pri pool size, equals to capacity * high_pri_pool_ratio. + // Remember the value to avoid recomputing each time. + double high_pri_pool_capacity_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + // LRU contains items which can be evicted, ie reference only by cache + BinnedLRUHandle lru_; + + // Pointer to head of low-pri pool in LRU list. + BinnedLRUHandle* lru_low_pri_; + + // ------------^^^^^^^^^^^^^----------- + // Not frequently modified data members + // ------------------------------------ + // + // We separate data members that are updated frequently from the ones that + // are not frequently updated so that they don't share the same cache line + // which will lead into false cache sharing + // + // ------------------------------------ + // Frequently modified data members + // ------------vvvvvvvvvvvvv----------- + BinnedLRUHandleTable table_; + + // Memory size for entries residing in the cache + size_t usage_; + + // Memory size for entries residing only in the LRU list + size_t lru_usage_; + + // mutex_ protects the following state. + // We don't count mutex_ as the cache's internal state so semantically we + // don't mind mutex_ invoking the non-const actions. + mutable std::mutex mutex_; +}; + +class BinnedLRUCache : public ShardedCache { + public: + BinnedLRUCache(CephContext *c, size_t capacity, int num_shard_bits, + bool strict_capacity_limit, double high_pri_pool_ratio); + virtual ~BinnedLRUCache(); + virtual const char* Name() const override { return "BinnedLRUCache"; } + virtual CacheShard* GetShard(int shard) override; + virtual const CacheShard* GetShard(int shard) const override; + virtual void* Value(Handle* handle) override; + virtual size_t GetCharge(Handle* handle) const override; + virtual uint32_t GetHash(Handle* handle) const override; + virtual void DisownData() override; + + // Retrieves number of elements in LRU, for unit test purpose only + size_t TEST_GetLRUSize(); + // Sets the high pri pool ratio + void SetHighPriPoolRatio(double high_pri_pool_ratio); + // Retrieves high pri pool ratio + double GetHighPriPoolRatio() const; + // Retrieves high pri pool usage + size_t GetHighPriPoolUsage() const; + + // PriorityCache + virtual int64_t request_cache_bytes( + PriorityCache::Priority pri, uint64_t total_cache) const; + virtual int64_t commit_cache_size(uint64_t total_cache); + virtual int64_t get_committed_size() const { + return GetCapacity(); + } + virtual std::string get_cache_name() const { + return "RocksDB Binned LRU Cache"; + } + + private: + CephContext *cct; + BinnedLRUCacheShard* shards_; + int num_shards_ = 0; +}; + +} // namespace rocksdb_cache + +#endif // ROCKSDB_BINNED_LRU_CACHE diff --git a/src/kv/rocksdb_cache/ShardedCache.cc b/src/kv/rocksdb_cache/ShardedCache.cc new file mode 100644 index 00000000..367140a9 --- /dev/null +++ b/src/kv/rocksdb_cache/ShardedCache.cc @@ -0,0 +1,159 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "ShardedCache.h" + +#include <string> + +namespace rocksdb_cache { + +ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) + : num_shard_bits_(num_shard_bits), + capacity_(capacity), + strict_capacity_limit_(strict_capacity_limit), + last_id_(1) {} + +void ShardedCache::SetCapacity(size_t capacity) { + int num_shards = 1 << num_shard_bits_; + const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; + std::lock_guard<std::mutex> l(capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetCapacity(per_shard); + } + capacity_ = capacity; +} + +void ShardedCache::SetStrictCapacityLimit(bool strict_capacity_limit) { + int num_shards = 1 << num_shard_bits_; + std::lock_guard<std::mutex> l(capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetStrictCapacityLimit(strict_capacity_limit); + } + strict_capacity_limit_ = strict_capacity_limit; +} + +rocksdb::Status ShardedCache::Insert(const rocksdb::Slice& key, void* value, size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, Priority priority) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Insert(key, hash, value, charge, deleter, handle, priority); +} + +rocksdb::Cache::Handle* ShardedCache::Lookup(const rocksdb::Slice& key, rocksdb::Statistics* /*stats*/) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash))->Lookup(key, hash); +} + +bool ShardedCache::Ref(rocksdb::Cache::Handle* handle) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Ref(handle); +} + +bool ShardedCache::Release(rocksdb::Cache::Handle* handle, bool force_erase) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Release(handle, force_erase); +} + +void ShardedCache::Erase(const rocksdb::Slice& key) { + uint32_t hash = HashSlice(key); + GetShard(Shard(hash))->Erase(key, hash); +} + +uint64_t ShardedCache::NewId() { + return last_id_.fetch_add(1, std::memory_order_relaxed); +} + +size_t ShardedCache::GetCapacity() const { + std::lock_guard<std::mutex> l(capacity_mutex_); + return capacity_; +} + +bool ShardedCache::HasStrictCapacityLimit() const { + std::lock_guard<std::mutex> l(capacity_mutex_); + return strict_capacity_limit_; +} + +size_t ShardedCache::GetUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetUsage(); + } + return usage; +} + +size_t ShardedCache::GetUsage(rocksdb::Cache::Handle* handle) const { + return GetCharge(handle); +} + +size_t ShardedCache::GetPinnedUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetPinnedUsage(); + } + return usage; +} + +void ShardedCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->ApplyToAllCacheEntries(callback, thread_safe); + } +} + +void ShardedCache::EraseUnRefEntries() { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->EraseUnRefEntries(); + } +} + +std::string ShardedCache::GetPrintableOptions() const { + std::string ret; + ret.reserve(20000); + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + std::lock_guard<std::mutex> l(capacity_mutex_); + snprintf(buffer, kBufferSize, " capacity : %" ROCKSDB_PRIszt "\n", + capacity_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " num_shard_bits : %d\n", num_shard_bits_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " strict_capacity_limit : %d\n", + strict_capacity_limit_); + ret.append(buffer); + } + ret.append(GetShard(0)->GetPrintableOptions()); + return ret; +} +int GetDefaultCacheShardBits(size_t capacity) { + int num_shard_bits = 0; + size_t min_shard_size = 512L * 1024L; // Every shard is at least 512KB. + size_t num_shards = capacity / min_shard_size; + while (num_shards >>= 1) { + if (++num_shard_bits >= 6) { + // No more than 6. + return num_shard_bits; + } + } + return num_shard_bits; +} + +} // namespace rocksdb_cache diff --git a/src/kv/rocksdb_cache/ShardedCache.h b/src/kv/rocksdb_cache/ShardedCache.h new file mode 100644 index 00000000..4d64893a --- /dev/null +++ b/src/kv/rocksdb_cache/ShardedCache.h @@ -0,0 +1,141 @@ +// Copyright (c) 2018-Present Red Hat Inc. All rights reserved. +// +// Copyright (c) 2011-2018, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 and Apache 2.0 License +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_SHARDED_CACHE +#define ROCKSDB_SHARDED_CACHE + +#include <atomic> +#include <string> +#include <mutex> + +#include "rocksdb/cache.h" +#include "include/ceph_hash.h" +#include "common/PriorityCache.h" +//#include "hash.h" + +#ifndef CACHE_LINE_SIZE +#define CACHE_LINE_SIZE 64 // XXX arch-specific define +#endif +#define ROCKSDB_PRIszt "zu" + +namespace rocksdb_cache { + +// Single cache shard interface. +class CacheShard { + public: + CacheShard() = default; + virtual ~CacheShard() = default; + + virtual rocksdb::Status Insert(const rocksdb::Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, rocksdb::Cache::Priority priority) = 0; + virtual rocksdb::Cache::Handle* Lookup(const rocksdb::Slice& key, uint32_t hash) = 0; + virtual bool Ref(rocksdb::Cache::Handle* handle) = 0; + virtual bool Release(rocksdb::Cache::Handle* handle, bool force_erase = false) = 0; + virtual void Erase(const rocksdb::Slice& key, uint32_t hash) = 0; + virtual void SetCapacity(size_t capacity) = 0; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; + virtual size_t GetUsage() const = 0; + virtual size_t GetPinnedUsage() const = 0; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) = 0; + virtual void EraseUnRefEntries() = 0; + virtual std::string GetPrintableOptions() const { return ""; } +}; + +// Generic cache interface which shards cache by hash of keys. 2^num_shard_bits +// shards will be created, with capacity split evenly to each of the shards. +// Keys are sharded by the highest num_shard_bits bits of hash value. +class ShardedCache : public rocksdb::Cache, public PriorityCache::PriCache { + public: + ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit); + virtual ~ShardedCache() = default; + virtual const char* Name() const override = 0; + virtual CacheShard* GetShard(int shard) = 0; + virtual const CacheShard* GetShard(int shard) const = 0; + virtual void* Value(Handle* handle) override = 0; + virtual size_t GetCharge(Handle* handle) const = 0; + virtual uint32_t GetHash(Handle* handle) const = 0; + virtual void DisownData() override = 0; + + virtual void SetCapacity(size_t capacity) override; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + virtual rocksdb::Status Insert(const rocksdb::Slice& key, void* value, size_t charge, + void (*deleter)(const rocksdb::Slice& key, void* value), + rocksdb::Cache::Handle** handle, Priority priority) override; + virtual rocksdb::Cache::Handle* Lookup(const rocksdb::Slice& key, rocksdb::Statistics* stats) override; + virtual bool Ref(rocksdb::Cache::Handle* handle) override; + virtual bool Release(rocksdb::Cache::Handle* handle, bool force_erase = false) override; + virtual void Erase(const rocksdb::Slice& key) override; + virtual uint64_t NewId() override; + virtual size_t GetCapacity() const override; + virtual bool HasStrictCapacityLimit() const override; + virtual size_t GetUsage() const override; + virtual size_t GetUsage(rocksdb::Cache::Handle* handle) const override; + virtual size_t GetPinnedUsage() const override; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + virtual void EraseUnRefEntries() override; + virtual std::string GetPrintableOptions() const override; + + int GetNumShardBits() const { return num_shard_bits_; } + + // PriCache + virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const { + return cache_bytes[pri]; + } + virtual int64_t get_cache_bytes() const { + int64_t total = 0; + for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) { + PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i); + total += get_cache_bytes(pri); + } + return total; + } + virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) { + cache_bytes[pri] = bytes; + } + virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) { + cache_bytes[pri] += bytes; + } + virtual double get_cache_ratio() const { + return cache_ratio; + } + virtual void set_cache_ratio(double ratio) { + cache_ratio = ratio; + } + virtual std::string get_cache_name() const = 0; + + private: + static inline uint32_t HashSlice(const rocksdb::Slice& s) { + return ceph_str_hash(CEPH_STR_HASH_RJENKINS, s.data(), s.size()); +// return Hash(s.data(), s.size(), 0); + } + + uint32_t Shard(uint32_t hash) { + // Note, hash >> 32 yields hash in gcc, not the zero we expect! + return (num_shard_bits_ > 0) ? (hash >> (32 - num_shard_bits_)) : 0; + } + + int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0}; + double cache_ratio = 0; + + int num_shard_bits_; + mutable std::mutex capacity_mutex_; + size_t capacity_; + bool strict_capacity_limit_; + std::atomic<uint64_t> last_id_; +}; + +extern int GetDefaultCacheShardBits(size_t capacity); + +} // namespace rocksdb_cache +#endif // ROCKSDB_SHARDED_CACHE |