summaryrefslogtreecommitdiffstats
path: root/src/os/kstore/KStore.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/os/kstore/KStore.h698
1 files changed, 698 insertions, 0 deletions
diff --git a/src/os/kstore/KStore.h b/src/os/kstore/KStore.h
new file mode 100644
index 000000000..b76b4dcfb
--- /dev/null
+++ b/src/os/kstore/KStore.h
@@ -0,0 +1,698 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_OSD_KSTORE_H
+#define CEPH_OSD_KSTORE_H
+
+#include "acconfig.h"
+
+#include <unistd.h>
+
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+
+#include "include/ceph_assert.h"
+#include "include/unordered_map.h"
+#include "common/Finisher.h"
+#include "common/RWLock.h"
+#include "common/Throttle.h"
+#include "common/WorkQueue.h"
+#include "os/ObjectStore.h"
+#include "common/perf_counters.h"
+#include "os/fs/FS.h"
+#include "kv/KeyValueDB.h"
+
+#include "kstore_types.h"
+
+#include "boost/intrusive/list.hpp"
+
+enum {
+ l_kstore_first = 832430,
+ l_kstore_state_prepare_lat,
+ l_kstore_state_kv_queued_lat,
+ l_kstore_state_kv_done_lat,
+ l_kstore_state_finishing_lat,
+ l_kstore_state_done_lat,
+ l_kstore_last
+};
+
+class KStore : public ObjectStore {
+ // -----------------------------------------------------
+ // types
+public:
+
+ struct TransContext;
+
+ /// an in-memory object
+ struct Onode {
+ CephContext* cct;
+ std::atomic_int nref; ///< reference count
+
+ ghobject_t oid;
+ std::string key; ///< key under PREFIX_OBJ where we are stored
+ boost::intrusive::list_member_hook<> lru_item;
+
+ kstore_onode_t onode; ///< metadata stored as value in kv store
+ bool dirty; // ???
+ bool exists;
+
+ std::mutex flush_lock; ///< protect flush_txns
+ std::condition_variable flush_cond; ///< wait here for unapplied txns
+ std::set<TransContext*> flush_txns; ///< committing txns
+
+ uint64_t tail_offset;
+ ceph::buffer::list tail_bl;
+
+ std::map<uint64_t,ceph::buffer::list> pending_stripes; ///< unwritten stripes
+
+ Onode(CephContext* cct, const ghobject_t& o, const std::string& k)
+ : cct(cct),
+ nref(0),
+ oid(o),
+ key(k),
+ dirty(false),
+ exists(false),
+ tail_offset(0) {
+ }
+
+ void flush();
+ void get() {
+ ++nref;
+ }
+ void put() {
+ if (--nref == 0)
+ delete this;
+ }
+
+ void clear_tail() {
+ tail_offset = 0;
+ tail_bl.clear();
+ }
+ void clear_pending_stripes() {
+ pending_stripes.clear();
+ }
+ };
+ typedef boost::intrusive_ptr<Onode> OnodeRef;
+
+ struct OnodeHashLRU {
+ CephContext* cct;
+ typedef boost::intrusive::list<
+ Onode,
+ boost::intrusive::member_hook<
+ Onode,
+ boost::intrusive::list_member_hook<>,
+ &Onode::lru_item> > lru_list_t;
+
+ std::mutex lock;
+ ceph::unordered_map<ghobject_t,OnodeRef> onode_map; ///< forward lookups
+ lru_list_t lru; ///< lru
+
+ OnodeHashLRU(CephContext* cct) : cct(cct) {}
+
+ void add(const ghobject_t& oid, OnodeRef o);
+ void _touch(OnodeRef o);
+ OnodeRef lookup(const ghobject_t& o);
+ void rename(const ghobject_t& old_oid, const ghobject_t& new_oid);
+ void clear();
+ bool get_next(const ghobject_t& after, std::pair<ghobject_t,OnodeRef> *next);
+ int trim(int max=-1);
+ };
+
+ class OpSequencer;
+ typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
+
+ struct Collection : public CollectionImpl {
+ KStore *store;
+ kstore_cnode_t cnode;
+ ceph::shared_mutex lock =
+ ceph::make_shared_mutex("KStore::Collection::lock", true, false);
+
+ OpSequencerRef osr;
+
+ // cache onodes on a per-collection basis to avoid lock
+ // contention.
+ OnodeHashLRU onode_map;
+
+ OnodeRef get_onode(const ghobject_t& oid, bool create);
+
+ bool contains(const ghobject_t& oid) {
+ if (cid.is_meta())
+ return oid.hobj.pool == -1;
+ spg_t spgid;
+ if (cid.is_pg(&spgid))
+ return
+ spgid.pgid.contains(cnode.bits, oid) &&
+ oid.shard_id == spgid.shard;
+ return false;
+ }
+
+ void flush() override;
+ bool flush_commit(Context *c) override;
+
+ private:
+ FRIEND_MAKE_REF(Collection);
+ Collection(KStore *ns, coll_t c);
+ };
+ using CollectionRef = ceph::ref_t<Collection>;
+
+ class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
+ CollectionRef c;
+ OnodeRef o;
+ KeyValueDB::Iterator it;
+ std::string head, tail;
+ public:
+ OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
+ int seek_to_first() override;
+ int upper_bound(const std::string &after) override;
+ int lower_bound(const std::string &to) override;
+ bool valid() override;
+ int next() override;
+ std::string key() override;
+ ceph::buffer::list value() override;
+ int status() override {
+ return 0;
+ }
+ };
+
+ struct TransContext {
+ typedef enum {
+ STATE_PREPARE,
+ STATE_AIO_WAIT,
+ STATE_IO_DONE,
+ STATE_KV_QUEUED,
+ STATE_KV_COMMITTING,
+ STATE_KV_DONE,
+ STATE_FINISHING,
+ STATE_DONE,
+ } state_t;
+
+ state_t state;
+
+ const char *get_state_name() {
+ switch (state) {
+ case STATE_PREPARE: return "prepare";
+ case STATE_AIO_WAIT: return "aio_wait";
+ case STATE_IO_DONE: return "io_done";
+ case STATE_KV_QUEUED: return "kv_queued";
+ case STATE_KV_COMMITTING: return "kv_committing";
+ case STATE_KV_DONE: return "kv_done";
+ case STATE_FINISHING: return "finishing";
+ case STATE_DONE: return "done";
+ }
+ return "???";
+ }
+
+ void log_state_latency(PerfCounters *logger, int state) {
+ utime_t lat, now = ceph_clock_now();
+ lat = now - start;
+ logger->tinc(state, lat);
+ start = now;
+ }
+
+ CollectionRef ch;
+ OpSequencerRef osr;
+ boost::intrusive::list_member_hook<> sequencer_item;
+
+ uint64_t ops, bytes;
+
+ std::set<OnodeRef> onodes; ///< these onodes need to be updated/written
+ KeyValueDB::Transaction t; ///< then we will commit this
+ Context *oncommit; ///< signal on commit
+ Context *onreadable; ///< signal on readable
+ Context *onreadable_sync; ///< signal on readable
+ std::list<Context*> oncommits; ///< more commit completions
+ std::list<CollectionRef> removed_collections; ///< colls we removed
+
+ CollectionRef first_collection; ///< first referenced collection
+ utime_t start;
+ explicit TransContext(OpSequencer *o)
+ : state(STATE_PREPARE),
+ osr(o),
+ ops(0),
+ bytes(0),
+ oncommit(NULL),
+ onreadable(NULL),
+ onreadable_sync(NULL),
+ start(ceph_clock_now()){
+ //cout << "txc new " << this << std::endl;
+ }
+ ~TransContext() {
+ //cout << "txc del " << this << std::endl;
+ }
+
+ void write_onode(OnodeRef &o) {
+ onodes.insert(o);
+ }
+ };
+
+ class OpSequencer : public RefCountedObject {
+ public:
+ std::mutex qlock;
+ std::condition_variable qcond;
+ typedef boost::intrusive::list<
+ TransContext,
+ boost::intrusive::member_hook<
+ TransContext,
+ boost::intrusive::list_member_hook<>,
+ &TransContext::sequencer_item> > q_list_t;
+ q_list_t q; ///< transactions
+
+ ~OpSequencer() {
+ ceph_assert(q.empty());
+ }
+
+ void queue_new(TransContext *txc) {
+ std::lock_guard<std::mutex> l(qlock);
+ q.push_back(*txc);
+ }
+
+ void flush() {
+ std::unique_lock<std::mutex> l(qlock);
+ while (!q.empty())
+ qcond.wait(l);
+ }
+
+ bool flush_commit(Context *c) {
+ std::lock_guard<std::mutex> l(qlock);
+ if (q.empty()) {
+ return true;
+ }
+ TransContext *txc = &q.back();
+ if (txc->state >= TransContext::STATE_KV_DONE) {
+ return true;
+ }
+ ceph_assert(txc->state < TransContext::STATE_KV_DONE);
+ txc->oncommits.push_back(c);
+ return false;
+ }
+ };
+
+ struct KVSyncThread : public Thread {
+ KStore *store;
+ explicit KVSyncThread(KStore *s) : store(s) {}
+ void *entry() override {
+ store->_kv_sync_thread();
+ return NULL;
+ }
+ };
+
+ // --------------------------------------------------------
+ // members
+private:
+ KeyValueDB *db;
+ uuid_d fsid;
+ std::string basedir;
+ int path_fd; ///< open handle to $path
+ int fsid_fd; ///< open handle (locked) to $path/fsid
+ bool mounted;
+
+ /// rwlock to protect coll_map
+ ceph::shared_mutex coll_lock = ceph::make_shared_mutex("KStore::coll_lock");
+ ceph::unordered_map<coll_t, CollectionRef> coll_map;
+ std::map<coll_t,CollectionRef> new_coll_map;
+
+ std::mutex nid_lock;
+ uint64_t nid_last;
+ uint64_t nid_max;
+
+ Throttle throttle_ops, throttle_bytes; ///< submit to commit
+
+ Finisher finisher;
+
+ KVSyncThread kv_sync_thread;
+ std::mutex kv_lock;
+ std::condition_variable kv_cond, kv_sync_cond;
+ bool kv_stop;
+ std::deque<TransContext*> kv_queue, kv_committing;
+
+ //Logger *logger;
+ PerfCounters *logger;
+ std::mutex reap_lock;
+ std::list<CollectionRef> removed_collections;
+
+
+ // --------------------------------------------------------
+ // private methods
+
+ void _init_logger();
+ void _shutdown_logger();
+
+ int _open_path();
+ void _close_path();
+ int _open_fsid(bool create);
+ int _lock_fsid();
+ int _read_fsid(uuid_d *f);
+ int _write_fsid();
+ void _close_fsid();
+ int _open_db(bool create);
+ void _close_db();
+ int _open_collections(int *errors=0);
+ void _close_collections();
+
+ int _open_super_meta();
+
+ CollectionRef _get_collection(coll_t cid);
+ void _queue_reap_collection(CollectionRef& c);
+ void _reap_collections();
+
+ void _assign_nid(TransContext *txc, OnodeRef o);
+
+ void _dump_onode(OnodeRef o);
+
+ TransContext *_txc_create(OpSequencer *osr);
+ void _txc_release(TransContext *txc, uint64_t offset, uint64_t length);
+ void _txc_add_transaction(TransContext *txc, Transaction *t);
+ void _txc_finalize(OpSequencer *osr, TransContext *txc);
+ void _txc_state_proc(TransContext *txc);
+ void _txc_finish_kv(TransContext *txc);
+ void _txc_finish(TransContext *txc);
+
+ void _osr_reap_done(OpSequencer *osr);
+
+ void _kv_sync_thread();
+ void _kv_stop() {
+ {
+ std::lock_guard<std::mutex> l(kv_lock);
+ kv_stop = true;
+ kv_cond.notify_all();
+ }
+ kv_sync_thread.join();
+ kv_stop = false;
+ }
+
+ void _do_read_stripe(OnodeRef o, uint64_t offset, ceph::buffer::list *pbl, bool do_cache);
+ void _do_write_stripe(TransContext *txc, OnodeRef o,
+ uint64_t offset, ceph::buffer::list& bl);
+ void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset);
+
+ int _collection_list(
+ Collection *c, const ghobject_t& start, const ghobject_t& end,
+ int max, std::vector<ghobject_t> *ls, ghobject_t *next);
+
+public:
+ KStore(CephContext *cct, const std::string& path);
+ ~KStore() override;
+
+ std::string get_type() override {
+ return "kstore";
+ }
+
+ bool needs_journal() override { return false; };
+ bool wants_journal() override { return false; };
+ bool allows_journal() override { return false; };
+
+ static int get_block_device_fsid(const std::string& path, uuid_d *fsid);
+
+ bool test_mount_in_use() override;
+
+ int mount() override;
+ int umount() override;
+ void _sync();
+
+ int fsck(bool deep) override;
+
+
+ int validate_hobject_key(const hobject_t &obj) const override {
+ return 0;
+ }
+ unsigned get_max_attr_name_length() override {
+ return 256; // arbitrary; there is no real limit internally
+ }
+
+ int mkfs() override;
+ int mkjournal() override {
+ return 0;
+ }
+ void dump_perf_counters(ceph::Formatter *f) override {
+ f->open_object_section("perf_counters");
+ logger->dump_formatted(f, false);
+ f->close_section();
+ }
+ void get_db_statistics(ceph::Formatter *f) override {
+ db->get_statistics(f);
+ }
+ int statfs(struct store_statfs_t *buf,
+ osd_alert_list_t* alerts = nullptr) override;
+ int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
+ bool *per_pool_omap) override;
+
+ CollectionHandle open_collection(const coll_t& c) override;
+ CollectionHandle create_new_collection(const coll_t& c) override;
+ void set_collection_commit_queue(const coll_t& cid,
+ ContextQueue *commit_queue) override {
+ }
+
+ using ObjectStore::exists;
+ bool exists(CollectionHandle& c, const ghobject_t& oid) override;
+ using ObjectStore::stat;
+ int stat(
+ CollectionHandle& c,
+ const ghobject_t& oid,
+ struct stat *st,
+ bool allow_eio = false) override; // struct stat?
+ int set_collection_opts(
+ CollectionHandle& c,
+ const pool_opts_t& opts) override;
+ using ObjectStore::read;
+ int read(
+ CollectionHandle& c,
+ const ghobject_t& oid,
+ uint64_t offset,
+ size_t len,
+ ceph::buffer::list& bl,
+ uint32_t op_flags = 0) override;
+ int _do_read(
+ OnodeRef o,
+ uint64_t offset,
+ size_t len,
+ ceph::buffer::list& bl,
+ bool do_cache,
+ uint32_t op_flags = 0);
+
+ using ObjectStore::fiemap;
+ int fiemap(CollectionHandle& c, const ghobject_t& oid, uint64_t offset, size_t len, std::map<uint64_t, uint64_t>& destmap) override;
+ int fiemap(CollectionHandle& c, const ghobject_t& oid, uint64_t offset, size_t len, ceph::buffer::list& outbl) override;
+ using ObjectStore::getattr;
+ int getattr(CollectionHandle& c, const ghobject_t& oid, const char *name, ceph::buffer::ptr& value) override;
+ using ObjectStore::getattrs;
+ int getattrs(CollectionHandle& c, const ghobject_t& oid, std::map<std::string,ceph::buffer::ptr>& aset) override;
+
+ int list_collections(std::vector<coll_t>& ls) override;
+ bool collection_exists(const coll_t& c) override;
+ int collection_empty(CollectionHandle& c, bool *empty) override;
+ int collection_bits(CollectionHandle& c) override;
+ int collection_list(
+ CollectionHandle &c, const ghobject_t& start, const ghobject_t& end,
+ int max,
+ std::vector<ghobject_t> *ls, ghobject_t *next) override;
+
+ using ObjectStore::omap_get;
+ int omap_get(
+ CollectionHandle& c, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ ceph::buffer::list *header, ///< [out] omap header
+ std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value std::map
+ ) override;
+
+ using ObjectStore::omap_get_header;
+ /// Get omap header
+ int omap_get_header(
+ CollectionHandle& c, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ ceph::buffer::list *header, ///< [out] omap header
+ bool allow_eio = false ///< [in] don't assert on eio
+ ) override;
+
+ using ObjectStore::omap_get_keys;
+ /// Get keys defined on oid
+ int omap_get_keys(
+ CollectionHandle& c, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ std::set<std::string> *keys ///< [out] Keys defined on oid
+ ) override;
+
+ using ObjectStore::omap_get_values;
+ /// Get key values
+ int omap_get_values(
+ CollectionHandle& c, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ const std::set<std::string> &keys, ///< [in] Keys to get
+ std::map<std::string, ceph::buffer::list> *out ///< [out] Returned keys and values
+ ) override;
+
+ using ObjectStore::omap_check_keys;
+ /// Filters keys into out which are defined on oid
+ int omap_check_keys(
+ CollectionHandle& c, ///< [in] Collection containing oid
+ const ghobject_t &oid, ///< [in] Object containing omap
+ const std::set<std::string> &keys, ///< [in] Keys to check
+ std::set<std::string> *out ///< [out] Subset of keys defined on oid
+ ) override;
+
+ using ObjectStore::get_omap_iterator;
+ ObjectMap::ObjectMapIterator get_omap_iterator(
+ CollectionHandle& c, ///< [in] collection
+ const ghobject_t &oid ///< [in] object
+ ) override;
+
+ void set_fsid(uuid_d u) override {
+ fsid = u;
+ }
+ uuid_d get_fsid() override {
+ return fsid;
+ }
+
+ uint64_t estimate_objects_overhead(uint64_t num_objects) override {
+ return num_objects * 300; //assuming per-object overhead is 300 bytes
+ }
+
+ objectstore_perf_stat_t get_cur_stats() override {
+ return objectstore_perf_stat_t();
+ }
+ const PerfCounters* get_perf_counters() const override {
+ return logger;
+ }
+
+
+ int queue_transactions(
+ CollectionHandle& ch,
+ std::vector<Transaction>& tls,
+ TrackedOpRef op = TrackedOpRef(),
+ ThreadPool::TPHandle *handle = NULL) override;
+
+ void compact () override {
+ ceph_assert(db);
+ db->compact();
+ }
+
+private:
+ // --------------------------------------------------------
+ // write ops
+
+ int _write(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ uint64_t offset, size_t len,
+ ceph::buffer::list& bl,
+ uint32_t fadvise_flags);
+ int _do_write(TransContext *txc,
+ OnodeRef o,
+ uint64_t offset, uint64_t length,
+ ceph::buffer::list& bl,
+ uint32_t fadvise_flags);
+ int _touch(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o);
+ int _zero(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ uint64_t offset, size_t len);
+ int _do_truncate(TransContext *txc,
+ OnodeRef o,
+ uint64_t offset);
+ int _truncate(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ uint64_t offset);
+ int _remove(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o);
+ int _do_remove(TransContext *txc,
+ OnodeRef o);
+ int _setattr(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ const std::string& name,
+ ceph::buffer::ptr& val);
+ int _setattrs(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ const std::map<std::string,ceph::buffer::ptr>& aset);
+ int _rmattr(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ const std::string& name);
+ int _rmattrs(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o);
+ void _do_omap_clear(TransContext *txc, uint64_t id);
+ int _omap_clear(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o);
+ int _omap_setkeys(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ ceph::buffer::list& bl);
+ int _omap_setheader(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ ceph::buffer::list& header);
+ int _omap_rmkeys(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ const ceph::buffer::list& bl);
+ int _omap_rmkey_range(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ const std::string& first, const std::string& last);
+ int _setallochint(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& o,
+ uint64_t expected_object_size,
+ uint64_t expected_write_size,
+ uint32_t flags);
+ int _clone(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& oldo,
+ OnodeRef& newo);
+ int _clone_range(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& oldo,
+ OnodeRef& newo,
+ uint64_t srcoff, uint64_t length, uint64_t dstoff);
+ int _rename(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef& oldo,
+ OnodeRef& newo,
+ const ghobject_t& new_oid);
+ int _create_collection(TransContext *txc, coll_t cid, unsigned bits,
+ CollectionRef *c);
+ int _remove_collection(TransContext *txc, coll_t cid, CollectionRef *c);
+ int _split_collection(TransContext *txc,
+ CollectionRef& c,
+ CollectionRef& d,
+ unsigned bits, int rem);
+ int _merge_collection(TransContext *txc,
+ CollectionRef *c,
+ CollectionRef& d,
+ unsigned bits);
+
+};
+
+static inline void intrusive_ptr_add_ref(KStore::Onode *o) {
+ o->get();
+}
+static inline void intrusive_ptr_release(KStore::Onode *o) {
+ o->put();
+}
+
+static inline void intrusive_ptr_add_ref(KStore::OpSequencer *o) {
+ o->get();
+}
+static inline void intrusive_ptr_release(KStore::OpSequencer *o) {
+ o->put();
+}
+
+#endif