summaryrefslogtreecommitdiffstats
path: root/src/mon/MonitorDBStore.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mon/MonitorDBStore.h')
-rw-r--r--src/mon/MonitorDBStore.h814
1 files changed, 814 insertions, 0 deletions
diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h
new file mode 100644
index 000000000..c33d35e48
--- /dev/null
+++ b/src/mon/MonitorDBStore.h
@@ -0,0 +1,814 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+* Ceph - scalable distributed file system
+*
+* Copyright (C) 2012 Inktank, Inc.
+*
+* This is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License version 2.1, as published by the Free Software
+* Foundation. See file COPYING.
+*/
+#ifndef CEPH_MONITOR_DB_STORE_H
+#define CEPH_MONITOR_DB_STORE_H
+
+#include "include/types.h"
+#include "include/buffer.h"
+#include <set>
+#include <map>
+#include <string>
+#include <boost/scoped_ptr.hpp>
+#include <sstream>
+#include <fstream>
+#include "kv/KeyValueDB.h"
+
+#include "include/ceph_assert.h"
+#include "common/Formatter.h"
+#include "common/Finisher.h"
+#include "common/errno.h"
+#include "common/debug.h"
+#include "common/safe_io.h"
+#include "common/blkdev.h"
+#include "common/PriorityCache.h"
+
+#define dout_context g_ceph_context
+
+class MonitorDBStore
+{
+ std::string path;
+ boost::scoped_ptr<KeyValueDB> db;
+ bool do_dump;
+ int dump_fd_binary;
+ std::ofstream dump_fd_json;
+ ceph::JSONFormatter dump_fmt;
+
+
+ Finisher io_work;
+
+ bool is_open;
+
+ public:
+
+ std::string get_devname() {
+ char devname[4096] = {0}, partition[4096];
+ get_device_by_path(path.c_str(), partition, devname,
+ sizeof(devname));
+ return devname;
+ }
+
+ std::string get_path() {
+ return path;
+ }
+
+ std::shared_ptr<PriorityCache::PriCache> get_priority_cache() const {
+ return db->get_priority_cache();
+ }
+
+ struct Op {
+ uint8_t type;
+ std::string prefix;
+ std::string key, endkey;
+ ceph::buffer::list bl;
+
+ Op()
+ : type(0) { }
+ Op(int t, const std::string& p, const std::string& k)
+ : type(t), prefix(p), key(k) { }
+ Op(int t, const std::string& p, const std::string& k, const ceph::buffer::list& b)
+ : type(t), prefix(p), key(k), bl(b) { }
+ Op(int t, const std::string& p, const std::string& start, const std::string& end)
+ : type(t), prefix(p), key(start), endkey(end) { }
+
+ void encode(ceph::buffer::list& encode_bl) const {
+ ENCODE_START(2, 1, encode_bl);
+ encode(type, encode_bl);
+ encode(prefix, encode_bl);
+ encode(key, encode_bl);
+ encode(bl, encode_bl);
+ encode(endkey, encode_bl);
+ ENCODE_FINISH(encode_bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& decode_bl) {
+ DECODE_START(2, decode_bl);
+ decode(type, decode_bl);
+ decode(prefix, decode_bl);
+ decode(key, decode_bl);
+ decode(bl, decode_bl);
+ if (struct_v >= 2)
+ decode(endkey, decode_bl);
+ DECODE_FINISH(decode_bl);
+ }
+
+ void dump(ceph::Formatter *f) const {
+ f->dump_int("type", type);
+ f->dump_string("prefix", prefix);
+ f->dump_string("key", key);
+ if (endkey.length()) {
+ f->dump_string("endkey", endkey);
+ }
+ }
+
+ int approx_size() const {
+ return 6 + 1 +
+ 4 + prefix.size() +
+ 4 + key.size() +
+ 4 + endkey.size() +
+ 4 + bl.length();
+ }
+
+ static void generate_test_instances(std::list<Op*>& ls) {
+ ls.push_back(new Op);
+ // we get coverage here from the Transaction instances
+ }
+ };
+
+ struct Transaction;
+ typedef std::shared_ptr<Transaction> TransactionRef;
+ struct Transaction {
+ std::list<Op> ops;
+ uint64_t bytes, keys;
+
+ Transaction() : bytes(6 + 4 + 8*2), keys(0) {}
+
+ enum {
+ OP_PUT = 1,
+ OP_ERASE = 2,
+ OP_COMPACT = 3,
+ OP_ERASE_RANGE = 4,
+ };
+
+ void put(const std::string& prefix, const std::string& key, const ceph::buffer::list& bl) {
+ ops.push_back(Op(OP_PUT, prefix, key, bl));
+ ++keys;
+ bytes += ops.back().approx_size();
+ }
+
+ void put(const std::string& prefix, version_t ver, const ceph::buffer::list& bl) {
+ std::ostringstream os;
+ os << ver;
+ put(prefix, os.str(), bl);
+ }
+
+ void put(const std::string& prefix, const std::string& key, version_t ver) {
+ using ceph::encode;
+ ceph::buffer::list bl;
+ encode(ver, bl);
+ put(prefix, key, bl);
+ }
+
+ void erase(const std::string& prefix, const std::string& key) {
+ ops.push_back(Op(OP_ERASE, prefix, key));
+ ++keys;
+ bytes += ops.back().approx_size();
+ }
+
+ void erase(const std::string& prefix, version_t ver) {
+ std::ostringstream os;
+ os << ver;
+ erase(prefix, os.str());
+ }
+
+ void erase_range(const std::string& prefix, const std::string& begin,
+ const std::string& end) {
+ ops.push_back(Op(OP_ERASE_RANGE, prefix, begin, end));
+ ++keys;
+ bytes += ops.back().approx_size();
+ }
+
+ void compact_prefix(const std::string& prefix) {
+ ops.push_back(Op(OP_COMPACT, prefix, {}));
+ }
+
+ void compact_range(const std::string& prefix, const std::string& start,
+ const std::string& end) {
+ ops.push_back(Op(OP_COMPACT, prefix, start, end));
+ }
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(2, 1, bl);
+ encode(ops, bl);
+ encode(bytes, bl);
+ encode(keys, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(2, bl);
+ decode(ops, bl);
+ if (struct_v >= 2) {
+ decode(bytes, bl);
+ decode(keys, bl);
+ }
+ DECODE_FINISH(bl);
+ }
+
+ static void generate_test_instances(std::list<Transaction*>& ls) {
+ ls.push_back(new Transaction);
+ ls.push_back(new Transaction);
+ ceph::buffer::list bl;
+ bl.append("value");
+ ls.back()->put("prefix", "key", bl);
+ ls.back()->erase("prefix2", "key2");
+ ls.back()->erase_range("prefix3", "key3", "key4");
+ ls.back()->compact_prefix("prefix3");
+ ls.back()->compact_range("prefix4", "from", "to");
+ }
+
+ void append(TransactionRef other) {
+ ops.splice(ops.end(), other->ops);
+ keys += other->keys;
+ bytes += other->bytes;
+ }
+
+ void append_from_encoded(ceph::buffer::list& bl) {
+ auto other(std::make_shared<Transaction>());
+ auto it = bl.cbegin();
+ other->decode(it);
+ append(other);
+ }
+
+ bool empty() {
+ return (size() == 0);
+ }
+
+ size_t size() const {
+ return ops.size();
+ }
+ uint64_t get_keys() const {
+ return keys;
+ }
+ uint64_t get_bytes() const {
+ return bytes;
+ }
+
+ void dump(ceph::Formatter *f, bool dump_val=false) const {
+ f->open_object_section("transaction");
+ f->open_array_section("ops");
+ int op_num = 0;
+ for (auto it = ops.begin(); it != ops.end(); ++it) {
+ const Op& op = *it;
+ f->open_object_section("op");
+ f->dump_int("op_num", op_num++);
+ switch (op.type) {
+ case OP_PUT:
+ {
+ f->dump_string("type", "PUT");
+ f->dump_string("prefix", op.prefix);
+ f->dump_string("key", op.key);
+ f->dump_unsigned("length", op.bl.length());
+ if (dump_val) {
+ std::ostringstream os;
+ op.bl.hexdump(os);
+ f->dump_string("bl", os.str());
+ }
+ }
+ break;
+ case OP_ERASE:
+ {
+ f->dump_string("type", "ERASE");
+ f->dump_string("prefix", op.prefix);
+ f->dump_string("key", op.key);
+ }
+ break;
+ case OP_ERASE_RANGE:
+ {
+ f->dump_string("type", "ERASE_RANGE");
+ f->dump_string("prefix", op.prefix);
+ f->dump_string("start", op.key);
+ f->dump_string("end", op.endkey);
+ }
+ break;
+ case OP_COMPACT:
+ {
+ f->dump_string("type", "COMPACT");
+ f->dump_string("prefix", op.prefix);
+ f->dump_string("start", op.key);
+ f->dump_string("end", op.endkey);
+ }
+ break;
+ default:
+ {
+ f->dump_string("type", "unknown");
+ f->dump_unsigned("op_code", op.type);
+ break;
+ }
+ }
+ f->close_section();
+ }
+ f->close_section();
+ f->dump_unsigned("num_keys", keys);
+ f->dump_unsigned("num_bytes", bytes);
+ f->close_section();
+ }
+ };
+
+ int apply_transaction(MonitorDBStore::TransactionRef t) {
+ KeyValueDB::Transaction dbt = db->get_transaction();
+
+ if (do_dump) {
+ if (!g_conf()->mon_debug_dump_json) {
+ ceph::buffer::list bl;
+ t->encode(bl);
+ bl.write_fd(dump_fd_binary);
+ } else {
+ t->dump(&dump_fmt, true);
+ dump_fmt.flush(dump_fd_json);
+ dump_fd_json.flush();
+ }
+ }
+
+ std::list<std::pair<std::string, std::pair<std::string,std::string>>> compact;
+ for (auto it = t->ops.begin(); it != t->ops.end(); ++it) {
+ const Op& op = *it;
+ switch (op.type) {
+ case Transaction::OP_PUT:
+ dbt->set(op.prefix, op.key, op.bl);
+ break;
+ case Transaction::OP_ERASE:
+ dbt->rmkey(op.prefix, op.key);
+ break;
+ case Transaction::OP_ERASE_RANGE:
+ dbt->rm_range_keys(op.prefix, op.key, op.endkey);
+ break;
+ case Transaction::OP_COMPACT:
+ compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
+ break;
+ default:
+ derr << __func__ << " unknown op type " << op.type << dendl;
+ ceph_abort();
+ break;
+ }
+ }
+ int r = db->submit_transaction_sync(dbt);
+ if (r >= 0) {
+ while (!compact.empty()) {
+ if (compact.front().second.first == std::string() &&
+ compact.front().second.second == std::string())
+ db->compact_prefix_async(compact.front().first);
+ else
+ db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
+ compact.pop_front();
+ }
+ } else {
+ ceph_abort_msg("failed to write to db");
+ }
+ return r;
+ }
+
+ struct C_DoTransaction : public Context {
+ MonitorDBStore *store;
+ MonitorDBStore::TransactionRef t;
+ Context *oncommit;
+ C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
+ Context *f)
+ : store(s), t(t), oncommit(f)
+ {}
+ void finish(int r) override {
+ /* The store serializes writes. Each transaction is handled
+ * sequentially by the io_work Finisher. If a transaction takes longer
+ * to apply its state to permanent storage, then no other transaction
+ * will be handled meanwhile.
+ *
+ * We will now randomly inject random delays. We can safely sleep prior
+ * to applying the transaction as it won't break the model.
+ */
+ double delay_prob = g_conf()->mon_inject_transaction_delay_probability;
+ if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) {
+ utime_t delay;
+ double delay_max = g_conf()->mon_inject_transaction_delay_max;
+ delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0);
+ lsubdout(g_ceph_context, mon, 1)
+ << "apply_transaction will be delayed for " << delay
+ << " seconds" << dendl;
+ delay.sleep();
+ }
+ int ret = store->apply_transaction(t);
+ oncommit->complete(ret);
+ }
+ };
+
+ /**
+ * queue transaction
+ *
+ * Queue a transaction to commit asynchronously. Trigger a context
+ * on completion (without any locks held).
+ */
+ void queue_transaction(MonitorDBStore::TransactionRef t,
+ Context *oncommit) {
+ io_work.queue(new C_DoTransaction(this, t, oncommit));
+ }
+
+ /**
+ * block and flush all io activity
+ */
+ void flush() {
+ io_work.wait_for_empty();
+ }
+
+ class StoreIteratorImpl {
+ protected:
+ bool done;
+ std::pair<std::string,std::string> last_key;
+ ceph::buffer::list crc_bl;
+
+ StoreIteratorImpl() : done(false) { }
+ virtual ~StoreIteratorImpl() { }
+
+ virtual bool _is_valid() = 0;
+
+ public:
+ __u32 crc() {
+ if (g_conf()->mon_sync_debug)
+ return crc_bl.crc32c(0);
+ return 0;
+ }
+ std::pair<std::string,std::string> get_last_key() {
+ return last_key;
+ }
+ virtual bool has_next_chunk() {
+ return !done && _is_valid();
+ }
+ virtual void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
+ uint64_t max_keys) = 0;
+ virtual std::pair<std::string,std::string> get_next_key() = 0;
+ };
+ typedef std::shared_ptr<StoreIteratorImpl> Synchronizer;
+
+ class WholeStoreIteratorImpl : public StoreIteratorImpl {
+ KeyValueDB::WholeSpaceIterator iter;
+ std::set<std::string> sync_prefixes;
+
+ public:
+ WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
+ std::set<std::string> &prefixes)
+ : StoreIteratorImpl(),
+ iter(iter),
+ sync_prefixes(prefixes)
+ { }
+
+ ~WholeStoreIteratorImpl() override { }
+
+ /**
+ * Obtain a chunk of the store
+ *
+ * @param bl Encoded transaction that will recreate the chunk
+ * @param first_key Pair containing the first key to obtain, and that
+ * will contain the first key in the chunk (that may
+ * differ from the one passed on to the function)
+ * @param last_key[out] Last key in the chunk
+ */
+ void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
+ uint64_t max_keys) override {
+ using ceph::encode;
+ ceph_assert(done == false);
+ ceph_assert(iter->valid() == true);
+
+ while (iter->valid()) {
+ std::string prefix(iter->raw_key().first);
+ std::string key(iter->raw_key().second);
+ if (sync_prefixes.count(prefix)) {
+ ceph::buffer::list value = iter->value();
+ if (tx->empty() ||
+ (tx->get_bytes() + value.length() + key.size() +
+ prefix.size() < max_bytes &&
+ tx->get_keys() < max_keys)) {
+ // NOTE: putting every key in a separate transaction is
+ // questionable as far as efficiency goes
+ auto tmp(std::make_shared<Transaction>());
+ tmp->put(prefix, key, value);
+ tx->append(tmp);
+ if (g_conf()->mon_sync_debug) {
+ encode(prefix, crc_bl);
+ encode(key, crc_bl);
+ encode(value, crc_bl);
+ }
+ } else {
+ last_key.first = prefix;
+ last_key.second = key;
+ return;
+ }
+ }
+ iter->next();
+ }
+ ceph_assert(iter->valid() == false);
+ done = true;
+ }
+
+ std::pair<std::string,std::string> get_next_key() override {
+ ceph_assert(iter->valid());
+
+ for (; iter->valid(); iter->next()) {
+ std::pair<std::string,std::string> r = iter->raw_key();
+ if (sync_prefixes.count(r.first) > 0) {
+ iter->next();
+ return r;
+ }
+ }
+ return std::pair<std::string,std::string>();
+ }
+
+ bool _is_valid() override {
+ return iter->valid();
+ }
+ };
+
+ Synchronizer get_synchronizer(std::pair<std::string,std::string> &key,
+ std::set<std::string> &prefixes) {
+ KeyValueDB::WholeSpaceIterator iter;
+ iter = db->get_wholespace_iterator();
+
+ if (!key.first.empty() && !key.second.empty())
+ iter->upper_bound(key.first, key.second);
+ else
+ iter->seek_to_first();
+
+ return std::shared_ptr<StoreIteratorImpl>(
+ new WholeStoreIteratorImpl(iter, prefixes)
+ );
+ }
+
+ KeyValueDB::Iterator get_iterator(const std::string &prefix) {
+ ceph_assert(!prefix.empty());
+ KeyValueDB::Iterator iter = db->get_iterator(prefix);
+ iter->seek_to_first();
+ return iter;
+ }
+
+ KeyValueDB::WholeSpaceIterator get_iterator() {
+ KeyValueDB::WholeSpaceIterator iter;
+ iter = db->get_wholespace_iterator();
+ iter->seek_to_first();
+ return iter;
+ }
+
+ int get(const std::string& prefix, const std::string& key, ceph::buffer::list& bl) {
+ ceph_assert(bl.length() == 0);
+ return db->get(prefix, key, &bl);
+ }
+
+ int get(const std::string& prefix, const version_t ver, ceph::buffer::list& bl) {
+ std::ostringstream os;
+ os << ver;
+ return get(prefix, os.str(), bl);
+ }
+
+ version_t get(const std::string& prefix, const std::string& key) {
+ using ceph::decode;
+ ceph::buffer::list bl;
+ int err = get(prefix, key, bl);
+ if (err < 0) {
+ if (err == -ENOENT) // if key doesn't exist, assume its value is 0
+ return 0;
+ // we're not expecting any other negative return value, and we can't
+ // just return a negative value if we're returning a version_t
+ generic_dout(0) << "MonitorDBStore::get() error obtaining"
+ << " (" << prefix << ":" << key << "): "
+ << cpp_strerror(err) << dendl;
+ ceph_abort_msg("error obtaining key");
+ }
+
+ ceph_assert(bl.length());
+ version_t ver;
+ auto p = bl.cbegin();
+ decode(ver, p);
+ return ver;
+ }
+
+ bool exists(const std::string& prefix, const std::string& key) {
+ KeyValueDB::Iterator it = db->get_iterator(prefix);
+ int err = it->lower_bound(key);
+ if (err < 0)
+ return false;
+
+ return (it->valid() && it->key() == key);
+ }
+
+ bool exists(const std::string& prefix, version_t ver) {
+ std::ostringstream os;
+ os << ver;
+ return exists(prefix, os.str());
+ }
+
+ std::string combine_strings(const std::string& prefix, const std::string& value) {
+ std::string out = prefix;
+ out.push_back('_');
+ out.append(value);
+ return out;
+ }
+
+ std::string combine_strings(const std::string& prefix, const version_t ver) {
+ std::ostringstream os;
+ os << ver;
+ return combine_strings(prefix, os.str());
+ }
+
+ void clear(std::set<std::string>& prefixes) {
+ KeyValueDB::Transaction dbt = db->get_transaction();
+
+ for (auto iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
+ dbt->rmkeys_by_prefix((*iter));
+ }
+ int r = db->submit_transaction_sync(dbt);
+ ceph_assert(r >= 0);
+ }
+
+ void _open(const std::string& kv_type) {
+ int pos = 0;
+ for (auto rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
+ if (*rit != '/')
+ break;
+ }
+ std::ostringstream os;
+ os << path.substr(0, path.size() - pos) << "/store.db";
+ std::string full_path = os.str();
+
+ KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context,
+ kv_type,
+ full_path);
+ if (!db_ptr) {
+ derr << __func__ << " error initializing "
+ << kv_type << " db back storage in "
+ << full_path << dendl;
+ ceph_abort_msg("MonitorDBStore: error initializing keyvaluedb back storage");
+ }
+ db.reset(db_ptr);
+
+ if (g_conf()->mon_debug_dump_transactions) {
+ if (!g_conf()->mon_debug_dump_json) {
+ dump_fd_binary = ::open(
+ g_conf()->mon_debug_dump_location.c_str(),
+ O_CREAT|O_APPEND|O_WRONLY|O_CLOEXEC, 0644);
+ if (dump_fd_binary < 0) {
+ dump_fd_binary = -errno;
+ derr << "Could not open log file, got "
+ << cpp_strerror(dump_fd_binary) << dendl;
+ }
+ } else {
+ dump_fmt.reset();
+ dump_fmt.open_array_section("dump");
+ dump_fd_json.open(g_conf()->mon_debug_dump_location.c_str());
+ }
+ do_dump = true;
+ }
+ if (kv_type == "rocksdb")
+ db->init(g_conf()->mon_rocksdb_options);
+ else
+ db->init();
+
+
+ }
+
+ int open(std::ostream &out) {
+ std::string kv_type;
+ int r = read_meta("kv_backend", &kv_type);
+ if (r < 0 || kv_type.empty()) {
+ // assume old monitors that did not mark the type were leveldb.
+ kv_type = "leveldb";
+ r = write_meta("kv_backend", kv_type);
+ if (r < 0)
+ return r;
+ }
+ _open(kv_type);
+ r = db->open(out);
+ if (r < 0)
+ return r;
+
+ // Monitors are few in number, so the resource cost of exposing
+ // very detailed stats is low: ramp up the priority of all the
+ // KV store's perf counters. Do this after open, because backend may
+ // not have constructed PerfCounters earlier.
+ if (db->get_perf_counters()) {
+ db->get_perf_counters()->set_prio_adjust(
+ PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY);
+ }
+
+ io_work.start();
+ is_open = true;
+ return 0;
+ }
+
+ int create_and_open(std::ostream &out) {
+ // record the type before open
+ std::string kv_type;
+ int r = read_meta("kv_backend", &kv_type);
+ if (r < 0) {
+ kv_type = g_conf()->mon_keyvaluedb;
+ r = write_meta("kv_backend", kv_type);
+ if (r < 0)
+ return r;
+ }
+ _open(kv_type);
+ r = db->create_and_open(out);
+ if (r < 0)
+ return r;
+ io_work.start();
+ is_open = true;
+ return 0;
+ }
+
+ void close() {
+ // there should be no work queued!
+ io_work.stop();
+ is_open = false;
+ db.reset(NULL);
+ }
+
+ void compact() {
+ db->compact();
+ }
+
+ void compact_async() {
+ db->compact_async();
+ }
+
+ void compact_prefix(const std::string& prefix) {
+ db->compact_prefix(prefix);
+ }
+
+ uint64_t get_estimated_size(std::map<std::string, uint64_t> &extras) {
+ return db->get_estimated_size(extras);
+ }
+
+ /**
+ * write_meta - write a simple configuration key out-of-band
+ *
+ * Write a simple key/value pair for basic store configuration
+ * (e.g., a uuid or magic number) to an unopened/unmounted store.
+ * The default implementation writes this to a plaintext file in the
+ * path.
+ *
+ * A newline is appended.
+ *
+ * @param key key name (e.g., "fsid")
+ * @param value value (e.g., a uuid rendered as a string)
+ * @returns 0 for success, or an error code
+ */
+ int write_meta(const std::string& key,
+ const std::string& value) const {
+ std::string v = value;
+ v += "\n";
+ int r = safe_write_file(path.c_str(), key.c_str(),
+ v.c_str(), v.length(),
+ 0600);
+ if (r < 0)
+ return r;
+ return 0;
+ }
+
+ /**
+ * read_meta - read a simple configuration key out-of-band
+ *
+ * Read a simple key value to an unopened/mounted store.
+ *
+ * Trailing whitespace is stripped off.
+ *
+ * @param key key name
+ * @param value pointer to value string
+ * @returns 0 for success, or an error code
+ */
+ int read_meta(const std::string& key,
+ std::string *value) const {
+ char buf[4096];
+ int r = safe_read_file(path.c_str(), key.c_str(),
+ buf, sizeof(buf));
+ if (r <= 0)
+ return r;
+ // drop trailing newlines
+ while (r && isspace(buf[r-1])) {
+ --r;
+ }
+ *value = std::string(buf, r);
+ return 0;
+ }
+
+ explicit MonitorDBStore(const std::string& path)
+ : path(path),
+ db(0),
+ do_dump(false),
+ dump_fd_binary(-1),
+ dump_fmt(true),
+ io_work(g_ceph_context, "monstore", "fn_monstore"),
+ is_open(false) {
+ }
+ ~MonitorDBStore() {
+ ceph_assert(!is_open);
+ if (do_dump) {
+ if (!g_conf()->mon_debug_dump_json) {
+ ::close(dump_fd_binary);
+ } else {
+ dump_fmt.close_section();
+ dump_fmt.flush(dump_fd_json);
+ dump_fd_json.flush();
+ dump_fd_json.close();
+ }
+ }
+ }
+
+};
+
+WRITE_CLASS_ENCODER(MonitorDBStore::Op)
+WRITE_CLASS_ENCODER(MonitorDBStore::Transaction)
+
+#endif /* CEPH_MONITOR_DB_STORE_H */