summaryrefslogtreecommitdiffstats
path: root/src/kv/LevelDBStore.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/kv/LevelDBStore.h')
-rw-r--r--src/kv/LevelDBStore.h412
1 files changed, 412 insertions, 0 deletions
diff --git a/src/kv/LevelDBStore.h b/src/kv/LevelDBStore.h
new file mode 100644
index 000000000..085193ee0
--- /dev/null
+++ b/src/kv/LevelDBStore.h
@@ -0,0 +1,412 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef LEVEL_DB_STORE_H
+#define LEVEL_DB_STORE_H
+
+#include "include/types.h"
+#include "include/buffer_fwd.h"
+#include "KeyValueDB.h"
+#include <set>
+#include <map>
+#include <string>
+#include <boost/scoped_ptr.hpp>
+#include "leveldb/db.h"
+#include "leveldb/env.h"
+#include "leveldb/write_batch.h"
+#include "leveldb/slice.h"
+#include "leveldb/cache.h"
+#ifdef HAVE_LEVELDB_FILTER_POLICY
+#include "leveldb/filter_policy.h"
+#endif
+
+#include <errno.h>
+#include "common/errno.h"
+#include "common/dout.h"
+#include "include/ceph_assert.h"
+#include "common/Formatter.h"
+#include "common/Cond.h"
+
+#include "common/ceph_context.h"
+#include "include/common_fwd.h"
+
+// reinclude our assert to clobber the system one
+# include "include/ceph_assert.h"
+
+enum {
+ l_leveldb_first = 34300,
+ l_leveldb_gets,
+ l_leveldb_txns,
+ l_leveldb_get_latency,
+ l_leveldb_submit_latency,
+ l_leveldb_submit_sync_latency,
+ l_leveldb_compact,
+ l_leveldb_compact_range,
+ l_leveldb_compact_queue_merge,
+ l_leveldb_compact_queue_len,
+ l_leveldb_last,
+};
+
+extern leveldb::Logger *create_leveldb_ceph_logger();
+
+class CephLevelDBLogger;
+
+/**
+ * Uses LevelDB to implement the KeyValueDB interface
+ */
+class LevelDBStore : public KeyValueDB {
+ CephContext *cct;
+ PerfCounters *logger;
+ CephLevelDBLogger *ceph_logger;
+ std::string path;
+ boost::scoped_ptr<leveldb::Cache> db_cache;
+#ifdef HAVE_LEVELDB_FILTER_POLICY
+ boost::scoped_ptr<const leveldb::FilterPolicy> filterpolicy;
+#endif
+ boost::scoped_ptr<leveldb::DB> db;
+
+ int load_leveldb_options(bool create_if_missing, leveldb::Options &opts);
+ int do_open(std::ostream &out, bool create_if_missing);
+
+ // manage async compactions
+ ceph::mutex compact_queue_lock =
+ ceph::make_mutex("LevelDBStore::compact_thread_lock");
+ ceph::condition_variable compact_queue_cond;
+ std::list<std::pair<std::string, std::string>> compact_queue;
+ bool compact_queue_stop;
+ class CompactThread : public Thread {
+ LevelDBStore *db;
+ public:
+ explicit CompactThread(LevelDBStore *d) : db(d) {}
+ void *entry() override {
+ db->compact_thread_entry();
+ return NULL;
+ }
+ friend class LevelDBStore;
+ } compact_thread;
+
+ void compact_thread_entry();
+
+ void compact_range(const std::string& start, const std::string& end) {
+ leveldb::Slice cstart(start);
+ leveldb::Slice cend(end);
+ db->CompactRange(&cstart, &cend);
+ }
+ void compact_range_async(const std::string& start, const std::string& end);
+
+public:
+ /// compact the underlying leveldb store
+ void compact() override;
+
+ void compact_async() override {
+ compact_range_async({}, {});
+ }
+
+ /// compact db for all keys with a given prefix
+ void compact_prefix(const std::string& prefix) override {
+ compact_range(prefix, past_prefix(prefix));
+ }
+ void compact_prefix_async(const std::string& prefix) override {
+ compact_range_async(prefix, past_prefix(prefix));
+ }
+ void compact_range(const std::string& prefix,
+ const std::string& start, const std::string& end) override {
+ compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
+ }
+ void compact_range_async(const std::string& prefix,
+ const std::string& start, const std::string& end) override {
+ compact_range_async(combine_strings(prefix, start),
+ combine_strings(prefix, end));
+ }
+
+
+ /**
+ * options_t: Holds options which are minimally interpreted
+ * on initialization and then passed through to LevelDB.
+ * We transform a couple of these into actual LevelDB
+ * structures, but the rest are simply passed through unchanged. See
+ * leveldb/options.h for more precise details on each.
+ *
+ * Set them after constructing the LevelDBStore, but before calling
+ * open() or create_and_open().
+ */
+ struct options_t {
+ uint64_t write_buffer_size; /// in-memory write buffer size
+ int max_open_files; /// maximum number of files LevelDB can open at once
+ uint64_t cache_size; /// size of extra decompressed cache to use
+ uint64_t block_size; /// user data per block
+ int bloom_size; /// number of bits per entry to put in a bloom filter
+ bool compression_enabled; /// whether to use libsnappy compression or not
+
+ // don't change these ones. No, seriously
+ int block_restart_interval;
+ bool error_if_exists;
+ bool paranoid_checks;
+
+ std::string log_file;
+
+ options_t() :
+ write_buffer_size(0), //< 0 means default
+ max_open_files(0), //< 0 means default
+ cache_size(0), //< 0 means no cache (default)
+ block_size(0), //< 0 means default
+ bloom_size(0), //< 0 means no bloom filter (default)
+ compression_enabled(true), //< set to false for no compression
+ block_restart_interval(0), //< 0 means default
+ error_if_exists(false), //< set to true if you want to check nonexistence
+ paranoid_checks(false) //< set to true if you want paranoid checks
+ {}
+ } options;
+
+ LevelDBStore(CephContext *c, const std::string &path) :
+ cct(c),
+ logger(NULL),
+ ceph_logger(NULL),
+ path(path),
+ db_cache(NULL),
+#ifdef HAVE_LEVELDB_FILTER_POLICY
+ filterpolicy(NULL),
+#endif
+ compact_queue_stop(false),
+ compact_thread(this),
+ options()
+ {}
+
+ ~LevelDBStore() override;
+
+ static int _test_init(const std::string& dir);
+ int init(std::string option_str="") override;
+
+ /// Opens underlying db
+ int open(std::ostream &out, const std::string& cfs="") override;
+ /// Creates underlying db if missing and opens it
+ int create_and_open(std::ostream &out, const std::string& cfs="") override;
+
+ void close() override;
+
+ PerfCounters *get_perf_counters() override
+ {
+ return logger;
+ }
+ int repair(std::ostream &out) override;
+
+ class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl {
+ public:
+ leveldb::WriteBatch bat;
+ LevelDBStore *db;
+ explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {}
+ void set(
+ const std::string &prefix,
+ const std::string &k,
+ const ceph::buffer::list &bl) override;
+ using KeyValueDB::TransactionImpl::set;
+ void rmkey(
+ const std::string &prefix,
+ const std::string &k) override;
+ void rmkeys_by_prefix(
+ const std::string &prefix
+ ) override;
+ virtual void rm_range_keys(
+ const std::string &prefix,
+ const std::string &start,
+ const std::string &end) override;
+
+ using KeyValueDB::TransactionImpl::rmkey;
+ };
+
+ KeyValueDB::Transaction get_transaction() override {
+ return std::make_shared<LevelDBTransactionImpl>(this);
+ }
+
+ int submit_transaction(KeyValueDB::Transaction t) override;
+ int submit_transaction_sync(KeyValueDB::Transaction t) override;
+ int get(
+ const std::string &prefix,
+ const std::set<std::string> &key,
+ std::map<std::string, ceph::buffer::list> *out
+ ) override;
+
+ int get(const std::string &prefix,
+ const std::string &key,
+ ceph::buffer::list *value) override;
+
+ using KeyValueDB::get;
+
+ class LevelDBWholeSpaceIteratorImpl :
+ public KeyValueDB::WholeSpaceIteratorImpl {
+ protected:
+ boost::scoped_ptr<leveldb::Iterator> dbiter;
+ public:
+ explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) :
+ dbiter(iter) { }
+ ~LevelDBWholeSpaceIteratorImpl() override { }
+
+ int seek_to_first() override {
+ dbiter->SeekToFirst();
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int seek_to_first(const std::string &prefix) override {
+ leveldb::Slice slice_prefix(prefix);
+ dbiter->Seek(slice_prefix);
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int seek_to_last() override {
+ dbiter->SeekToLast();
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int seek_to_last(const std::string &prefix) override {
+ std::string limit = past_prefix(prefix);
+ leveldb::Slice slice_limit(limit);
+ dbiter->Seek(slice_limit);
+
+ if (!dbiter->Valid()) {
+ dbiter->SeekToLast();
+ } else {
+ dbiter->Prev();
+ }
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int upper_bound(const std::string &prefix, const std::string &after) override {
+ lower_bound(prefix, after);
+ if (valid()) {
+ std::pair<std::string,std::string> key = raw_key();
+ if (key.first == prefix && key.second == after)
+ next();
+ }
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int lower_bound(const std::string &prefix, const std::string &to) override {
+ std::string bound = combine_strings(prefix, to);
+ leveldb::Slice slice_bound(bound);
+ dbiter->Seek(slice_bound);
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ bool valid() override {
+ return dbiter->Valid();
+ }
+ int next() override {
+ if (valid())
+ dbiter->Next();
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int prev() override {
+ if (valid())
+ dbiter->Prev();
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ std::string key() override {
+ std::string out_key;
+ split_key(dbiter->key(), 0, &out_key);
+ return out_key;
+ }
+ std::pair<std::string,std::string> raw_key() override {
+ std::string prefix, key;
+ split_key(dbiter->key(), &prefix, &key);
+ return std::make_pair(prefix, key);
+ }
+ bool raw_key_is_prefixed(const std::string &prefix) override {
+ leveldb::Slice key = dbiter->key();
+ if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
+ return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
+ } else {
+ return false;
+ }
+ }
+ ceph::buffer::list value() override {
+ return to_bufferlist(dbiter->value());
+ }
+
+ ceph::bufferptr value_as_ptr() override {
+ leveldb::Slice data = dbiter->value();
+ return ceph::bufferptr(data.data(), data.size());
+ }
+
+ int status() override {
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ };
+
+ /// Utility
+ static std::string combine_strings(const std::string &prefix, const std::string &value);
+ static int split_key(leveldb::Slice in, std::string *prefix, std::string *key);
+ static ceph::buffer::list to_bufferlist(leveldb::Slice in);
+ static std::string past_prefix(const std::string &prefix) {
+ std::string limit = prefix;
+ limit.push_back(1);
+ return limit;
+ }
+
+ uint64_t get_estimated_size(std::map<std::string,std::uint64_t> &extra) override {
+ DIR *store_dir = opendir(path.c_str());
+ if (!store_dir) {
+ lderr(cct) << __func__ << " something happened opening the store: "
+ << cpp_strerror(errno) << dendl;
+ return 0;
+ }
+
+ uint64_t total_size = 0;
+ uint64_t sst_size = 0;
+ uint64_t log_size = 0;
+ uint64_t misc_size = 0;
+
+ struct dirent *entry = NULL;
+ while ((entry = readdir(store_dir)) != NULL) {
+ std::string n(entry->d_name);
+
+ if (n == "." || n == "..")
+ continue;
+
+ std::string fpath = path + '/' + n;
+ struct stat s;
+ int err = stat(fpath.c_str(), &s);
+ if (err < 0)
+ err = -errno;
+ // we may race against leveldb while reading files; this should only
+ // happen when those files are being updated, data is being shuffled
+ // and files get removed, in which case there's not much of a problem
+ // as we'll get to them next time around.
+ if (err == -ENOENT) {
+ continue;
+ }
+ if (err < 0) {
+ lderr(cct) << __func__ << " error obtaining stats for " << fpath
+ << ": " << cpp_strerror(err) << dendl;
+ goto err;
+ }
+
+ size_t pos = n.find_last_of('.');
+ if (pos == std::string::npos) {
+ misc_size += s.st_size;
+ continue;
+ }
+
+ std::string ext = n.substr(pos+1);
+ if (ext == "sst") {
+ sst_size += s.st_size;
+ } else if (ext == "log") {
+ log_size += s.st_size;
+ } else {
+ misc_size += s.st_size;
+ }
+ }
+
+ total_size = sst_size + log_size + misc_size;
+
+ extra["sst"] = sst_size;
+ extra["log"] = log_size;
+ extra["misc"] = misc_size;
+ extra["total"] = total_size;
+
+err:
+ closedir(store_dir);
+ return total_size;
+ }
+
+
+ WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override {
+ return std::make_shared<LevelDBWholeSpaceIteratorImpl>(
+ db->NewIterator(leveldb::ReadOptions()));
+ }
+
+};
+
+#endif