From 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 20:24:20 +0200 Subject: Adding upstream version 14.2.21. Signed-off-by: Daniel Baumann --- src/os/kstore/KStore.cc | 3436 +++++++++++++++++++++++++++++++++++++++++ src/os/kstore/KStore.h | 692 +++++++++ src/os/kstore/kstore_types.cc | 102 ++ src/os/kstore/kstore_types.h | 68 + 4 files changed, 4298 insertions(+) create mode 100644 src/os/kstore/KStore.cc create mode 100644 src/os/kstore/KStore.h create mode 100644 src/os/kstore/kstore_types.cc create mode 100644 src/os/kstore/kstore_types.h (limited to 'src/os/kstore') diff --git a/src/os/kstore/KStore.cc b/src/os/kstore/KStore.cc new file mode 100644 index 00000000..bc352bb0 --- /dev/null +++ b/src/os/kstore/KStore.cc @@ -0,0 +1,3436 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include +#include +#if defined(__FreeBSD__) +#include +#include +#endif + +#include "KStore.h" +#include "osd/osd_types.h" +#include "os/kv.h" +#include "include/compat.h" +#include "include/stringify.h" +#include "common/errno.h" +#include "common/safe_io.h" +#include "common/Formatter.h" + + +#define dout_context cct +#define dout_subsys ceph_subsys_kstore + +/* + + TODO: + + * superblock, features + * refcounted extents (for efficient clone) + + */ + +const string PREFIX_SUPER = "S"; // field -> value +const string PREFIX_COLL = "C"; // collection name -> (nothing) +const string PREFIX_OBJ = "O"; // object name -> onode +const string PREFIX_DATA = "D"; // nid + offset -> data +const string PREFIX_OMAP = "M"; // u64 + keyname -> value + +/* + * object name key structure + * + * 2 chars: shard (-- for none, or hex digit, so that we sort properly) + * encoded u64: poolid + 2^63 (so that it sorts properly) + * encoded u32: hash (bit reversed) + * + * 1 char: '.' + * + * escaped string: namespace + * + * 1 char: '<', '=', or '>'. if =, then object key == object name, and + * we are followed just by the key. otherwise, we are followed by + * the key and then the object name. + * escaped string: key + * escaped string: object name (unless '=' above) + * + * encoded u64: snap + * encoded u64: generation + */ + +/* + * string encoding in the key + * + * The key string needs to lexicographically sort the same way that + * ghobject_t does. We do this by escaping anything <= to '#' with # + * plus a 2 digit hex string, and anything >= '~' with ~ plus the two + * hex digits. + * + * We use ! as a terminator for strings; this works because it is < # + * and will get escaped if it is present in the string. + * + */ + +static void append_escaped(const string &in, string *out) +{ + char hexbyte[8]; + for (string::const_iterator i = in.begin(); i != in.end(); ++i) { + if ((unsigned char)*i <= '#') { + snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i); + out->append(hexbyte); + } else if ((unsigned char)*i >= '~') { + snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i); + out->append(hexbyte); + } else { + out->push_back(*i); + } + } + out->push_back('!'); +} + +static int decode_escaped(const char *p, string *out) +{ + const char *orig_p = p; + while (*p && *p != '!') { + if (*p == '#' || *p == '~') { + unsigned hex; + int r = sscanf(++p, "%2x", &hex); + if (r < 1) + return -EINVAL; + out->push_back((char)hex); + p += 2; + } else { + out->push_back(*p++); + } + } + return p - orig_p; +} + +// some things we encode in binary (as le32 or le64); print the +// resulting key strings nicely +static string pretty_binary_string(const string& in) +{ + char buf[10]; + string out; + out.reserve(in.length() * 3); + enum { NONE, HEX, STRING } mode = NONE; + unsigned from = 0, i; + for (i=0; i < in.length(); ++i) { + if ((in[i] < 32 || (unsigned char)in[i] > 126) || + (mode == HEX && in.length() - i >= 4 && + ((in[i] < 32 || (unsigned char)in[i] > 126) || + (in[i+1] < 32 || (unsigned char)in[i+1] > 126) || + (in[i+2] < 32 || (unsigned char)in[i+2] > 126) || + (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) { + if (mode == STRING) { + out.append(in.substr(from, i - from)); + out.push_back('\''); + } + if (mode != HEX) { + out.append("0x"); + mode = HEX; + } + if (in.length() - i >= 4) { + // print a whole u32 at once + snprintf(buf, sizeof(buf), "%08x", + (uint32_t)(((unsigned char)in[i] << 24) | + ((unsigned char)in[i+1] << 16) | + ((unsigned char)in[i+2] << 8) | + ((unsigned char)in[i+3] << 0))); + i += 3; + } else { + snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]); + } + out.append(buf); + } else { + if (mode != STRING) { + out.push_back('\''); + mode = STRING; + from = i; + } + } + } + if (mode == STRING) { + out.append(in.substr(from, i - from)); + out.push_back('\''); + } + return out; +} + +static void _key_encode_shard(shard_id_t shard, string *key) +{ + // make field ordering match with ghobject_t compare operations + if (shard == shard_id_t::NO_SHARD) { + // otherwise ff will sort *after* 0, not before. + key->append("--"); + } else { + char buf[32]; + snprintf(buf, sizeof(buf), "%02x", (int)shard); + key->append(buf); + } +} +static const char *_key_decode_shard(const char *key, shard_id_t *pshard) +{ + if (key[0] == '-') { + *pshard = shard_id_t::NO_SHARD; + } else { + unsigned shard; + int r = sscanf(key, "%x", &shard); + if (r < 1) + return NULL; + *pshard = shard_id_t(shard); + } + return key + 2; +} + +static void get_coll_key_range(const coll_t& cid, int bits, + string *temp_start, string *temp_end, + string *start, string *end) +{ + temp_start->clear(); + temp_end->clear(); + start->clear(); + end->clear(); + + spg_t pgid; + if (cid.is_pg(&pgid)) { + _key_encode_shard(pgid.shard, start); + *end = *start; + *temp_start = *start; + *temp_end = *start; + + _key_encode_u64(pgid.pool() + 0x8000000000000000ull, start); + _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_start); + _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), start); + _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), temp_start); + start->append("."); + temp_start->append("."); + + _key_encode_u64(pgid.pool() + 0x8000000000000000ull, end); + _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_end); + + uint64_t end_hash = + hobject_t::_reverse_bits(pgid.ps()) + (1ull << (32-bits)); + if (end_hash <= 0xffffffffull) { + _key_encode_u32(end_hash, end); + _key_encode_u32(end_hash, temp_end); + end->append("."); + temp_end->append("."); + } else { + _key_encode_u32(0xffffffff, end); + _key_encode_u32(0xffffffff, temp_end); + end->append(":"); + temp_end->append(":"); + } + } else { + _key_encode_shard(shard_id_t::NO_SHARD, start); + _key_encode_u64(-1ull + 0x8000000000000000ull, start); + *end = *start; + _key_encode_u32(0, start); + start->append("."); + _key_encode_u32(0xffffffff, end); + end->append(":"); + + // no separate temp section + *temp_start = *end; + *temp_end = *end; + } +} + +static int get_key_object(const string& key, ghobject_t *oid); + +static void get_object_key(CephContext* cct, const ghobject_t& oid, + string *key) +{ + key->clear(); + + _key_encode_shard(oid.shard_id, key); + _key_encode_u64(oid.hobj.pool + 0x8000000000000000ull, key); + _key_encode_u32(oid.hobj.get_bitwise_key_u32(), key); + key->append("."); + + append_escaped(oid.hobj.nspace, key); + + if (oid.hobj.get_key().length()) { + // is a key... could be < = or >. + // (ASCII chars < = and > sort in that order, yay) + if (oid.hobj.get_key() < oid.hobj.oid.name) { + key->append("<"); + append_escaped(oid.hobj.get_key(), key); + append_escaped(oid.hobj.oid.name, key); + } else if (oid.hobj.get_key() > oid.hobj.oid.name) { + key->append(">"); + append_escaped(oid.hobj.get_key(), key); + append_escaped(oid.hobj.oid.name, key); + } else { + // same as no key + key->append("="); + append_escaped(oid.hobj.oid.name, key); + } + } else { + // no key + key->append("="); + append_escaped(oid.hobj.oid.name, key); + } + + _key_encode_u64(oid.hobj.snap, key); + _key_encode_u64(oid.generation, key); + + // sanity check + if (true) { + ghobject_t t; + int r = get_key_object(*key, &t); + if (r || t != oid) { + derr << " r " << r << dendl; + derr << "key " << pretty_binary_string(*key) << dendl; + derr << "oid " << oid << dendl; + derr << " t " << t << dendl; + ceph_assert(t == oid); + } + } +} + +static int get_key_object(const string& key, ghobject_t *oid) +{ + int r; + const char *p = key.c_str(); + + p = _key_decode_shard(p, &oid->shard_id); + + uint64_t pool; + p = _key_decode_u64(p, &pool); + oid->hobj.pool = pool - 0x8000000000000000ull; + + unsigned hash; + p = _key_decode_u32(p, &hash); + oid->hobj.set_bitwise_key_u32(hash); + if (*p != '.') + return -5; + ++p; + + r = decode_escaped(p, &oid->hobj.nspace); + if (r < 0) + return -6; + p += r + 1; + + if (*p == '=') { + // no key + ++p; + r = decode_escaped(p, &oid->hobj.oid.name); + if (r < 0) + return -7; + p += r + 1; + } else if (*p == '<' || *p == '>') { + // key + name + ++p; + string okey; + r = decode_escaped(p, &okey); + if (r < 0) + return -8; + p += r + 1; + r = decode_escaped(p, &oid->hobj.oid.name); + if (r < 0) + return -9; + p += r + 1; + oid->hobj.set_key(okey); + } else { + // malformed + return -10; + } + + p = _key_decode_u64(p, &oid->hobj.snap.val); + p = _key_decode_u64(p, &oid->generation); + if (*p) { + // if we get something other than a null terminator here, + // something goes wrong. + return -12; + } + + return 0; +} + + +static void get_data_key(uint64_t nid, uint64_t offset, string *out) +{ + _key_encode_u64(nid, out); + _key_encode_u64(offset, out); +} + +// '-' < '.' < '~' +static void get_omap_header(uint64_t id, string *out) +{ + _key_encode_u64(id, out); + out->push_back('-'); +} + +// hmm, I don't think there's any need to escape the user key since we +// have a clean prefix. +static void get_omap_key(uint64_t id, const string& key, string *out) +{ + _key_encode_u64(id, out); + out->push_back('.'); + out->append(key); +} + +static void rewrite_omap_key(uint64_t id, string old, string *out) +{ + _key_encode_u64(id, out); + out->append(old.substr(out->length())); +} + +static void decode_omap_key(const string& key, string *user_key) +{ + *user_key = key.substr(sizeof(uint64_t) + 1); +} + +static void get_omap_tail(uint64_t id, string *out) +{ + _key_encode_u64(id, out); + out->push_back('~'); +} + + + +// Onode + +#undef dout_prefix +#define dout_prefix *_dout << "kstore.onode(" << this << ") " + +void KStore::Onode::flush() +{ + std::unique_lock l(flush_lock); + dout(20) << __func__ << " " << flush_txns << dendl; + while (!flush_txns.empty()) + flush_cond.wait(l); + dout(20) << __func__ << " done" << dendl; +} + +// OnodeHashLRU + +#undef dout_prefix +#define dout_prefix *_dout << "kstore.lru(" << this << ") " + +void KStore::OnodeHashLRU::_touch(OnodeRef o) +{ + lru_list_t::iterator p = lru.iterator_to(*o); + lru.erase(p); + lru.push_front(*o); +} + +void KStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o) +{ + std::lock_guard l(lock); + dout(30) << __func__ << " " << oid << " " << o << dendl; + ceph_assert(onode_map.count(oid) == 0); + onode_map[oid] = o; + lru.push_front(*o); +} + +KStore::OnodeRef KStore::OnodeHashLRU::lookup(const ghobject_t& oid) +{ + std::lock_guard l(lock); + dout(30) << __func__ << dendl; + ceph::unordered_map::iterator p = onode_map.find(oid); + if (p == onode_map.end()) { + dout(30) << __func__ << " " << oid << " miss" << dendl; + return OnodeRef(); + } + dout(30) << __func__ << " " << oid << " hit " << p->second << dendl; + _touch(p->second); + return p->second; +} + +void KStore::OnodeHashLRU::clear() +{ + std::lock_guard l(lock); + dout(10) << __func__ << dendl; + lru.clear(); + onode_map.clear(); +} + +void KStore::OnodeHashLRU::rename(const ghobject_t& old_oid, + const ghobject_t& new_oid) +{ + std::lock_guard l(lock); + dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl; + ceph::unordered_map::iterator po, pn; + po = onode_map.find(old_oid); + pn = onode_map.find(new_oid); + + ceph_assert(po != onode_map.end()); + if (pn != onode_map.end()) { + lru_list_t::iterator p = lru.iterator_to(*pn->second); + lru.erase(p); + onode_map.erase(pn); + } + OnodeRef o = po->second; + + // install a non-existent onode it its place + po->second.reset(new Onode(cct, old_oid, o->key)); + lru.push_back(*po->second); + + // fix oid, key + onode_map.insert(make_pair(new_oid, o)); + _touch(o); + o->oid = new_oid; + get_object_key(cct, new_oid, &o->key); +} + +bool KStore::OnodeHashLRU::get_next( + const ghobject_t& after, + pair *next) +{ + std::lock_guard l(lock); + dout(20) << __func__ << " after " << after << dendl; + + if (after == ghobject_t()) { + if (lru.empty()) { + return false; + } + ceph::unordered_map::iterator p = onode_map.begin(); + ceph_assert(p != onode_map.end()); + next->first = p->first; + next->second = p->second; + return true; + } + + ceph::unordered_map::iterator p = onode_map.find(after); + ceph_assert(p != onode_map.end()); // for now + lru_list_t::iterator pi = lru.iterator_to(*p->second); + ++pi; + if (pi == lru.end()) { + return false; + } + next->first = pi->oid; + next->second = onode_map[pi->oid]; + return true; +} + +int KStore::OnodeHashLRU::trim(int max) +{ + std::lock_guard l(lock); + dout(20) << __func__ << " max " << max + << " size " << onode_map.size() << dendl; + int trimmed = 0; + int num = onode_map.size() - max; + if (onode_map.size() == 0 || num <= 0) + return 0; // don't even try + + lru_list_t::iterator p = lru.end(); + if (num) + --p; + while (num > 0) { + Onode *o = &*p; + int refs = o->nref.load(); + if (refs > 1) { + dout(20) << __func__ << " " << o->oid << " has " << refs + << " refs; stopping with " << num << " left to trim" << dendl; + break; + } + dout(30) << __func__ << " trim " << o->oid << dendl; + if (p != lru.begin()) { + lru.erase(p--); + } else { + lru.erase(p); + ceph_assert(num == 1); + } + o->get(); // paranoia + onode_map.erase(o->oid); + o->put(); + --num; + ++trimmed; + } + return trimmed; +} + +// ======================================================= + +// Collection + +#undef dout_prefix +#define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") " + +KStore::Collection::Collection(KStore *ns, coll_t cid) + : CollectionImpl(cid), + store(ns), + lock("KStore::Collection::lock", true, false), + osr(new OpSequencer()), + onode_map(store->cct) +{ +} + +void KStore::Collection::flush() +{ + osr->flush(); +} + +bool KStore::Collection::flush_commit(Context *c) +{ + return osr->flush_commit(c); +} + + +KStore::OnodeRef KStore::Collection::get_onode( + const ghobject_t& oid, + bool create) +{ + ceph_assert(create ? lock.is_wlocked() : lock.is_locked()); + + spg_t pgid; + if (cid.is_pg(&pgid)) { + if (!oid.match(cnode.bits, pgid.ps())) { + lderr(store->cct) << __func__ << " oid " << oid << " not part of " + << pgid << " bits " << cnode.bits << dendl; + ceph_abort(); + } + } + + OnodeRef o = onode_map.lookup(oid); + if (o) + return o; + + string key; + get_object_key(store->cct, oid, &key); + + ldout(store->cct, 20) << __func__ << " oid " << oid << " key " + << pretty_binary_string(key) << dendl; + + bufferlist v; + int r = store->db->get(PREFIX_OBJ, key, &v); + ldout(store->cct, 20) << " r " << r << " v.len " << v.length() << dendl; + Onode *on; + if (v.length() == 0) { + ceph_assert(r == -ENOENT); + if (!create) + return OnodeRef(); + + // new + on = new Onode(store->cct, oid, key); + on->dirty = true; + } else { + // loaded + ceph_assert(r >=0); + on = new Onode(store->cct, oid, key); + on->exists = true; + auto p = v.cbegin(); + decode(on->onode, p); + } + o.reset(on); + onode_map.add(oid, o); + return o; +} + + + +// ======================================================= + +#undef dout_prefix +#define dout_prefix *_dout << "kstore(" << path << ") " + +KStore::KStore(CephContext *cct, const string& path) + : ObjectStore(cct, path), + db(NULL), + basedir(path), + path_fd(-1), + fsid_fd(-1), + mounted(false), + coll_lock("KStore::coll_lock"), + nid_last(0), + nid_max(0), + throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops), + throttle_bytes(cct, "kstore_max_bytes", cct->_conf->kstore_max_bytes), + finisher(cct), + kv_sync_thread(this), + kv_stop(false), + logger(nullptr) +{ + _init_logger(); +} + +KStore::~KStore() +{ + _shutdown_logger(); + ceph_assert(!mounted); + ceph_assert(db == NULL); + ceph_assert(fsid_fd < 0); +} + +void KStore::_init_logger() +{ + // XXX + PerfCountersBuilder b(cct, "KStore", + l_kstore_first, l_kstore_last); + b.add_time_avg(l_kstore_state_prepare_lat, "state_prepare_lat", "Average prepare state latency"); + b.add_time_avg(l_kstore_state_kv_queued_lat, "state_kv_queued_lat", "Average kv_queued state latency"); + b.add_time_avg(l_kstore_state_kv_done_lat, "state_kv_done_lat", "Average kv_done state latency"); + b.add_time_avg(l_kstore_state_finishing_lat, "state_finishing_lat", "Average finishing state latency"); + b.add_time_avg(l_kstore_state_done_lat, "state_done_lat", "Average done state latency"); + logger = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); +} + +void KStore::_shutdown_logger() +{ + // XXX + cct->get_perfcounters_collection()->remove(logger); + delete logger; +} + +int KStore::_open_path() +{ + ceph_assert(path_fd < 0); + path_fd = ::open(path.c_str(), O_DIRECTORY|O_CLOEXEC); + if (path_fd < 0) { + int r = -errno; + derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r) + << dendl; + return r; + } + return 0; +} + +void KStore::_close_path() +{ + VOID_TEMP_FAILURE_RETRY(::close(path_fd)); + path_fd = -1; +} + +int KStore::_open_fsid(bool create) +{ + ceph_assert(fsid_fd < 0); + int flags = O_RDWR; + if (create) + flags |= O_CREAT; + fsid_fd = ::openat(path_fd, "fsid", flags, 0644); + if (fsid_fd < 0) { + int err = -errno; + derr << __func__ << " " << cpp_strerror(err) << dendl; + return err; + } + return 0; +} + +int KStore::_read_fsid(uuid_d *uuid) +{ + char fsid_str[40]; + memset(fsid_str, 0, sizeof(fsid_str)); + int ret = safe_read(fsid_fd, fsid_str, sizeof(fsid_str)); + if (ret < 0) { + derr << __func__ << " failed: " << cpp_strerror(ret) << dendl; + return ret; + } + if (ret > 36) + fsid_str[36] = 0; + else + fsid_str[ret] = 0; + if (!uuid->parse(fsid_str)) { + derr << __func__ << " unparsable uuid " << fsid_str << dendl; + return -EINVAL; + } + return 0; +} + +int KStore::_write_fsid() +{ + int r = ::ftruncate(fsid_fd, 0); + if (r < 0) { + r = -errno; + derr << __func__ << " fsid truncate failed: " << cpp_strerror(r) << dendl; + return r; + } + string str = stringify(fsid) + "\n"; + r = safe_write(fsid_fd, str.c_str(), str.length()); + if (r < 0) { + derr << __func__ << " fsid write failed: " << cpp_strerror(r) << dendl; + return r; + } + r = ::fsync(fsid_fd); + if (r < 0) { + r = -errno; + derr << __func__ << " fsid fsync failed: " << cpp_strerror(r) << dendl; + return r; + } + return 0; +} + +void KStore::_close_fsid() +{ + VOID_TEMP_FAILURE_RETRY(::close(fsid_fd)); + fsid_fd = -1; +} + +int KStore::_lock_fsid() +{ + struct flock l; + memset(&l, 0, sizeof(l)); + l.l_type = F_WRLCK; + l.l_whence = SEEK_SET; + l.l_start = 0; + l.l_len = 0; + int r = ::fcntl(fsid_fd, F_SETLK, &l); + if (r < 0) { + int err = errno; + derr << __func__ << " failed to lock " << path << "/fsid" + << " (is another ceph-osd still running?)" + << cpp_strerror(err) << dendl; + return -err; + } + return 0; +} + +bool KStore::test_mount_in_use() +{ + // most error conditions mean the mount is not in use (e.g., because + // it doesn't exist). only if we fail to lock do we conclude it is + // in use. + bool ret = false; + int r = _open_path(); + if (r < 0) + return false; + r = _open_fsid(false); + if (r < 0) + goto out_path; + r = _lock_fsid(); + if (r < 0) + ret = true; // if we can't lock, it is in use + _close_fsid(); + out_path: + _close_path(); + return ret; +} + +int KStore::_open_db(bool create) +{ + int r; + ceph_assert(!db); + char fn[PATH_MAX]; + snprintf(fn, sizeof(fn), "%s/db", path.c_str()); + + string kv_backend; + if (create) { + kv_backend = cct->_conf->kstore_backend; + } else { + r = read_meta("kv_backend", &kv_backend); + if (r < 0) { + derr << __func__ << " uanble to read 'kv_backend' meta" << dendl; + return -EIO; + } + } + dout(10) << __func__ << " kv_backend = " << kv_backend << dendl; + + if (create) { + int r = ::mkdir(fn, 0755); + if (r < 0) + r = -errno; + if (r < 0 && r != -EEXIST) { + derr << __func__ << " failed to create " << fn << ": " << cpp_strerror(r) + << dendl; + return r; + } + + // wal_dir, too! + char walfn[PATH_MAX]; + snprintf(walfn, sizeof(walfn), "%s/db.wal", path.c_str()); + r = ::mkdir(walfn, 0755); + if (r < 0) + r = -errno; + if (r < 0 && r != -EEXIST) { + derr << __func__ << " failed to create " << walfn + << ": " << cpp_strerror(r) + << dendl; + return r; + } + } + + db = KeyValueDB::create(cct, kv_backend, fn); + if (!db) { + derr << __func__ << " error creating db" << dendl; + return -EIO; + } + string options; + if (kv_backend == "rocksdb") + options = cct->_conf->kstore_rocksdb_options; + db->init(options); + stringstream err; + if (create) + r = db->create_and_open(err); + else + r = db->open(err); + if (r) { + derr << __func__ << " erroring opening db: " << err.str() << dendl; + delete db; + db = NULL; + return -EIO; + } + dout(1) << __func__ << " opened " << kv_backend + << " path " << fn << " options " << options << dendl; + return 0; +} + +void KStore::_close_db() +{ + ceph_assert(db); + delete db; + db = NULL; +} + +int KStore::_open_collections(int *errors) +{ + ceph_assert(coll_map.empty()); + KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL); + for (it->upper_bound(string()); + it->valid(); + it->next()) { + coll_t cid; + if (cid.parse(it->key())) { + CollectionRef c(new Collection(this, cid)); + bufferlist bl = it->value(); + auto p = bl.cbegin(); + try { + decode(c->cnode, p); + } catch (buffer::error& e) { + derr << __func__ << " failed to decode cnode, key:" + << pretty_binary_string(it->key()) << dendl; + return -EIO; + } + dout(20) << __func__ << " opened " << cid << dendl; + coll_map[cid] = c; + } else { + derr << __func__ << " unrecognized collection " << it->key() << dendl; + if (errors) + (*errors)++; + } + } + return 0; +} + +int KStore::mkfs() +{ + dout(1) << __func__ << " path " << path << dendl; + int r; + uuid_d old_fsid; + + r = _open_path(); + if (r < 0) + return r; + + r = _open_fsid(true); + if (r < 0) + goto out_path_fd; + + r = _lock_fsid(); + if (r < 0) + goto out_close_fsid; + + r = _read_fsid(&old_fsid); + if (r < 0 || old_fsid.is_zero()) { + if (fsid.is_zero()) { + fsid.generate_random(); + dout(1) << __func__ << " generated fsid " << fsid << dendl; + } else { + dout(1) << __func__ << " using provided fsid " << fsid << dendl; + } + // we'll write it last. + } else { + if (!fsid.is_zero() && fsid != old_fsid) { + derr << __func__ << " on-disk fsid " << old_fsid + << " != provided " << fsid << dendl; + r = -EINVAL; + goto out_close_fsid; + } + fsid = old_fsid; + dout(1) << __func__ << " already created, fsid is " << fsid << dendl; + goto out_close_fsid; + } + + r = _open_db(true); + if (r < 0) + goto out_close_fsid; + + r = write_meta("kv_backend", cct->_conf->kstore_backend); + if (r < 0) + goto out_close_db; + + r = write_meta("type", "kstore"); + if (r < 0) + goto out_close_db; + + // indicate mkfs completion/success by writing the fsid file + r = _write_fsid(); + if (r == 0) + dout(10) << __func__ << " success" << dendl; + else + derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl; + + out_close_db: + _close_db(); + out_close_fsid: + _close_fsid(); + out_path_fd: + _close_path(); + return r; +} + +int KStore::mount() +{ + dout(1) << __func__ << " path " << path << dendl; + + if (cct->_conf->kstore_fsck_on_mount) { + int rc = fsck(cct->_conf->kstore_fsck_on_mount_deep); + if (rc < 0) + return rc; + } + + int r = _open_path(); + if (r < 0) + return r; + r = _open_fsid(false); + if (r < 0) + goto out_path; + + r = _read_fsid(&fsid); + if (r < 0) + goto out_fsid; + + r = _lock_fsid(); + if (r < 0) + goto out_fsid; + + r = _open_db(false); + if (r < 0) + goto out_fsid; + + r = _open_super_meta(); + if (r < 0) + goto out_db; + + r = _open_collections(); + if (r < 0) + goto out_db; + + finisher.start(); + kv_sync_thread.create("kstore_kv_sync"); + + mounted = true; + return 0; + + out_db: + _close_db(); + out_fsid: + _close_fsid(); + out_path: + _close_path(); + return r; +} + +int KStore::umount() +{ + ceph_assert(mounted); + dout(1) << __func__ << dendl; + + _sync(); + _reap_collections(); + coll_map.clear(); + + dout(20) << __func__ << " stopping kv thread" << dendl; + _kv_stop(); + dout(20) << __func__ << " draining finisher" << dendl; + finisher.wait_for_empty(); + dout(20) << __func__ << " stopping finisher" << dendl; + finisher.stop(); + dout(20) << __func__ << " closing" << dendl; + + mounted = false; + _close_db(); + _close_fsid(); + _close_path(); + return 0; +} + +int KStore::fsck(bool deep) +{ + dout(1) << __func__ << dendl; + int errors = 0; + dout(1) << __func__ << " finish with " << errors << " errors" << dendl; + return errors; +} + +void KStore::_sync() +{ + dout(10) << __func__ << dendl; + + std::unique_lock l(kv_lock); + while (!kv_committing.empty() || + !kv_queue.empty()) { + dout(20) << " waiting for kv to commit" << dendl; + kv_sync_cond.wait(l); + } + + dout(10) << __func__ << " done" << dendl; +} + +int KStore::statfs(struct store_statfs_t* buf0, osd_alert_list_t* alerts) +{ + struct statfs buf; + buf0->reset(); + if (alerts) { + alerts->clear(); // returns nothing for now + } + if (::statfs(basedir.c_str(), &buf) < 0) { + int r = -errno; + ceph_assert(r != -ENOENT); + return r; + } + + buf0->total = buf.f_blocks * buf.f_bsize; + buf0->available = buf.f_bavail * buf.f_bsize; + + return 0; +} + +ObjectStore::CollectionHandle KStore::open_collection(const coll_t& cid) +{ + return _get_collection(cid); +} + +ObjectStore::CollectionHandle KStore::create_new_collection(const coll_t& cid) +{ + auto *c = new Collection(this, cid); + RWLock::WLocker l(coll_lock); + new_coll_map[cid] = c; + return c; +} + +int KStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf) +{ + return -ENOTSUP; +} + +// --------------- +// cache + +KStore::CollectionRef KStore::_get_collection(coll_t cid) +{ + RWLock::RLocker l(coll_lock); + ceph::unordered_map::iterator cp = coll_map.find(cid); + if (cp == coll_map.end()) + return CollectionRef(); + return cp->second; +} + +void KStore::_queue_reap_collection(CollectionRef& c) +{ + dout(10) << __func__ << " " << c->cid << dendl; + std::lock_guard l(reap_lock); + removed_collections.push_back(c); +} + +void KStore::_reap_collections() +{ + list removed_colls; + std::lock_guard l(reap_lock); + removed_colls.swap(removed_collections); + + for (list::iterator p = removed_colls.begin(); + p != removed_colls.end(); + ++p) { + CollectionRef c = *p; + dout(10) << __func__ << " " << c->cid << dendl; + { + pair next; + while (c->onode_map.get_next(next.first, &next)) { + ceph_assert(!next.second->exists); + if (!next.second->flush_txns.empty()) { + dout(10) << __func__ << " " << c->cid << " " << next.second->oid + << " flush_txns " << next.second->flush_txns << dendl; + return; + } + } + } + c->onode_map.clear(); + dout(10) << __func__ << " " << c->cid << " done" << dendl; + } + + dout(10) << __func__ << " all reaped" << dendl; +} + +// --------------- +// read operations + +bool KStore::exists(CollectionHandle& ch, const ghobject_t& oid) +{ + dout(10) << __func__ << " " << ch->cid << " " << oid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) + return false; + return true; +} + +int KStore::stat( + CollectionHandle& ch, + const ghobject_t& oid, + struct stat *st, + bool allow_eio) +{ + dout(10) << __func__ << " " << ch->cid << " " << oid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) + return -ENOENT; + st->st_size = o->onode.size; + st->st_blksize = 4096; + st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize; + st->st_nlink = 1; + return 0; +} + +int KStore::set_collection_opts( + CollectionHandle& ch, + const pool_opts_t& opts) +{ + return -EOPNOTSUPP; +} + +int KStore::read( + CollectionHandle& ch, + const ghobject_t& oid, + uint64_t offset, + size_t length, + bufferlist& bl, + uint32_t op_flags) +{ + dout(15) << __func__ << " " << ch->cid << " " << oid + << " " << offset << "~" << length + << dendl; + bl.clear(); + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + + int r; + + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + r = -ENOENT; + goto out; + } + + if (offset == length && offset == 0) + length = o->onode.size; + + r = _do_read(o, offset, length, bl, op_flags); + + out: + dout(10) << __func__ << " " << ch->cid << " " << oid + << " " << offset << "~" << length + << " = " << r << dendl; + return r; +} + +int KStore::_do_read( + OnodeRef o, + uint64_t offset, + size_t length, + bufferlist& bl, + uint32_t op_flags) +{ + int r = 0; + uint64_t stripe_size = o->onode.stripe_size; + uint64_t stripe_off; + + dout(20) << __func__ << " " << offset << "~" << length << " size " + << o->onode.size << " nid " << o->onode.nid << dendl; + bl.clear(); + + if (offset > o->onode.size) { + goto out; + } + if (offset + length > o->onode.size) { + length = o->onode.size - offset; + } + if (stripe_size == 0) { + bl.append_zero(length); + r = length; + goto out; + } + + o->flush(); + + stripe_off = offset % stripe_size; + while (length > 0) { + bufferlist stripe; + _do_read_stripe(o, offset - stripe_off, &stripe); + dout(30) << __func__ << " stripe " << offset - stripe_off << " got " + << stripe.length() << dendl; + unsigned swant = std::min(stripe_size - stripe_off, length); + if (stripe.length()) { + if (swant == stripe.length()) { + bl.claim_append(stripe); + dout(30) << __func__ << " taking full stripe" << dendl; + } else { + unsigned l = 0; + if (stripe_off < stripe.length()) { + l = std::min(stripe.length() - stripe_off, swant); + bufferlist t; + t.substr_of(stripe, stripe_off, l); + bl.claim_append(t); + dout(30) << __func__ << " taking " << stripe_off << "~" << l << dendl; + } + if (l < swant) { + bl.append_zero(swant - l); + dout(30) << __func__ << " adding " << swant - l << " zeros" << dendl; + } + } + } else { + dout(30) << __func__ << " generating " << swant << " zeros" << dendl; + bl.append_zero(swant); + } + offset += swant; + length -= swant; + stripe_off = 0; + } + r = bl.length(); + dout(30) << " result:\n"; + bl.hexdump(*_dout); + *_dout << dendl; + + out: + return r; +} + +int KStore::fiemap( + CollectionHandle& ch, + const ghobject_t& oid, + uint64_t offset, + size_t len, + bufferlist& bl) +{ + map m; + int r = fiemap(ch, oid, offset, len, m); + if (r >= 0) { + encode(m, bl); + } + return r; +} + +int KStore::fiemap( + CollectionHandle& ch, + const ghobject_t& oid, + uint64_t offset, + size_t len, + map& destmap) +{ + CollectionRef c = static_cast(ch.get()); + if (!c) + return -ENOENT; + RWLock::RLocker l(c->lock); + + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + return -ENOENT; + } + + if (offset > o->onode.size) + goto out; + + if (offset + len > o->onode.size) { + len = o->onode.size - offset; + } + + dout(20) << __func__ << " " << offset << "~" << len << " size " + << o->onode.size << dendl; + + // FIXME: do something smarter here + destmap[0] = o->onode.size; + + out: + dout(20) << __func__ << " " << offset << "~" << len + << " size = 0 (" << destmap << ")" << dendl; + return 0; +} + +int KStore::getattr( + CollectionHandle& ch, + const ghobject_t& oid, + const char *name, + bufferptr& value) +{ + dout(15) << __func__ << " " << ch->cid << " " << oid << " " << name << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + int r; + string k(name); + + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + r = -ENOENT; + goto out; + } + + if (!o->onode.attrs.count(k)) { + r = -ENODATA; + goto out; + } + value = o->onode.attrs[k]; + r = 0; + out: + dout(10) << __func__ << " " << ch->cid << " " << oid << " " << name + << " = " << r << dendl; + return r; +} + +int KStore::getattrs( + CollectionHandle& ch, + const ghobject_t& oid, + map& aset) +{ + dout(15) << __func__ << " " << ch->cid << " " << oid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + int r; + + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + r = -ENOENT; + goto out; + } + aset = o->onode.attrs; + r = 0; + out: + dout(10) << __func__ << " " << ch->cid << " " << oid + << " = " << r << dendl; + return r; +} + +int KStore::list_collections(vector& ls) +{ + RWLock::RLocker l(coll_lock); + for (ceph::unordered_map::iterator p = coll_map.begin(); + p != coll_map.end(); + ++p) + ls.push_back(p->first); + return 0; +} + +bool KStore::collection_exists(const coll_t& c) +{ + RWLock::RLocker l(coll_lock); + return coll_map.count(c); +} + +int KStore::collection_empty(CollectionHandle& ch, bool *empty) +{ + dout(15) << __func__ << " " << ch->cid << dendl; + vector ls; + ghobject_t next; + int r = collection_list(ch, ghobject_t(), ghobject_t::get_max(), 1, + &ls, &next); + if (r < 0) { + derr << __func__ << " collection_list returned: " << cpp_strerror(r) + << dendl; + return r; + } + *empty = ls.empty(); + dout(10) << __func__ << " " << ch->cid << " = " << (int)(*empty) << dendl; + return 0; +} + +int KStore::collection_bits(CollectionHandle& ch) +{ + dout(15) << __func__ << " " << ch->cid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + dout(10) << __func__ << " " << ch->cid << " = " << c->cnode.bits << dendl; + return c->cnode.bits; +} + +int KStore::collection_list( + CollectionHandle &c_, const ghobject_t& start, const ghobject_t& end, int max, + vector *ls, ghobject_t *pnext) + +{ + Collection *c = static_cast(c_.get()); + c->flush(); + dout(15) << __func__ << " " << c->cid + << " start " << start << " end " << end << " max " << max << dendl; + int r; + { + RWLock::RLocker l(c->lock); + r = _collection_list(c, start, end, max, ls, pnext); + } + + dout(10) << __func__ << " " << c->cid + << " start " << start << " end " << end << " max " << max + << " = " << r << ", ls.size() = " << ls->size() + << ", next = " << (pnext ? *pnext : ghobject_t()) << dendl; + return r; +} + +int KStore::_collection_list( + Collection* c, const ghobject_t& start, const ghobject_t& end, int max, + vector *ls, ghobject_t *pnext) +{ + int r = 0; + KeyValueDB::Iterator it; + string temp_start_key, temp_end_key; + string start_key, end_key; + bool set_next = false; + string pend; + bool temp; + + ghobject_t static_next; + if (!pnext) + pnext = &static_next; + + if (start == ghobject_t::get_max() || + start.hobj.is_max()) { + goto out; + } + get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key, + &start_key, &end_key); + dout(20) << __func__ + << " range " << pretty_binary_string(temp_start_key) + << " to " << pretty_binary_string(temp_end_key) + << " and " << pretty_binary_string(start_key) + << " to " << pretty_binary_string(end_key) + << " start " << start << dendl; + it = db->get_iterator(PREFIX_OBJ); + if (start == ghobject_t() || start == c->cid.get_min_hobj()) { + it->upper_bound(temp_start_key); + temp = true; + } else { + string k; + get_object_key(cct, start, &k); + if (start.hobj.is_temp()) { + temp = true; + ceph_assert(k >= temp_start_key && k < temp_end_key); + } else { + temp = false; + ceph_assert(k >= start_key && k < end_key); + } + dout(20) << " start from " << pretty_binary_string(k) + << " temp=" << (int)temp << dendl; + it->lower_bound(k); + } + if (end.hobj.is_max()) { + pend = temp ? temp_end_key : end_key; + } else { + if (end.hobj.is_temp()) { + if (temp) + get_object_key(cct, end, &pend); + else + goto out; + } else { + if (temp) + pend = temp_end_key; + else + get_object_key(cct, end, &pend); + } + } + dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl; + while (true) { + if (!it->valid() || it->key() >= pend) { + if (!it->valid()) + dout(20) << __func__ << " iterator not valid (end of db?)" << dendl; + else + dout(20) << __func__ << " key " << pretty_binary_string(it->key()) + << " > " << end << dendl; + if (temp) { + if (end.hobj.is_temp()) { + if (it->valid() && it->key() < temp_end_key) { + int r = get_key_object(it->key(), pnext); + ceph_assert(r == 0); + set_next = true; + } + break; + } + dout(30) << __func__ << " switch to non-temp namespace" << dendl; + temp = false; + it->upper_bound(start_key); + if (end.hobj.is_max()) + pend = end_key; + else + get_object_key(cct, end, &pend); + dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl; + continue; + } + if (it->valid() && it->key() < end_key) { + int r = get_key_object(it->key(), pnext); + ceph_assert(r == 0); + set_next = true; + } + break; + } + dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl; + ghobject_t oid; + int r = get_key_object(it->key(), &oid); + ceph_assert(r == 0); + if (ls->size() >= (unsigned)max) { + dout(20) << __func__ << " reached max " << max << dendl; + *pnext = oid; + set_next = true; + break; + } + ls->push_back(oid); + it->next(); + } +out: + if (!set_next) { + *pnext = ghobject_t::get_max(); + } + return r; +} + +// omap reads + +KStore::OmapIteratorImpl::OmapIteratorImpl( + CollectionRef c, OnodeRef o, KeyValueDB::Iterator it) + : c(c), o(o), it(it) +{ + RWLock::RLocker l(c->lock); + if (o->onode.omap_head) { + get_omap_key(o->onode.omap_head, string(), &head); + get_omap_tail(o->onode.omap_head, &tail); + it->lower_bound(head); + } +} + +int KStore::OmapIteratorImpl::seek_to_first() +{ + RWLock::RLocker l(c->lock); + if (o->onode.omap_head) { + it->lower_bound(head); + } else { + it = KeyValueDB::Iterator(); + } + return 0; +} + +int KStore::OmapIteratorImpl::upper_bound(const string& after) +{ + RWLock::RLocker l(c->lock); + if (o->onode.omap_head) { + string key; + get_omap_key(o->onode.omap_head, after, &key); + it->upper_bound(key); + } else { + it = KeyValueDB::Iterator(); + } + return 0; +} + +int KStore::OmapIteratorImpl::lower_bound(const string& to) +{ + RWLock::RLocker l(c->lock); + if (o->onode.omap_head) { + string key; + get_omap_key(o->onode.omap_head, to, &key); + it->lower_bound(key); + } else { + it = KeyValueDB::Iterator(); + } + return 0; +} + +bool KStore::OmapIteratorImpl::valid() +{ + RWLock::RLocker l(c->lock); + if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) { + return true; + } else { + return false; + } +} + +int KStore::OmapIteratorImpl::next() +{ + RWLock::RLocker l(c->lock); + if (o->onode.omap_head) { + it->next(); + return 0; + } else { + return -1; + } +} + +string KStore::OmapIteratorImpl::key() +{ + RWLock::RLocker l(c->lock); + ceph_assert(it->valid()); + string db_key = it->raw_key().second; + string user_key; + decode_omap_key(db_key, &user_key); + return user_key; +} + +bufferlist KStore::OmapIteratorImpl::value() +{ + RWLock::RLocker l(c->lock); + ceph_assert(it->valid()); + return it->value(); +} + +int KStore::omap_get( + CollectionHandle& ch, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + map *out /// < [out] Key to value map + ) +{ + dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + int r = 0; + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + r = -ENOENT; + goto out; + } + if (!o->onode.omap_head) + goto out; + o->flush(); + { + KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP); + string head, tail; + get_omap_header(o->onode.omap_head, &head); + get_omap_tail(o->onode.omap_head, &tail); + it->lower_bound(head); + while (it->valid()) { + if (it->key() == head) { + dout(30) << __func__ << " got header" << dendl; + *header = it->value(); + } else if (it->key() >= tail) { + dout(30) << __func__ << " reached tail" << dendl; + break; + } else { + string user_key; + decode_omap_key(it->key(), &user_key); + dout(30) << __func__ << " got " << pretty_binary_string(it->key()) + << " -> " << user_key << dendl; + ceph_assert(it->key() < tail); + (*out)[user_key] = it->value(); + } + it->next(); + } + } + out: + dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl; + return r; +} + +int KStore::omap_get_header( + CollectionHandle& ch, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + bool allow_eio ///< [in] don't assert on eio + ) +{ + dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + int r = 0; + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + r = -ENOENT; + goto out; + } + if (!o->onode.omap_head) + goto out; + o->flush(); + { + string head; + get_omap_header(o->onode.omap_head, &head); + if (db->get(PREFIX_OMAP, head, header) >= 0) { + dout(30) << __func__ << " got header" << dendl; + } else { + dout(30) << __func__ << " no header" << dendl; + } + } + out: + dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl; + return r; +} + +int KStore::omap_get_keys( + CollectionHandle& ch, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + set *keys ///< [out] Keys defined on oid + ) +{ + dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + int r = 0; + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + r = -ENOENT; + goto out; + } + if (!o->onode.omap_head) + goto out; + o->flush(); + { + KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP); + string head, tail; + get_omap_key(o->onode.omap_head, string(), &head); + get_omap_tail(o->onode.omap_head, &tail); + it->lower_bound(head); + while (it->valid()) { + if (it->key() >= tail) { + dout(30) << __func__ << " reached tail" << dendl; + break; + } + string user_key; + decode_omap_key(it->key(), &user_key); + dout(30) << __func__ << " got " << pretty_binary_string(it->key()) + << " -> " << user_key << dendl; + ceph_assert(it->key() < tail); + keys->insert(user_key); + it->next(); + } + } + out: + dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl; + return r; +} + +int KStore::omap_get_values( + CollectionHandle& ch, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to get + map *out ///< [out] Returned keys and values + ) +{ + dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + int r = 0; + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + r = -ENOENT; + goto out; + } + if (!o->onode.omap_head) + goto out; + o->flush(); + for (set::const_iterator p = keys.begin(); p != keys.end(); ++p) { + string key; + get_omap_key(o->onode.omap_head, *p, &key); + bufferlist val; + if (db->get(PREFIX_OMAP, key, &val) >= 0) { + dout(30) << __func__ << " got " << pretty_binary_string(key) + << " -> " << *p << dendl; + out->insert(make_pair(*p, val)); + } + } + out: + dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl; + return r; +} + +int KStore::omap_check_keys( + CollectionHandle& ch, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to check + set *out ///< [out] Subset of keys defined on oid + ) +{ + dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + int r = 0; + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + r = -ENOENT; + goto out; + } + if (!o->onode.omap_head) + goto out; + o->flush(); + for (set::const_iterator p = keys.begin(); p != keys.end(); ++p) { + string key; + get_omap_key(o->onode.omap_head, *p, &key); + bufferlist val; + if (db->get(PREFIX_OMAP, key, &val) >= 0) { + dout(30) << __func__ << " have " << pretty_binary_string(key) + << " -> " << *p << dendl; + out->insert(*p); + } else { + dout(30) << __func__ << " miss " << pretty_binary_string(key) + << " -> " << *p << dendl; + } + } + out: + dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl; + return r; +} + +ObjectMap::ObjectMapIterator KStore::get_omap_iterator( + CollectionHandle& ch, ///< [in] collection + const ghobject_t &oid ///< [in] object + ) +{ + + dout(10) << __func__ << " " << ch->cid << " " << oid << dendl; + Collection *c = static_cast(ch.get()); + RWLock::RLocker l(c->lock); + OnodeRef o = c->get_onode(oid, false); + if (!o || !o->exists) { + dout(10) << __func__ << " " << oid << "doesn't exist" <flush(); + dout(10) << __func__ << " header = " << o->onode.omap_head <get_iterator(PREFIX_OMAP); + return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o, it)); +} + + +// ----------------- +// write helpers + +int KStore::_open_super_meta() +{ + // nid + { + nid_max = 0; + bufferlist bl; + db->get(PREFIX_SUPER, "nid_max", &bl); + auto p = bl.cbegin(); + try { + decode(nid_max, p); + } catch (buffer::error& e) { + } + dout(10) << __func__ << " old nid_max " << nid_max << dendl; + nid_last = nid_max; + } + return 0; +} + +void KStore::_assign_nid(TransContext *txc, OnodeRef o) +{ + if (o->onode.nid) + return; + std::lock_guard l(nid_lock); + o->onode.nid = ++nid_last; + dout(20) << __func__ << " " << o->oid << " nid " << o->onode.nid << dendl; + if (nid_last > nid_max) { + nid_max += cct->_conf->kstore_nid_prealloc; + bufferlist bl; + encode(nid_max, bl); + txc->t->set(PREFIX_SUPER, "nid_max", bl); + dout(10) << __func__ << " nid_max now " << nid_max << dendl; + } +} + +KStore::TransContext *KStore::_txc_create(OpSequencer *osr) +{ + TransContext *txc = new TransContext(osr); + txc->t = db->get_transaction(); + osr->queue_new(txc); + dout(20) << __func__ << " osr " << osr << " = " << txc << dendl; + return txc; +} + +void KStore::_txc_state_proc(TransContext *txc) +{ + while (true) { + dout(10) << __func__ << " txc " << txc + << " " << txc->get_state_name() << dendl; + switch (txc->state) { + case TransContext::STATE_PREPARE: + txc->log_state_latency(logger, l_kstore_state_prepare_lat); + txc->state = TransContext::STATE_KV_QUEUED; + if (!cct->_conf->kstore_sync_transaction) { + std::lock_guard l(kv_lock); + if (cct->_conf->kstore_sync_submit_transaction) { + int r = db->submit_transaction(txc->t); + ceph_assert(r == 0); + } + kv_queue.push_back(txc); + kv_cond.notify_one(); + return; + } + { + int r = db->submit_transaction_sync(txc->t); + ceph_assert(r == 0); + } + break; + + case TransContext::STATE_KV_QUEUED: + txc->log_state_latency(logger, l_kstore_state_kv_queued_lat); + txc->state = TransContext::STATE_KV_DONE; + _txc_finish_kv(txc); + // ** fall-thru ** + + case TransContext::STATE_KV_DONE: + txc->log_state_latency(logger, l_kstore_state_kv_done_lat); + txc->state = TransContext::STATE_FINISHING; + // ** fall-thru ** + + case TransContext::TransContext::STATE_FINISHING: + txc->log_state_latency(logger, l_kstore_state_finishing_lat); + _txc_finish(txc); + return; + + default: + derr << __func__ << " unexpected txc " << txc + << " state " << txc->get_state_name() << dendl; + ceph_abort_msg("unexpected txc state"); + return; + } + } +} + +void KStore::_txc_finalize(OpSequencer *osr, TransContext *txc) +{ + dout(20) << __func__ << " osr " << osr << " txc " << txc + << " onodes " << txc->onodes << dendl; + + // finalize onodes + for (set::iterator p = txc->onodes.begin(); + p != txc->onodes.end(); + ++p) { + bufferlist bl; + encode((*p)->onode, bl); + dout(20) << " onode size is " << bl.length() << dendl; + txc->t->set(PREFIX_OBJ, (*p)->key, bl); + + std::lock_guard l((*p)->flush_lock); + (*p)->flush_txns.insert(txc); + } +} + +void KStore::_txc_finish_kv(TransContext *txc) +{ + dout(20) << __func__ << " txc " << txc << dendl; + + // warning: we're calling onreadable_sync inside the sequencer lock + if (txc->onreadable_sync) { + txc->onreadable_sync->complete(0); + txc->onreadable_sync = NULL; + } + if (txc->onreadable) { + finisher.queue(txc->onreadable); + txc->onreadable = NULL; + } + if (txc->oncommit) { + finisher.queue(txc->oncommit); + txc->oncommit = NULL; + } + if (!txc->oncommits.empty()) { + finisher.queue(txc->oncommits); + } + + throttle_ops.put(txc->ops); + throttle_bytes.put(txc->bytes); +} + +void KStore::_txc_finish(TransContext *txc) +{ + dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl; + ceph_assert(txc->state == TransContext::STATE_FINISHING); + + for (set::iterator p = txc->onodes.begin(); + p != txc->onodes.end(); + ++p) { + std::lock_guard l((*p)->flush_lock); + dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns + << dendl; + ceph_assert((*p)->flush_txns.count(txc)); + (*p)->flush_txns.erase(txc); + if ((*p)->flush_txns.empty()) { + (*p)->flush_cond.notify_all(); + (*p)->clear_pending_stripes(); + } + } + + // clear out refs + txc->onodes.clear(); + + while (!txc->removed_collections.empty()) { + _queue_reap_collection(txc->removed_collections.front()); + txc->removed_collections.pop_front(); + } + + OpSequencerRef osr = txc->osr; + { + std::lock_guard l(osr->qlock); + txc->state = TransContext::STATE_DONE; + } + + _osr_reap_done(osr.get()); +} + +void KStore::_osr_reap_done(OpSequencer *osr) +{ + std::lock_guard l(osr->qlock); + dout(20) << __func__ << " osr " << osr << dendl; + while (!osr->q.empty()) { + TransContext *txc = &osr->q.front(); + dout(20) << __func__ << " txc " << txc << " " << txc->get_state_name() + << dendl; + if (txc->state != TransContext::STATE_DONE) { + break; + } + + if (txc->first_collection) { + txc->first_collection->onode_map.trim(cct->_conf->kstore_onode_map_size); + } + + osr->q.pop_front(); + txc->log_state_latency(logger, l_kstore_state_done_lat); + delete txc; + osr->qcond.notify_all(); + if (osr->q.empty()) + dout(20) << __func__ << " osr " << osr << " q now empty" << dendl; + } +} + +void KStore::_kv_sync_thread() +{ + dout(10) << __func__ << " start" << dendl; + std::unique_lock l(kv_lock); + while (true) { + ceph_assert(kv_committing.empty()); + if (kv_queue.empty()) { + if (kv_stop) + break; + dout(20) << __func__ << " sleep" << dendl; + kv_sync_cond.notify_all(); + kv_cond.wait(l); + dout(20) << __func__ << " wake" << dendl; + } else { + dout(20) << __func__ << " committing " << kv_queue.size() << dendl; + kv_committing.swap(kv_queue); + utime_t start = ceph_clock_now(); + l.unlock(); + + dout(30) << __func__ << " committing txc " << kv_committing << dendl; + + // one transaction to force a sync + KeyValueDB::Transaction t = db->get_transaction(); + if (!cct->_conf->kstore_sync_submit_transaction) { + for (std::deque::iterator it = kv_committing.begin(); + it != kv_committing.end(); + ++it) { + int r = db->submit_transaction((*it)->t); + ceph_assert(r == 0); + } + } + int r = db->submit_transaction_sync(t); + ceph_assert(r == 0); + utime_t finish = ceph_clock_now(); + utime_t dur = finish - start; + dout(20) << __func__ << " committed " << kv_committing.size() + << " in " << dur << dendl; + while (!kv_committing.empty()) { + TransContext *txc = kv_committing.front(); + _txc_state_proc(txc); + kv_committing.pop_front(); + } + + // this is as good a place as any ... + _reap_collections(); + + l.lock(); + } + } + dout(10) << __func__ << " finish" << dendl; +} + + +// --------------------------- +// transactions + +int KStore::queue_transactions( + CollectionHandle& ch, + vector& tls, + TrackedOpRef op, + ThreadPool::TPHandle *handle) +{ + Context *onreadable; + Context *ondisk; + Context *onreadable_sync; + ObjectStore::Transaction::collect_contexts( + tls, &onreadable, &ondisk, &onreadable_sync); + + // set up the sequencer + Collection *c = static_cast(ch.get()); + OpSequencer *osr = c->osr.get(); + dout(10) << __func__ << " ch " << ch.get() << " " << c->cid << dendl; + + // prepare + TransContext *txc = _txc_create(osr); + txc->onreadable = onreadable; + txc->onreadable_sync = onreadable_sync; + txc->oncommit = ondisk; + + for (vector::iterator p = tls.begin(); p != tls.end(); ++p) { + txc->ops += (*p).get_num_ops(); + txc->bytes += (*p).get_num_bytes(); + _txc_add_transaction(txc, &(*p)); + } + + _txc_finalize(osr, txc); + + throttle_ops.get(txc->ops); + throttle_bytes.get(txc->bytes); + + // execute (start) + _txc_state_proc(txc); + return 0; +} + +void KStore::_txc_add_transaction(TransContext *txc, Transaction *t) +{ + Transaction::iterator i = t->begin(); + + dout(30) << __func__ << " transaction dump:\n"; + JSONFormatter f(true); + f.open_object_section("transaction"); + t->dump(&f); + f.close_section(); + f.flush(*_dout); + *_dout << dendl; + + vector cvec(i.colls.size()); + unsigned j = 0; + for (vector::iterator p = i.colls.begin(); p != i.colls.end(); + ++p, ++j) { + cvec[j] = _get_collection(*p); + + // note first collection we reference + if (!j && !txc->first_collection) + txc->first_collection = cvec[j]; + } + vector ovec(i.objects.size()); + + for (int pos = 0; i.have_op(); ++pos) { + Transaction::Op *op = i.decode_op(); + int r = 0; + + // no coll or obj + if (op->op == Transaction::OP_NOP) + continue; + + // collection operations + CollectionRef &c = cvec[op->cid]; + switch (op->op) { + case Transaction::OP_RMCOLL: + { + coll_t cid = i.get_cid(op->cid); + r = _remove_collection(txc, cid, &c); + if (!r) + continue; + } + break; + + case Transaction::OP_MKCOLL: + { + ceph_assert(!c); + coll_t cid = i.get_cid(op->cid); + r = _create_collection(txc, cid, op->split_bits, &c); + if (!r) + continue; + } + break; + + case Transaction::OP_SPLIT_COLLECTION: + ceph_abort_msg("deprecated"); + break; + + case Transaction::OP_SPLIT_COLLECTION2: + { + uint32_t bits = op->split_bits; + uint32_t rem = op->split_rem; + r = _split_collection(txc, c, cvec[op->dest_cid], bits, rem); + if (!r) + continue; + } + break; + + case Transaction::OP_MERGE_COLLECTION: + { + uint32_t bits = op->split_bits; + r = _merge_collection(txc, &c, cvec[op->dest_cid], bits); + if (!r) + continue; + } + break; + + case Transaction::OP_COLL_HINT: + { + uint32_t type = op->hint_type; + bufferlist hint; + i.decode_bl(hint); + auto hiter = hint.cbegin(); + if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) { + uint32_t pg_num; + uint64_t num_objs; + decode(pg_num, hiter); + decode(num_objs, hiter); + dout(10) << __func__ << " collection hint objects is a no-op, " + << " pg_num " << pg_num << " num_objects " << num_objs + << dendl; + } else { + // Ignore the hint + dout(10) << __func__ << " unknown collection hint " << type << dendl; + } + continue; + } + break; + + case Transaction::OP_COLL_SETATTR: + r = -EOPNOTSUPP; + break; + + case Transaction::OP_COLL_RMATTR: + r = -EOPNOTSUPP; + break; + + case Transaction::OP_COLL_RENAME: + ceph_abort_msg("not implemented"); + break; + } + if (r < 0) { + derr << " error " << cpp_strerror(r) + << " not handled on operation " << op->op + << " (op " << pos << ", counting from 0)" << dendl; + dout(0) << " transaction dump:\n"; + JSONFormatter f(true); + f.open_object_section("transaction"); + t->dump(&f); + f.close_section(); + f.flush(*_dout); + *_dout << dendl; + ceph_abort_msg("unexpected error"); + } + + // object operations + RWLock::WLocker l(c->lock); + OnodeRef &o = ovec[op->oid]; + if (!o) { + // these operations implicity create the object + bool create = false; + if (op->op == Transaction::OP_TOUCH || + op->op == Transaction::OP_WRITE || + op->op == Transaction::OP_ZERO) { + create = true; + } + ghobject_t oid = i.get_oid(op->oid); + o = c->get_onode(oid, create); + if (!create) { + if (!o || !o->exists) { + dout(10) << __func__ << " op " << op->op << " got ENOENT on " + << oid << dendl; + r = -ENOENT; + goto endop; + } + } + } + + switch (op->op) { + case Transaction::OP_TOUCH: + r = _touch(txc, c, o); + break; + + case Transaction::OP_WRITE: + { + uint64_t off = op->off; + uint64_t len = op->len; + uint32_t fadvise_flags = i.get_fadvise_flags(); + bufferlist bl; + i.decode_bl(bl); + r = _write(txc, c, o, off, len, bl, fadvise_flags); + } + break; + + case Transaction::OP_ZERO: + { + uint64_t off = op->off; + uint64_t len = op->len; + r = _zero(txc, c, o, off, len); + } + break; + + case Transaction::OP_TRIMCACHE: + { + // deprecated, no-op + } + break; + + case Transaction::OP_TRUNCATE: + { + uint64_t off = op->off; + r = _truncate(txc, c, o, off); + } + break; + + case Transaction::OP_REMOVE: + r = _remove(txc, c, o); + break; + + case Transaction::OP_SETATTR: + { + string name = i.decode_string(); + bufferlist bl; + i.decode_bl(bl); + map to_set; + to_set[name] = bufferptr(bl.c_str(), bl.length()); + r = _setattrs(txc, c, o, to_set); + } + break; + + case Transaction::OP_SETATTRS: + { + map aset; + i.decode_attrset(aset); + r = _setattrs(txc, c, o, aset); + } + break; + + case Transaction::OP_RMATTR: + { + string name = i.decode_string(); + r = _rmattr(txc, c, o, name); + } + break; + + case Transaction::OP_RMATTRS: + { + r = _rmattrs(txc, c, o); + } + break; + + case Transaction::OP_CLONE: + { + const ghobject_t& noid = i.get_oid(op->dest_oid); + OnodeRef no = c->get_onode(noid, true); + r = _clone(txc, c, o, no); + } + break; + + case Transaction::OP_CLONERANGE: + ceph_abort_msg("deprecated"); + break; + + case Transaction::OP_CLONERANGE2: + { + const ghobject_t& noid = i.get_oid(op->dest_oid); + OnodeRef no = c->get_onode(noid, true); + uint64_t srcoff = op->off; + uint64_t len = op->len; + uint64_t dstoff = op->dest_off; + r = _clone_range(txc, c, o, no, srcoff, len, dstoff); + } + break; + + case Transaction::OP_COLL_ADD: + ceph_abort_msg("not implemented"); + break; + + case Transaction::OP_COLL_REMOVE: + ceph_abort_msg("not implemented"); + break; + + case Transaction::OP_COLL_MOVE: + ceph_abort_msg("deprecated"); + break; + + case Transaction::OP_COLL_MOVE_RENAME: + { + ceph_assert(op->cid == op->dest_cid); + const ghobject_t& noid = i.get_oid(op->dest_oid); + OnodeRef no = c->get_onode(noid, true); + r = _rename(txc, c, o, no, noid); + o.reset(); + } + break; + + case Transaction::OP_TRY_RENAME: + { + const ghobject_t& noid = i.get_oid(op->dest_oid); + OnodeRef no = c->get_onode(noid, true); + r = _rename(txc, c, o, no, noid); + if (r == -ENOENT) + r = 0; + o.reset(); + } + break; + + case Transaction::OP_OMAP_CLEAR: + { + r = _omap_clear(txc, c, o); + } + break; + case Transaction::OP_OMAP_SETKEYS: + { + bufferlist aset_bl; + i.decode_attrset_bl(&aset_bl); + r = _omap_setkeys(txc, c, o, aset_bl); + } + break; + case Transaction::OP_OMAP_RMKEYS: + { + bufferlist keys_bl; + i.decode_keyset_bl(&keys_bl); + r = _omap_rmkeys(txc, c, o, keys_bl); + } + break; + case Transaction::OP_OMAP_RMKEYRANGE: + { + string first, last; + first = i.decode_string(); + last = i.decode_string(); + r = _omap_rmkey_range(txc, c, o, first, last); + } + break; + case Transaction::OP_OMAP_SETHEADER: + { + bufferlist bl; + i.decode_bl(bl); + r = _omap_setheader(txc, c, o, bl); + } + break; + + case Transaction::OP_SETALLOCHINT: + { + uint64_t expected_object_size = op->expected_object_size; + uint64_t expected_write_size = op->expected_write_size; + uint32_t flags = op->alloc_hint_flags; + r = _setallochint(txc, c, o, + expected_object_size, + expected_write_size, + flags); + } + break; + + default: + derr << "bad op " << op->op << dendl; + ceph_abort(); + } + + endop: + if (r < 0) { + bool ok = false; + + if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE || + op->op == Transaction::OP_CLONE || + op->op == Transaction::OP_CLONERANGE2 || + op->op == Transaction::OP_COLL_ADD)) + // -ENOENT is usually okay + ok = true; + if (r == -ENODATA) + ok = true; + + if (!ok) { + const char *msg = "unexpected error code"; + + if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE || + op->op == Transaction::OP_CLONE || + op->op == Transaction::OP_CLONERANGE2)) + msg = "ENOENT on clone suggests osd bug"; + + if (r == -ENOSPC) + // For now, if we hit _any_ ENOSPC, crash, before we do any damage + // by partially applying transactions. + msg = "ENOSPC from key value store, misconfigured cluster"; + + if (r == -ENOTEMPTY) { + msg = "ENOTEMPTY suggests garbage data in osd data dir"; + } + + dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op->op + << " (op " << pos << ", counting from 0)" << dendl; + dout(0) << msg << dendl; + dout(0) << " transaction dump:\n"; + JSONFormatter f(true); + f.open_object_section("transaction"); + t->dump(&f); + f.close_section(); + f.flush(*_dout); + *_dout << dendl; + ceph_abort_msg("unexpected error"); + } + } + } +} + + + +// ----------------- +// write operations + +int KStore::_touch(TransContext *txc, + CollectionRef& c, + OnodeRef &o) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl; + int r = 0; + o->exists = true; + _assign_nid(txc, o); + txc->write_onode(o); + dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl; + return r; +} + +void KStore::_dump_onode(OnodeRef o) +{ + dout(30) << __func__ << " " << o + << " nid " << o->onode.nid + << " size " << o->onode.size + << " expected_object_size " << o->onode.expected_object_size + << " expected_write_size " << o->onode.expected_write_size + << dendl; + for (map::iterator p = o->onode.attrs.begin(); + p != o->onode.attrs.end(); + ++p) { + dout(30) << __func__ << " attr " << p->first + << " len " << p->second.length() << dendl; + } +} + +void KStore::_do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl) +{ + map::iterator p = o->pending_stripes.find(offset); + if (p == o->pending_stripes.end()) { + string key; + get_data_key(o->onode.nid, offset, &key); + db->get(PREFIX_DATA, key, pbl); + o->pending_stripes[offset] = *pbl; + } else { + *pbl = p->second; + } +} + +void KStore::_do_write_stripe(TransContext *txc, OnodeRef o, + uint64_t offset, bufferlist& bl) +{ + o->pending_stripes[offset] = bl; + string key; + get_data_key(o->onode.nid, offset, &key); + txc->t->set(PREFIX_DATA, key, bl); +} + +void KStore::_do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset) +{ + o->pending_stripes.erase(offset); + string key; + get_data_key(o->onode.nid, offset, &key); + txc->t->rmkey(PREFIX_DATA, key); +} + +int KStore::_do_write(TransContext *txc, + OnodeRef o, + uint64_t offset, uint64_t length, + bufferlist& orig_bl, + uint32_t fadvise_flags) +{ + int r = 0; + + dout(20) << __func__ + << " " << o->oid << " " << offset << "~" << length + << " - have " << o->onode.size + << " bytes, nid " << o->onode.nid << dendl; + _dump_onode(o); + o->exists = true; + + if (length == 0) { + return 0; + } + + uint64_t stripe_size = o->onode.stripe_size; + if (!stripe_size) { + o->onode.stripe_size = cct->_conf->kstore_default_stripe_size; + stripe_size = o->onode.stripe_size; + } + + unsigned bl_off = 0; + while (length > 0) { + uint64_t offset_rem = offset % stripe_size; + uint64_t end_rem = (offset + length) % stripe_size; + if (offset_rem == 0 && end_rem == 0) { + bufferlist bl; + bl.substr_of(orig_bl, bl_off, stripe_size); + dout(30) << __func__ << " full stripe " << offset << dendl; + _do_write_stripe(txc, o, offset, bl); + offset += stripe_size; + length -= stripe_size; + bl_off += stripe_size; + continue; + } + uint64_t stripe_off = offset - offset_rem; + bufferlist prev; + _do_read_stripe(o, stripe_off, &prev); + dout(20) << __func__ << " read previous stripe " << stripe_off + << ", got " << prev.length() << dendl; + bufferlist bl; + if (offset_rem) { + unsigned p = std::min(prev.length(), offset_rem); + if (p) { + dout(20) << __func__ << " reuse leading " << p << " bytes" << dendl; + bl.substr_of(prev, 0, p); + } + if (p < offset_rem) { + dout(20) << __func__ << " add leading " << offset_rem - p << " zeros" << dendl; + bl.append_zero(offset_rem - p); + } + } + unsigned use = stripe_size - offset_rem; + if (use > length) + use -= stripe_size - end_rem; + dout(20) << __func__ << " using " << use << " for this stripe" << dendl; + bufferlist t; + t.substr_of(orig_bl, bl_off, use); + bl.claim_append(t); + bl_off += use; + if (end_rem) { + if (end_rem < prev.length()) { + unsigned l = prev.length() - end_rem; + dout(20) << __func__ << " reuse trailing " << l << " bytes" << dendl; + bufferlist t; + t.substr_of(prev, end_rem, l); + bl.claim_append(t); + } + } + dout(30) << " writing:\n"; + bl.hexdump(*_dout); + *_dout << dendl; + _do_write_stripe(txc, o, stripe_off, bl); + offset += use; + length -= use; + } + + if (offset > o->onode.size) { + dout(20) << __func__ << " extending size to " << offset + length + << dendl; + o->onode.size = offset; + } + + return r; +} + +int KStore::_write(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset, size_t length, + bufferlist& bl, + uint32_t fadvise_flags) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid + << " " << offset << "~" << length + << dendl; + _assign_nid(txc, o); + int r = _do_write(txc, o, offset, length, bl, fadvise_flags); + txc->write_onode(o); + + dout(10) << __func__ << " " << c->cid << " " << o->oid + << " " << offset << "~" << length + << " = " << r << dendl; + return r; +} + +int KStore::_zero(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset, size_t length) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid + << " " << offset << "~" << length + << dendl; + int r = 0; + o->exists = true; + + _dump_onode(o); + _assign_nid(txc, o); + + uint64_t stripe_size = o->onode.stripe_size; + if (stripe_size) { + uint64_t end = offset + length; + uint64_t pos = offset; + uint64_t stripe_off = pos % stripe_size; + while (pos < offset + length) { + if (stripe_off || end - pos < stripe_size) { + bufferlist stripe; + _do_read_stripe(o, pos - stripe_off, &stripe); + dout(30) << __func__ << " stripe " << pos - stripe_off << " got " + << stripe.length() << dendl; + bufferlist bl; + bl.substr_of(stripe, 0, std::min(stripe.length(), stripe_off)); + if (end >= pos - stripe_off + stripe_size || + end >= o->onode.size) { + dout(20) << __func__ << " truncated stripe " << pos - stripe_off + << " to " << bl.length() << dendl; + } else { + auto len = end - (pos - stripe_off + bl.length()); + bl.append_zero(len); + dout(20) << __func__ << " adding " << len << " of zeros" << dendl; + if (stripe.length() > bl.length()) { + unsigned l = stripe.length() - bl.length(); + bufferlist t; + t.substr_of(stripe, stripe.length() - l, l); + dout(20) << __func__ << " keeping tail " << l << " of stripe" << dendl; + bl.claim_append(t); + } + } + _do_write_stripe(txc, o, pos - stripe_off, bl); + pos += stripe_size - stripe_off; + stripe_off = 0; + } else { + dout(20) << __func__ << " rm stripe " << pos << dendl; + _do_remove_stripe(txc, o, pos - stripe_off); + pos += stripe_size; + } + } + } + if (offset + length > o->onode.size) { + o->onode.size = offset + length; + dout(20) << __func__ << " extending size to " << offset + length + << dendl; + } + txc->write_onode(o); + + dout(10) << __func__ << " " << c->cid << " " << o->oid + << " " << offset << "~" << length + << " = " << r << dendl; + return r; +} + +int KStore::_do_truncate(TransContext *txc, OnodeRef o, uint64_t offset) +{ + uint64_t stripe_size = o->onode.stripe_size; + + o->flush(); + + // trim down stripes + if (stripe_size) { + uint64_t pos = offset; + uint64_t stripe_off = pos % stripe_size; + while (pos < o->onode.size) { + if (stripe_off) { + bufferlist stripe; + _do_read_stripe(o, pos - stripe_off, &stripe); + dout(30) << __func__ << " stripe " << pos - stripe_off << " got " + << stripe.length() << dendl; + bufferlist t; + t.substr_of(stripe, 0, std::min(stripe_off, stripe.length())); + _do_write_stripe(txc, o, pos - stripe_off, t); + dout(20) << __func__ << " truncated stripe " << pos - stripe_off + << " to " << t.length() << dendl; + pos += stripe_size - stripe_off; + stripe_off = 0; + } else { + dout(20) << __func__ << " rm stripe " << pos << dendl; + _do_remove_stripe(txc, o, pos - stripe_off); + pos += stripe_size; + } + } + + // trim down cached tail + if (o->tail_bl.length()) { + if (offset / stripe_size != o->onode.size / stripe_size) { + dout(20) << __func__ << " clear cached tail" << dendl; + o->clear_tail(); + } + } + } + + o->onode.size = offset; + dout(10) << __func__ << " truncate size to " << offset << dendl; + + txc->write_onode(o); + return 0; +} + +int KStore::_truncate(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid + << " " << offset + << dendl; + int r = _do_truncate(txc, o, offset); + dout(10) << __func__ << " " << c->cid << " " << o->oid + << " " << offset + << " = " << r << dendl; + return r; +} + +int KStore::_do_remove(TransContext *txc, + OnodeRef o) +{ + string key; + + _do_truncate(txc, o, 0); + + o->onode.size = 0; + if (o->onode.omap_head) { + _do_omap_clear(txc, o->onode.omap_head); + } + o->exists = false; + o->onode = kstore_onode_t(); + txc->onodes.erase(o); + get_object_key(cct, o->oid, &key); + txc->t->rmkey(PREFIX_OBJ, key); + return 0; +} + +int KStore::_remove(TransContext *txc, + CollectionRef& c, + OnodeRef &o) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl; + int r = _do_remove(txc, o); + dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl; + return r; +} + +int KStore::_setattr(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& name, + bufferptr& val) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid + << " " << name << " (" << val.length() << " bytes)" + << dendl; + int r = 0; + o->onode.attrs[name] = val; + txc->write_onode(o); + dout(10) << __func__ << " " << c->cid << " " << o->oid + << " " << name << " (" << val.length() << " bytes)" + << " = " << r << dendl; + return r; +} + +int KStore::_setattrs(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const map& aset) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid + << " " << aset.size() << " keys" + << dendl; + int r = 0; + for (map::const_iterator p = aset.begin(); + p != aset.end(); ++p) { + if (p->second.is_partial()) + o->onode.attrs[p->first] = bufferptr(p->second.c_str(), p->second.length()); + else + o->onode.attrs[p->first] = p->second; + } + txc->write_onode(o); + dout(10) << __func__ << " " << c->cid << " " << o->oid + << " " << aset.size() << " keys" + << " = " << r << dendl; + return r; +} + + +int KStore::_rmattr(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& name) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid + << " " << name << dendl; + int r = 0; + o->onode.attrs.erase(name); + txc->write_onode(o); + dout(10) << __func__ << " " << c->cid << " " << o->oid + << " " << name << " = " << r << dendl; + return r; +} + +int KStore::_rmattrs(TransContext *txc, + CollectionRef& c, + OnodeRef& o) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl; + int r = 0; + o->onode.attrs.clear(); + txc->write_onode(o); + dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl; + return r; +} + +void KStore::_do_omap_clear(TransContext *txc, uint64_t id) +{ + KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP); + string prefix, tail; + get_omap_header(id, &prefix); + get_omap_tail(id, &tail); + it->lower_bound(prefix); + while (it->valid()) { + if (it->key() >= tail) { + dout(30) << __func__ << " stop at " << tail << dendl; + break; + } + txc->t->rmkey(PREFIX_OMAP, it->key()); + dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl; + it->next(); + } +} + +int KStore::_omap_clear(TransContext *txc, + CollectionRef& c, + OnodeRef& o) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl; + int r = 0; + if (o->onode.omap_head != 0) { + _do_omap_clear(txc, o->onode.omap_head); + } + dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl; + return r; +} + +int KStore::_omap_setkeys(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + bufferlist &bl) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl; + int r; + auto p = bl.cbegin(); + __u32 num; + if (!o->onode.omap_head) { + o->onode.omap_head = o->onode.nid; + txc->write_onode(o); + } + decode(num, p); + while (num--) { + string key; + bufferlist value; + decode(key, p); + decode(value, p); + string final_key; + get_omap_key(o->onode.omap_head, key, &final_key); + dout(30) << __func__ << " " << pretty_binary_string(final_key) + << " <- " << key << dendl; + txc->t->set(PREFIX_OMAP, final_key, value); + } + r = 0; + dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl; + return r; +} + +int KStore::_omap_setheader(TransContext *txc, + CollectionRef& c, + OnodeRef &o, + bufferlist& bl) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl; + int r; + string key; + if (!o->onode.omap_head) { + o->onode.omap_head = o->onode.nid; + txc->write_onode(o); + } + get_omap_header(o->onode.omap_head, &key); + txc->t->set(PREFIX_OMAP, key, bl); + r = 0; + dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl; + return r; +} + +int KStore::_omap_rmkeys(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const bufferlist& bl) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl; + int r = 0; + auto p = bl.cbegin(); + __u32 num; + + if (!o->onode.omap_head) { + r = 0; + goto out; + } + decode(num, p); + while (num--) { + string key; + decode(key, p); + string final_key; + get_omap_key(o->onode.omap_head, key, &final_key); + dout(30) << __func__ << " rm " << pretty_binary_string(final_key) + << " <- " << key << dendl; + txc->t->rmkey(PREFIX_OMAP, final_key); + } + r = 0; + + out: + dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl; + return r; +} + +int KStore::_omap_rmkey_range(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& first, const string& last) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl; + KeyValueDB::Iterator it; + string key_first, key_last; + int r = 0; + + if (!o->onode.omap_head) { + goto out; + } + it = db->get_iterator(PREFIX_OMAP); + get_omap_key(o->onode.omap_head, first, &key_first); + get_omap_key(o->onode.omap_head, last, &key_last); + it->lower_bound(key_first); + while (it->valid()) { + if (it->key() >= key_last) { + dout(30) << __func__ << " stop at " << pretty_binary_string(key_last) + << dendl; + break; + } + txc->t->rmkey(PREFIX_OMAP, it->key()); + dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl; + it->next(); + } + r = 0; + + out: + dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl; + return r; +} + +int KStore::_setallochint(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t expected_object_size, + uint64_t expected_write_size, + uint32_t flags) +{ + dout(15) << __func__ << " " << c->cid << " " << o->oid + << " object_size " << expected_object_size + << " write_size " << expected_write_size + << " flags " << flags + << dendl; + int r = 0; + o->onode.expected_object_size = expected_object_size; + o->onode.expected_write_size = expected_write_size; + o->onode.alloc_hint_flags = flags; + + txc->write_onode(o); + dout(10) << __func__ << " " << c->cid << " " << o->oid + << " object_size " << expected_object_size + << " write_size " << expected_write_size + << " = " << r << dendl; + return r; +} + +int KStore::_clone(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo) +{ + dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> " + << newo->oid << dendl; + int r = 0; + if (oldo->oid.hobj.get_hash() != newo->oid.hobj.get_hash()) { + derr << __func__ << " mismatched hash on " << oldo->oid + << " and " << newo->oid << dendl; + return -EINVAL; + } + + bufferlist bl; + newo->exists = true; + _assign_nid(txc, newo); + + // data + oldo->flush(); + + r = _do_read(oldo, 0, oldo->onode.size, bl, 0); + if (r < 0) + goto out; + + // truncate any old data + r = _do_truncate(txc, newo, 0); + if (r < 0) + goto out; + + r = _do_write(txc, newo, 0, oldo->onode.size, bl, 0); + if (r < 0) + goto out; + + newo->onode.attrs = oldo->onode.attrs; + + // clone omap + if (newo->onode.omap_head) { + dout(20) << __func__ << " clearing old omap data" << dendl; + _do_omap_clear(txc, newo->onode.omap_head); + } + if (oldo->onode.omap_head) { + dout(20) << __func__ << " copying omap data" << dendl; + if (!newo->onode.omap_head) { + newo->onode.omap_head = newo->onode.nid; + } + KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP); + string head, tail; + get_omap_header(oldo->onode.omap_head, &head); + get_omap_tail(oldo->onode.omap_head, &tail); + it->lower_bound(head); + while (it->valid()) { + string key; + if (it->key() >= tail) { + dout(30) << __func__ << " reached tail" << dendl; + break; + } else { + dout(30) << __func__ << " got header/data " + << pretty_binary_string(it->key()) << dendl; + ceph_assert(it->key() < tail); + rewrite_omap_key(newo->onode.omap_head, it->key(), &key); + txc->t->set(PREFIX_OMAP, key, it->value()); + } + it->next(); + } + } + + txc->write_onode(newo); + r = 0; + + out: + dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> " + << newo->oid << " = " << r << dendl; + return r; +} + +int KStore::_clone_range(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo, + uint64_t srcoff, uint64_t length, uint64_t dstoff) +{ + dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> " + << newo->oid << " from " << srcoff << "~" << length + << " to offset " << dstoff << dendl; + int r = 0; + + bufferlist bl; + newo->exists = true; + _assign_nid(txc, newo); + + r = _do_read(oldo, srcoff, length, bl, 0); + if (r < 0) + goto out; + + r = _do_write(txc, newo, dstoff, bl.length(), bl, 0); + if (r < 0) + goto out; + + txc->write_onode(newo); + + r = 0; + + out: + dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> " + << newo->oid << " from " << srcoff << "~" << length + << " to offset " << dstoff + << " = " << r << dendl; + return r; +} + +int KStore::_rename(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo, + const ghobject_t& new_oid) +{ + dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> " + << new_oid << dendl; + int r; + ghobject_t old_oid = oldo->oid; + bufferlist bl; + string old_key, new_key; + + if (newo && newo->exists) { + // destination object already exists, remove it first + r = _do_remove(txc, newo); + if (r < 0) + goto out; + } + + txc->t->rmkey(PREFIX_OBJ, oldo->key); + txc->write_onode(oldo); + c->onode_map.rename(old_oid, new_oid); // this adjusts oldo->{oid,key} + r = 0; + + out: + dout(10) << __func__ << " " << c->cid << " " << old_oid << " -> " + << new_oid << " = " << r << dendl; + return r; +} + +// collections + +int KStore::_create_collection( + TransContext *txc, + coll_t cid, + unsigned bits, + CollectionRef *c) +{ + dout(15) << __func__ << " " << cid << " bits " << bits << dendl; + int r; + bufferlist bl; + + { + RWLock::WLocker l(coll_lock); + if (*c) { + r = -EEXIST; + goto out; + } + auto p = new_coll_map.find(cid); + ceph_assert(p != new_coll_map.end()); + *c = p->second; + ceph_assert((*c)->cid == cid); + (*c)->cnode.bits = bits; + coll_map[cid] = *c; + new_coll_map.erase(p); + } + encode((*c)->cnode, bl); + txc->t->set(PREFIX_COLL, stringify(cid), bl); + r = 0; + + out: + dout(10) << __func__ << " " << cid << " bits " << bits << " = " << r << dendl; + return r; +} + +int KStore::_remove_collection(TransContext *txc, coll_t cid, + CollectionRef *c) +{ + dout(15) << __func__ << " " << cid << dendl; + int r; + + { + RWLock::WLocker l(coll_lock); + if (!*c) { + r = -ENOENT; + goto out; + } + size_t nonexistent_count = 0; + pair next_onode; + while ((*c)->onode_map.get_next(next_onode.first, &next_onode)) { + if (next_onode.second->exists) { + r = -ENOTEMPTY; + goto out; + } + ++nonexistent_count; + } + vector ls; + ghobject_t next; + // Enumerate onodes in db, up to nonexistent_count + 1 + // then check if all of them are marked as non-existent. + // Bypass the check if returned number is greater than nonexistent_count + r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(), + nonexistent_count + 1, &ls, &next); + if (r >= 0) { + bool exists = false; //ls.size() > nonexistent_count; + for (auto it = ls.begin(); !exists && it < ls.end(); ++it) { + dout(10) << __func__ << " oid " << *it << dendl; + auto onode = (*c)->onode_map.lookup(*it); + exists = !onode || onode->exists; + if (exists) { + dout(10) << __func__ << " " << *it + << " exists in db" << dendl; + } + } + if (!exists) { + coll_map.erase(cid); + txc->removed_collections.push_back(*c); + c->reset(); + txc->t->rmkey(PREFIX_COLL, stringify(cid)); + r = 0; + } else { + dout(10) << __func__ << " " << cid + << " is non-empty" << dendl; + r = -ENOTEMPTY; + } + } + } + + out: + dout(10) << __func__ << " " << cid << " = " << r << dendl; + return r; +} + +int KStore::_split_collection(TransContext *txc, + CollectionRef& c, + CollectionRef& d, + unsigned bits, int rem) +{ + dout(15) << __func__ << " " << c->cid << " to " << d->cid << " " + << " bits " << bits << dendl; + int r; + RWLock::WLocker l(c->lock); + RWLock::WLocker l2(d->lock); + c->onode_map.clear(); + d->onode_map.clear(); + c->cnode.bits = bits; + ceph_assert(d->cnode.bits == bits); + r = 0; + + bufferlist bl; + encode(c->cnode, bl); + txc->t->set(PREFIX_COLL, stringify(c->cid), bl); + + dout(10) << __func__ << " " << c->cid << " to " << d->cid << " " + << " bits " << bits << " = " << r << dendl; + return r; +} + +int KStore::_merge_collection(TransContext *txc, + CollectionRef *c, + CollectionRef& d, + unsigned bits) +{ + dout(15) << __func__ << " " << (*c)->cid << " to " << d->cid << " " + << " bits " << bits << dendl; + int r; + RWLock::WLocker l((*c)->lock); + RWLock::WLocker l2(d->lock); + (*c)->onode_map.clear(); + d->onode_map.clear(); + d->cnode.bits = bits; + r = 0; + + coll_t cid = (*c)->cid; + + bufferlist bl; + encode(d->cnode, bl); + txc->t->set(PREFIX_COLL, stringify(d->cid), bl); + + coll_map.erase((*c)->cid); + txc->removed_collections.push_back(*c); + c->reset(); + txc->t->rmkey(PREFIX_COLL, stringify(cid)); + + dout(10) << __func__ << " " << cid << " to " << d->cid << " " + << " bits " << bits << " = " << r << dendl; + return r; +} + +// =========================================== diff --git a/src/os/kstore/KStore.h b/src/os/kstore/KStore.h new file mode 100644 index 00000000..227227fb --- /dev/null +++ b/src/os/kstore/KStore.h @@ -0,0 +1,692 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_OSD_KSTORE_H +#define CEPH_OSD_KSTORE_H + +#include "acconfig.h" + +#include + +#include +#include +#include + +#include "include/ceph_assert.h" +#include "include/unordered_map.h" +#include "common/Finisher.h" +#include "common/RWLock.h" +#include "common/Throttle.h" +#include "common/WorkQueue.h" +#include "os/ObjectStore.h" +#include "common/perf_counters.h" +#include "os/fs/FS.h" +#include "kv/KeyValueDB.h" + +#include "kstore_types.h" + +#include "boost/intrusive/list.hpp" + +enum { + l_kstore_first = 832430, + l_kstore_state_prepare_lat, + l_kstore_state_kv_queued_lat, + l_kstore_state_kv_done_lat, + l_kstore_state_finishing_lat, + l_kstore_state_done_lat, + l_kstore_last +}; + +class KStore : public ObjectStore { + // ----------------------------------------------------- + // types +public: + + class TransContext; + + /// an in-memory object + struct Onode { + CephContext* cct; + std::atomic_int nref; ///< reference count + + ghobject_t oid; + string key; ///< key under PREFIX_OBJ where we are stored + boost::intrusive::list_member_hook<> lru_item; + + kstore_onode_t onode; ///< metadata stored as value in kv store + bool dirty; // ??? + bool exists; + + std::mutex flush_lock; ///< protect flush_txns + std::condition_variable flush_cond; ///< wait here for unapplied txns + set flush_txns; ///< committing txns + + uint64_t tail_offset; + bufferlist tail_bl; + + map pending_stripes; ///< unwritten stripes + + Onode(CephContext* cct, const ghobject_t& o, const string& k) + : cct(cct), + nref(0), + oid(o), + key(k), + dirty(false), + exists(false), + tail_offset(0) { + } + + void flush(); + void get() { + ++nref; + } + void put() { + if (--nref == 0) + delete this; + } + + void clear_tail() { + tail_offset = 0; + tail_bl.clear(); + } + void clear_pending_stripes() { + pending_stripes.clear(); + } + }; + typedef boost::intrusive_ptr OnodeRef; + + struct OnodeHashLRU { + CephContext* cct; + typedef boost::intrusive::list< + Onode, + boost::intrusive::member_hook< + Onode, + boost::intrusive::list_member_hook<>, + &Onode::lru_item> > lru_list_t; + + std::mutex lock; + ceph::unordered_map onode_map; ///< forward lookups + lru_list_t lru; ///< lru + + OnodeHashLRU(CephContext* cct) : cct(cct) {} + + void add(const ghobject_t& oid, OnodeRef o); + void _touch(OnodeRef o); + OnodeRef lookup(const ghobject_t& o); + void rename(const ghobject_t& old_oid, const ghobject_t& new_oid); + void clear(); + bool get_next(const ghobject_t& after, pair *next); + int trim(int max=-1); + }; + + class OpSequencer; + typedef boost::intrusive_ptr OpSequencerRef; + + struct Collection : public CollectionImpl { + KStore *store; + kstore_cnode_t cnode; + RWLock lock; + + OpSequencerRef osr; + + // cache onodes on a per-collection basis to avoid lock + // contention. + OnodeHashLRU onode_map; + + OnodeRef get_onode(const ghobject_t& oid, bool create); + + bool contains(const ghobject_t& oid) { + if (cid.is_meta()) + return oid.hobj.pool == -1; + spg_t spgid; + if (cid.is_pg(&spgid)) + return + spgid.pgid.contains(cnode.bits, oid) && + oid.shard_id == spgid.shard; + return false; + } + + void flush() override; + bool flush_commit(Context *c) override; + + Collection(KStore *ns, coll_t c); + }; + typedef boost::intrusive_ptr CollectionRef; + + class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl { + CollectionRef c; + OnodeRef o; + KeyValueDB::Iterator it; + string head, tail; + public: + OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it); + int seek_to_first() override; + int upper_bound(const string &after) override; + int lower_bound(const string &to) override; + bool valid() override; + int next() override; + string key() override; + bufferlist value() override; + int status() override { + return 0; + } + }; + + struct TransContext { + typedef enum { + STATE_PREPARE, + STATE_AIO_WAIT, + STATE_IO_DONE, + STATE_KV_QUEUED, + STATE_KV_COMMITTING, + STATE_KV_DONE, + STATE_FINISHING, + STATE_DONE, + } state_t; + + state_t state; + + const char *get_state_name() { + switch (state) { + case STATE_PREPARE: return "prepare"; + case STATE_AIO_WAIT: return "aio_wait"; + case STATE_IO_DONE: return "io_done"; + case STATE_KV_QUEUED: return "kv_queued"; + case STATE_KV_COMMITTING: return "kv_committing"; + case STATE_KV_DONE: return "kv_done"; + case STATE_FINISHING: return "finishing"; + case STATE_DONE: return "done"; + } + return "???"; + } + + void log_state_latency(PerfCounters *logger, int state) { + utime_t lat, now = ceph_clock_now(); + lat = now - start; + logger->tinc(state, lat); + start = now; + } + + CollectionRef ch; + OpSequencerRef osr; + boost::intrusive::list_member_hook<> sequencer_item; + + uint64_t ops, bytes; + + set onodes; ///< these onodes need to be updated/written + KeyValueDB::Transaction t; ///< then we will commit this + Context *oncommit; ///< signal on commit + Context *onreadable; ///< signal on readable + Context *onreadable_sync; ///< signal on readable + list oncommits; ///< more commit completions + list removed_collections; ///< colls we removed + + CollectionRef first_collection; ///< first referenced collection + utime_t start; + explicit TransContext(OpSequencer *o) + : state(STATE_PREPARE), + osr(o), + ops(0), + bytes(0), + oncommit(NULL), + onreadable(NULL), + onreadable_sync(NULL), + start(ceph_clock_now()){ + //cout << "txc new " << this << std::endl; + } + ~TransContext() { + //cout << "txc del " << this << std::endl; + } + + void write_onode(OnodeRef &o) { + onodes.insert(o); + } + }; + + class OpSequencer : public RefCountedObject { + public: + std::mutex qlock; + std::condition_variable qcond; + typedef boost::intrusive::list< + TransContext, + boost::intrusive::member_hook< + TransContext, + boost::intrusive::list_member_hook<>, + &TransContext::sequencer_item> > q_list_t; + q_list_t q; ///< transactions + + ~OpSequencer() { + ceph_assert(q.empty()); + } + + void queue_new(TransContext *txc) { + std::lock_guard l(qlock); + q.push_back(*txc); + } + + void flush() { + std::unique_lock l(qlock); + while (!q.empty()) + qcond.wait(l); + } + + bool flush_commit(Context *c) { + std::lock_guard l(qlock); + if (q.empty()) { + return true; + } + TransContext *txc = &q.back(); + if (txc->state >= TransContext::STATE_KV_DONE) { + return true; + } + ceph_assert(txc->state < TransContext::STATE_KV_DONE); + txc->oncommits.push_back(c); + return false; + } + }; + + struct KVSyncThread : public Thread { + KStore *store; + explicit KVSyncThread(KStore *s) : store(s) {} + void *entry() override { + store->_kv_sync_thread(); + return NULL; + } + }; + + // -------------------------------------------------------- + // members +private: + KeyValueDB *db; + uuid_d fsid; + string basedir; + int path_fd; ///< open handle to $path + int fsid_fd; ///< open handle (locked) to $path/fsid + bool mounted; + + RWLock coll_lock; ///< rwlock to protect coll_map + ceph::unordered_map coll_map; + map new_coll_map; + + std::mutex nid_lock; + uint64_t nid_last; + uint64_t nid_max; + + Throttle throttle_ops, throttle_bytes; ///< submit to commit + + Finisher finisher; + + KVSyncThread kv_sync_thread; + std::mutex kv_lock; + std::condition_variable kv_cond, kv_sync_cond; + bool kv_stop; + deque kv_queue, kv_committing; + + //Logger *logger; + PerfCounters *logger; + std::mutex reap_lock; + list removed_collections; + + + // -------------------------------------------------------- + // private methods + + void _init_logger(); + void _shutdown_logger(); + + int _open_path(); + void _close_path(); + int _open_fsid(bool create); + int _lock_fsid(); + int _read_fsid(uuid_d *f); + int _write_fsid(); + void _close_fsid(); + int _open_db(bool create); + void _close_db(); + int _open_collections(int *errors=0); + void _close_collections(); + + int _open_super_meta(); + + CollectionRef _get_collection(coll_t cid); + void _queue_reap_collection(CollectionRef& c); + void _reap_collections(); + + void _assign_nid(TransContext *txc, OnodeRef o); + + void _dump_onode(OnodeRef o); + + TransContext *_txc_create(OpSequencer *osr); + void _txc_release(TransContext *txc, uint64_t offset, uint64_t length); + void _txc_add_transaction(TransContext *txc, Transaction *t); + void _txc_finalize(OpSequencer *osr, TransContext *txc); + void _txc_state_proc(TransContext *txc); + void _txc_finish_kv(TransContext *txc); + void _txc_finish(TransContext *txc); + + void _osr_reap_done(OpSequencer *osr); + + void _kv_sync_thread(); + void _kv_stop() { + { + std::lock_guard l(kv_lock); + kv_stop = true; + kv_cond.notify_all(); + } + kv_sync_thread.join(); + kv_stop = false; + } + + void _do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl); + void _do_write_stripe(TransContext *txc, OnodeRef o, + uint64_t offset, bufferlist& bl); + void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset); + + int _collection_list( + Collection *c, const ghobject_t& start, const ghobject_t& end, + int max, vector *ls, ghobject_t *next); + +public: + KStore(CephContext *cct, const string& path); + ~KStore() override; + + string get_type() override { + return "kstore"; + } + + bool needs_journal() override { return false; }; + bool wants_journal() override { return false; }; + bool allows_journal() override { return false; }; + + static int get_block_device_fsid(const string& path, uuid_d *fsid); + + bool test_mount_in_use() override; + + int mount() override; + int umount() override; + void _sync(); + + int fsck(bool deep) override; + + + int validate_hobject_key(const hobject_t &obj) const override { + return 0; + } + unsigned get_max_attr_name_length() override { + return 256; // arbitrary; there is no real limit internally + } + + int mkfs() override; + int mkjournal() override { + return 0; + } + void dump_perf_counters(Formatter *f) override { + f->open_object_section("perf_counters"); + logger->dump_formatted(f, false); + f->close_section(); + } + void get_db_statistics(Formatter *f) override { + db->get_statistics(f); + } + int statfs(struct store_statfs_t *buf, + osd_alert_list_t* alerts = nullptr) override; + int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf) override; + + CollectionHandle open_collection(const coll_t& c) override; + CollectionHandle create_new_collection(const coll_t& c) override; + void set_collection_commit_queue(const coll_t& cid, + ContextQueue *commit_queue) override { + } + + using ObjectStore::exists; + bool exists(CollectionHandle& c, const ghobject_t& oid) override; + using ObjectStore::stat; + int stat( + CollectionHandle& c, + const ghobject_t& oid, + struct stat *st, + bool allow_eio = false) override; // struct stat? + int set_collection_opts( + CollectionHandle& c, + const pool_opts_t& opts) override; + using ObjectStore::read; + int read( + CollectionHandle& c, + const ghobject_t& oid, + uint64_t offset, + size_t len, + bufferlist& bl, + uint32_t op_flags = 0) override; + int _do_read( + OnodeRef o, + uint64_t offset, + size_t len, + bufferlist& bl, + uint32_t op_flags = 0); + + using ObjectStore::fiemap; + int fiemap(CollectionHandle& c, const ghobject_t& oid, uint64_t offset, size_t len, map& destmap) override; + int fiemap(CollectionHandle& c, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& outbl) override; + using ObjectStore::getattr; + int getattr(CollectionHandle& c, const ghobject_t& oid, const char *name, bufferptr& value) override; + using ObjectStore::getattrs; + int getattrs(CollectionHandle& c, const ghobject_t& oid, map& aset) override; + + int list_collections(vector& ls) override; + bool collection_exists(const coll_t& c) override; + int collection_empty(CollectionHandle& c, bool *empty) override; + int collection_bits(CollectionHandle& c) override; + int collection_list( + CollectionHandle &c, const ghobject_t& start, const ghobject_t& end, + int max, + vector *ls, ghobject_t *next) override; + + using ObjectStore::omap_get; + int omap_get( + CollectionHandle& c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + map *out /// < [out] Key to value map + ) override; + + using ObjectStore::omap_get_header; + /// Get omap header + int omap_get_header( + CollectionHandle& c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + bool allow_eio = false ///< [in] don't assert on eio + ) override; + + using ObjectStore::omap_get_keys; + /// Get keys defined on oid + int omap_get_keys( + CollectionHandle& c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + set *keys ///< [out] Keys defined on oid + ) override; + + using ObjectStore::omap_get_values; + /// Get key values + int omap_get_values( + CollectionHandle& c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to get + map *out ///< [out] Returned keys and values + ) override; + + using ObjectStore::omap_check_keys; + /// Filters keys into out which are defined on oid + int omap_check_keys( + CollectionHandle& c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to check + set *out ///< [out] Subset of keys defined on oid + ) override; + + using ObjectStore::get_omap_iterator; + ObjectMap::ObjectMapIterator get_omap_iterator( + CollectionHandle& c, ///< [in] collection + const ghobject_t &oid ///< [in] object + ) override; + + void set_fsid(uuid_d u) override { + fsid = u; + } + uuid_d get_fsid() override { + return fsid; + } + + uint64_t estimate_objects_overhead(uint64_t num_objects) override { + return num_objects * 300; //assuming per-object overhead is 300 bytes + } + + objectstore_perf_stat_t get_cur_stats() override { + return objectstore_perf_stat_t(); + } + const PerfCounters* get_perf_counters() const override { + return logger; + } + + + int queue_transactions( + CollectionHandle& ch, + vector& tls, + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL) override; + + void compact () override { + ceph_assert(db); + db->compact(); + } + +private: + // -------------------------------------------------------- + // write ops + + int _write(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset, size_t len, + bufferlist& bl, + uint32_t fadvise_flags); + int _do_write(TransContext *txc, + OnodeRef o, + uint64_t offset, uint64_t length, + bufferlist& bl, + uint32_t fadvise_flags); + int _touch(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + int _zero(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset, size_t len); + int _do_truncate(TransContext *txc, + OnodeRef o, + uint64_t offset); + int _truncate(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset); + int _remove(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + int _do_remove(TransContext *txc, + OnodeRef o); + int _setattr(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& name, + bufferptr& val); + int _setattrs(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const map& aset); + int _rmattr(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& name); + int _rmattrs(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + void _do_omap_clear(TransContext *txc, uint64_t id); + int _omap_clear(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + int _omap_setkeys(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + bufferlist& bl); + int _omap_setheader(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + bufferlist& header); + int _omap_rmkeys(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const bufferlist& bl); + int _omap_rmkey_range(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& first, const string& last); + int _setallochint(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t expected_object_size, + uint64_t expected_write_size, + uint32_t flags); + int _clone(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo); + int _clone_range(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo, + uint64_t srcoff, uint64_t length, uint64_t dstoff); + int _rename(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo, + const ghobject_t& new_oid); + int _create_collection(TransContext *txc, coll_t cid, unsigned bits, + CollectionRef *c); + int _remove_collection(TransContext *txc, coll_t cid, CollectionRef *c); + int _split_collection(TransContext *txc, + CollectionRef& c, + CollectionRef& d, + unsigned bits, int rem); + int _merge_collection(TransContext *txc, + CollectionRef *c, + CollectionRef& d, + unsigned bits); + +}; + +static inline void intrusive_ptr_add_ref(KStore::Onode *o) { + o->get(); +} +static inline void intrusive_ptr_release(KStore::Onode *o) { + o->put(); +} + +static inline void intrusive_ptr_add_ref(KStore::OpSequencer *o) { + o->get(); +} +static inline void intrusive_ptr_release(KStore::OpSequencer *o) { + o->put(); +} + +#endif diff --git a/src/os/kstore/kstore_types.cc b/src/os/kstore/kstore_types.cc new file mode 100644 index 00000000..07270374 --- /dev/null +++ b/src/os/kstore/kstore_types.cc @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "kstore_types.h" +#include "common/Formatter.h" +#include "include/stringify.h" + +// cnode_t + +void kstore_cnode_t::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + encode(bits, bl); + ENCODE_FINISH(bl); +} + +void kstore_cnode_t::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + decode(bits, p); + DECODE_FINISH(p); +} + +void kstore_cnode_t::dump(Formatter *f) const +{ + f->dump_unsigned("bits", bits); +} + +void kstore_cnode_t::generate_test_instances(list& o) +{ + o.push_back(new kstore_cnode_t()); + o.push_back(new kstore_cnode_t(0)); + o.push_back(new kstore_cnode_t(123)); +} + + +// kstore_onode_t + +void kstore_onode_t::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + encode(nid, bl); + encode(size, bl); + encode(attrs, bl); + encode(omap_head, bl); + encode(stripe_size, bl); + encode(expected_object_size, bl); + encode(expected_write_size, bl); + encode(alloc_hint_flags, bl); + ENCODE_FINISH(bl); +} + +void kstore_onode_t::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + decode(nid, p); + decode(size, p); + decode(attrs, p); + decode(omap_head, p); + decode(stripe_size, p); + decode(expected_object_size, p); + decode(expected_write_size, p); + decode(alloc_hint_flags, p); + DECODE_FINISH(p); +} + +void kstore_onode_t::dump(Formatter *f) const +{ + f->dump_unsigned("nid", nid); + f->dump_unsigned("size", size); + f->open_object_section("attrs"); + for (map::const_iterator p = attrs.begin(); + p != attrs.end(); ++p) { + f->open_object_section("attr"); + f->dump_string("name", p->first); + f->dump_unsigned("len", p->second.length()); + f->close_section(); + } + f->close_section(); + f->dump_unsigned("omap_head", omap_head); + f->dump_unsigned("stripe_size", stripe_size); + f->dump_unsigned("expected_object_size", expected_object_size); + f->dump_unsigned("expected_write_size", expected_write_size); + f->dump_unsigned("alloc_hint_flags", alloc_hint_flags); +} + +void kstore_onode_t::generate_test_instances(list& o) +{ + o.push_back(new kstore_onode_t()); + // FIXME +} diff --git a/src/os/kstore/kstore_types.h b/src/os/kstore/kstore_types.h new file mode 100644 index 00000000..13c33fb6 --- /dev/null +++ b/src/os/kstore/kstore_types.h @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_OSD_KSTORE_TYPES_H +#define CEPH_OSD_KSTORE_TYPES_H + +#include +#include "include/types.h" +#include "include/interval_set.h" +#include "include/utime.h" +#include "common/hobject.h" + +namespace ceph { + class Formatter; +} +/// collection metadata +struct kstore_cnode_t { + uint32_t bits; ///< how many bits of coll pgid are significant + + explicit kstore_cnode_t(int b=0) : bits(b) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + void dump(Formatter *f) const; + static void generate_test_instances(list& o); +}; +WRITE_CLASS_ENCODER(kstore_cnode_t) + +/// onode: per-object metadata +struct kstore_onode_t { + uint64_t nid; ///< numeric id (locally unique) + uint64_t size; ///< object size + map attrs; ///< attrs + uint64_t omap_head; ///< id for omap root node + uint32_t stripe_size; ///< stripe size + + uint32_t expected_object_size; + uint32_t expected_write_size; + uint32_t alloc_hint_flags; + + kstore_onode_t() + : nid(0), + size(0), + omap_head(0), + stripe_size(0), + expected_object_size(0), + expected_write_size(0), + alloc_hint_flags(0) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + void dump(Formatter *f) const; + static void generate_test_instances(list& o); +}; +WRITE_CLASS_ENCODER(kstore_onode_t) + +#endif -- cgit v1.2.3