diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/crimson/os | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
25 files changed, 4243 insertions, 0 deletions
diff --git a/src/crimson/os/CMakeLists.txt b/src/crimson/os/CMakeLists.txt new file mode 100644 index 00000000..264e3525 --- /dev/null +++ b/src/crimson/os/CMakeLists.txt @@ -0,0 +1,7 @@ +add_library(crimson-os + cyan_store.cc + cyan_collection.cc + cyan_object.cc + Transaction.cc) +target_link_libraries(crimson-os + crimson) diff --git a/src/crimson/os/Transaction.cc b/src/crimson/os/Transaction.cc new file mode 100644 index 00000000..acf99c59 --- /dev/null +++ b/src/crimson/os/Transaction.cc @@ -0,0 +1,507 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Transaction.h" +#include "include/denc.h" + +namespace ceph::os +{ + +void Transaction::iterator::decode_attrset_bl(bufferlist *pbl) +{ + using ceph::decode; + auto start = data_bl_p; + __u32 n; + decode(n, data_bl_p); + unsigned len = 4; + while (n--) { + __u32 l; + decode(l, data_bl_p); + data_bl_p.advance(l); + len += 4 + l; + decode(l, data_bl_p); + data_bl_p.advance(l); + len += 4 + l; + } + start.copy(len, *pbl); +} + +void Transaction::iterator::decode_keyset_bl(bufferlist *pbl) +{ + using ceph::decode; + auto start = data_bl_p; + __u32 n; + decode(n, data_bl_p); + unsigned len = 4; + while (n--) { + __u32 l; + decode(l, data_bl_p); + data_bl_p.advance(l); + len += 4 + l; + } + start.copy(len, *pbl); +} + +void Transaction::dump(ceph::Formatter *f) +{ + f->open_array_section("ops"); + iterator i = begin(); + int op_num = 0; + bool stop_looping = false; + while (i.have_op() && !stop_looping) { + Transaction::Op *op = i.decode_op(); + f->open_object_section("op"); + f->dump_int("op_num", op_num); + + switch (op->op) { + case Transaction::OP_NOP: + f->dump_string("op_name", "nop"); + break; + case Transaction::OP_TOUCH: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "touch"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_WRITE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t off = op->off; + uint64_t len = op->len; + bufferlist bl; + i.decode_bl(bl); + f->dump_string("op_name", "write"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_unsigned("length", len); + f->dump_unsigned("offset", off); + f->dump_unsigned("bufferlist length", bl.length()); + } + break; + + case Transaction::OP_ZERO: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t off = op->off; + uint64_t len = op->len; + f->dump_string("op_name", "zero"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_unsigned("offset", off); + f->dump_unsigned("length", len); + } + break; + + case Transaction::OP_TRIMCACHE: + { + // deprecated, no-op + f->dump_string("op_name", "trim_cache"); + } + break; + + case Transaction::OP_TRUNCATE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t off = op->off; + f->dump_string("op_name", "truncate"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_unsigned("offset", off); + } + break; + + case Transaction::OP_REMOVE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "remove"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_SETATTR: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + string name = i.decode_string(); + bufferlist bl; + i.decode_bl(bl); + f->dump_string("op_name", "setattr"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_string("name", name); + f->dump_unsigned("length", bl.length()); + } + break; + + case Transaction::OP_SETATTRS: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + map<string, bufferptr> aset; + i.decode_attrset(aset); + f->dump_string("op_name", "setattrs"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->open_object_section("attr_lens"); + for (map<string,bufferptr>::iterator p = aset.begin(); + p != aset.end(); ++p) { + f->dump_unsigned(p->first.c_str(), p->second.length()); + } + f->close_section(); + } + break; + + case Transaction::OP_RMATTR: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + string name = i.decode_string(); + f->dump_string("op_name", "rmattr"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_string("name", name); + } + break; + + case Transaction::OP_RMATTRS: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "rmattrs"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_CLONE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + ghobject_t noid = i.get_oid(op->dest_oid); + f->dump_string("op_name", "clone"); + f->dump_stream("collection") << cid; + f->dump_stream("src_oid") << oid; + f->dump_stream("dst_oid") << noid; + } + break; + + case Transaction::OP_CLONERANGE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + ghobject_t noid = i.get_oid(op->dest_oid); + uint64_t off = op->off; + uint64_t len = op->len; + f->dump_string("op_name", "clonerange"); + f->dump_stream("collection") << cid; + f->dump_stream("src_oid") << oid; + f->dump_stream("dst_oid") << noid; + f->dump_unsigned("offset", off); + f->dump_unsigned("len", len); + } + break; + + case Transaction::OP_CLONERANGE2: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + ghobject_t noid = i.get_oid(op->dest_oid); + uint64_t srcoff = op->off; + uint64_t len = op->len; + uint64_t dstoff = op->dest_off; + f->dump_string("op_name", "clonerange2"); + f->dump_stream("collection") << cid; + f->dump_stream("src_oid") << oid; + f->dump_stream("dst_oid") << noid; + f->dump_unsigned("src_offset", srcoff); + f->dump_unsigned("len", len); + f->dump_unsigned("dst_offset", dstoff); + } + break; + + case Transaction::OP_MKCOLL: + { + coll_t cid = i.get_cid(op->cid); + f->dump_string("op_name", "mkcoll"); + f->dump_stream("collection") << cid; + } + break; + + case Transaction::OP_COLL_HINT: + { + using ceph::decode; + coll_t cid = i.get_cid(op->cid); + uint32_t type = op->hint_type; + f->dump_string("op_name", "coll_hint"); + f->dump_stream("collection") << cid; + f->dump_unsigned("type", type); + bufferlist hint; + i.decode_bl(hint); + auto hiter = hint.cbegin(); + if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) { + uint32_t pg_num; + uint64_t num_objs; + decode(pg_num, hiter); + decode(num_objs, hiter); + f->dump_unsigned("pg_num", pg_num); + f->dump_unsigned("expected_num_objects", num_objs); + } + } + break; + + case Transaction::OP_COLL_SET_BITS: + { + coll_t cid = i.get_cid(op->cid); + f->dump_string("op_name", "coll_set_bits"); + f->dump_stream("collection") << cid; + f->dump_unsigned("bits", op->split_bits); + } + break; + + case Transaction::OP_RMCOLL: + { + coll_t cid = i.get_cid(op->cid); + f->dump_string("op_name", "rmcoll"); + f->dump_stream("collection") << cid; + } + break; + + case Transaction::OP_COLL_ADD: + { + coll_t ocid = i.get_cid(op->cid); + coll_t ncid = i.get_cid(op->dest_cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "collection_add"); + f->dump_stream("src_collection") << ocid; + f->dump_stream("dst_collection") << ncid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_COLL_REMOVE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "collection_remove"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_COLL_MOVE: + { + coll_t ocid = i.get_cid(op->cid); + coll_t ncid = i.get_cid(op->dest_cid); + ghobject_t oid = i.get_oid(op->oid); + f->open_object_section("collection_move"); + f->dump_stream("src_collection") << ocid; + f->dump_stream("dst_collection") << ncid; + f->dump_stream("oid") << oid; + f->close_section(); + } + break; + + case Transaction::OP_COLL_SETATTR: + { + coll_t cid = i.get_cid(op->cid); + string name = i.decode_string(); + bufferlist bl; + i.decode_bl(bl); + f->dump_string("op_name", "collection_setattr"); + f->dump_stream("collection") << cid; + f->dump_string("name", name); + f->dump_unsigned("length", bl.length()); + } + break; + + case Transaction::OP_COLL_RMATTR: + { + coll_t cid = i.get_cid(op->cid); + string name = i.decode_string(); + f->dump_string("op_name", "collection_rmattr"); + f->dump_stream("collection") << cid; + f->dump_string("name", name); + } + break; + + case Transaction::OP_COLL_RENAME: + { + f->dump_string("op_name", "collection_rename"); + } + break; + + case Transaction::OP_OMAP_CLEAR: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "omap_clear"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_OMAP_SETKEYS: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + map<string, bufferlist> aset; + i.decode_attrset(aset); + f->dump_string("op_name", "omap_setkeys"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->open_object_section("attr_lens"); + for (map<string, bufferlist>::iterator p = aset.begin(); + p != aset.end(); ++p) { + f->dump_unsigned(p->first.c_str(), p->second.length()); + } + f->close_section(); + } + break; + + case Transaction::OP_OMAP_RMKEYS: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + set<string> keys; + i.decode_keyset(keys); + f->dump_string("op_name", "omap_rmkeys"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->open_array_section("attrs"); + for (auto& k : keys) { + f->dump_string("", k.c_str()); + } + f->close_section(); + } + break; + + case Transaction::OP_OMAP_SETHEADER: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + bufferlist bl; + i.decode_bl(bl); + f->dump_string("op_name", "omap_setheader"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_stream("header_length") << bl.length(); + } + break; + + case Transaction::OP_SPLIT_COLLECTION: + { + coll_t cid = i.get_cid(op->cid); + uint32_t bits = op->split_bits; + uint32_t rem = op->split_rem; + coll_t dest = i.get_cid(op->dest_cid); + f->dump_string("op_name", "op_split_collection_create"); + f->dump_stream("collection") << cid; + f->dump_stream("bits") << bits; + f->dump_stream("rem") << rem; + f->dump_stream("dest") << dest; + } + break; + + case Transaction::OP_SPLIT_COLLECTION2: + { + coll_t cid = i.get_cid(op->cid); + uint32_t bits = op->split_bits; + uint32_t rem = op->split_rem; + coll_t dest = i.get_cid(op->dest_cid); + f->dump_string("op_name", "op_split_collection"); + f->dump_stream("collection") << cid; + f->dump_stream("bits") << bits; + f->dump_stream("rem") << rem; + f->dump_stream("dest") << dest; + } + break; + + case Transaction::OP_MERGE_COLLECTION: + { + coll_t cid = i.get_cid(op->cid); + uint32_t bits = op->split_bits; + coll_t dest = i.get_cid(op->dest_cid); + f->dump_string("op_name", "op_merge_collection"); + f->dump_stream("collection") << cid; + f->dump_stream("dest") << dest; + f->dump_stream("bits") << bits; + } + break; + + case Transaction::OP_OMAP_RMKEYRANGE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + string first, last; + first = i.decode_string(); + last = i.decode_string(); + f->dump_string("op_name", "op_omap_rmkeyrange"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_string("first", first); + f->dump_string("last", last); + } + break; + + case Transaction::OP_COLL_MOVE_RENAME: + { + coll_t old_cid = i.get_cid(op->cid); + ghobject_t old_oid = i.get_oid(op->oid); + coll_t new_cid = i.get_cid(op->dest_cid); + ghobject_t new_oid = i.get_oid(op->dest_oid); + f->dump_string("op_name", "op_coll_move_rename"); + f->dump_stream("old_collection") << old_cid; + f->dump_stream("old_oid") << old_oid; + f->dump_stream("new_collection") << new_cid; + f->dump_stream("new_oid") << new_oid; + } + break; + + case Transaction::OP_TRY_RENAME: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t old_oid = i.get_oid(op->oid); + ghobject_t new_oid = i.get_oid(op->dest_oid); + f->dump_string("op_name", "op_coll_move_rename"); + f->dump_stream("collection") << cid; + f->dump_stream("old_oid") << old_oid; + f->dump_stream("new_oid") << new_oid; + } + break; + + case Transaction::OP_SETALLOCHINT: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t expected_object_size = op->expected_object_size; + uint64_t expected_write_size = op->expected_write_size; + f->dump_string("op_name", "op_setallochint"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_stream("expected_object_size") << expected_object_size; + f->dump_stream("expected_write_size") << expected_write_size; + } + break; + + default: + f->dump_string("op_name", "unknown"); + f->dump_unsigned("op_code", op->op); + stop_looping = true; + break; + } + f->close_section(); + op_num++; + } + f->close_section(); +} + +} diff --git a/src/crimson/os/Transaction.h b/src/crimson/os/Transaction.h new file mode 100644 index 00000000..c6f660fe --- /dev/null +++ b/src/crimson/os/Transaction.h @@ -0,0 +1,1234 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <map> + +#include "include/int_types.h" +#include "include/buffer.h" +#include "osd/osd_types.h" + +/********************************* + * transaction + * + * A Transaction represents a sequence of primitive mutation + * operations. + * + * Three events in the life of a Transaction result in + * callbacks. Any Transaction can contain any number of callback + * objects (Context) for any combination of the three classes of + * callbacks: + * + * on_applied_sync, on_applied, and on_commit. + * + * The "on_applied" and "on_applied_sync" callbacks are invoked when + * the modifications requested by the Transaction are visible to + * subsequent ObjectStore operations, i.e., the results are + * readable. The only conceptual difference between on_applied and + * on_applied_sync is the specific thread and locking environment in + * which the callbacks operate. "on_applied_sync" is called + * directly by an ObjectStore execution thread. It is expected to + * execute quickly and must not acquire any locks of the calling + * environment. Conversely, "on_applied" is called from the separate + * Finisher thread, meaning that it can contend for calling + * environment locks. NB, on_applied and on_applied_sync are + * sometimes called on_readable and on_readable_sync. + * + * The "on_commit" callback is also called from the Finisher thread + * and indicates that all of the mutations have been durably + * committed to stable storage (i.e., are now software/hardware + * crashproof). + * + * At the implementation level, each mutation primitive (and its + * associated data) can be serialized to a single buffer. That + * serialization, however, does not copy any data, but (using the + * bufferlist library) will reference the original buffers. This + * implies that the buffer that contains the data being submitted + * must remain stable until the on_commit callback completes. In + * practice, bufferlist handles all of this for you and this + * subtlety is only relevant if you are referencing an existing + * buffer via buffer::raw_static. + * + * Some implementations of ObjectStore choose to implement their own + * form of journaling that uses the serialized form of a + * Transaction. This requires that the encode/decode logic properly + * version itself and handle version upgrades that might change the + * format of the encoded Transaction. This has already happened a + * couple of times and the Transaction object contains some helper + * variables that aid in this legacy decoding: + * + * sobject_encoding detects an older/simpler version of oid + * present in pre-bobtail versions of ceph. use_pool_override + * also detects a situation where the pool of an oid can be + * overridden for legacy operations/buffers. For non-legacy + * implementations of ObjectStore, neither of these fields are + * relevant. + * + * + * TRANSACTION ISOLATION + * + * Except as noted above, isolation is the responsibility of the + * caller. In other words, if any storage element (storage element + * == any of the four portions of an object as described above) is + * altered by a transaction (including deletion), the caller + * promises not to attempt to read that element while the + * transaction is pending (here pending means from the time of + * issuance until the "on_applied_sync" callback has been + * received). Violations of isolation need not be detected by + * ObjectStore and there is no corresponding error mechanism for + * reporting an isolation violation (crashing would be the + * appropriate way to report an isolation violation if detected). + * + * Enumeration operations may violate transaction isolation as + * described above when a storage element is being created or + * deleted as part of a transaction. In this case, ObjectStore is + * allowed to consider the enumeration operation to either precede + * or follow the violating transaction element. In other words, the + * presence/absence of the mutated element in the enumeration is + * entirely at the discretion of ObjectStore. The arbitrary ordering + * applies independently to each transaction element. For example, + * if a transaction contains two mutating elements "create A" and + * "delete B". And an enumeration operation is performed while this + * transaction is pending. It is permissible for ObjectStore to + * report any of the four possible combinations of the existence of + * A and B. + * + */ +namespace ceph::os { +class Transaction { +public: + enum { + OP_NOP = 0, + OP_TOUCH = 9, // cid, oid + OP_WRITE = 10, // cid, oid, offset, len, bl + OP_ZERO = 11, // cid, oid, offset, len + OP_TRUNCATE = 12, // cid, oid, len + OP_REMOVE = 13, // cid, oid + OP_SETATTR = 14, // cid, oid, attrname, bl + OP_SETATTRS = 15, // cid, oid, attrset + OP_RMATTR = 16, // cid, oid, attrname + OP_CLONE = 17, // cid, oid, newoid + OP_CLONERANGE = 18, // cid, oid, newoid, offset, len + OP_CLONERANGE2 = 30, // cid, oid, newoid, srcoff, len, dstoff + + OP_TRIMCACHE = 19, // cid, oid, offset, len **DEPRECATED** + + OP_MKCOLL = 20, // cid + OP_RMCOLL = 21, // cid + OP_COLL_ADD = 22, // cid, oldcid, oid + OP_COLL_REMOVE = 23, // cid, oid + OP_COLL_SETATTR = 24, // cid, attrname, bl + OP_COLL_RMATTR = 25, // cid, attrname + OP_COLL_SETATTRS = 26, // cid, attrset + OP_COLL_MOVE = 8, // newcid, oldcid, oid + + OP_RMATTRS = 28, // cid, oid + OP_COLL_RENAME = 29, // cid, newcid + + OP_OMAP_CLEAR = 31, // cid + OP_OMAP_SETKEYS = 32, // cid, attrset + OP_OMAP_RMKEYS = 33, // cid, keyset + OP_OMAP_SETHEADER = 34, // cid, header + OP_SPLIT_COLLECTION = 35, // cid, bits, destination + OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination + doesn't create the destination */ + OP_OMAP_RMKEYRANGE = 37, // cid, oid, firstkey, lastkey + OP_COLL_MOVE_RENAME = 38, // oldcid, oldoid, newcid, newoid + OP_SETALLOCHINT = 39, // cid, oid, object_size, write_size + OP_COLL_HINT = 40, // cid, type, bl + + OP_TRY_RENAME = 41, // oldcid, oldoid, newoid + + OP_COLL_SET_BITS = 42, // cid, bits + + OP_MERGE_COLLECTION = 43, // cid, destination + }; + + // Transaction hint type + enum { + COLL_HINT_EXPECTED_NUM_OBJECTS = 1, + }; + + struct Op { + ceph_le32 op; + ceph_le32 cid; + ceph_le32 oid; + ceph_le64 off; + ceph_le64 len; + ceph_le32 dest_cid; + ceph_le32 dest_oid; //OP_CLONE, OP_CLONERANGE + ceph_le64 dest_off; //OP_CLONERANGE + union { + struct { + ceph_le32 hint_type; //OP_COLL_HINT + }; + struct { + ceph_le32 alloc_hint_flags; //OP_SETALLOCHINT + }; + }; + ceph_le64 expected_object_size; //OP_SETALLOCHINT + ceph_le64 expected_write_size; //OP_SETALLOCHINT + ceph_le32 split_bits; //OP_SPLIT_COLLECTION2,OP_COLL_SET_BITS, + //OP_MKCOLL + ceph_le32 split_rem; //OP_SPLIT_COLLECTION2 + } __attribute__ ((packed)) ; + + struct TransactionData { + ceph_le64 ops; + ceph_le32 largest_data_len; + ceph_le32 largest_data_off; + ceph_le32 largest_data_off_in_data_bl; + ceph_le32 fadvise_flags; + + TransactionData() noexcept : + ops(init_le64(0)), + largest_data_len(init_le32(0)), + largest_data_off(init_le32(0)), + largest_data_off_in_data_bl(init_le32(0)), + fadvise_flags(init_le32(0)) { } + + // override default move operations to reset default values + TransactionData(TransactionData&& other) noexcept : + ops(other.ops), + largest_data_len(other.largest_data_len), + largest_data_off(other.largest_data_off), + largest_data_off_in_data_bl(other.largest_data_off_in_data_bl), + fadvise_flags(other.fadvise_flags) { + other.ops = 0; + other.largest_data_len = 0; + other.largest_data_off = 0; + other.largest_data_off_in_data_bl = 0; + other.fadvise_flags = 0; + } + TransactionData& operator=(TransactionData&& other) noexcept { + ops = other.ops; + largest_data_len = other.largest_data_len; + largest_data_off = other.largest_data_off; + largest_data_off_in_data_bl = other.largest_data_off_in_data_bl; + fadvise_flags = other.fadvise_flags; + other.ops = 0; + other.largest_data_len = 0; + other.largest_data_off = 0; + other.largest_data_off_in_data_bl = 0; + other.fadvise_flags = 0; + return *this; + } + + TransactionData(const TransactionData& other) = default; + TransactionData& operator=(const TransactionData& other) = default; + + void encode(bufferlist& bl) const { + bl.append((char*)this, sizeof(TransactionData)); + } + void decode(bufferlist::const_iterator &bl) { + bl.copy(sizeof(TransactionData), (char*)this); + } + } __attribute__ ((packed)) ; + +private: + TransactionData data; + + std::map<coll_t, __le32> coll_index; + std::map<ghobject_t, __le32> object_index; + + __le32 coll_id {0}; + __le32 object_id {0}; + + bufferlist data_bl; + bufferlist op_bl; + + std::list<Context *> on_applied; + std::list<Context *> on_commit; + std::list<Context *> on_applied_sync; + +public: + Transaction() = default; + + explicit Transaction(bufferlist::const_iterator &dp) { + decode(dp); + } + explicit Transaction(bufferlist &nbl) { + auto dp = nbl.cbegin(); + decode(dp); + } + + // override default move operations to reset default values + Transaction(Transaction&& other) noexcept : + data(std::move(other.data)), + coll_index(std::move(other.coll_index)), + object_index(std::move(other.object_index)), + coll_id(other.coll_id), + object_id(other.object_id), + data_bl(std::move(other.data_bl)), + op_bl(std::move(other.op_bl)), + on_applied(std::move(other.on_applied)), + on_commit(std::move(other.on_commit)), + on_applied_sync(std::move(other.on_applied_sync)) { + other.coll_id = 0; + other.object_id = 0; + } + + Transaction& operator=(Transaction&& other) noexcept { + data = std::move(other.data); + coll_index = std::move(other.coll_index); + object_index = std::move(other.object_index); + coll_id = other.coll_id; + object_id = other.object_id; + data_bl = std::move(other.data_bl); + op_bl = std::move(other.op_bl); + on_applied = std::move(other.on_applied); + on_commit = std::move(other.on_commit); + on_applied_sync = std::move(other.on_applied_sync); + other.coll_id = 0; + other.object_id = 0; + return *this; + } + + Transaction(const Transaction& other) = default; + Transaction& operator=(const Transaction& other) = default; + + // expose object_index for FileStore::Op's benefit + const map<ghobject_t, __le32>& get_object_index() const { + return object_index; + } + + /* Operations on callback contexts */ + void register_on_applied(Context *c) { + if (!c) return; + on_applied.push_back(c); + } + void register_on_commit(Context *c) { + if (!c) return; + on_commit.push_back(c); + } + void register_on_applied_sync(Context *c) { + if (!c) return; + on_applied_sync.push_back(c); + } + void register_on_complete(Context *c) { + if (!c) return; + RunOnDeleteRef _complete (std::make_shared<RunOnDelete>(c)); + register_on_applied(new ContainerContext<RunOnDeleteRef>(_complete)); + register_on_commit(new ContainerContext<RunOnDeleteRef>(_complete)); + } + bool has_contexts() const { + return + !on_commit.empty() || + !on_applied.empty() || + !on_applied_sync.empty(); + } + + static void collect_contexts(vector<Transaction>& t, + Context **out_on_applied, + Context **out_on_commit, + Context **out_on_applied_sync) { + ceph_assert(out_on_applied); + ceph_assert(out_on_commit); + ceph_assert(out_on_applied_sync); + std::list<Context *> on_applied, on_commit, on_applied_sync; + for (auto& i : t) { + on_applied.splice(on_applied.end(), i.on_applied); + on_commit.splice(on_commit.end(), i.on_commit); + on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync); + } + *out_on_applied = C_Contexts::list_to_context(on_applied); + *out_on_commit = C_Contexts::list_to_context(on_commit); + *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); + } + static void collect_contexts(vector<Transaction>& t, + std::list<Context*> *out_on_applied, + std::list<Context*> *out_on_commit, + std::list<Context*> *out_on_applied_sync) { + ceph_assert(out_on_applied); + ceph_assert(out_on_commit); + ceph_assert(out_on_applied_sync); + for (auto& i : t) { + out_on_applied->splice(out_on_applied->end(), i.on_applied); + out_on_commit->splice(out_on_commit->end(), i.on_commit); + out_on_applied_sync->splice(out_on_applied_sync->end(), + i.on_applied_sync); + } + } + + Context *get_on_applied() { + return C_Contexts::list_to_context(on_applied); + } + Context *get_on_commit() { + return C_Contexts::list_to_context(on_commit); + } + Context *get_on_applied_sync() { + return C_Contexts::list_to_context(on_applied_sync); + } + + void set_fadvise_flags(uint32_t flags) { + data.fadvise_flags = flags; + } + void set_fadvise_flag(uint32_t flag) { + data.fadvise_flags = data.fadvise_flags | flag; + } + uint32_t get_fadvise_flags() { return data.fadvise_flags; } + + void swap(Transaction& other) noexcept { + std::swap(data, other.data); + std::swap(on_applied, other.on_applied); + std::swap(on_commit, other.on_commit); + std::swap(on_applied_sync, other.on_applied_sync); + + std::swap(coll_index, other.coll_index); + std::swap(object_index, other.object_index); + std::swap(coll_id, other.coll_id); + std::swap(object_id, other.object_id); + op_bl.swap(other.op_bl); + data_bl.swap(other.data_bl); + } + + void _update_op(Op* op, + vector<__le32> &cm, + vector<__le32> &om) { + + switch (op->op) { + case OP_NOP: + break; + + case OP_TOUCH: + case OP_REMOVE: + case OP_SETATTR: + case OP_SETATTRS: + case OP_RMATTR: + case OP_RMATTRS: + case OP_COLL_REMOVE: + case OP_OMAP_CLEAR: + case OP_OMAP_SETKEYS: + case OP_OMAP_RMKEYS: + case OP_OMAP_RMKEYRANGE: + case OP_OMAP_SETHEADER: + case OP_WRITE: + case OP_ZERO: + case OP_TRUNCATE: + case OP_SETALLOCHINT: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + break; + + case OP_CLONERANGE2: + case OP_CLONE: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_MKCOLL: + case OP_RMCOLL: + case OP_COLL_SETATTR: + case OP_COLL_RMATTR: + case OP_COLL_SETATTRS: + case OP_COLL_HINT: + case OP_COLL_SET_BITS: + ceph_assert(op->cid < cm.size()); + op->cid = cm[op->cid]; + break; + + case OP_COLL_ADD: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_cid < om.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + op->oid = om[op->oid]; + break; + + case OP_COLL_MOVE_RENAME: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_cid < cm.size()); + ceph_assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_cid = cm[op->dest_cid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_TRY_RENAME: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_SPLIT_COLLECTION2: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->dest_cid < cm.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + break; + + case OP_MERGE_COLLECTION: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->dest_cid < cm.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + break; + + default: + ceph_abort_msg("Unknown OP"); + } + } + void _update_op_bl( + bufferlist& bl, + vector<__le32> &cm, + vector<__le32> &om) { + for (auto& bp : bl.buffers()) { + ceph_assert(bp.length() % sizeof(Op) == 0); + + char* raw_p = const_cast<char*>(bp.c_str()); + char* raw_end = raw_p + bp.length(); + while (raw_p < raw_end) { + _update_op(reinterpret_cast<Op*>(raw_p), cm, om); + raw_p += sizeof(Op); + } + } + } + /// Append the operations of the parameter to this Transaction. Those + /// operations are removed from the parameter Transaction + void append(Transaction& other) { + + data.ops = data.ops + other.data.ops; + if (other.data.largest_data_len > data.largest_data_len) { + data.largest_data_len = other.data.largest_data_len; + data.largest_data_off = other.data.largest_data_off; + data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl; + } + data.fadvise_flags = data.fadvise_flags | other.data.fadvise_flags; + on_applied.splice(on_applied.end(), other.on_applied); + on_commit.splice(on_commit.end(), other.on_commit); + on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync); + + //append coll_index & object_index + vector<__le32> cm(other.coll_index.size()); + map<coll_t, __le32>::iterator coll_index_p; + for (coll_index_p = other.coll_index.begin(); + coll_index_p != other.coll_index.end(); + ++coll_index_p) { + cm[coll_index_p->second] = _get_coll_id(coll_index_p->first); + } + + vector<__le32> om(other.object_index.size()); + map<ghobject_t, __le32>::iterator object_index_p; + for (object_index_p = other.object_index.begin(); + object_index_p != other.object_index.end(); + ++object_index_p) { + om[object_index_p->second] = _get_object_id(object_index_p->first); + } + + //the other.op_bl SHOULD NOT be changes during append operation, + //we use additional bufferlist to avoid this problem + bufferlist other_op_bl; + { + bufferptr other_op_bl_ptr(other.op_bl.length()); + other.op_bl.copy(0, other.op_bl.length(), other_op_bl_ptr.c_str()); + other_op_bl.append(std::move(other_op_bl_ptr)); + } + + //update other_op_bl with cm & om + //When the other is appended to current transaction, all coll_index and + //object_index in other.op_buffer should be updated by new index of the + //combined transaction + _update_op_bl(other_op_bl, cm, om); + + //append op_bl + op_bl.append(other_op_bl); + //append data_bl + data_bl.append(other.data_bl); + } + + /** Inquires about the Transaction as a whole. */ + + /// How big is the encoded Transaction buffer? + uint64_t get_encoded_bytes() { + //layout: data_bl + op_bl + coll_index + object_index + data + + // coll_index size, object_index size and sizeof(transaction_data) + // all here, so they may be computed at compile-time + size_t final_size = sizeof(__u32) * 2 + sizeof(data); + + // coll_index second and object_index second + final_size += (coll_index.size() + object_index.size()) * sizeof(__le32); + + // coll_index first + for (auto p = coll_index.begin(); p != coll_index.end(); ++p) { + final_size += p->first.encoded_size(); + } + + // object_index first + for (auto p = object_index.begin(); p != object_index.end(); ++p) { + final_size += p->first.encoded_size(); + } + + return data_bl.length() + + op_bl.length() + + final_size; + } + + /// Retain old version for regression testing purposes + uint64_t get_encoded_bytes_test() { + using ceph::encode; + //layout: data_bl + op_bl + coll_index + object_index + data + bufferlist bl; + encode(coll_index, bl); + encode(object_index, bl); + + return data_bl.length() + + op_bl.length() + + bl.length() + + sizeof(data); + } + + uint64_t get_num_bytes() { + return get_encoded_bytes(); + } + /// Size of largest data buffer to the "write" operation encountered so far + uint32_t get_data_length() { + return data.largest_data_len; + } + /// offset within the encoded buffer to the start of the largest data buffer + /// that's encoded + uint32_t get_data_offset() { + if (data.largest_data_off_in_data_bl) { + return data.largest_data_off_in_data_bl + + sizeof(__u8) + // encode struct_v + sizeof(__u8) + // encode compat_v + sizeof(__u32) + // encode len + sizeof(__u32); // data_bl len + } + return 0; // none + } + /// offset of buffer as aligned to destination within object. + int get_data_alignment() { + if (!data.largest_data_len) + return 0; + return (0 - get_data_offset()) & ~CEPH_PAGE_MASK; + } + /// Is the Transaction empty (no operations) + bool empty() { + return !data.ops; + } + /// Number of operations in the transaction + int get_num_ops() { + return data.ops; + } + + /** + * iterator + * + * Helper object to parse Transactions. + * + * ObjectStore instances use this object to step down the encoded + * buffer decoding operation codes and parameters as we go. + * + */ + class iterator { + Transaction *t; + + uint64_t ops; + char* op_buffer_p; + + bufferlist::const_iterator data_bl_p; + + public: + vector<coll_t> colls; + vector<ghobject_t> objects; + + private: + explicit iterator(Transaction *t) + : t(t), + data_bl_p(t->data_bl.cbegin()), + colls(t->coll_index.size()), + objects(t->object_index.size()) { + + ops = t->data.ops; + op_buffer_p = t->op_bl.c_str(); + + map<coll_t, __le32>::iterator coll_index_p; + for (coll_index_p = t->coll_index.begin(); + coll_index_p != t->coll_index.end(); + ++coll_index_p) { + colls[coll_index_p->second] = coll_index_p->first; + } + + map<ghobject_t, __le32>::iterator object_index_p; + for (object_index_p = t->object_index.begin(); + object_index_p != t->object_index.end(); + ++object_index_p) { + objects[object_index_p->second] = object_index_p->first; + } + } + + friend class Transaction; + + public: + + bool have_op() { + return ops > 0; + } + Op* decode_op() { + ceph_assert(ops > 0); + + Op* op = reinterpret_cast<Op*>(op_buffer_p); + op_buffer_p += sizeof(Op); + ops--; + + return op; + } + string decode_string() { + using ceph::decode; + string s; + decode(s, data_bl_p); + return s; + } + void decode_bp(bufferptr& bp) { + using ceph::decode; + decode(bp, data_bl_p); + } + void decode_bl(bufferlist& bl) { + using ceph::decode; + decode(bl, data_bl_p); + } + void decode_attrset(map<string,bufferptr>& aset) { + using ceph::decode; + decode(aset, data_bl_p); + } + void decode_attrset(map<string,bufferlist>& aset) { + using ceph::decode; + decode(aset, data_bl_p); + } + void decode_attrset_bl(bufferlist *pbl); + void decode_keyset(set<string> &keys){ + using ceph::decode; + decode(keys, data_bl_p); + } + void decode_keyset_bl(bufferlist *pbl); + + const ghobject_t &get_oid(__le32 oid_id) { + ceph_assert(oid_id < objects.size()); + return objects[oid_id]; + } + const coll_t &get_cid(__le32 cid_id) { + ceph_assert(cid_id < colls.size()); + return colls[cid_id]; + } + uint32_t get_fadvise_flags() const { + return t->get_fadvise_flags(); + } + }; + + iterator begin() { + return iterator(this); + } + +private: + void _build_actions_from_tbl(); + + static constexpr size_t OPS_PER_PTR = 32u; + /** + * Helper functions to encode the various mutation elements of a + * transaction. These are 1:1 with the operation codes (see + * enumeration above). These routines ensure that the + * encoder/creator of a transaction gets the right data in the + * right place. Sadly, there's no corresponding version nor any + * form of seat belts for the decoder. + */ + Op* _get_next_op() { + if (op_bl.get_append_buffer_unused_tail_length() < sizeof(Op)) { + op_bl.reserve(sizeof(Op) * OPS_PER_PTR); + } + // append_hole ensures bptr merging. Even huge number of ops + // shouldn't result in overpopulating bl::_buffers. + char* const p = op_bl.append_hole(sizeof(Op)).c_str(); + memset(p, 0, sizeof(Op)); + return reinterpret_cast<Op*>(p); + } + __le32 _get_coll_id(const coll_t& coll) { + map<coll_t, __le32>::iterator c = coll_index.find(coll); + if (c != coll_index.end()) + return c->second; + + __le32 index_id = coll_id++; + coll_index[coll] = index_id; + return index_id; + } + __le32 _get_object_id(const ghobject_t& oid) { + map<ghobject_t, __le32>::iterator o = object_index.find(oid); + if (o != object_index.end()) + return o->second; + + __le32 index_id = object_id++; + object_index[oid] = index_id; + return index_id; + } + +public: + /// noop. 'nuf said + void nop() { + Op* _op = _get_next_op(); + _op->op = OP_NOP; + data.ops = data.ops + 1; + } + /** + * touch + * + * Ensure the existance of an object in a collection. Create an + * empty object if necessary + */ + void touch(const coll_t& cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_TOUCH; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + /** + * Write data to an offset within an object. If the object is too + * small, it is expanded as needed. It is possible to specify an + * offset beyond the current end of an object and it will be + * expanded as needed. Simple implementations of ObjectStore will + * just zero the data between the old end of the object and the + * newly provided data. More sophisticated implementations of + * ObjectStore will omit the untouched data and store it as a + * "hole" in the file. + * + * Note that a 0-length write does not affect the size of the object. + */ + void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len, + const bufferlist& write_data, uint32_t flags = 0) { + using ceph::encode; + uint32_t orig_len = data_bl.length(); + Op* _op = _get_next_op(); + _op->op = OP_WRITE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->off = off; + _op->len = len; + encode(write_data, data_bl); + + ceph_assert(len == write_data.length()); + data.fadvise_flags = data.fadvise_flags | flags; + if (write_data.length() > data.largest_data_len) { + data.largest_data_len = write_data.length(); + data.largest_data_off = off; + // we are about to + data.largest_data_off_in_data_bl = orig_len + sizeof(__u32); + } + data.ops = data.ops + 1; + } + /** + * zero out the indicated byte range within an object. Some + * ObjectStore instances may optimize this to release the + * underlying storage space. + * + * If the zero range extends beyond the end of the object, the object + * size is extended, just as if we were writing a buffer full of zeros. + * EXCEPT if the length is 0, in which case (just like a 0-length write) + * we do not adjust the object size. + */ + void zero(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len) { + Op* _op = _get_next_op(); + _op->op = OP_ZERO; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->off = off; + _op->len = len; + data.ops = data.ops + 1; + } + /// Discard all data in the object beyond the specified size. + void truncate(const coll_t& cid, const ghobject_t& oid, uint64_t off) { + Op* _op = _get_next_op(); + _op->op = OP_TRUNCATE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->off = off; + data.ops = data.ops + 1; + } + /// Remove an object. All four parts of the object are removed. + void remove(const coll_t& cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_REMOVE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + /// Set an xattr of an object + void setattr(const coll_t& cid, const ghobject_t& oid, + const char* name, + bufferlist& val) { + string n(name); + setattr(cid, oid, n, val); + } + /// Set an xattr of an object + void setattr(const coll_t& cid, const ghobject_t& oid, + const string& s, bufferlist& val) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_SETATTR; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(s, data_bl); + encode(val, data_bl); + data.ops = data.ops + 1; + } + /// Set multiple xattrs of an object + void setattrs(const coll_t& cid, const ghobject_t& oid, + const map<string,bufferptr>& attrset) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_SETATTRS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(attrset, data_bl); + data.ops = data.ops + 1; + } + /// Set multiple xattrs of an object + void setattrs(const coll_t& cid, const ghobject_t& oid, + const map<string,bufferlist>& attrset) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_SETATTRS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(attrset, data_bl); + data.ops = data.ops + 1; + } + /// remove an xattr from an object + void rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) { + string n(name); + rmattr(cid, oid, n); + } + /// remove an xattr from an object + void rmattr(const coll_t& cid, const ghobject_t& oid, const string& s) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_RMATTR; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(s, data_bl); + data.ops = data.ops + 1; + } + /// remove all xattrs from an object + void rmattrs(const coll_t& cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_RMATTRS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + /** + * Clone an object into another object. + * + * Low-cost (e.g., O(1)) cloning (if supported) is best, but + * fallback to an O(n) copy is allowed. All four parts of the + * object are cloned (data, xattrs, omap header, omap + * entries). + * + * The destination named object may already exist, in + * which case its previous contents are discarded. + */ + void clone(const coll_t& cid, const ghobject_t& oid, + const ghobject_t& noid) { + Op* _op = _get_next_op(); + _op->op = OP_CLONE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->dest_oid = _get_object_id(noid); + data.ops = data.ops + 1; + } + /** + * Clone a byte range from one object to another. + * + * The data portion of the destination object receives a copy of a + * portion of the data from the source object. None of the other + * three parts of an object is copied from the source. + * + * The destination object size may be extended to the dstoff + len. + * + * The source range *must* overlap with the source object data. If it does + * not the result is undefined. + */ + void clone_range(const coll_t& cid, const ghobject_t& oid, + const ghobject_t& noid, + uint64_t srcoff, uint64_t srclen, uint64_t dstoff) { + Op* _op = _get_next_op(); + _op->op = OP_CLONERANGE2; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->dest_oid = _get_object_id(noid); + _op->off = srcoff; + _op->len = srclen; + _op->dest_off = dstoff; + data.ops = data.ops + 1; + } + + /// Create the collection + void create_collection(const coll_t& cid, int bits) { + Op* _op = _get_next_op(); + _op->op = OP_MKCOLL; + _op->cid = _get_coll_id(cid); + _op->split_bits = bits; + data.ops = data.ops + 1; + } + + /** + * Give the collection a hint. + * + * @param cid - collection id. + * @param type - hint type. + * @param hint - the hint payload, which contains the customized + * data along with the hint type. + */ + void collection_hint(const coll_t& cid, uint32_t type, const bufferlist& hint) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_COLL_HINT; + _op->cid = _get_coll_id(cid); + _op->hint_type = type; + encode(hint, data_bl); + data.ops = data.ops + 1; + } + + /// remove the collection, the collection must be empty + void remove_collection(const coll_t& cid) { + Op* _op = _get_next_op(); + _op->op = OP_RMCOLL; + _op->cid = _get_coll_id(cid); + data.ops = data.ops + 1; + } + void collection_move(const coll_t& cid, const coll_t &oldcid, + const ghobject_t& oid) + __attribute__ ((deprecated)) { + // NOTE: we encode this as a fixed combo of ADD + REMOVE. they + // always appear together, so this is effectively a single MOVE. + Op* _op = _get_next_op(); + _op->op = OP_COLL_ADD; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oid); + _op->dest_cid = _get_coll_id(cid); + data.ops = data.ops + 1; + + _op = _get_next_op(); + _op->op = OP_COLL_REMOVE; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + void collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid, + const coll_t &cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_COLL_MOVE_RENAME; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oldoid); + _op->dest_cid = _get_coll_id(cid); + _op->dest_oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + void try_rename(const coll_t &cid, const ghobject_t& oldoid, + const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_TRY_RENAME; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oldoid); + _op->dest_oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + + /// Remove omap from oid + void omap_clear( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid ///< [in] Object from which to remove omap + ) { + Op* _op = _get_next_op(); + _op->op = OP_OMAP_CLEAR; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + /// Set keys on oid omap. Replaces duplicate keys. + void omap_setkeys( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object to update + const map<string, bufferlist> &attrset ///< [in] Replacement keys and values + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_SETKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(attrset, data_bl); + data.ops = data.ops + 1; + } + + /// Set keys on an oid omap (bufferlist variant). + void omap_setkeys( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object to update + const bufferlist &attrset_bl ///< [in] Replacement keys and values + ) { + Op* _op = _get_next_op(); + _op->op = OP_OMAP_SETKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data_bl.append(attrset_bl); + data.ops = data.ops + 1; + } + + /// Remove keys from oid omap + void omap_rmkeys( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object from which to remove the omap + const set<string> &keys ///< [in] Keys to clear + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(keys, data_bl); + data.ops = data.ops + 1; + } + + /// Remove keys from oid omap + void omap_rmkeys( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object from which to remove the omap + const bufferlist &keys_bl ///< [in] Keys to clear + ) { + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data_bl.append(keys_bl); + data.ops = data.ops + 1; + } + + /// Remove key range from oid omap + void omap_rmkeyrange( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object from which to remove the omap keys + const string& first, ///< [in] first key in range + const string& last ///< [in] first key past range, range is [first,last) + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYRANGE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(first, data_bl); + encode(last, data_bl); + data.ops = data.ops + 1; + } + + /// Set omap header + void omap_setheader( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object + const bufferlist &bl ///< [in] Header value + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_SETHEADER; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(bl, data_bl); + data.ops = data.ops + 1; + } + + /// Split collection based on given prefixes, objects matching the specified + /// bits/rem are moved to the new collection + void split_collection( + const coll_t &cid, + uint32_t bits, + uint32_t rem, + const coll_t &destination) { + Op* _op = _get_next_op(); + _op->op = OP_SPLIT_COLLECTION2; + _op->cid = _get_coll_id(cid); + _op->dest_cid = _get_coll_id(destination); + _op->split_bits = bits; + _op->split_rem = rem; + data.ops = data.ops + 1; + } + + /// Merge collection into another. + void merge_collection( + coll_t cid, + coll_t destination, + uint32_t bits) { + Op* _op = _get_next_op(); + _op->op = OP_MERGE_COLLECTION; + _op->cid = _get_coll_id(cid); + _op->dest_cid = _get_coll_id(destination); + _op->split_bits = bits; + data.ops = data.ops + 1; + } + + void collection_set_bits( + const coll_t &cid, + int bits) { + Op* _op = _get_next_op(); + _op->op = OP_COLL_SET_BITS; + _op->cid = _get_coll_id(cid); + _op->split_bits = bits; + data.ops = data.ops + 1; + } + + /// Set allocation hint for an object + /// make 0 values(expected_object_size, expected_write_size) noops for all implementations + void set_alloc_hint( + const coll_t &cid, + const ghobject_t &oid, + uint64_t expected_object_size, + uint64_t expected_write_size, + uint32_t flags + ) { + Op* _op = _get_next_op(); + _op->op = OP_SETALLOCHINT; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->expected_object_size = expected_object_size; + _op->expected_write_size = expected_write_size; + _op->alloc_hint_flags = flags; + data.ops = data.ops + 1; + } + + void encode(bufferlist& bl) const { + //layout: data_bl + op_bl + coll_index + object_index + data + ENCODE_START(9, 9, bl); + encode(data_bl, bl); + encode(op_bl, bl); + encode(coll_index, bl); + encode(object_index, bl); + data.encode(bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator &bl) { + DECODE_START(9, bl); + DECODE_OLDEST(9); + + decode(data_bl, bl); + decode(op_bl, bl); + decode(coll_index, bl); + decode(object_index, bl); + data.decode(bl); + coll_id = coll_index.size(); + object_id = object_index.size(); + + DECODE_FINISH(bl); + } + + void dump(ceph::Formatter *f); +}; +} diff --git a/src/crimson/os/cyan_collection.cc b/src/crimson/os/cyan_collection.cc new file mode 100644 index 00000000..82403dbc --- /dev/null +++ b/src/crimson/os/cyan_collection.cc @@ -0,0 +1,76 @@ +#include "cyan_collection.h" + +#include "cyan_object.h" + +namespace ceph::os +{ + +Collection::Collection(const coll_t& c) + : cid{c} +{} + +Collection::~Collection() = default; + +Collection::ObjectRef Collection::create_object() const +{ + return new ceph::os::Object{}; +} + +Collection::ObjectRef Collection::get_object(ghobject_t oid) +{ + auto o = object_hash.find(oid); + if (o == object_hash.end()) + return ObjectRef(); + return o->second; +} + +Collection::ObjectRef Collection::get_or_create_object(ghobject_t oid) +{ + auto result = object_hash.emplace(oid, ObjectRef{}); + if (result.second) + object_map[oid] = result.first->second = create_object(); + return result.first->second; +} + +uint64_t Collection::used_bytes() const +{ + uint64_t result = 0; + for (auto& obj : object_map) { + result += obj.second->get_size(); + } + return result; +} + +void Collection::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + encode(xattr, bl); + encode(use_page_set, bl); + uint32_t s = object_map.size(); + encode(s, bl); + for (auto& [oid, obj] : object_map) { + encode(oid, bl); + obj->encode(bl); + } + ENCODE_FINISH(bl); +} + +void Collection::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + decode(xattr, p); + decode(use_page_set, p); + uint32_t s; + decode(s, p); + while (s--) { + ghobject_t k; + decode(k, p); + auto o = create_object(); + o->decode(p); + object_map.insert(make_pair(k, o)); + object_hash.insert(make_pair(k, o)); + } + DECODE_FINISH(p); +} + +} diff --git a/src/crimson/os/cyan_collection.h b/src/crimson/os/cyan_collection.h new file mode 100644 index 00000000..78f1aa0a --- /dev/null +++ b/src/crimson/os/cyan_collection.h @@ -0,0 +1,55 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <string> +#include <unordered_map> +#include <boost/intrusive_ptr.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> + +#include "include/buffer.h" +#include "osd/osd_types.h" + +namespace ceph::os { + +class Object; +/** + * a collection also orders transactions + * + * Any transactions queued under a given collection will be applied in + * sequence. Transactions queued under different collections may run + * in parallel. + * + * ObjectStore users my get collection handles with open_collection() (or, + * for bootstrapping a new collection, create_new_collection()). + */ +struct Collection : public boost::intrusive_ref_counter< + Collection, + boost::thread_unsafe_counter> +{ + using ObjectRef = boost::intrusive_ptr<Object>; + const coll_t cid; + int bits = 0; + // always use bufferlist object for testing + bool use_page_set = false; + std::unordered_map<ghobject_t, ObjectRef> object_hash; ///< for lookup + std::map<ghobject_t, ObjectRef> object_map; ///< for iteration + std::map<std::string,bufferptr> xattr; + bool exists = true; + + Collection(const coll_t& c); + ~Collection(); + + ObjectRef create_object() const; + ObjectRef get_object(ghobject_t oid); + ObjectRef get_or_create_object(ghobject_t oid); + uint64_t used_bytes() const; + + const coll_t &get_cid() const { + return cid; + } + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); +}; + +} diff --git a/src/crimson/os/cyan_object.cc b/src/crimson/os/cyan_object.cc new file mode 100644 index 00000000..1612cebe --- /dev/null +++ b/src/crimson/os/cyan_object.cc @@ -0,0 +1,88 @@ +#include "cyan_object.h" +#include "include/encoding.h" + +namespace ceph::os { + +size_t Object::get_size() const { + return data.length(); +} + +int Object::read(uint64_t offset, uint64_t len, bufferlist &bl) +{ + bl.substr_of(data, offset, len); + return bl.length(); +} + +int Object::write(uint64_t offset, const bufferlist &src) +{ + unsigned len = src.length(); + // before + bufferlist newdata; + if (get_size() >= offset) { + newdata.substr_of(data, 0, offset); + } else { + if (get_size()) { + newdata.substr_of(data, 0, get_size()); + } + newdata.append_zero(offset - get_size()); + } + + newdata.append(src); + + // after + if (get_size() > offset + len) { + bufferlist tail; + tail.substr_of(data, offset + len, get_size() - (offset + len)); + newdata.append(tail); + } + + data.claim(newdata); + return 0; +} + +int Object::clone(Object *src, uint64_t srcoff, uint64_t len, + uint64_t dstoff) +{ + bufferlist bl; + if (srcoff == dstoff && len == src->get_size()) { + data = src->data; + return 0; + } + bl.substr_of(src->data, srcoff, len); + return write(dstoff, bl); + +} + +int Object::truncate(uint64_t size) +{ + if (get_size() > size) { + bufferlist bl; + bl.substr_of(data, 0, size); + data.claim(bl); + } else if (get_size() == size) { + // do nothing + } else { + data.append_zero(size - get_size()); + } + return 0; +} + +void Object::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(data, bl); + encode(xattr, bl); + encode(omap_header, bl); + encode(omap, bl); + ENCODE_FINISH(bl); +} + +void Object::decode(bufferlist::const_iterator& p) { + DECODE_START(1, p); + decode(data, p); + decode(xattr, p); + decode(omap_header, p); + decode(omap, p); + DECODE_FINISH(p); +} + +} diff --git a/src/crimson/os/cyan_object.h b/src/crimson/os/cyan_object.h new file mode 100644 index 00000000..6846b1a4 --- /dev/null +++ b/src/crimson/os/cyan_object.h @@ -0,0 +1,38 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include <cstddef> +#include <map> +#include <string> +#include <boost/intrusive_ptr.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> +#include "include/buffer.h" + +namespace ceph::os { + +struct Object : public boost::intrusive_ref_counter< + Object, + boost::thread_unsafe_counter> +{ + using bufferlist = ceph::bufferlist; + + bufferlist data; + std::map<std::string,bufferptr> xattr; + bufferlist omap_header; + std::map<std::string,bufferlist> omap; + + typedef boost::intrusive_ptr<Object> Ref; + + Object() = default; + + // interface for object data + size_t get_size() const; + int read(uint64_t offset, uint64_t len, bufferlist &bl); + int write(uint64_t offset, const bufferlist &bl); + int clone(Object *src, uint64_t srcoff, uint64_t len, + uint64_t dstoff); + int truncate(uint64_t offset); + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); +}; +} diff --git a/src/crimson/os/cyan_store.cc b/src/crimson/os/cyan_store.cc new file mode 100644 index 00000000..68ccae88 --- /dev/null +++ b/src/crimson/os/cyan_store.cc @@ -0,0 +1,277 @@ +#include "cyan_store.h" + +#include <fmt/format.h> + +#include "common/safe_io.h" + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_object.h" +#include "crimson/os/Transaction.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_filestore); + } +} + +namespace ceph::os { + +using ObjectRef = boost::intrusive_ptr<Object>; + +CyanStore::CyanStore(const std::string& path) + : path{path} +{} + +CyanStore::~CyanStore() = default; + +seastar::future<> CyanStore::mount() +{ + bufferlist bl; + string fn = path + "/collections"; + string err; + if (int r = bl.read_file(fn.c_str(), &err); r < 0) { + throw std::runtime_error("read_file"); + } + + set<coll_t> collections; + auto p = bl.cbegin(); + decode(collections, p); + + for (auto& coll : collections) { + string fn = fmt::format("{}/{}", path, coll); + bufferlist cbl; + if (int r = cbl.read_file(fn.c_str(), &err); r < 0) { + throw std::runtime_error("read_file"); + } + CollectionRef c{new Collection{coll}}; + auto p = cbl.cbegin(); + c->decode(p); + coll_map[coll] = c; + used_bytes += c->used_bytes(); + } + return seastar::now(); +} + +seastar::future<> CyanStore::umount() +{ + set<coll_t> collections; + for (auto& [col, ch] : coll_map) { + collections.insert(col); + bufferlist bl; + ceph_assert(ch); + ch->encode(bl); + string fn = fmt::format("{}/{}", path, col); + if (int r = bl.write_file(fn.c_str()); r < 0) { + throw std::runtime_error("write_file"); + } + } + + string fn = path + "/collections"; + bufferlist bl; + encode(collections, bl); + if (int r = bl.write_file(fn.c_str()); r < 0) { + throw std::runtime_error("write_file"); + } + return seastar::now(); +} + +seastar::future<> CyanStore::mkfs(uuid_d osd_fsid) +{ + string fsid_str; + int r = read_meta("fsid", &fsid_str); + if (r == -ENOENT) { + write_meta("fsid", fmt::format("{}", osd_fsid)); + } else if (r < 0) { + throw std::runtime_error("read_meta"); + } else { + logger().error("{} already has fsid {}", __func__, fsid_str); + throw std::runtime_error("mkfs"); + } + + string fn = path + "/collections"; + bufferlist bl; + set<coll_t> collections; + encode(collections, bl); + r = bl.write_file(fn.c_str()); + if (r < 0) + throw std::runtime_error("write_file"); + + write_meta("type", "memstore"); + return seastar::now(); +} + +CyanStore::CollectionRef CyanStore::create_new_collection(const coll_t& cid) +{ + auto c = new Collection{cid}; + return new_coll_map[cid] = c; +} + +CyanStore::CollectionRef CyanStore::open_collection(const coll_t& cid) +{ + auto cp = coll_map.find(cid); + if (cp == coll_map.end()) + return {}; + return cp->second; +} + +std::vector<coll_t> CyanStore::list_collections() +{ + std::vector<coll_t> collections; + for (auto& coll : coll_map) { + collections.push_back(coll.first); + } + return collections; +} + +seastar::future<bufferlist> CyanStore::read(CollectionRef c, + const ghobject_t& oid, + uint64_t offset, + size_t len, + uint32_t op_flags) +{ + logger().info("{} {} {} {}~{}", + __func__, c->cid, oid, offset, len); + if (!c->exists) { + throw std::runtime_error(fmt::format("collection does not exist: {}", c->cid)); + } + ObjectRef o = c->get_object(oid); + if (!o) { + throw std::runtime_error(fmt::format("object does not exist: {}", oid)); + } + if (offset >= o->get_size()) + return seastar::make_ready_future<bufferlist>(); + size_t l = len; + if (l == 0 && offset == 0) // note: len == 0 means read the entire object + l = o->get_size(); + else if (offset + l > o->get_size()) + l = o->get_size() - offset; + bufferlist bl; + if (int r = o->read(offset, l, bl); r < 0) { + throw std::runtime_error("read"); + } + return seastar::make_ready_future<bufferlist>(std::move(bl)); +} + +seastar::future<CyanStore::omap_values_t> +CyanStore::omap_get_values(CollectionRef c, + const ghobject_t& oid, + std::vector<std::string>&& keys) +{ + logger().info("{} {} {}", + __func__, c->cid, oid); + auto o = c->get_object(oid); + if (!o) { + throw std::runtime_error(fmt::format("object does not exist: {}", oid)); + } + omap_values_t values; + for (auto& key : keys) { + if (auto found = o->omap.find(key); found != o->omap.end()) { + values.insert(*found); + } + } + return seastar::make_ready_future<omap_values_t>(std::move(values)); +} + +seastar::future<> CyanStore::do_transaction(CollectionRef ch, + Transaction&& t) +{ + auto i = t.begin(); + while (i.have_op()) { + Transaction::Op* op = i.decode_op(); + int r = 0; + switch (op->op) { + case Transaction::OP_NOP: + break; + case Transaction::OP_WRITE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t off = op->off; + uint64_t len = op->len; + uint32_t fadvise_flags = i.get_fadvise_flags(); + bufferlist bl; + i.decode_bl(bl); + r = _write(cid, oid, off, len, bl, fadvise_flags); + } + break; + case Transaction::OP_MKCOLL: + { + coll_t cid = i.get_cid(op->cid); + r = _create_collection(cid, op->split_bits); + } + break; + default: + logger().error("bad op {}", static_cast<unsigned>(op->op)); + abort(); + } + if (r < 0) { + abort(); + } + } + return seastar::now(); +} + +int CyanStore::_write(const coll_t& cid, const ghobject_t& oid, + uint64_t offset, size_t len, const bufferlist& bl, + uint32_t fadvise_flags) +{ + logger().info("{} {} {} {} ~ {}", + __func__, cid, oid, offset, len); + assert(len == bl.length()); + + auto c = open_collection(cid); + if (!c) + return -ENOENT; + + ObjectRef o = c->get_or_create_object(oid); + if (len > 0) { + const ssize_t old_size = o->get_size(); + o->write(offset, bl); + used_bytes += (o->get_size() - old_size); + } + + return 0; +} + +int CyanStore::_create_collection(const coll_t& cid, int bits) +{ + auto result = coll_map.insert(std::make_pair(cid, CollectionRef())); + if (!result.second) + return -EEXIST; + auto p = new_coll_map.find(cid); + assert(p != new_coll_map.end()); + result.first->second = p->second; + result.first->second->bits = bits; + new_coll_map.erase(p); + return 0; +} + +void CyanStore::write_meta(const std::string& key, + const std::string& value) +{ + std::string v = value; + v += "\n"; + if (int r = safe_write_file(path.c_str(), key.c_str(), + v.c_str(), v.length(), 0600); + r < 0) { + throw std::runtime_error{fmt::format("unable to write_meta({})", key)}; + } +} + +int CyanStore::read_meta(const std::string& key, + std::string* value) +{ + char buf[4096]; + int r = safe_read_file(path.c_str(), key.c_str(), + buf, sizeof(buf)); + if (r <= 0) { + return r; + } + // drop trailing newlines + while (r && isspace(buf[r-1])) { + --r; + } + *value = string{buf, static_cast<size_t>(r)}; + return 0; +} +} diff --git a/src/crimson/os/cyan_store.h b/src/crimson/os/cyan_store.h new file mode 100644 index 00000000..0b7f3d87 --- /dev/null +++ b/src/crimson/os/cyan_store.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <string> +#include <unordered_map> +#include <map> +#include <vector> +#include <seastar/core/future.hh> +#include "osd/osd_types.h" +#include "include/uuid.h" + +namespace ceph::os { + +class Collection; +class Transaction; + +// a just-enough store for reading/writing the superblock +class CyanStore { + using CollectionRef = boost::intrusive_ptr<Collection>; + const std::string path; + std::unordered_map<coll_t, CollectionRef> coll_map; + std::map<coll_t,CollectionRef> new_coll_map; + uint64_t used_bytes = 0; + +public: + CyanStore(const std::string& path); + ~CyanStore(); + + seastar::future<> mount(); + seastar::future<> umount(); + + seastar::future<> mkfs(uuid_d osd_fsid); + seastar::future<bufferlist> read(CollectionRef c, + const ghobject_t& oid, + uint64_t offset, + size_t len, + uint32_t op_flags = 0); + using omap_values_t = std::map<std::string,bufferlist, std::less<>>; + seastar::future<omap_values_t> omap_get_values( + CollectionRef c, + const ghobject_t& oid, + std::vector<std::string>&& keys); + CollectionRef create_new_collection(const coll_t& cid); + CollectionRef open_collection(const coll_t& cid); + std::vector<coll_t> list_collections(); + + seastar::future<> do_transaction(CollectionRef ch, + Transaction&& txn); + + void write_meta(const std::string& key, + const std::string& value); + int read_meta(const std::string& key, std::string* value); + +private: + int _write(const coll_t& cid, const ghobject_t& oid, + uint64_t offset, size_t len, const bufferlist& bl, + uint32_t fadvise_flags); + int _create_collection(const coll_t& cid, int bits); +}; + +} diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt new file mode 100644 index 00000000..e86a11b5 --- /dev/null +++ b/src/crimson/osd/CMakeLists.txt @@ -0,0 +1,10 @@ +add_executable(crimson-osd + chained_dispatchers.cc + heartbeat.cc + main.cc + osd.cc + osd_meta.cc + pg.cc + pg_meta.cc) +target_link_libraries(crimson-osd + crimson-common crimson-os crimson) diff --git a/src/crimson/osd/chained_dispatchers.cc b/src/crimson/osd/chained_dispatchers.cc new file mode 100644 index 00000000..da4aa269 --- /dev/null +++ b/src/crimson/osd/chained_dispatchers.cc @@ -0,0 +1,72 @@ +#include "chained_dispatchers.h" +#include "crimson/net/Connection.h" + + +seastar::future<> +ChainedDispatchers::ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) { + return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) { + return dispatcher->ms_dispatch(conn, m); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_accept(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_accept(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_connect(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_connect(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_reset(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_reset(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_remote_reset(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_remote_reset(conn); + }); +} + +seastar::future<std::unique_ptr<AuthAuthorizer>> +ChainedDispatchers::ms_get_authorizer(peer_type_t peer_type) +{ + // since dispatcher returns a nullptr if it does not have the authorizer, + // let's use the chain-of-responsibility pattern here. + struct Params { + peer_type_t peer_type; + std::deque<Dispatcher*>::iterator first, last; + } params = {peer_type, dispatchers.begin(), dispatchers.end()}; + return seastar::do_with(Params{params}, [this] (Params& params) { + using result_t = std::unique_ptr<AuthAuthorizer>; + return seastar::repeat_until_value([&] () { + auto& first = params.first; + if (first == params.last) { + // just give up + return seastar::make_ready_future<std::optional<result_t>>(result_t{}); + } else { + return (*first)->ms_get_authorizer(params.peer_type) + .then([&] (auto&& auth)-> std::optional<result_t> { + if (auth) { + // hooray! + return std::move(auth); + } else { + // try next one + ++first; + return {}; + } + }); + } + }); + }); +} diff --git a/src/crimson/osd/chained_dispatchers.h b/src/crimson/osd/chained_dispatchers.h new file mode 100644 index 00000000..c3040836 --- /dev/null +++ b/src/crimson/osd/chained_dispatchers.h @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <deque> +#include "crimson/net/Dispatcher.h" + +// in existing Messenger, dispatchers are put into a chain as described by +// chain-of-responsibility pattern. we could do the same to stop processing +// the message once any of the dispatchers claims this message, and prevent +// other dispatchers from reading it. but this change is more involved as +// it requires changing the ms_ methods to return a bool. so as an intermediate +// solution, we are using an observer dispatcher to notify all the interested +// or unintersted parties. +class ChainedDispatchers : public ceph::net::Dispatcher { + std::deque<Dispatcher*> dispatchers; +public: + void push_front(Dispatcher* dispatcher) { + dispatchers.push_front(dispatcher); + } + void push_back(Dispatcher* dispatcher) { + dispatchers.push_back(dispatcher); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override; + seastar::future<> ms_handle_accept(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; + seastar::future<std::unique_ptr<AuthAuthorizer>> + ms_get_authorizer(peer_type_t peer_type) override; +}; diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc new file mode 100644 index 00000000..6dfefb3b --- /dev/null +++ b/src/crimson/osd/heartbeat.cc @@ -0,0 +1,415 @@ +#include "heartbeat.h" + +#include <boost/range/join.hpp> + +#include "messages/MOSDPing.h" +#include "messages/MOSDFailure.h" + +#include "crimson/common/config_proxy.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/mon/MonClient.h" + +#include "osd/OSDMap.h" + +using ceph::common::local_conf; + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } + + template<typename Message, typename... Args> + Ref<Message> make_message(Args&&... args) + { + return {new Message{std::forward<Args>(args)...}, false}; + } +} + +Heartbeat::Heartbeat(int whoami, + uint32_t nonce, + const OSDMapService& service, + ceph::mon::Client& monc) + : whoami{whoami}, + nonce{nonce}, + service{service}, + monc{monc}, + timer{[this] {send_heartbeats();}} +{} + +seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, + entity_addrvec_t back_addrs) +{ + logger().info("heartbeat: start"); + // i only care about the address, so any unused port would work + for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) { + addr.set_port(0); + } + return seastar::when_all_succeed( + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "hb_front", + nonce, + seastar::engine().cpu_id()) + .then([this, front_addrs] (auto msgr) { + front_msgr = msgr; + return start_messenger(front_msgr, front_addrs); + }), + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "hb_back", + nonce, + seastar::engine().cpu_id()) + .then([this, back_addrs] (auto msgr) { + back_msgr = msgr; + return start_messenger(back_msgr, back_addrs); + })) + .then([this] { + timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_heartbeat_interval)); + }); +} + +seastar::future<> +Heartbeat::start_messenger(ceph::net::Messenger* msgr, + const entity_addrvec_t& addrs) +{ + if (local_conf()->ms_crc_data) { + msgr->set_crc_data(); + } + if (local_conf()->ms_crc_header) { + msgr->set_crc_header(); + } + return msgr->try_bind(addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max).then([msgr, this] { + return msgr->start(this); + }); +} + +seastar::future<> Heartbeat::stop() +{ + return seastar::when_all_succeed(front_msgr->shutdown(), + back_msgr->shutdown()); +} + +const entity_addrvec_t& Heartbeat::get_front_addrs() const +{ + return front_msgr->get_myaddrs(); +} + +const entity_addrvec_t& Heartbeat::get_back_addrs() const +{ + return back_msgr->get_myaddrs(); +} + +seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) +{ + auto found = peers.find(peer); + if (found == peers.end()) { + logger().info("add_peer({})", peer); + auto osdmap = service.get_map(); + // TODO: msgr v2 + return seastar::when_all_succeed( + front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD), + back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD)) + .then([this, peer, epoch] (auto xcon_front, auto xcon_back) { + PeerInfo info; + // sharded-messenger compatible mode + info.con_front = xcon_front->release(); + info.con_back = xcon_back->release(); + info.epoch = epoch; + peers.emplace(peer, std::move(info)); + }); + } else { + found->second.epoch = epoch; + return seastar::now(); + } +} + +seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers() +{ + osds_t osds; + for (auto& peer : peers) { + osds.push_back(peer.first); + } + return seastar::map_reduce(std::move(osds), + [this](auto& osd) { + auto osdmap = service.get_map(); + if (!osdmap->is_up(osd)) { + return remove_peer(osd).then([] { + return seastar::make_ready_future<osd_id_t>(-1); + }); + } else if (peers[osd].epoch < osdmap->get_epoch()) { + return seastar::make_ready_future<osd_id_t>(osd); + } else { + return seastar::make_ready_future<osd_id_t>(-1); + } + }, osds_t{}, + [this](osds_t&& extras, osd_id_t extra) { + if (extra >= 0) { + extras.push_back(extra); + } + return extras; + }); +} + +void Heartbeat::add_reporter_peers(int whoami) +{ + auto osdmap = service.get_map(); + // include next and previous up osds to ensure we have a fully-connected set + set<int> want; + if (auto next = osdmap->get_next_up_osd_after(whoami); next >= 0) { + want.insert(next); + } + if (auto prev = osdmap->get_previous_up_osd_before(whoami); prev >= 0) { + want.insert(prev); + } + // make sure we have at least **min_down** osds coming from different + // subtree level (e.g., hosts) for fast failure detection. + auto min_down = local_conf().get_val<uint64_t>("mon_osd_min_down_reporters"); + auto subtree = local_conf().get_val<string>("mon_osd_reporter_subtree_level"); + osdmap->get_random_up_osds_by_subtree( + whoami, subtree, min_down, want, &want); + for (auto osd : want) { + add_peer(osd, osdmap->get_epoch()); + } +} + +seastar::future<> Heartbeat::update_peers(int whoami) +{ + const auto min_peers = static_cast<size_t>( + local_conf().get_val<int64_t>("osd_heartbeat_min_peers")); + return remove_down_peers().then([=](osds_t&& extra) { + add_reporter_peers(whoami); + // too many? + struct iteration_state { + osds_t::const_iterator where; + osds_t::const_iterator end; + }; + return seastar::do_with(iteration_state{extra.begin(),extra.end()}, + [=](iteration_state& s) { + return seastar::do_until( + [min_peers, &s, this] { + return peers.size() < min_peers || s.where == s.end; }, + [&s, this] { + return remove_peer(*s.where); } + ); + }); + }).then([=] { + // or too few? + auto osdmap = service.get_map(); + for (auto next = osdmap->get_next_up_osd_after(whoami); + peers.size() < min_peers && next >= 0 && next != whoami; + next = osdmap->get_next_up_osd_after(next)) { + add_peer(next, osdmap->get_epoch()); + } + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::remove_peer(osd_id_t peer) +{ + auto found = peers.find(peer); + assert(found != peers.end()); + logger().info("remove_peer({})", peer); + return seastar::when_all_succeed(found->second.con_front->close(), + found->second.con_back->close()).then( + [this, peer] { + peers.erase(peer); + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) +{ + logger().info("heartbeat: ms_dispatch {} from {}", + *m, m->get_source()); + switch (m->get_type()) { + case CEPH_MSG_PING: + return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m)); + default: + return seastar::now(); + } +} + +seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m) +{ + switch (m->op) { + case MOSDPing::PING: + return handle_ping(conn, m); + case MOSDPing::PING_REPLY: + return handle_reply(conn, m); + case MOSDPing::YOU_DIED: + return handle_you_died(); + default: + return seastar::now(); + } +} + +seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m) +{ + auto min_message = static_cast<uint32_t>( + local_conf()->osd_heartbeat_min_size); + auto reply = + make_message<MOSDPing>(m->fsid, + service.get_map()->get_epoch(), + MOSDPing::PING_REPLY, + m->stamp, + min_message); + return conn->send(reply); +} + +seastar::future<> Heartbeat::handle_reply(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m) +{ + const osd_id_t from = m->get_source().num(); + auto found = peers.find(from); + if (found == peers.end()) { + // stale reply + return seastar::now(); + } + auto& peer = found->second; + auto ping = peer.ping_history.find(m->stamp); + if (ping == peer.ping_history.end()) { + // old replies, deprecated by newly sent pings. + return seastar::now(); + } + const auto now = clock::now(); + auto& unacked = ping->second.unacknowledged; + if (conn == peer.con_back) { + peer.last_rx_back = now; + unacked--; + } else if (conn == peer.con_front) { + peer.last_rx_front = now; + unacked--; + } + if (unacked == 0) { + peer.ping_history.erase(peer.ping_history.begin(), ++ping); + } + if (peer.is_healthy(now)) { + // cancel false reports + failure_queue.erase(from); + if (auto pending = failure_pending.find(from); + pending != failure_pending.end()) { + return send_still_alive(from, pending->second.addrs); + } + } + return seastar::now(); +} + +seastar::future<> Heartbeat::handle_you_died() +{ + // TODO: ask for newer osdmap + return seastar::now(); +} + +seastar::future<> Heartbeat::send_heartbeats() +{ + using peers_item_t = typename peers_map_t::value_type; + return seastar::parallel_for_each(peers, + [this](peers_item_t& item) { + const auto now = clock::now(); + const auto deadline = + now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); + auto& [peer, info] = item; + info.last_tx = now; + if (clock::is_zero(info.first_tx)) { + info.first_tx = now; + } + const utime_t sent_stamp{now}; + auto [reply, added] = info.ping_history.emplace(sent_stamp, + reply_t{deadline, 0}); + std::vector<ceph::net::ConnectionRef> conns{info.con_front, + info.con_back}; + return seastar::parallel_for_each(std::move(conns), + [sent_stamp, &reply=reply->second, this] (auto con) { + if (con) { + auto min_message = static_cast<uint32_t>( + local_conf()->osd_heartbeat_min_size); + auto ping = make_message<MOSDPing>(monc.get_fsid(), + service.get_map()->get_epoch(), + MOSDPing::PING, + sent_stamp, + min_message); + return con->send(ping).then([&reply] { + reply.unacknowledged++; + return seastar::now(); + }); + } else { + return seastar::now(); + } + }); + }); +} + +seastar::future<> Heartbeat::send_failures() +{ + using failure_item_t = typename failure_queue_t::value_type; + return seastar::parallel_for_each(failure_queue, + [this](failure_item_t& failure_item) { + auto [osd, failed_since] = failure_item; + if (failure_pending.count(osd)) { + return seastar::now(); + } + auto failed_for = chrono::duration_cast<chrono::seconds>( + clock::now() - failed_since).count(); + auto osdmap = service.get_map(); + auto failure_report = + make_message<MOSDFailure>(monc.get_fsid(), + osd, + osdmap->get_addrs(osd), + static_cast<int>(failed_for), + osdmap->get_epoch()); + failure_pending.emplace(osd, failure_info_t{failed_since, + osdmap->get_addrs(osd)}); + return monc.send_message(failure_report); + }).then([this] { + failure_queue.clear(); + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::send_still_alive(osd_id_t osd, + const entity_addrvec_t& addrs) +{ + auto still_alive = make_message<MOSDFailure>(monc.get_fsid(), + osd, + addrs, + 0, + service.get_map()->get_epoch(), + MOSDFailure::FLAG_ALIVE); + return monc.send_message(still_alive).then([=] { + failure_pending.erase(osd); + return seastar::now(); + }); +} + +bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const +{ + if (ping_history.empty()) { + // we haven't sent a ping yet or we have got all replies, + // in either way we are safe and healthy for now + return false; + } else { + auto oldest_ping = ping_history.begin(); + return now > oldest_ping->second.deadline; + } +} + +bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const +{ + if (con_front && clock::is_zero(last_rx_front)) { + return false; + } + if (con_back && clock::is_zero(last_rx_back)) { + return false; + } + // only declare to be healthy until we have received the first + // replies from both front/back connections + return !is_unhealthy(now); +} diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h new file mode 100644 index 00000000..b5eb0f7c --- /dev/null +++ b/src/crimson/osd/heartbeat.h @@ -0,0 +1,121 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <cstdint> +#include <seastar/core/future.hh> +#include "common/ceph_time.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Fwd.h" + +class MOSDPing; +class OSDMapService; + +namespace ceph::mon { + class Client; +} + +template<typename Message> using Ref = boost::intrusive_ptr<Message>; + +class Heartbeat : public ceph::net::Dispatcher { +public: + using osd_id_t = int; + + Heartbeat(int whoami, + uint32_t nonce, + const OSDMapService& service, + ceph::mon::Client& monc); + + seastar::future<> start(entity_addrvec_t front, + entity_addrvec_t back); + seastar::future<> stop(); + + seastar::future<> add_peer(osd_id_t peer, epoch_t epoch); + seastar::future<> update_peers(int whoami); + seastar::future<> remove_peer(osd_id_t peer); + + seastar::future<> send_heartbeats(); + seastar::future<> send_failures(); + + const entity_addrvec_t& get_front_addrs() const; + const entity_addrvec_t& get_back_addrs() const; + + // Dispatcher methods + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) override; + +private: + seastar::future<> handle_osd_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m); + seastar::future<> handle_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m); + seastar::future<> handle_reply(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m); + seastar::future<> handle_you_died(); + + seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&); + + using osds_t = std::vector<osd_id_t>; + /// remove down OSDs + /// @return peers not needed in this epoch + seastar::future<osds_t> remove_down_peers(); + /// add enough reporters for fast failure detection + void add_reporter_peers(int whoami); + + seastar::future<> start_messenger(ceph::net::Messenger* msgr, + const entity_addrvec_t& addrs); +private: + const int whoami; + const uint32_t nonce; + ceph::net::Messenger* front_msgr = nullptr; + ceph::net::Messenger* back_msgr = nullptr; + const OSDMapService& service; + ceph::mon::Client& monc; + + seastar::timer<seastar::lowres_clock> timer; + // use real_clock so it can be converted to utime_t + using clock = ceph::coarse_real_clock; + + struct reply_t { + clock::time_point deadline; + // one sent over front conn, another sent over back conn + uint8_t unacknowledged = 0; + }; + struct PeerInfo { + /// peer connection (front) + ceph::net::ConnectionRef con_front; + /// peer connection (back) + ceph::net::ConnectionRef con_back; + /// time we sent our first ping request + clock::time_point first_tx; + /// last time we sent a ping request + clock::time_point last_tx; + /// last time we got a ping reply on the front side + clock::time_point last_rx_front; + /// last time we got a ping reply on the back side + clock::time_point last_rx_back; + /// most recent epoch we wanted this peer + epoch_t epoch; + /// history of inflight pings, arranging by timestamp we sent + std::map<utime_t, reply_t> ping_history; + + bool is_unhealthy(clock::time_point now) const; + bool is_healthy(clock::time_point now) const; + }; + using peers_map_t = std::map<osd_id_t, PeerInfo>; + peers_map_t peers; + + // osds which are considered failed + // osd_id => when was the last time that both front and back pings were acked + // use for calculating how long the OSD has been unresponsive + using failure_queue_t = std::map<osd_id_t, clock::time_point>; + failure_queue_t failure_queue; + struct failure_info_t { + clock::time_point failed_since; + entity_addrvec_t addrs; + }; + // osds we've reported to monior as failed ones, but they are not marked down + // yet + std::map<osd_id_t, failure_info_t> failure_pending; +}; diff --git a/src/crimson/osd/main.cc b/src/crimson/osd/main.cc new file mode 100644 index 00000000..04a0c97d --- /dev/null +++ b/src/crimson/osd/main.cc @@ -0,0 +1,89 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include <sys/types.h> +#include <unistd.h> + +#include <iostream> + +#include <seastar/core/app-template.hh> +#include <seastar/core/thread.hh> + +#include "common/ceph_argparse.h" +#include "crimson/common/config_proxy.h" + +#include "osd.h" + +using config_t = ceph::common::ConfigProxy; + +void usage(const char* prog) { + std::cout << "usage: " << prog << " -i <ID>" << std::endl; + generic_server_usage(); +} + +int main(int argc, char* argv[]) +{ + std::vector<const char*> args{argv + 1, argv + argc}; + if (ceph_argparse_need_usage(args)) { + usage(argv[0]); + return EXIT_SUCCESS; + } + std::string cluster; + std::string conf_file_list; + // ceph_argparse_early_args() could _exit(), while local_conf() won't ready + // until it's started. so do the boilerplate-settings parsing here. + auto init_params = ceph_argparse_early_args(args, + CEPH_ENTITY_TYPE_OSD, + &cluster, + &conf_file_list); + seastar::app_template app; + app.add_options() + ("mkfs", "create a [new] data directory"); + seastar::sharded<OSD> osd; + + using ceph::common::sharded_conf; + using ceph::common::sharded_perf_coll; + using ceph::common::local_conf; + + args.insert(begin(args), argv[0]); + try { + return app.run_deprecated(args.size(), const_cast<char**>(args.data()), [&] { + auto& config = app.configuration(); + seastar::engine().at_exit([] { + return sharded_conf().stop(); + }); + seastar::engine().at_exit([] { + return sharded_perf_coll().stop(); + }); + seastar::engine().at_exit([&] { + return osd.stop(); + }); + return sharded_conf().start(init_params.name, cluster).then([] { + return sharded_perf_coll().start(); + }).then([&conf_file_list] { + return local_conf().parse_config_files(conf_file_list); + }).then([&] { + return osd.start_single(std::stoi(local_conf()->name.get_id()), + static_cast<uint32_t>(getpid())); + }).then([&osd, mkfs = config.count("mkfs")] { + if (mkfs) { + return osd.invoke_on(0, &OSD::mkfs, + local_conf().get_val<uuid_d>("fsid")); + } else { + return osd.invoke_on(0, &OSD::start); + } + }); + }); + } catch (...) { + seastar::fprint(std::cerr, "FATAL: Exception during startup, aborting: %s\n", std::current_exception()); + return EXIT_FAILURE; + } +} + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * crimson-osd" + * End: + */ diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc new file mode 100644 index 00000000..bdb1642e --- /dev/null +++ b/src/crimson/osd/osd.cc @@ -0,0 +1,658 @@ +#include "osd.h" + +#include <boost/range/join.hpp> +#include <boost/smart_ptr/make_local_shared.hpp> + +#include "common/pick_address.h" +#include "messages/MOSDBeacon.h" +#include "messages/MOSDBoot.h" +#include "messages/MOSDMap.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_object.h" +#include "crimson/os/cyan_store.h" +#include "crimson/os/Transaction.h" +#include "crimson/osd/heartbeat.h" +#include "crimson/osd/osd_meta.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_meta.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } + + template<typename Message, typename... Args> + Ref<Message> make_message(Args&&... args) + { + return {new Message{std::forward<Args>(args)...}, false}; + } + static constexpr int TICK_INTERVAL = 1; +} + +using ceph::common::local_conf; +using ceph::os::CyanStore; + +OSD::OSD(int id, uint32_t nonce) + : whoami{id}, + nonce{nonce}, + beacon_timer{[this] { send_beacon(); }}, + heartbeat_timer{[this] { update_heartbeat_peers(); }} +{ + osdmaps[0] = boost::make_local_shared<OSDMap>(); +} + +OSD::~OSD() = default; + +namespace { +// Initial features in new superblock. +// Features here are also automatically upgraded +CompatSet get_osd_initial_compat_set() +{ + CompatSet::FeatureSet ceph_osd_feature_compat; + CompatSet::FeatureSet ceph_osd_feature_ro_compat; + CompatSet::FeatureSet ceph_osd_feature_incompat; + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES); + return CompatSet(ceph_osd_feature_compat, + ceph_osd_feature_ro_compat, + ceph_osd_feature_incompat); +} +} + +seastar::future<> OSD::mkfs(uuid_d cluster_fsid) +{ + const auto data_path = local_conf().get_val<std::string>("osd_data"); + store = std::make_unique<ceph::os::CyanStore>(data_path); + uuid_d osd_fsid; + osd_fsid.generate_random(); + return store->mkfs(osd_fsid).then([this] { + return store->mount(); + }).then([cluster_fsid, osd_fsid, this] { + superblock.cluster_fsid = cluster_fsid; + superblock.osd_fsid = osd_fsid; + superblock.whoami = whoami; + superblock.compat_features = get_osd_initial_compat_set(); + + meta_coll = make_unique<OSDMeta>( + store->create_new_collection(coll_t::meta()), store.get()); + ceph::os::Transaction t; + meta_coll->create(t); + meta_coll->store_superblock(t, superblock); + return store->do_transaction(meta_coll->collection(), std::move(t)); + }).then([cluster_fsid, this] { + store->write_meta("ceph_fsid", cluster_fsid.to_string()); + store->write_meta("whoami", std::to_string(whoami)); + return seastar::now(); + }); +} + +namespace { + entity_addrvec_t pick_addresses(int what) { + entity_addrvec_t addrs; + CephContext cct; + if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) { + throw std::runtime_error("failed to pick address"); + } + // TODO: v2: ::pick_addresses() returns v2 addresses, but crimson-msgr does + // not support v2 yet. remove following set_type() once v2 support is ready. + for (auto addr : addrs.v) { + addr.set_type(addr.TYPE_LEGACY); + logger().info("picked address {}", addr); + } + return addrs; + } + std::pair<entity_addrvec_t, bool> + replace_unknown_addrs(entity_addrvec_t maybe_unknowns, + const entity_addrvec_t& knowns) { + bool changed = false; + auto maybe_replace = [&](entity_addr_t addr) { + if (!addr.is_blank_ip()) { + return addr; + } + for (auto& b : knowns.v) { + if (addr.get_family() == b.get_family()) { + auto a = b; + a.set_nonce(addr.get_nonce()); + a.set_type(addr.get_type()); + a.set_port(addr.get_port()); + changed = true; + return a; + } + } + throw std::runtime_error("failed to replace unknown address"); + }; + entity_addrvec_t replaced; + std::transform(maybe_unknowns.v.begin(), + maybe_unknowns.v.end(), + std::back_inserter(replaced.v), + maybe_replace); + return {replaced, changed}; + } +} + +seastar::future<> OSD::start() +{ + logger().info("start"); + + return seastar::when_all_succeed( + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "cluster", + nonce, + seastar::engine().cpu_id()) + .then([this] (auto msgr) { cluster_msgr = msgr; }), + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "client", + nonce, + seastar::engine().cpu_id()) + .then([this] (auto msgr) { public_msgr = msgr; })) + .then([this] { + monc.reset(new ceph::mon::Client{*public_msgr}); + heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc}); + + for (auto msgr : {cluster_msgr, public_msgr}) { + if (local_conf()->ms_crc_data) { + msgr->set_crc_data(); + } + if (local_conf()->ms_crc_header) { + msgr->set_crc_header(); + } + } + dispatchers.push_front(this); + dispatchers.push_front(monc.get()); + + const auto data_path = local_conf().get_val<std::string>("osd_data"); + store = std::make_unique<ceph::os::CyanStore>(data_path); + return store->mount(); + }).then([this] { + meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()), + store.get()); + return meta_coll->load_superblock(); + }).then([this](OSDSuperblock&& sb) { + superblock = std::move(sb); + return get_map(superblock.current_epoch); + }).then([this](cached_map_t&& map) { + osdmap = std::move(map); + return load_pgs(); + }).then([this] { + return seastar::when_all_succeed( + cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return cluster_msgr->start(&dispatchers); }), + public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return public_msgr->start(&dispatchers); })); + }).then([this] { + return monc->start(); + }).then([this] { + monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0); + monc->sub_want("mgrmap", 0, 0); + monc->sub_want("osdmap", 0, 0); + return monc->renew_subs(); + }).then([this] { + if (auto [addrs, changed] = + replace_unknown_addrs(cluster_msgr->get_myaddrs(), + public_msgr->get_myaddrs()); changed) { + cluster_msgr->set_myaddrs(addrs); + } + return heartbeat->start(public_msgr->get_myaddrs(), + cluster_msgr->get_myaddrs()); + }).then([this] { + return start_boot(); + }); +} + +seastar::future<> OSD::start_boot() +{ + state.set_preboot(); + return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) { + return _preboot(oldest, newest); + }); +} + +seastar::future<> OSD::_preboot(version_t oldest, version_t newest) +{ + logger().info("osd.{}: _preboot", whoami); + if (osdmap->get_epoch() == 0) { + logger().warn("waiting for initial osdmap"); + } else if (osdmap->is_destroyed(whoami)) { + logger().warn("osdmap says I am destroyed"); + // provide a small margin so we don't livelock seeing if we + // un-destroyed ourselves. + if (osdmap->get_epoch() > newest - 1) { + throw std::runtime_error("i am destroyed"); + } + } else if (osdmap->is_noup(whoami)) { + logger().warn("osdmap NOUP flag is set, waiting for it to clear"); + } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { + logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it"); + } else if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) { + logger().error("osdmap require_osd_release < luminous; please upgrade to luminous"); + } else if (false) { + // TODO: update mon if current fullness state is different from osdmap + } else if (version_t n = local_conf()->osd_map_message_max; + osdmap->get_epoch() >= oldest - 1 && + osdmap->get_epoch() + n > newest) { + return _send_boot(); + } + // get all the latest maps + if (osdmap->get_epoch() + 1 >= oldest) { + return osdmap_subscribe(osdmap->get_epoch() + 1, false); + } else { + return osdmap_subscribe(oldest - 1, true); + } +} + +seastar::future<> OSD::_send_boot() +{ + state.set_booting(); + + logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs()); + logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs()); + logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr()); + auto m = make_message<MOSDBoot>(superblock, + osdmap->get_epoch(), + osdmap->get_epoch(), + heartbeat->get_back_addrs(), + heartbeat->get_front_addrs(), + cluster_msgr->get_myaddrs(), + CEPH_FEATURES_ALL); + return monc->send_message(m); +} + +seastar::future<> OSD::stop() +{ + // see also OSD::shutdown() + state.set_stopping(); + return gate.close().then([this] { + return heartbeat->stop(); + }).then([this] { + return monc->stop(); + }).then([this] { + return public_msgr->shutdown(); + }).then([this] { + return cluster_msgr->shutdown(); + }); +} + +seastar::future<> OSD::load_pgs() +{ + return seastar::parallel_for_each(store->list_collections(), + [this](auto coll) { + spg_t pgid; + if (coll.is_pg(&pgid)) { + return load_pg(pgid).then([pgid, this](auto&& pg) { + logger().info("load_pgs: loaded {}", pgid); + pgs.emplace(pgid, std::move(pg)); + return seastar::now(); + }); + } else if (coll.is_temp(&pgid)) { + // TODO: remove the collection + return seastar::now(); + } else { + logger().warn("ignoring unrecognized collection: {}", coll); + return seastar::now(); + } + }); +} + +seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid) +{ + using ec_profile_t = map<string,string>; + return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) { + return get_map(e); + }).then([pgid, this] (auto&& create_map) { + if (create_map->have_pg_pool(pgid.pool())) { + pg_pool_t pi = *create_map->get_pg_pool(pgid.pool()); + string name = create_map->get_pool_name(pgid.pool()); + ec_profile_t ec_profile; + if (pi.is_erasure()) { + ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile); + } + return seastar::make_ready_future<pg_pool_t, + string, + ec_profile_t>(std::move(pi), + std::move(name), + std::move(ec_profile)); + } else { + // pool was deleted; grab final pg_pool_t off disk. + return meta_coll->load_final_pool_info(pgid.pool()); + } + }).then([this](pg_pool_t&& pool, string&& name, ec_profile_t&& ec_profile) { + Ref<PG> pg{new PG{std::move(pool), + std::move(name), + std::move(ec_profile)}}; + return seastar::make_ready_future<Ref<PG>>(std::move(pg)); + }); +} + +seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) +{ + logger().info("ms_dispatch {}", *m); + if (state.is_stopping()) { + return seastar::now(); + } + + switch (m->get_type()) { + case CEPH_MSG_OSD_MAP: + return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m)); + default: + return seastar::now(); + } +} + +seastar::future<> OSD::ms_handle_connect(ceph::net::ConnectionRef conn) +{ + if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) { + return seastar::now(); + } else { + return seastar::now(); + } +} + +seastar::future<> OSD::ms_handle_reset(ceph::net::ConnectionRef conn) +{ + // TODO: cleanup the session attached to this connection + logger().warn("ms_handle_reset"); + return seastar::now(); +} + +seastar::future<> OSD::ms_handle_remote_reset(ceph::net::ConnectionRef conn) +{ + logger().warn("ms_handle_remote_reset"); + return seastar::now(); +} + +OSD::cached_map_t OSD::get_map() const +{ + return osdmap; +} + +seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e) +{ + // TODO: use LRU cache for managing osdmap, fallback to disk if we have to + if (auto found = osdmaps.find(e); found) { + return seastar::make_ready_future<cached_map_t>(std::move(found)); + } else { + return load_map_bl(e).then([e, this](bufferlist bl) { + auto osdmap = std::make_unique<OSDMap>(); + osdmap->decode(bl); + return seastar::make_ready_future<cached_map_t>( + osdmaps.insert(e, std::move(osdmap))); + }); + } +} + +void OSD::store_map_bl(ceph::os::Transaction& t, + epoch_t e, bufferlist&& bl) +{ + meta_coll->store_map(t, e, bl); + map_bl_cache.insert(e, std::move(bl)); +} + +seastar::future<bufferlist> OSD::load_map_bl(epoch_t e) +{ + if (std::optional<bufferlist> found = map_bl_cache.find(e); found) { + return seastar::make_ready_future<bufferlist>(*found); + } else { + return meta_coll->load_map(e); + } +} + +seastar::future<> OSD::store_maps(ceph::os::Transaction& t, + epoch_t start, Ref<MOSDMap> m) +{ + return seastar::do_for_each(boost::counting_iterator<epoch_t>(start), + boost::counting_iterator<epoch_t>(m->get_last() + 1), + [&t, m, this](epoch_t e) { + if (auto p = m->maps.find(e); p != m->maps.end()) { + auto o = std::make_unique<OSDMap>(); + o->decode(p->second); + logger().info("store_maps osdmap.{}", e); + store_map_bl(t, e, std::move(std::move(p->second))); + osdmaps.insert(e, std::move(o)); + return seastar::now(); + } else if (auto p = m->incremental_maps.find(e); + p != m->incremental_maps.end()) { + OSDMap::Incremental inc; + auto i = p->second.cbegin(); + inc.decode(i); + return load_map_bl(e - 1) + .then([&t, e, inc=std::move(inc), this](bufferlist bl) { + auto o = std::make_unique<OSDMap>(); + o->decode(bl); + o->apply_incremental(inc); + bufferlist fbl; + o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED); + store_map_bl(t, e, std::move(fbl)); + osdmaps.insert(e, std::move(o)); + return seastar::now(); + }); + } else { + logger().error("MOSDMap lied about what maps it had?"); + return seastar::now(); + } + }); +} + +seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request) +{ + logger().info("{}({})", __func__, epoch); + if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || + force_request) { + return monc->renew_subs(); + } else { + return seastar::now(); + } +} + +seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn, + Ref<MOSDMap> m) +{ + logger().info("handle_osd_map {}", *m); + if (m->fsid != superblock.cluster_fsid) { + logger().warn("fsid mismatched"); + return seastar::now(); + } + if (state.is_initializing()) { + logger().warn("i am still initializing"); + return seastar::now(); + } + + const auto first = m->get_first(); + const auto last = m->get_last(); + + // make sure there is something new, here, before we bother flushing + // the queues and such + if (last <= superblock.newest_map) { + return seastar::now(); + } + // missing some? + bool skip_maps = false; + epoch_t start = superblock.newest_map + 1; + if (first > start) { + logger().info("handle_osd_map message skips epochs {}..{}", + start, first - 1); + if (m->oldest_map <= start) { + return osdmap_subscribe(start, false); + } + // always try to get the full range of maps--as many as we can. this + // 1- is good to have + // 2- is at present the only way to ensure that we get a *full* map as + // the first map! + if (m->oldest_map < first) { + return osdmap_subscribe(m->oldest_map - 1, true); + } + skip_maps = true; + start = first; + } + + return seastar::do_with(ceph::os::Transaction{}, + [=](auto& t) { + return store_maps(t, start, m).then([=, &t] { + // even if this map isn't from a mon, we may have satisfied our subscription + monc->sub_got("osdmap", last); + if (!superblock.oldest_map || skip_maps) { + superblock.oldest_map = first; + } + superblock.newest_map = last; + superblock.current_epoch = last; + + // note in the superblock that we were clean thru the prior epoch + if (boot_epoch && boot_epoch >= superblock.mounted) { + superblock.mounted = boot_epoch; + superblock.clean_thru = last; + } + meta_coll->store_superblock(t, superblock); + return store->do_transaction(meta_coll->collection(), std::move(t)); + }); + }).then([=] { + // TODO: write to superblock and commit the transaction + return committed_osd_maps(start, last, m); + }); +} + +seastar::future<> OSD::committed_osd_maps(version_t first, + version_t last, + Ref<MOSDMap> m) +{ + logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last); + // advance through the new maps + return seastar::parallel_for_each(boost::irange(first, last + 1), + [this](epoch_t cur) { + return get_map(cur).then([this](cached_map_t&& o) { + osdmap = std::move(o); + if (up_epoch != 0 && + osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { + up_epoch = osdmap->get_epoch(); + if (!boot_epoch) { + boot_epoch = osdmap->get_epoch(); + } + } + }); + }).then([m, this] { + if (osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && + bind_epoch < osdmap->get_up_from(whoami)) { + if (state.is_booting()) { + logger().info("osd.{}: activating...", whoami); + state.set_active(); + beacon_timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_beacon_report_interval)); + heartbeat_timer.arm_periodic( + std::chrono::seconds(TICK_INTERVAL)); + } + } + + if (state.is_active()) { + logger().info("osd.{}: now active", whoami); + if (!osdmap->exists(whoami)) { + return shutdown(); + } + if (should_restart()) { + return restart(); + } else { + return seastar::now(); + } + } else if (state.is_preboot()) { + logger().info("osd.{}: now preboot", whoami); + + if (m->get_source().is_mon()) { + return _preboot(m->oldest_map, m->newest_map); + } else { + logger().info("osd.{}: start_boot", whoami); + return start_boot(); + } + } else { + logger().info("osd.{}: now {}", whoami, state); + // XXX + return seastar::now(); + } + }); +} + +bool OSD::should_restart() const +{ + if (!osdmap->is_up(whoami)) { + logger().info("map e {} marked osd.{} down", + osdmap->get_epoch(), whoami); + return true; + } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) { + logger().error("map e {} had wrong client addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_addrs(whoami), + public_msgr->get_myaddrs()); + return true; + } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) { + logger().error("map e {} had wrong cluster addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_cluster_addrs(whoami), + cluster_msgr->get_myaddrs()); + return true; + } else { + return false; + } +} + +seastar::future<> OSD::restart() +{ + up_epoch = 0; + bind_epoch = osdmap->get_epoch(); + // TODO: promote to shutdown if being marked down for multiple times + // rebind messengers + return start_boot(); +} + +seastar::future<> OSD::shutdown() +{ + // TODO + superblock.mounted = boot_epoch; + superblock.clean_thru = osdmap->get_epoch(); + return seastar::now(); +} + +seastar::future<> OSD::send_beacon() +{ + // FIXME: min lec should be calculated from pg_stat + // and should set m->pgs + epoch_t min_last_epoch_clean = osdmap->get_epoch(); + auto m = make_message<MOSDBeacon>(osdmap->get_epoch(), + min_last_epoch_clean); + return monc->send_message(m); +} + +void OSD::update_heartbeat_peers() +{ + if (!state.is_active()) { + return; + } + for (auto& pg : pgs) { + vector<int> up, acting; + osdmap->pg_to_up_acting_osds(pg.first.pgid, + &up, nullptr, + &acting, nullptr); + for (auto osd : boost::join(up, acting)) { + if (osd != CRUSH_ITEM_NONE) { + heartbeat->add_peer(osd, osdmap->get_epoch()); + } + } + } + heartbeat->update_peers(whoami); +} diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h new file mode 100644 index 00000000..c5aff5e2 --- /dev/null +++ b/src/crimson/osd/osd.h @@ -0,0 +1,125 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <map> +#include <seastar/core/future.hh> +#include <seastar/core/gate.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/timer.hh> + +#include "crimson/common/simple_lru.h" +#include "crimson/common/shared_lru.h" +#include "crimson/mon/MonClient.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/osd/chained_dispatchers.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/osd/state.h" + +#include "osd/OSDMap.h" + +class MOSDMap; +class OSDMap; +class OSDMeta; +class PG; +class Heartbeat; + +namespace ceph::net { + class Messenger; +} + +namespace ceph::os { + class CyanStore; + struct Collection; + class Transaction; +} + +template<typename T> using Ref = boost::intrusive_ptr<T>; + +class OSD : public ceph::net::Dispatcher, + private OSDMapService { + seastar::gate gate; + const int whoami; + const uint32_t nonce; + seastar::timer<seastar::lowres_clock> beacon_timer; + // talk with osd + ceph::net::Messenger* cluster_msgr = nullptr; + // talk with client/mon/mgr + ceph::net::Messenger* public_msgr = nullptr; + ChainedDispatchers dispatchers; + std::unique_ptr<ceph::mon::Client> monc; + + std::unique_ptr<Heartbeat> heartbeat; + seastar::timer<seastar::lowres_clock> heartbeat_timer; + + SharedLRU<epoch_t, OSDMap> osdmaps; + SimpleLRU<epoch_t, bufferlist, false> map_bl_cache; + cached_map_t osdmap; + // TODO: use a wrapper for ObjectStore + std::unique_ptr<ceph::os::CyanStore> store; + std::unique_ptr<OSDMeta> meta_coll; + + std::unordered_map<spg_t, Ref<PG>> pgs; + OSDState state; + + /// _first_ epoch we were marked up (after this process started) + epoch_t boot_epoch = 0; + /// _most_recent_ epoch we were marked up + epoch_t up_epoch = 0; + //< epoch we last did a bind to new ip:ports + epoch_t bind_epoch = 0; + //< since when there is no more pending pg creates from mon + epoch_t last_pg_create_epoch = 0; + + OSDSuperblock superblock; + + // Dispatcher methods + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override; + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; + +public: + OSD(int id, uint32_t nonce); + ~OSD() override; + + seastar::future<> mkfs(uuid_d fsid); + + seastar::future<> start(); + seastar::future<> stop(); + +private: + seastar::future<> start_boot(); + seastar::future<> _preboot(version_t oldest_osdmap, version_t newest_osdmap); + seastar::future<> _send_boot(); + + seastar::future<Ref<PG>> load_pg(spg_t pgid); + seastar::future<> load_pgs(); + + // OSDMapService methods + seastar::future<cached_map_t> get_map(epoch_t e) override; + cached_map_t get_map() const override; + + seastar::future<bufferlist> load_map_bl(epoch_t e); + void store_map_bl(ceph::os::Transaction& t, + epoch_t e, bufferlist&& bl); + seastar::future<> store_maps(ceph::os::Transaction& t, + epoch_t start, Ref<MOSDMap> m); + seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); + + void write_superblock(ceph::os::Transaction& t); + seastar::future<> read_superblock(); + + seastar::future<> handle_osd_map(ceph::net::ConnectionRef conn, + Ref<MOSDMap> m); + seastar::future<> committed_osd_maps(version_t first, + version_t last, + Ref<MOSDMap> m); + bool should_restart() const; + seastar::future<> restart(); + seastar::future<> shutdown(); + + seastar::future<> send_beacon(); + void update_heartbeat_peers(); +}; diff --git a/src/crimson/osd/osd_meta.cc b/src/crimson/osd/osd_meta.cc new file mode 100644 index 00000000..6eb225fe --- /dev/null +++ b/src/crimson/osd/osd_meta.cc @@ -0,0 +1,80 @@ +#include "osd_meta.h" + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_store.h" +#include "crimson/os/Transaction.h" + +void OSDMeta::create(ceph::os::Transaction& t) +{ + t.create_collection(coll->cid, 0); +} + +void OSDMeta::store_map(ceph::os::Transaction& t, + epoch_t e, const bufferlist& m) +{ + t.write(coll->cid, osdmap_oid(e), 0, m.length(), m); +} + +seastar::future<bufferlist> OSDMeta::load_map(epoch_t e) +{ + return store->read(coll, + osdmap_oid(e), 0, 0, + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); +} + +void OSDMeta::store_superblock(ceph::os::Transaction& t, + const OSDSuperblock& superblock) +{ + bufferlist bl; + encode(superblock, bl); + t.write(coll->cid, superblock_oid(), 0, bl.length(), bl); +} + +seastar::future<OSDSuperblock> OSDMeta::load_superblock() +{ + return store->read(coll, superblock_oid(), 0, 0) + .then([this] (bufferlist&& bl) { + auto p = bl.cbegin(); + OSDSuperblock superblock; + decode(superblock, p); + return seastar::make_ready_future<OSDSuperblock>(std::move(superblock)); + }); +} + +seastar::future<pg_pool_t, + std::string, + OSDMeta::ec_profile_t> +OSDMeta::load_final_pool_info(int64_t pool) { + return store->read(coll, final_pool_info_oid(pool), + 0, 0).then([this] (bufferlist&& bl) { + auto p = bl.cbegin(); + pg_pool_t pi; + string name; + ec_profile_t ec_profile; + decode(pi, p); + decode(name, p); + decode(ec_profile, p); + return seastar::make_ready_future<pg_pool_t, + string, + ec_profile_t>(std::move(pi), + std::move(name), + std::move(ec_profile)); + }); +} + +ghobject_t OSDMeta::osdmap_oid(epoch_t epoch) +{ + string name = fmt::format("osdmap.{}", epoch); + return ghobject_t(hobject_t(sobject_t(object_t(name), 0))); +} + +ghobject_t OSDMeta::final_pool_info_oid(int64_t pool) +{ + string name = fmt::format("final_pool_{}", pool); + return ghobject_t(hobject_t(sobject_t(object_t(name), CEPH_NOSNAP))); +} + +ghobject_t OSDMeta::superblock_oid() +{ + return ghobject_t(hobject_t(sobject_t(object_t("osd_superblock"), 0))); +} diff --git a/src/crimson/osd/osd_meta.h b/src/crimson/osd/osd_meta.h new file mode 100644 index 00000000..936d9548 --- /dev/null +++ b/src/crimson/osd/osd_meta.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <map> +#include <string> +#include <seastar/core/future.hh> +#include "osd/osd_types.h" + +namespace ceph::os { + class CyanStore; + class Collection; + class Transaction; +} + +/// metadata shared across PGs, or put in another way, +/// metadata not specific to certain PGs. +class OSDMeta { + template<typename T> using Ref = boost::intrusive_ptr<T>; + + ceph::os::CyanStore* store; + Ref<ceph::os::Collection> coll; + +public: + OSDMeta(Ref<ceph::os::Collection> coll, + ceph::os::CyanStore* store) + : store{store}, coll{coll} + {} + + + auto collection() { + return coll; + } + void create(ceph::os::Transaction& t); + + void store_map(ceph::os::Transaction& t, + epoch_t e, const bufferlist& m); + seastar::future<bufferlist> load_map(epoch_t e); + + void store_superblock(ceph::os::Transaction& t, + const OSDSuperblock& sb); + seastar::future<OSDSuperblock> load_superblock(); + + using ec_profile_t = std::map<std::string, std::string>; + seastar::future<pg_pool_t, + std::string, + ec_profile_t> load_final_pool_info(int64_t pool); +private: + static ghobject_t osdmap_oid(epoch_t epoch); + static ghobject_t final_pool_info_oid(int64_t pool); + static ghobject_t superblock_oid(); +}; diff --git a/src/crimson/osd/osdmap_service.h b/src/crimson/osd/osdmap_service.h new file mode 100644 index 00000000..0a3aaed3 --- /dev/null +++ b/src/crimson/osd/osdmap_service.h @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/smart_ptr/local_shared_ptr.hpp> + +#include "include/types.h" + +class OSDMap; + +class OSDMapService { +public: + using cached_map_t = boost::local_shared_ptr<OSDMap>; + virtual ~OSDMapService() = default; + virtual seastar::future<cached_map_t> get_map(epoch_t e) = 0; + /// get the latest map + virtual cached_map_t get_map() const = 0; +}; diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc new file mode 100644 index 00000000..bc7b8c2d --- /dev/null +++ b/src/crimson/osd/pg.cc @@ -0,0 +1,6 @@ +#include "pg.h" + +PG::PG(pg_pool_t&& pool, std::string&& name, ec_profile_t&& ec_profile) +{ + // TODO +} diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h new file mode 100644 index 00000000..4bb672f4 --- /dev/null +++ b/src/crimson/osd/pg.h @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> +#include <seastar/core/future.hh> + +#include "osd/osd_types.h" + +template<typename T> using Ref = boost::intrusive_ptr<T>; + +class PG : public boost::intrusive_ref_counter< + PG, + boost::thread_unsafe_counter> +{ + using ec_profile_t = std::map<std::string,std::string>; +public: + PG(pg_pool_t&& pool, std::string&& name, ec_profile_t&& ec_profile); +}; diff --git a/src/crimson/osd/pg_meta.cc b/src/crimson/osd/pg_meta.cc new file mode 100644 index 00000000..2098e50a --- /dev/null +++ b/src/crimson/osd/pg_meta.cc @@ -0,0 +1,104 @@ +#include "pg_meta.h" + +#include <string_view> + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_store.h" + +// prefix pgmeta_oid keys with _ so that PGLog::read_log_and_missing() can +// easily skip them + +static const string_view infover_key = "_infover"sv; +static const string_view info_key = "_info"sv; +static const string_view biginfo_key = "_biginfo"sv; +static const string_view epoch_key = "_epoch"sv; +static const string_view fastinfo_key = "_fastinfo"sv; + +using ceph::os::CyanStore; + +PGMeta::PGMeta(CyanStore* store, spg_t pgid) + : store{store}, + pgid{pgid} +{} + +namespace { + template<typename T> + std::optional<T> find_value(const CyanStore::omap_values_t& values, + string_view key) + { + auto found = values.find(key); + if (found == values.end()) { + return {}; + } + auto p = found->second.cbegin(); + T value; + decode(value, p); + return std::make_optional(std::move(value)); + } +} +seastar::future<epoch_t> PGMeta::get_epoch() +{ + auto ch = store->open_collection(coll_t{pgid}); + return store->omap_get_values(ch, + pgid.make_pgmeta_oid(), + {string{infover_key}, + string{epoch_key}}).then( + [](auto&& values) { + { + // sanity check + auto infover = find_value<__u8>(values, infover_key); + assert(infover); + if (*infover < 10) { + throw std::runtime_error("incompatible pg meta"); + } + } + { + auto epoch = find_value<epoch_t>(values, epoch_key); + assert(epoch); + return seastar::make_ready_future<epoch_t>(*epoch); + } + }); +} + +seastar::future<pg_info_t, PastIntervals> PGMeta::load() +{ + auto ch = store->open_collection(coll_t{pgid}); + return store->omap_get_values(ch, + pgid.make_pgmeta_oid(), + {string{infover_key}, + string{info_key}, + string{biginfo_key}, + string{fastinfo_key}}).then( + [this](auto&& values) { + { + // sanity check + auto infover = find_value<__u8>(values, infover_key); + assert(infover); + if (infover < 10) { + throw std::runtime_error("incompatible pg meta"); + } + } + pg_info_t info; + { + auto found = find_value<pg_info_t>(values, info_key); + assert(found); + info = *std::move(found); + } + PastIntervals past_intervals; + { + using biginfo_t = std::pair<PastIntervals, decltype(info.purged_snaps)>; + auto big_info = find_value<biginfo_t>(values, biginfo_key); + assert(big_info); + past_intervals = std::move(big_info->first); + info.purged_snaps = std::move(big_info->second); + } + { + auto fast_info = find_value<pg_fast_info_t>(values, fastinfo_key); + assert(fast_info); + fast_info->try_apply_to(&info); + } + return seastar::make_ready_future<pg_info_t, PastIntervals>( + std::move(info), + std::move(past_intervals)); + }); +} diff --git a/src/crimson/osd/pg_meta.h b/src/crimson/osd/pg_meta.h new file mode 100644 index 00000000..10f2234a --- /dev/null +++ b/src/crimson/osd/pg_meta.h @@ -0,0 +1,22 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> +#include "osd/osd_types.h" + +namespace ceph::os { + class CyanStore; +} + +/// PG related metadata +class PGMeta +{ + ceph::os::CyanStore* store; + const spg_t pgid; +public: + PGMeta(ceph::os::CyanStore *store, spg_t pgid); + seastar::future<epoch_t> get_epoch(); + seastar::future<pg_info_t, PastIntervals> load(); +}; diff --git a/src/crimson/osd/state.h b/src/crimson/osd/state.h new file mode 100644 index 00000000..4c445348 --- /dev/null +++ b/src/crimson/osd/state.h @@ -0,0 +1,71 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <string_view> +#include <ostream> + +class OSDMap; + +class OSDState { + + enum class State { + INITIALIZING, + PREBOOT, + BOOTING, + ACTIVE, + STOPPING, + WAITING_FOR_HEALTHY, + }; + + State state = State::INITIALIZING; + +public: + bool is_initializing() const { + return state == State::INITIALIZING; + } + bool is_preboot() const { + return state == State::PREBOOT; + } + bool is_booting() const { + return state == State::BOOTING; + } + bool is_active() const { + return state == State::ACTIVE; + } + bool is_stopping() const { + return state == State::STOPPING; + } + bool is_waiting_for_healthy() const { + return state == State::WAITING_FOR_HEALTHY; + } + void set_preboot() { + state = State::PREBOOT; + } + void set_booting() { + state = State::BOOTING; + } + void set_active() { + state = State::ACTIVE; + } + void set_stopping() { + state = State::STOPPING; + } + std::string_view to_string() const { + switch (state) { + case State::INITIALIZING: return "initializing"; + case State::PREBOOT: return "preboot"; + case State::BOOTING: return "booting"; + case State::ACTIVE: return "active"; + case State::STOPPING: return "stopping"; + case State::WAITING_FOR_HEALTHY: return "waiting_for_healthy"; + default: return "???"; + } + } +}; + +inline std::ostream& +operator<<(std::ostream& os, const OSDState& s) { + return os << s.to_string(); +} |