diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/neorados | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/neorados')
-rw-r--r-- | src/neorados/CMakeLists.txt | 42 | ||||
-rw-r--r-- | src/neorados/RADOS.cc | 1736 | ||||
-rw-r--r-- | src/neorados/RADOSImpl.cc | 121 | ||||
-rw-r--r-- | src/neorados/RADOSImpl.h | 135 | ||||
-rw-r--r-- | src/neorados/cls/fifo.cc | 385 | ||||
-rw-r--r-- | src/neorados/cls/fifo.h | 1754 |
6 files changed, 4173 insertions, 0 deletions
diff --git a/src/neorados/CMakeLists.txt b/src/neorados/CMakeLists.txt new file mode 100644 index 000000000..8695b48f0 --- /dev/null +++ b/src/neorados/CMakeLists.txt @@ -0,0 +1,42 @@ +add_library(neorados_objs OBJECT + RADOSImpl.cc) +compile_with_fmt(neorados_objs) +add_library(neorados_api_obj OBJECT + RADOS.cc) +compile_with_fmt(neorados_api_obj) + +add_library(libneorados STATIC + $<TARGET_OBJECTS:neorados_api_obj> + $<TARGET_OBJECTS:neorados_objs>) +target_link_libraries(libneorados PRIVATE + osdc ceph-common cls_lock_client fmt::fmt + ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS}) + +# if(ENABLE_SHARED) +# add_library(libneorados ${CEPH_SHARED} +# $<TARGET_OBJECTS:neorados_api_obj> +# $<TARGET_OBJECTS:neorados_objs> +# $<TARGET_OBJECTS:common_buffer_obj>) +# set_target_properties(libneorados PROPERTIES +# OUTPUT_NAME RADOS +# VERSION 0.0.1 +# SOVERSION 1 +# CXX_VISIBILITY_PRESET hidden +# VISIBILITY_INLINES_HIDDEN ON) +# if(NOT APPLE) +# set_property(TARGET libneorados APPEND_STRING PROPERTY +# LINK_FLAGS " -Wl,--exclude-libs,ALL") +# endif() +# else(ENABLE_SHARED) +# add_library(libneorados STATIC +# $<TARGET_OBJECTS:neorados_api_obj> +# $<TARGET_OBJECTS:neorados_objs>) +# endif(ENABLE_SHARED) +# target_link_libraries(libneorados PRIVATE +# osdc ceph-common cls_lock_client +# ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS}) +# target_link_libraries(libneorados ${rados_libs}) +# install(TARGETS libneorados DESTINATION ${CMAKE_INSTALL_LIBDIR}) +add_library(neorados_cls_fifo STATIC cls/fifo.cc) +target_link_libraries(neorados_cls_fifo PRIVATE + libneorados ceph-common fmt::fmt) diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc new file mode 100644 index 000000000..927f995cc --- /dev/null +++ b/src/neorados/RADOS.cc @@ -0,0 +1,1736 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat <contact@redhat.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#define BOOST_BIND_NO_PLACEHOLDERS + +#include <optional> +#include <string_view> + +#include <boost/intrusive_ptr.hpp> + +#include <fmt/format.h> + +#include "include/ceph_fs.h" + +#include "common/ceph_context.h" +#include "common/ceph_argparse.h" +#include "common/common_init.h" +#include "common/hobject.h" +#include "common/EventTrace.h" + +#include "global/global_init.h" + +#include "osd/osd_types.h" +#include "osdc/error_code.h" + +#include "neorados/RADOSImpl.h" +#include "include/neorados/RADOS.hpp" + +namespace bc = boost::container; +namespace bs = boost::system; +namespace ca = ceph::async; +namespace cb = ceph::buffer; + +namespace neorados { +// Object + +Object::Object() { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(); +} + +Object::Object(const char* s) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(s); +} + +Object::Object(std::string_view s) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(s); +} + +Object::Object(std::string&& s) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(std::move(s)); +} + +Object::Object(const std::string& s) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(s); +} + +Object::~Object() { + reinterpret_cast<object_t*>(&impl)->~object_t(); +} + +Object::Object(const Object& o) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(*reinterpret_cast<const object_t*>(&o.impl)); +} +Object& Object::operator =(const Object& o) { + *reinterpret_cast<object_t*>(&impl) = + *reinterpret_cast<const object_t*>(&o.impl); + return *this; +} +Object::Object(Object&& o) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(std::move(*reinterpret_cast<object_t*>(&o.impl))); +} +Object& Object::operator =(Object&& o) { + *reinterpret_cast<object_t*>(&impl) = + std::move(*reinterpret_cast<object_t*>(&o.impl)); + return *this; +} + +Object::operator std::string_view() const { + return std::string_view(reinterpret_cast<const object_t*>(&impl)->name); +} + +bool operator <(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast<const object_t*>(&lhs.impl) < + *reinterpret_cast<const object_t*>(&rhs.impl)); +} +bool operator <=(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast<const object_t*>(&lhs.impl) <= + *reinterpret_cast<const object_t*>(&rhs.impl)); +} +bool operator >=(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast<const object_t*>(&lhs.impl) >= + *reinterpret_cast<const object_t*>(&rhs.impl)); +} +bool operator >(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast<const object_t*>(&lhs.impl) > + *reinterpret_cast<const object_t*>(&rhs.impl)); +} + +bool operator ==(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast<const object_t*>(&lhs.impl) == + *reinterpret_cast<const object_t*>(&rhs.impl)); +} +bool operator !=(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast<const object_t*>(&lhs.impl) != + *reinterpret_cast<const object_t*>(&rhs.impl)); +} + +std::ostream& operator <<(std::ostream& m, const Object& o) { + return (m << *reinterpret_cast<const object_t*>(&o.impl)); +} + +// IOContext + +struct IOContextImpl { + object_locator_t oloc; + snapid_t snap_seq = CEPH_NOSNAP; + SnapContext snapc; + int extra_op_flags = 0; +}; + +IOContext::IOContext() { + static_assert(impl_size >= sizeof(IOContextImpl)); + new (&impl) IOContextImpl(); +} + +IOContext::IOContext(std::int64_t _pool) : IOContext() { + pool(_pool); +} + +IOContext::IOContext(std::int64_t _pool, std::string_view _ns) + : IOContext() { + pool(_pool); + ns(_ns); +} + +IOContext::IOContext(std::int64_t _pool, std::string&& _ns) + : IOContext() { + pool(_pool); + ns(std::move(_ns)); +} + +IOContext::~IOContext() { + reinterpret_cast<IOContextImpl*>(&impl)->~IOContextImpl(); +} + +IOContext::IOContext(const IOContext& rhs) { + static_assert(impl_size >= sizeof(IOContextImpl)); + new (&impl) IOContextImpl(*reinterpret_cast<const IOContextImpl*>(&rhs.impl)); +} + +IOContext& IOContext::operator =(const IOContext& rhs) { + *reinterpret_cast<IOContextImpl*>(&impl) = + *reinterpret_cast<const IOContextImpl*>(&rhs.impl); + return *this; +} + +IOContext::IOContext(IOContext&& rhs) { + static_assert(impl_size >= sizeof(IOContextImpl)); + new (&impl) IOContextImpl( + std::move(*reinterpret_cast<IOContextImpl*>(&rhs.impl))); +} + +IOContext& IOContext::operator =(IOContext&& rhs) { + *reinterpret_cast<IOContextImpl*>(&impl) = + std::move(*reinterpret_cast<IOContextImpl*>(&rhs.impl)); + return *this; +} + +std::int64_t IOContext::pool() const { + return reinterpret_cast<const IOContextImpl*>(&impl)->oloc.pool; +} + +void IOContext::pool(std::int64_t _pool) { + reinterpret_cast<IOContextImpl*>(&impl)->oloc.pool = _pool; +} + +std::string_view IOContext::ns() const { + return reinterpret_cast<const IOContextImpl*>(&impl)->oloc.nspace; +} + +void IOContext::ns(std::string_view _ns) { + reinterpret_cast<IOContextImpl*>(&impl)->oloc.nspace = _ns; +} + +void IOContext::ns(std::string&& _ns) { + reinterpret_cast<IOContextImpl*>(&impl)->oloc.nspace = std::move(_ns); +} + +std::optional<std::string_view> IOContext::key() const { + auto& oloc = reinterpret_cast<const IOContextImpl*>(&impl)->oloc; + if (oloc.key.empty()) + return std::nullopt; + else + return std::string_view(oloc.key); +} + +void IOContext::key(std::string_view _key) { + auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc; + oloc.hash = -1; + oloc.key = _key; +} + +void IOContext::key(std::string&&_key) { + auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc; + oloc.hash = -1; + oloc.key = std::move(_key); +} + +void IOContext::clear_key() { + auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc; + oloc.hash = -1; + oloc.key.clear(); +} + +std::optional<std::int64_t> IOContext::hash() const { + auto& oloc = reinterpret_cast<const IOContextImpl*>(&impl)->oloc; + if (oloc.hash < 0) + return std::nullopt; + else + return oloc.hash; +} + +void IOContext::hash(std::int64_t _hash) { + auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc; + oloc.hash = _hash; + oloc.key.clear(); +} + +void IOContext::clear_hash() { + auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc; + oloc.hash = -1; + oloc.key.clear(); +} + + +std::optional<std::uint64_t> IOContext::read_snap() const { + auto& snap_seq = reinterpret_cast<const IOContextImpl*>(&impl)->snap_seq; + if (snap_seq == CEPH_NOSNAP) + return std::nullopt; + else + return snap_seq; +} +void IOContext::read_snap(std::optional<std::uint64_t> _snapid) { + auto& snap_seq = reinterpret_cast<IOContextImpl*>(&impl)->snap_seq; + snap_seq = _snapid.value_or(CEPH_NOSNAP); +} + +std::optional< + std::pair<std::uint64_t, + std::vector<std::uint64_t>>> IOContext::write_snap_context() const { + auto& snapc = reinterpret_cast<const IOContextImpl*>(&impl)->snapc; + if (snapc.empty()) { + return std::nullopt; + } else { + std::vector<uint64_t> v(snapc.snaps.begin(), snapc.snaps.end()); + return std::make_optional(std::make_pair(uint64_t(snapc.seq), v)); + } +} + +void IOContext::write_snap_context( + std::optional<std::pair<std::uint64_t, std::vector<std::uint64_t>>> _snapc) { + auto& snapc = reinterpret_cast<IOContextImpl*>(&impl)->snapc; + if (!_snapc) { + snapc.clear(); + } else { + SnapContext n(_snapc->first, { _snapc->second.begin(), _snapc->second.end()}); + if (!n.is_valid()) { + throw bs::system_error(EINVAL, + bs::system_category(), + "Invalid snap context."); + + } else { + snapc = n; + } + } +} + +bool IOContext::full_try() const { + const auto ioc = reinterpret_cast<const IOContextImpl*>(&impl); + return (ioc->extra_op_flags & CEPH_OSD_FLAG_FULL_TRY) != 0; +} + +void IOContext::full_try(bool _full_try) { + auto ioc = reinterpret_cast<IOContextImpl*>(&impl); + if (_full_try) { + ioc->extra_op_flags |= CEPH_OSD_FLAG_FULL_TRY; + } else { + ioc->extra_op_flags &= ~CEPH_OSD_FLAG_FULL_TRY; + } +} + +bool operator <(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl); + const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) < + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator <=(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl); + const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) <= + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator >=(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl); + const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) >= + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator >(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl); + const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) > + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator ==(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl); + const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) == + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator !=(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl); + const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) != + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +std::ostream& operator <<(std::ostream& m, const IOContext& o) { + const auto l = reinterpret_cast<const IOContextImpl*>(&o.impl); + return (m << l->oloc.pool << ":" << l->oloc.nspace << ":" << l->oloc.key); +} + + +// Op + +struct OpImpl { + ObjectOperation op; + std::optional<ceph::real_time> mtime; + + OpImpl() = default; + + OpImpl(const OpImpl& rhs) = delete; + OpImpl(OpImpl&& rhs) = default; + + OpImpl& operator =(const OpImpl& rhs) = delete; + OpImpl& operator =(OpImpl&& rhs) = default; +}; + +Op::Op() { + static_assert(Op::impl_size >= sizeof(OpImpl)); + new (&impl) OpImpl; +} + +Op::Op(Op&& rhs) { + new (&impl) OpImpl(std::move(*reinterpret_cast<OpImpl*>(&rhs.impl))); +} +Op& Op::operator =(Op&& rhs) { + reinterpret_cast<OpImpl*>(&impl)->~OpImpl(); + new (&impl) OpImpl(std::move(*reinterpret_cast<OpImpl*>(&rhs.impl))); + return *this; +} +Op::~Op() { + reinterpret_cast<OpImpl*>(&impl)->~OpImpl(); +} + +void Op::set_excl() { + reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(CEPH_OSD_OP_FLAG_EXCL); +} +void Op::set_failok() { + reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FAILOK); +} +void Op::set_fadvise_random() { + reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_RANDOM); +} +void Op::set_fadvise_sequential() { + reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); +} +void Op::set_fadvise_willneed() { + reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); +} +void Op::set_fadvise_dontneed() { + reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); +} +void Op::set_fadvise_nocache() { + reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_NOCACHE); +} + +void Op::cmpext(uint64_t off, bufferlist&& cmp_bl, std::size_t* s) { + reinterpret_cast<OpImpl*>(&impl)->op.cmpext(off, std::move(cmp_bl), nullptr, + s); +} +void Op::cmpxattr(std::string_view name, cmpxattr_op op, const bufferlist& val) { + reinterpret_cast<OpImpl*>(&impl)-> + op.cmpxattr(name, std::uint8_t(op), CEPH_OSD_CMPXATTR_MODE_STRING, val); +} +void Op::cmpxattr(std::string_view name, cmpxattr_op op, std::uint64_t val) { + bufferlist bl; + encode(val, bl); + reinterpret_cast<OpImpl*>(&impl)-> + op.cmpxattr(name, std::uint8_t(op), CEPH_OSD_CMPXATTR_MODE_U64, bl); +} + +void Op::assert_version(uint64_t ver) { + reinterpret_cast<OpImpl*>(&impl)->op.assert_version(ver); +} +void Op::assert_exists() { + reinterpret_cast<OpImpl*>(&impl)->op.stat( + nullptr, + static_cast<ceph::real_time*>(nullptr), + static_cast<bs::error_code*>(nullptr)); +} +void Op::cmp_omap(const bc::flat_map< + std::string, std::pair<cb::list, + int>>& assertions) { + reinterpret_cast<OpImpl*>(&impl)->op.omap_cmp(assertions, nullptr); +} + +void Op::exec(std::string_view cls, std::string_view method, + const bufferlist& inbl, + cb::list* out, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, ec, out); +} + +void Op::exec(std::string_view cls, std::string_view method, + const bufferlist& inbl, + fu2::unique_function<void(bs::error_code, + const cb::list&) &&> f) { + reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, std::move(f)); +} + +void Op::exec(std::string_view cls, std::string_view method, + const bufferlist& inbl, + fu2::unique_function<void(bs::error_code, int, + const cb::list&) &&> f) { + reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, std::move(f)); +} + +void Op::exec(std::string_view cls, std::string_view method, + const bufferlist& inbl, bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, ec); +} + +void Op::balance_reads() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_BALANCE_READS; +} +void Op::localize_reads() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_LOCALIZE_READS; +} +void Op::order_reads_writes() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_RWORDERED; +} +void Op::ignore_cache() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_CACHE; +} +void Op::skiprwlocks() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_SKIPRWLOCKS; +} +void Op::ignore_overlay() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY; +} +void Op::full_try() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_FULL_TRY; +} +void Op::full_force() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_FULL_FORCE; +} +void Op::ignore_redirect() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_REDIRECT; +} +void Op::ordersnap() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_ORDERSNAP; +} +void Op::returnvec() { + reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_RETURNVEC; +} + +std::size_t Op::size() const { + return reinterpret_cast<const OpImpl*>(&impl)->op.size(); +} + +std::ostream& operator <<(std::ostream& m, const Op& o) { + return m << reinterpret_cast<const OpImpl*>(&o.impl)->op; +} + + +// --- + +// ReadOp / WriteOp + +void ReadOp::read(size_t off, uint64_t len, cb::list* out, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.read(off, len, ec, out); +} + +void ReadOp::get_xattr(std::string_view name, cb::list* out, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.getxattr(name, ec, out); +} + +void ReadOp::get_omap_header(cb::list* out, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.omap_get_header(ec, out); +} + +void ReadOp::sparse_read(uint64_t off, uint64_t len, cb::list* out, + std::vector<std::pair<std::uint64_t, + std::uint64_t>>* extents, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.sparse_read(off, len, ec, extents, out); +} + +void ReadOp::stat(std::uint64_t* size, ceph::real_time* mtime, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.stat(size, mtime, ec); +} + +void ReadOp::get_omap_keys(std::optional<std::string_view> start_after, + std::uint64_t max_return, + bc::flat_set<std::string>* keys, + bool* done, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.omap_get_keys(start_after, max_return, + ec, keys, done); +} + +void ReadOp::get_xattrs(bc::flat_map<std::string, + cb::list>* kv, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.getxattrs(ec, kv); +} + +void ReadOp::get_omap_vals(std::optional<std::string_view> start_after, + std::optional<std::string_view> filter_prefix, + uint64_t max_return, + bc::flat_map<std::string, + cb::list>* kv, + bool* done, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.omap_get_vals(start_after, filter_prefix, + max_return, ec, kv, done); +} + +void ReadOp::get_omap_vals_by_keys( + const bc::flat_set<std::string>& keys, + bc::flat_map<std::string, cb::list>* kv, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.omap_get_vals_by_keys(keys, ec, kv); +} + +void ReadOp::list_watchers(std::vector<ObjWatcher>* watchers, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)-> op.list_watchers(watchers, ec); +} + +void ReadOp::list_snaps(SnapSet* snaps, + bs::error_code* ec) { + reinterpret_cast<OpImpl*>(&impl)->op.list_snaps(snaps, nullptr, ec); +} + +// WriteOp + +void WriteOp::set_mtime(ceph::real_time t) { + auto o = reinterpret_cast<OpImpl*>(&impl); + o->mtime = t; +} + +void WriteOp::create(bool exclusive) { + reinterpret_cast<OpImpl*>(&impl)->op.create(exclusive); +} + +void WriteOp::write(uint64_t off, bufferlist&& bl) { + reinterpret_cast<OpImpl*>(&impl)->op.write(off, bl); +} + +void WriteOp::write_full(bufferlist&& bl) { + reinterpret_cast<OpImpl*>(&impl)->op.write_full(bl); +} + +void WriteOp::writesame(uint64_t off, uint64_t write_len, bufferlist&& bl) { + reinterpret_cast<OpImpl*>(&impl)->op.writesame(off, write_len, bl); +} + +void WriteOp::append(bufferlist&& bl) { + reinterpret_cast<OpImpl*>(&impl)->op.append(bl); +} + +void WriteOp::remove() { + reinterpret_cast<OpImpl*>(&impl)->op.remove(); +} + +void WriteOp::truncate(uint64_t off) { + reinterpret_cast<OpImpl*>(&impl)->op.truncate(off); +} + +void WriteOp::zero(uint64_t off, uint64_t len) { + reinterpret_cast<OpImpl*>(&impl)->op.zero(off, len); +} + +void WriteOp::rmxattr(std::string_view name) { + reinterpret_cast<OpImpl*>(&impl)->op.rmxattr(name); +} + +void WriteOp::setxattr(std::string_view name, + bufferlist&& bl) { + reinterpret_cast<OpImpl*>(&impl)->op.setxattr(name, bl); +} + +void WriteOp::rollback(uint64_t snapid) { + reinterpret_cast<OpImpl*>(&impl)->op.rollback(snapid); +} + +void WriteOp::set_omap( + const bc::flat_map<std::string, cb::list>& map) { + reinterpret_cast<OpImpl*>(&impl)->op.omap_set(map); +} + +void WriteOp::set_omap_header(bufferlist&& bl) { + reinterpret_cast<OpImpl*>(&impl)->op.omap_set_header(bl); +} + +void WriteOp::clear_omap() { + reinterpret_cast<OpImpl*>(&impl)->op.omap_clear(); +} + +void WriteOp::rm_omap_keys( + const bc::flat_set<std::string>& to_rm) { + reinterpret_cast<OpImpl*>(&impl)->op.omap_rm_keys(to_rm); +} + +void WriteOp::set_alloc_hint(uint64_t expected_object_size, + uint64_t expected_write_size, + alloc_hint::alloc_hint_t flags) { + using namespace alloc_hint; + static_assert(sequential_write == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE)); + static_assert(random_write == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_WRITE)); + static_assert(sequential_read == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ)); + static_assert(random_read == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ)); + static_assert(append_only == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY)); + static_assert(immutable == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE)); + static_assert(shortlived == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_SHORTLIVED)); + static_assert(longlived == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_LONGLIVED)); + static_assert(compressible == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE)); + static_assert(incompressible == + static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE)); + + reinterpret_cast<OpImpl*>(&impl)->op.set_alloc_hint(expected_object_size, + expected_write_size, + flags); +} + +// RADOS + +RADOS::Builder& RADOS::Builder::add_conf_file(std::string_view f) { + if (conf_files) + *conf_files += (", " + std::string(f)); + else + conf_files = std::string(f); + return *this; +} + +void RADOS::Builder::build(boost::asio::io_context& ioctx, + std::unique_ptr<BuildComp> c) { + constexpr auto env = CODE_ENVIRONMENT_LIBRARY; + CephInitParameters ci(env); + if (name) + ci.name.set(CEPH_ENTITY_TYPE_CLIENT, *name); + else + ci.name.set(CEPH_ENTITY_TYPE_CLIENT, "admin"); + uint32_t flags = 0; + if (no_default_conf) + flags |= CINIT_FLAG_NO_DEFAULT_CONFIG_FILE; + if (no_mon_conf) + flags |= CINIT_FLAG_NO_MON_CONFIG; + + CephContext *cct = common_preinit(ci, env, flags); + if (cluster) + cct->_conf->cluster = *cluster; + + if (no_mon_conf) + cct->_conf->no_mon_config = true; + + // TODO: Come up with proper error codes here. Maybe augment the + // functions with a default bs::error_code* parameter to + // pass back. + { + std::ostringstream ss; + auto r = cct->_conf.parse_config_files(conf_files ? conf_files->data() : nullptr, + &ss, flags); + if (r < 0) + c->post(std::move(c), ceph::to_error_code(r), RADOS{nullptr}); + } + + cct->_conf.parse_env(cct->get_module_type()); + + for (const auto& [n, v] : configs) { + std::stringstream ss; + auto r = cct->_conf.set_val(n, v, &ss); + if (r < 0) + c->post(std::move(c), ceph::to_error_code(-EINVAL), RADOS{nullptr}); + } + + if (!no_mon_conf) { + MonClient mc_bootstrap(cct, ioctx); + // TODO This function should return an error code. + auto err = mc_bootstrap.get_monmap_and_config(); + if (err < 0) + c->post(std::move(c), ceph::to_error_code(err), RADOS{nullptr}); + } + if (!cct->_log->is_started()) { + cct->_log->start(); + } + common_init_finish(cct); + + RADOS::make_with_cct(cct, ioctx, std::move(c)); +} + +void RADOS::make_with_cct(CephContext* cct, + boost::asio::io_context& ioctx, + std::unique_ptr<BuildComp> c) { + try { + auto r = new detail::NeoClient{std::make_unique<detail::RADOS>(ioctx, cct)}; + r->objecter->wait_for_osd_map( + [c = std::move(c), r = std::unique_ptr<detail::Client>(r)]() mutable { + c->dispatch(std::move(c), bs::error_code{}, + RADOS{std::move(r)}); + }); + } catch (const bs::system_error& err) { + c->post(std::move(c), err.code(), RADOS{nullptr}); + } +} + +RADOS RADOS::make_with_librados(librados::Rados& rados) { + return RADOS{std::make_unique<detail::RadosClient>(rados.client)}; +} + +RADOS::RADOS() = default; + +RADOS::RADOS(std::unique_ptr<detail::Client> impl) + : impl(std::move(impl)) {} + +RADOS::RADOS(RADOS&&) = default; +RADOS& RADOS::operator =(RADOS&&) = default; + +RADOS::~RADOS() = default; + +RADOS::executor_type RADOS::get_executor() const { + return impl->ioctx.get_executor(); +} + +boost::asio::io_context& RADOS::get_io_context() { + return impl->ioctx; +} + +void RADOS::execute(const Object& o, const IOContext& _ioc, ReadOp&& _op, + cb::list* bl, + std::unique_ptr<ReadOp::Completion> c, version_t* objver, + const blkin_trace_info *trace_info) { + auto oid = reinterpret_cast<const object_t*>(&o.impl); + auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl); + auto op = reinterpret_cast<OpImpl*>(&_op.impl); + auto flags = op->op.flags | ioc->extra_op_flags; + + ZTracer::Trace trace; + if (trace_info) { + ZTracer::Trace parent_trace("", nullptr, trace_info); + trace.init("rados execute", &impl->objecter->trace_endpoint, &parent_trace); + } + + trace.event("init"); + impl->objecter->read( + *oid, ioc->oloc, std::move(op->op), ioc->snap_seq, bl, flags, + std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace); + + trace.event("submitted"); +} + +void RADOS::execute(const Object& o, const IOContext& _ioc, WriteOp&& _op, + std::unique_ptr<WriteOp::Completion> c, version_t* objver, + const blkin_trace_info *trace_info) { + auto oid = reinterpret_cast<const object_t*>(&o.impl); + auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl); + auto op = reinterpret_cast<OpImpl*>(&_op.impl); + auto flags = op->op.flags | ioc->extra_op_flags; + ceph::real_time mtime; + if (op->mtime) + mtime = *op->mtime; + else + mtime = ceph::real_clock::now(); + + ZTracer::Trace trace; + if (trace_info) { + ZTracer::Trace parent_trace("", nullptr, trace_info); + trace.init("rados execute", &impl->objecter->trace_endpoint, &parent_trace); + } + + trace.event("init"); + impl->objecter->mutate( + *oid, ioc->oloc, std::move(op->op), ioc->snapc, + mtime, flags, + std::move(c), objver, osd_reqid_t{}, &trace); + trace.event("submitted"); +} + +void RADOS::execute(const Object& o, std::int64_t pool, ReadOp&& _op, + cb::list* bl, + std::unique_ptr<ReadOp::Completion> c, + std::optional<std::string_view> ns, + std::optional<std::string_view> key, + version_t* objver) { + auto oid = reinterpret_cast<const object_t*>(&o.impl); + auto op = reinterpret_cast<OpImpl*>(&_op.impl); + auto flags = op->op.flags; + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + impl->objecter->read( + *oid, oloc, std::move(op->op), CEPH_NOSNAP, bl, flags, + std::move(c), objver); +} + +void RADOS::execute(const Object& o, std::int64_t pool, WriteOp&& _op, + std::unique_ptr<WriteOp::Completion> c, + std::optional<std::string_view> ns, + std::optional<std::string_view> key, + version_t* objver) { + auto oid = reinterpret_cast<const object_t*>(&o.impl); + auto op = reinterpret_cast<OpImpl*>(&_op.impl); + auto flags = op->op.flags; + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + ceph::real_time mtime; + if (op->mtime) + mtime = *op->mtime; + else + mtime = ceph::real_clock::now(); + + impl->objecter->mutate( + *oid, oloc, std::move(op->op), {}, + mtime, flags, + std::move(c), objver); +} + +boost::uuids::uuid RADOS::get_fsid() const noexcept { + return impl->monclient.get_fsid().uuid; +} + + +void RADOS::lookup_pool(std::string_view name, + std::unique_ptr<LookupPoolComp> c) +{ + // I kind of want to make lookup_pg_pool return + // std::optional<int64_t> since it can only return one error code. + int64_t ret = impl->objecter->with_osdmap( + std::mem_fn(&OSDMap::lookup_pg_pool_name), + name); + if (ret < 0) { + impl->objecter->wait_for_latest_osdmap( + [name = std::string(name), c = std::move(c), + objecter = impl->objecter] + (bs::error_code ec) mutable { + int64_t ret = + objecter->with_osdmap([&](const OSDMap &osdmap) { + return osdmap.lookup_pg_pool_name(name); + }); + if (ret < 0) + ca::dispatch(std::move(c), osdc_errc::pool_dne, + std::int64_t(0)); + else + ca::dispatch(std::move(c), bs::error_code{}, ret); + }); + } else if (ret < 0) { + ca::post(std::move(c), osdc_errc::pool_dne, + std::int64_t(0)); + } else { + ca::post(std::move(c), bs::error_code{}, ret); + } +} + + +std::optional<uint64_t> RADOS::get_pool_alignment(int64_t pool_id) +{ + return impl->objecter->with_osdmap( + [pool_id](const OSDMap &o) -> std::optional<uint64_t> { + if (!o.have_pg_pool(pool_id)) { + throw bs::system_error( + ENOENT, bs::system_category(), + "Cannot find pool in OSDMap."); + } else if (o.get_pg_pool(pool_id)->requires_aligned_append()) { + return o.get_pg_pool(pool_id)->required_alignment(); + } else { + return std::nullopt; + } + }); +} + +void RADOS::list_pools(std::unique_ptr<LSPoolsComp> c) { + impl->objecter->with_osdmap( + [&](OSDMap& o) { + std::vector<std::pair<std::int64_t, std::string>> v; + for (auto p : o.get_pools()) + v.push_back(std::make_pair(p.first, o.get_pool_name(p.first))); + ca::dispatch(std::move(c), std::move(v)); + }); +} + +void RADOS::create_pool_snap(std::int64_t pool, + std::string_view snapName, + std::unique_ptr<SimpleOpComp> c) +{ + impl->objecter->create_pool_snap( + pool, snapName, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::allocate_selfmanaged_snap(int64_t pool, + std::unique_ptr<SMSnapComp> c) { + impl->objecter->allocate_selfmanaged_snap( + pool, + ca::Completion<void(bs::error_code, snapid_t)>::create( + get_executor(), + [c = std::move(c)](bs::error_code e, snapid_t snap) mutable { + ca::dispatch(std::move(c), e, snap); + })); +} + +void RADOS::delete_pool_snap(std::int64_t pool, + std::string_view snapName, + std::unique_ptr<SimpleOpComp> c) +{ + impl->objecter->delete_pool_snap( + pool, snapName, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::delete_selfmanaged_snap(std::int64_t pool, + std::uint64_t snap, + std::unique_ptr<SimpleOpComp> c) +{ + impl->objecter->delete_selfmanaged_snap( + pool, snap, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::create_pool(std::string_view name, + std::optional<int> crush_rule, + std::unique_ptr<SimpleOpComp> c) +{ + impl->objecter->create_pool( + name, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + }), + crush_rule.value_or(-1)); +} + +void RADOS::delete_pool(std::string_view name, + std::unique_ptr<SimpleOpComp> c) +{ + impl->objecter->delete_pool( + name, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::delete_pool(std::int64_t pool, + std::unique_ptr<SimpleOpComp> c) +{ + impl->objecter->delete_pool( + pool, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::stat_pools(const std::vector<std::string>& pools, + std::unique_ptr<PoolStatComp> c) { + impl->objecter->get_pool_stats( + pools, + [c = std::move(c)] + (bs::error_code ec, + bc::flat_map<std::string, pool_stat_t> rawresult, + bool per_pool) mutable { + bc::flat_map<std::string, PoolStats> result; + for (auto p = rawresult.begin(); p != rawresult.end(); ++p) { + auto& pv = result[p->first]; + auto& pstat = p->second; + store_statfs_t &statfs = pstat.store_stats; + uint64_t allocated_bytes = pstat.get_allocated_data_bytes(per_pool) + + pstat.get_allocated_omap_bytes(per_pool); + // FIXME: raw_used_rate is unknown hence use 1.0 here + // meaning we keep net amount aggregated over all replicas + // Not a big deal so far since this field isn't exposed + uint64_t user_bytes = pstat.get_user_data_bytes(1.0, per_pool) + + pstat.get_user_omap_bytes(1.0, per_pool); + + object_stat_sum_t *sum = &p->second.stats.sum; + pv.num_kb = shift_round_up(allocated_bytes, 10); + pv.num_bytes = allocated_bytes; + pv.num_objects = sum->num_objects; + pv.num_object_clones = sum->num_object_clones; + pv.num_object_copies = sum->num_object_copies; + pv.num_objects_missing_on_primary = sum->num_objects_missing_on_primary; + pv.num_objects_unfound = sum->num_objects_unfound; + pv.num_objects_degraded = sum->num_objects_degraded; + pv.num_rd = sum->num_rd; + pv.num_rd_kb = sum->num_rd_kb; + pv.num_wr = sum->num_wr; + pv.num_wr_kb = sum->num_wr_kb; + pv.num_user_bytes = user_bytes; + pv.compressed_bytes_orig = statfs.data_compressed_original; + pv.compressed_bytes = statfs.data_compressed; + pv.compressed_bytes_alloc = statfs.data_compressed_allocated; + } + + ca::dispatch(std::move(c), ec, std::move(result), per_pool); + }); +} + +void RADOS::stat_fs(std::optional<std::int64_t> _pool, + std::unique_ptr<StatFSComp> c) { + boost::optional<int64_t> pool; + if (_pool) + pool = *pool; + impl->objecter->get_fs_stats( + pool, + [c = std::move(c)](bs::error_code ec, const struct ceph_statfs s) mutable { + FSStats fso{s.kb, s.kb_used, s.kb_avail, s.num_objects}; + c->dispatch(std::move(c), ec, std::move(fso)); + }); +} + +// --- Watch/Notify + +void RADOS::watch(const Object& o, const IOContext& _ioc, + std::optional<std::chrono::seconds> timeout, WatchCB&& cb, + std::unique_ptr<WatchComp> c) { + auto oid = reinterpret_cast<const object_t*>(&o.impl); + auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl); + + ObjectOperation op; + + auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc, + ioc->extra_op_flags); + uint64_t cookie = linger_op->get_cookie(); + linger_op->handle = std::move(cb); + op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count()); + bufferlist bl; + impl->objecter->linger_watch( + linger_op, op, ioc->snapc, ceph::real_clock::now(), bl, + Objecter::LingerOp::OpComp::create( + get_executor(), + [c = std::move(c), cookie](bs::error_code e, cb::list) mutable { + ca::dispatch(std::move(c), e, cookie); + }), nullptr); +} + +void RADOS::watch(const Object& o, std::int64_t pool, + std::optional<std::chrono::seconds> timeout, WatchCB&& cb, + std::unique_ptr<WatchComp> c, + std::optional<std::string_view> ns, + std::optional<std::string_view> key) { + auto oid = reinterpret_cast<const object_t*>(&o.impl); + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + ObjectOperation op; + + Objecter::LingerOp *linger_op = impl->objecter->linger_register(*oid, oloc, 0); + uint64_t cookie = linger_op->get_cookie(); + linger_op->handle = std::move(cb); + op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count()); + bufferlist bl; + impl->objecter->linger_watch( + linger_op, op, {}, ceph::real_clock::now(), bl, + Objecter::LingerOp::OpComp::create( + get_executor(), + [c = std::move(c), cookie](bs::error_code e, bufferlist) mutable { + ca::dispatch(std::move(c), e, cookie); + }), nullptr); +} + +void RADOS::notify_ack(const Object& o, + const IOContext& _ioc, + uint64_t notify_id, + uint64_t cookie, + bufferlist&& bl, + std::unique_ptr<SimpleOpComp> c) +{ + auto oid = reinterpret_cast<const object_t*>(&o.impl); + auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl); + + ObjectOperation op; + op.notify_ack(notify_id, cookie, bl); + + impl->objecter->read(*oid, ioc->oloc, std::move(op), ioc->snap_seq, + nullptr, ioc->extra_op_flags, std::move(c)); +} + +void RADOS::notify_ack(const Object& o, + std::int64_t pool, + uint64_t notify_id, + uint64_t cookie, + bufferlist&& bl, + std::unique_ptr<SimpleOpComp> c, + std::optional<std::string_view> ns, + std::optional<std::string_view> key) { + auto oid = reinterpret_cast<const object_t*>(&o.impl); + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + ObjectOperation op; + op.notify_ack(notify_id, cookie, bl); + impl->objecter->read(*oid, oloc, std::move(op), CEPH_NOSNAP, nullptr, 0, + std::move(c)); +} + +tl::expected<ceph::timespan, bs::error_code> RADOS::watch_check(uint64_t cookie) +{ + Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie); + return impl->objecter->linger_check(linger_op); +} + +void RADOS::unwatch(uint64_t cookie, const IOContext& _ioc, + std::unique_ptr<SimpleOpComp> c) +{ + auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl); + + Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie); + + ObjectOperation op; + op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); + impl->objecter->mutate(linger_op->target.base_oid, ioc->oloc, std::move(op), + ioc->snapc, ceph::real_clock::now(), ioc->extra_op_flags, + Objecter::Op::OpComp::create( + get_executor(), + [objecter = impl->objecter, + linger_op, c = std::move(c)] + (bs::error_code ec) mutable { + objecter->linger_cancel(linger_op); + ca::dispatch(std::move(c), ec); + })); +} + +void RADOS::unwatch(uint64_t cookie, std::int64_t pool, + std::unique_ptr<SimpleOpComp> c, + std::optional<std::string_view> ns, + std::optional<std::string_view> key) +{ + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie); + + ObjectOperation op; + op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); + impl->objecter->mutate(linger_op->target.base_oid, oloc, std::move(op), + {}, ceph::real_clock::now(), 0, + Objecter::Op::OpComp::create( + get_executor(), + [objecter = impl->objecter, + linger_op, c = std::move(c)] + (bs::error_code ec) mutable { + objecter->linger_cancel(linger_op); + ca::dispatch(std::move(c), ec); + })); +} + +void RADOS::flush_watch(std::unique_ptr<VoidOpComp> c) +{ + impl->objecter->linger_callback_flush([c = std::move(c)]() mutable { + ca::post(std::move(c)); + }); +} + +struct NotifyHandler : std::enable_shared_from_this<NotifyHandler> { + boost::asio::io_context& ioc; + boost::asio::io_context::strand strand; + Objecter* objecter; + Objecter::LingerOp* op; + std::unique_ptr<RADOS::NotifyComp> c; + + bool acked = false; + bool finished = false; + bs::error_code res; + bufferlist rbl; + + NotifyHandler(boost::asio::io_context& ioc, + Objecter* objecter, + Objecter::LingerOp* op, + std::unique_ptr<RADOS::NotifyComp> c) + : ioc(ioc), strand(ioc), objecter(objecter), op(op), c(std::move(c)) {} + + // Use bind or a lambda to pass this in. + void handle_ack(bs::error_code ec, + bufferlist&&) { + boost::asio::post( + strand, + [this, ec, p = shared_from_this()]() mutable { + acked = true; + maybe_cleanup(ec); + }); + } + + // Notify finish callback. It can actually own the object's storage. + + void operator()(bs::error_code ec, + bufferlist&& bl) { + boost::asio::post( + strand, + [this, ec, p = shared_from_this()]() mutable { + finished = true; + maybe_cleanup(ec); + }); + } + + // Should be called from strand. + void maybe_cleanup(bs::error_code ec) { + if (!res && ec) + res = ec; + if ((acked && finished) || res) { + objecter->linger_cancel(op); + ceph_assert(c); + ca::dispatch(std::move(c), res, std::move(rbl)); + } + } +}; + +void RADOS::notify(const Object& o, const IOContext& _ioc, bufferlist&& bl, + std::optional<std::chrono::milliseconds> timeout, + std::unique_ptr<NotifyComp> c) +{ + auto oid = reinterpret_cast<const object_t*>(&o.impl); + auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl); + auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc, + ioc->extra_op_flags); + + auto cb = std::make_shared<NotifyHandler>(impl->ioctx, impl->objecter, + linger_op, std::move(c)); + linger_op->on_notify_finish = + Objecter::LingerOp::OpComp::create( + get_executor(), + [cb](bs::error_code ec, ceph::bufferlist bl) mutable { + (*cb)(ec, std::move(bl)); + }); + ObjectOperation rd; + bufferlist inbl; + rd.notify( + linger_op->get_cookie(), 1, + timeout ? timeout->count() : impl->cct->_conf->client_notify_timeout, + bl, &inbl); + + impl->objecter->linger_notify( + linger_op, rd, ioc->snap_seq, inbl, + Objecter::LingerOp::OpComp::create( + get_executor(), + [cb](bs::error_code ec, ceph::bufferlist bl) mutable { + cb->handle_ack(ec, std::move(bl)); + }), nullptr); +} + +void RADOS::notify(const Object& o, std::int64_t pool, bufferlist&& bl, + std::optional<std::chrono::milliseconds> timeout, + std::unique_ptr<NotifyComp> c, + std::optional<std::string_view> ns, + std::optional<std::string_view> key) +{ + auto oid = reinterpret_cast<const object_t*>(&o.impl); + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + auto linger_op = impl->objecter->linger_register(*oid, oloc, 0); + + auto cb = std::make_shared<NotifyHandler>(impl->ioctx, impl->objecter, + linger_op, std::move(c)); + linger_op->on_notify_finish = + Objecter::LingerOp::OpComp::create( + get_executor(), + [cb](bs::error_code ec, ceph::bufferlist&& bl) mutable { + (*cb)(ec, std::move(bl)); + }); + ObjectOperation rd; + bufferlist inbl; + rd.notify( + linger_op->get_cookie(), 1, + timeout ? timeout->count() : impl->cct->_conf->client_notify_timeout, + bl, &inbl); + + impl->objecter->linger_notify( + linger_op, rd, CEPH_NOSNAP, inbl, + Objecter::LingerOp::OpComp::create( + get_executor(), + [cb](bs::error_code ec, bufferlist&& bl) mutable { + cb->handle_ack(ec, std::move(bl)); + }), nullptr); +} + +// Enumeration + +Cursor::Cursor() { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(); +}; + +Cursor::Cursor(end_magic_t) { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(hobject_t::get_max()); +} + +Cursor::Cursor(void* p) { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(std::move(*reinterpret_cast<hobject_t*>(p))); +} + +Cursor Cursor::begin() { + Cursor e; + return e; +} + +Cursor Cursor::end() { + Cursor e(end_magic_t{}); + return e; +} + +Cursor::Cursor(const Cursor& rhs) { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(*reinterpret_cast<const hobject_t*>(&rhs.impl)); +} + +Cursor& Cursor::operator =(const Cursor& rhs) { + static_assert(impl_size >= sizeof(hobject_t)); + reinterpret_cast<hobject_t*>(&impl)->~hobject_t(); + new (&impl) hobject_t(*reinterpret_cast<const hobject_t*>(&rhs.impl)); + return *this; +} + +Cursor::Cursor(Cursor&& rhs) { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(std::move(*reinterpret_cast<hobject_t*>(&rhs.impl))); +} + +Cursor& Cursor::operator =(Cursor&& rhs) { + static_assert(impl_size >= sizeof(hobject_t)); + reinterpret_cast<hobject_t*>(&impl)->~hobject_t(); + new (&impl) hobject_t(std::move(*reinterpret_cast<hobject_t*>(&rhs.impl))); + return *this; +} +Cursor::~Cursor() { + reinterpret_cast<hobject_t*>(&impl)->~hobject_t(); +} + +bool operator ==(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast<const hobject_t*>(&lhs.impl) == + *reinterpret_cast<const hobject_t*>(&rhs.impl)); +} + +bool operator !=(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast<const hobject_t*>(&lhs.impl) != + *reinterpret_cast<const hobject_t*>(&rhs.impl)); +} + +bool operator <(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast<const hobject_t*>(&lhs.impl) < + *reinterpret_cast<const hobject_t*>(&rhs.impl)); +} + +bool operator <=(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast<const hobject_t*>(&lhs.impl) <= + *reinterpret_cast<const hobject_t*>(&rhs.impl)); +} + +bool operator >=(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast<const hobject_t*>(&lhs.impl) >= + *reinterpret_cast<const hobject_t*>(&rhs.impl)); +} + +bool operator >(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast<const hobject_t*>(&lhs.impl) > + *reinterpret_cast<const hobject_t*>(&rhs.impl)); +} + +std::string Cursor::to_str() const { + using namespace std::literals; + auto& h = *reinterpret_cast<const hobject_t*>(&impl); + + return h.is_max() ? "MAX"s : h.to_str(); +} + +std::optional<Cursor> +Cursor::from_str(const std::string& s) { + Cursor e; + auto& h = *reinterpret_cast<hobject_t*>(&e.impl); + if (!h.parse(s)) + return std::nullopt; + + return e; +} + +void RADOS::enumerate_objects(const IOContext& _ioc, + const Cursor& begin, + const Cursor& end, + const std::uint32_t max, + const bufferlist& filter, + std::unique_ptr<EnumerateComp> c) { + auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl); + + impl->objecter->enumerate_objects<Entry>( + ioc->oloc.pool, + ioc->oloc.nspace, + *reinterpret_cast<const hobject_t*>(&begin.impl), + *reinterpret_cast<const hobject_t*>(&end.impl), + max, + filter, + [c = std::move(c)] + (bs::error_code ec, std::vector<Entry>&& v, + hobject_t&& n) mutable { + ca::dispatch(std::move(c), ec, std::move(v), + Cursor(static_cast<void*>(&n))); + }); +} + +void RADOS::enumerate_objects(std::int64_t pool, + const Cursor& begin, + const Cursor& end, + const std::uint32_t max, + const bufferlist& filter, + std::unique_ptr<EnumerateComp> c, + std::optional<std::string_view> ns, + std::optional<std::string_view> key) { + impl->objecter->enumerate_objects<Entry>( + pool, + ns ? *ns : std::string_view{}, + *reinterpret_cast<const hobject_t*>(&begin.impl), + *reinterpret_cast<const hobject_t*>(&end.impl), + max, + filter, + [c = std::move(c)] + (bs::error_code ec, std::vector<Entry>&& v, + hobject_t&& n) mutable { + ca::dispatch(std::move(c), ec, std::move(v), + Cursor(static_cast<void*>(&n))); + }); +} + + +void RADOS::osd_command(int osd, std::vector<std::string>&& cmd, + ceph::bufferlist&& in, std::unique_ptr<CommandComp> c) { + impl->objecter->osd_command(osd, std::move(cmd), std::move(in), nullptr, + [c = std::move(c)] + (bs::error_code ec, + std::string&& s, + ceph::bufferlist&& b) mutable { + ca::dispatch(std::move(c), ec, + std::move(s), + std::move(b)); + }); +} +void RADOS::pg_command(PG pg, std::vector<std::string>&& cmd, + ceph::bufferlist&& in, std::unique_ptr<CommandComp> c) { + impl->objecter->pg_command(pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr, + [c = std::move(c)] + (bs::error_code ec, + std::string&& s, + ceph::bufferlist&& b) mutable { + ca::dispatch(std::move(c), ec, + std::move(s), + std::move(b)); + }); +} + +void RADOS::enable_application(std::string_view pool, std::string_view app_name, + bool force, std::unique_ptr<SimpleOpComp> c) { + // pre-Luminous clusters will return -EINVAL and application won't be + // preserved until Luminous is configured as minimum version. + if (!impl->get_required_monitor_features().contains_all( + ceph::features::mon::FEATURE_LUMINOUS)) { + ca::post(std::move(c), ceph::to_error_code(-EOPNOTSUPP)); + } else { + impl->monclient.start_mon_command( + { fmt::format("{{ \"prefix\": \"osd pool application enable\"," + "\"pool\": \"{}\", \"app\": \"{}\"{}}}", + pool, app_name, + force ? " ,\"yes_i_really_mean_it\": true" : "")}, + {}, [c = std::move(c)](bs::error_code e, + std::string, cb::list) mutable { + ca::post(std::move(c), e); + }); + } +} + +void RADOS::blocklist_add(std::string_view client_address, + std::optional<std::chrono::seconds> expire, + std::unique_ptr<SimpleOpComp> c) { + auto expire_arg = (expire ? + fmt::format(", \"expire\": \"{}.0\"", expire->count()) : std::string{}); + impl->monclient.start_mon_command( + { fmt::format("{{" + "\"prefix\": \"osd blocklist\", " + "\"blocklistop\": \"add\", " + "\"addr\": \"{}\"{}}}", + client_address, expire_arg) }, + {}, + [this, client_address = std::string(client_address), expire_arg, + c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable { + if (ec != bs::errc::invalid_argument) { + ca::post(std::move(c), ec); + return; + } + + // retry using the legacy command + impl->monclient.start_mon_command( + { fmt::format("{{" + "\"prefix\": \"osd blacklist\", " + "\"blacklistop\": \"add\", " + "\"addr\": \"{}\"{}}}", + client_address, expire_arg) }, + {}, + [c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable { + ca::post(std::move(c), ec); + }); + }); +} + +void RADOS::wait_for_latest_osd_map(std::unique_ptr<SimpleOpComp> c) { + impl->objecter->wait_for_latest_osdmap(std::move(c)); +} + +void RADOS::mon_command(std::vector<std::string> command, + const cb::list& bl, + std::string* outs, cb::list* outbl, + std::unique_ptr<SimpleOpComp> c) { + + impl->monclient.start_mon_command( + command, bl, + [c = std::move(c), outs, outbl](bs::error_code e, + std::string s, cb::list bl) mutable { + if (outs) + *outs = std::move(s); + if (outbl) + *outbl = std::move(bl); + ca::post(std::move(c), e); + }); +} + +uint64_t RADOS::instance_id() const { + return impl->get_instance_id(); +} + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wnon-virtual-dtor" +class category : public ceph::converting_category { +public: + category() {} + const char* name() const noexcept override; + const char* message(int ev, char*, std::size_t) const noexcept override; + std::string message(int ev) const override; + bs::error_condition default_error_condition(int ev) const noexcept + override; + bool equivalent(int ev, const bs::error_condition& c) const + noexcept override; + using ceph::converting_category::equivalent; + int from_code(int ev) const noexcept override; +}; +#pragma GCC diagnostic pop +#pragma clang diagnostic pop + +const char* category::name() const noexcept { + return "RADOS"; +} + +const char* category::message(int ev, char*, + std::size_t) const noexcept { + if (ev == 0) + return "No error"; + + switch (static_cast<errc>(ev)) { + case errc::pool_dne: + return "Pool does not exist"; + + case errc::invalid_snapcontext: + return "Invalid snapcontext"; + } + + return "Unknown error"; +} + +std::string category::message(int ev) const { + return message(ev, nullptr, 0); +} + +bs::error_condition category::default_error_condition(int ev) const noexcept { + switch (static_cast<errc>(ev)) { + case errc::pool_dne: + return ceph::errc::does_not_exist; + case errc::invalid_snapcontext: + return bs::errc::invalid_argument; + } + + return { ev, *this }; +} + +bool category::equivalent(int ev, const bs::error_condition& c) const noexcept { + if (static_cast<errc>(ev) == errc::pool_dne) { + if (c == bs::errc::no_such_file_or_directory) { + return true; + } + } + + return default_error_condition(ev) == c; +} + +int category::from_code(int ev) const noexcept { + switch (static_cast<errc>(ev)) { + case errc::pool_dne: + return -ENOENT; + case errc::invalid_snapcontext: + return -EINVAL; + } + return -EDOM; +} + +const bs::error_category& error_category() noexcept { + static const class category c; + return c; +} + +CephContext* RADOS::cct() { + return impl->cct.get(); +} +} + +namespace std { +size_t hash<neorados::Object>::operator ()( + const neorados::Object& r) const { + static constexpr const hash<object_t> H; + return H(*reinterpret_cast<const object_t*>(&r.impl)); +} + +size_t hash<neorados::IOContext>::operator ()( + const neorados::IOContext& r) const { + static constexpr const hash<int64_t> H; + static constexpr const hash<std::string> G; + const auto l = reinterpret_cast<const neorados::IOContextImpl*>(&r.impl); + return H(l->oloc.pool) ^ (G(l->oloc.nspace) << 1) ^ (G(l->oloc.key) << 2); +} +} diff --git a/src/neorados/RADOSImpl.cc b/src/neorados/RADOSImpl.cc new file mode 100644 index 000000000..6c9c210a8 --- /dev/null +++ b/src/neorados/RADOSImpl.cc @@ -0,0 +1,121 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include <boost/system/system_error.hpp> + +#include "common/common_init.h" + +#include "global/global_init.h" + +#include "RADOSImpl.h" + +namespace neorados { +namespace detail { + +RADOS::RADOS(boost::asio::io_context& ioctx, + boost::intrusive_ptr<CephContext> cct) + : Dispatcher(cct.get()), + ioctx(ioctx), + cct(cct), + monclient(cct.get(), ioctx), + mgrclient(cct.get(), nullptr, &monclient.monmap) { + auto err = monclient.build_initial_monmap(); + if (err < 0) + throw std::system_error(ceph::to_error_code(err)); + + messenger.reset(Messenger::create_client_messenger(cct.get(), "radosclient")); + if (!messenger) + throw std::bad_alloc(); + + // Require OSDREPLYMUX feature. This means we will fail to talk to + // old servers. This is necessary because otherwise we won't know + // how to decompose the reply data into its constituent pieces. + messenger->set_default_policy( + Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX)); + + objecter = std::make_unique<Objecter>(cct.get(), messenger.get(), &monclient, ioctx); + + objecter->set_balanced_budget(); + monclient.set_messenger(messenger.get()); + mgrclient.set_messenger(messenger.get()); + objecter->init(); + messenger->add_dispatcher_head(&mgrclient); + messenger->add_dispatcher_tail(objecter.get()); + messenger->start(); + monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR); + err = monclient.init(); + if (err) { + throw boost::system::system_error(ceph::to_error_code(err)); + } + err = monclient.authenticate(std::chrono::duration<double>(cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count()); + if (err) { + throw boost::system::system_error(ceph::to_error_code(err)); + } + messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id())); + // Detect older cluster, put mgrclient into compatible mode + mgrclient.set_mgr_optional( + !get_required_monitor_features().contains_all( + ceph::features::mon::FEATURE_LUMINOUS)); + + // MgrClient needs this (it doesn't have MonClient reference itself) + monclient.sub_want("mgrmap", 0, 0); + monclient.renew_subs(); + + mgrclient.init(); + objecter->set_client_incarnation(0); + objecter->start(); + + messenger->add_dispatcher_tail(this); + + std::unique_lock l(lock); + instance_id = monclient.get_global_id(); +} + +RADOS::~RADOS() { + if (objecter && objecter->initialized) { + objecter->shutdown(); + } + + mgrclient.shutdown(); + monclient.shutdown(); + + if (messenger) { + messenger->shutdown(); + messenger->wait(); + } +} + +bool RADOS::ms_dispatch(Message *m) +{ + switch (m->get_type()) { + // OSD + case CEPH_MSG_OSD_MAP: + m->put(); + return true; + } + return false; +} + +void RADOS::ms_handle_connect(Connection *con) {} +bool RADOS::ms_handle_reset(Connection *con) { + return false; +} +void RADOS::ms_handle_remote_reset(Connection *con) {} +bool RADOS::ms_handle_refused(Connection *con) { + return false; +} + +} // namespace detail +} // namespace neorados diff --git a/src/neorados/RADOSImpl.h b/src/neorados/RADOSImpl.h new file mode 100644 index 000000000..d45ca9481 --- /dev/null +++ b/src/neorados/RADOSImpl.h @@ -0,0 +1,135 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_NEORADOS_RADOSIMPL_H +#define CEPH_NEORADOS_RADOSIMPL_H + +#include <functional> +#include <memory> +#include <string> + +#include <boost/asio.hpp> +#include <boost/intrusive_ptr.hpp> + +#include "common/ceph_context.h" +#include "common/ceph_mutex.h" + +#include "librados/RadosClient.h" + +#include "mon/MonClient.h" + +#include "mgr/MgrClient.h" + +#include "osdc/Objecter.h" + +namespace neorados { + +class RADOS; + +namespace detail { + +class NeoClient; + +class RADOS : public Dispatcher +{ + friend ::neorados::RADOS; + friend NeoClient; + + boost::asio::io_context& ioctx; + boost::intrusive_ptr<CephContext> cct; + + ceph::mutex lock = ceph::make_mutex("RADOS_unleashed::_::RADOSImpl"); + int instance_id = -1; + + std::unique_ptr<Messenger> messenger; + + MonClient monclient; + MgrClient mgrclient; + + std::unique_ptr<Objecter> objecter; + +public: + + RADOS(boost::asio::io_context& ioctx, boost::intrusive_ptr<CephContext> cct); + ~RADOS(); + bool ms_dispatch(Message *m) override; + void ms_handle_connect(Connection *con) override; + bool ms_handle_reset(Connection *con) override; + void ms_handle_remote_reset(Connection *con) override; + bool ms_handle_refused(Connection *con) override; + mon_feature_t get_required_monitor_features() const { + return monclient.with_monmap(std::mem_fn(&MonMap::get_required_features)); + } +}; + +class Client { +public: + Client(boost::asio::io_context& ioctx, + boost::intrusive_ptr<CephContext> cct, + MonClient& monclient, Objecter* objecter) + : ioctx(ioctx), cct(cct), monclient(monclient), objecter(objecter) { + } + virtual ~Client() {} + + Client(const Client&) = delete; + Client& operator=(const Client&) = delete; + + boost::asio::io_context& ioctx; + + boost::intrusive_ptr<CephContext> cct; + MonClient& monclient; + Objecter* objecter; + + mon_feature_t get_required_monitor_features() const { + return monclient.with_monmap(std::mem_fn(&MonMap::get_required_features)); + } + + virtual int get_instance_id() const = 0; +}; + +class NeoClient : public Client { +public: + NeoClient(std::unique_ptr<RADOS>&& rados) + : Client(rados->ioctx, rados->cct, rados->monclient, + rados->objecter.get()), + rados(std::move(rados)) { + } + + int get_instance_id() const override { + return rados->instance_id; + } + +private: + std::unique_ptr<RADOS> rados; +}; + +class RadosClient : public Client { +public: + RadosClient(librados::RadosClient* rados_client) + : Client(rados_client->poolctx, {rados_client->cct}, + rados_client->monclient, rados_client->objecter), + rados_client(rados_client) { + } + + int get_instance_id() const override { + return rados_client->instance_id; + } + +public: + librados::RadosClient* rados_client; +}; + +} // namespace detail +} // namespace neorados + +#endif diff --git a/src/neorados/cls/fifo.cc b/src/neorados/cls/fifo.cc new file mode 100644 index 000000000..fa99275b2 --- /dev/null +++ b/src/neorados/cls/fifo.cc @@ -0,0 +1,385 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat <contact@redhat.com> + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <cstdint> +#include <numeric> +#include <optional> +#include <string_view> + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include <fmt/format.h> + +#include <boost/system/error_code.hpp> + +#include "include/neorados/RADOS.hpp" + +#include "include/buffer.h" + +#include "common/random_string.h" + +#include "cls/fifo/cls_fifo_types.h" +#include "cls/fifo/cls_fifo_ops.h" + +#include "fifo.h" + +namespace neorados::cls::fifo { +namespace bs = boost::system; +namespace cb = ceph::buffer; +namespace fifo = rados::cls::fifo; + +void create_meta(WriteOp& op, std::string_view id, + std::optional<fifo::objv> objv, + std::optional<std::string_view> oid_prefix, + bool exclusive, + std::uint64_t max_part_size, + std::uint64_t max_entry_size) +{ + fifo::op::create_meta cm; + + cm.id = id; + cm.version = objv; + cm.oid_prefix = oid_prefix; + cm.max_part_size = max_part_size; + cm.max_entry_size = max_entry_size; + cm.exclusive = exclusive; + + cb::list in; + encode(cm, in); + op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in); +} + +void get_meta(ReadOp& op, std::optional<fifo::objv> objv, + bs::error_code* ec_out, fifo::info* info, + std::uint32_t* part_header_size, + std::uint32_t* part_entry_overhead) +{ + fifo::op::get_meta gm; + gm.version = objv; + cb::list in; + encode(gm, in); + op.exec(fifo::op::CLASS, fifo::op::GET_META, in, + [ec_out, info, part_header_size, + part_entry_overhead](bs::error_code ec, const cb::list& bl) { + fifo::op::get_meta_reply reply; + if (!ec) try { + auto iter = bl.cbegin(); + decode(reply, iter); + } catch (const cb::error& err) { + ec = err.code(); + } + if (ec_out) *ec_out = ec; + if (info) *info = std::move(reply.info); + if (part_header_size) *part_header_size = reply.part_header_size; + if (part_entry_overhead) + *part_entry_overhead = reply.part_entry_overhead; + }); +}; + +void update_meta(WriteOp& op, const fifo::objv& objv, + const fifo::update& update) +{ + fifo::op::update_meta um; + + um.version = objv; + um.tail_part_num = update.tail_part_num(); + um.head_part_num = update.head_part_num(); + um.min_push_part_num = update.min_push_part_num(); + um.max_push_part_num = update.max_push_part_num(); + um.journal_entries_add = std::move(update).journal_entries_add(); + um.journal_entries_rm = std::move(update).journal_entries_rm(); + + cb::list in; + encode(um, in); + op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, in); +} + +void part_init(WriteOp& op, std::string_view tag, + fifo::data_params params) +{ + fifo::op::init_part ip; + + ip.tag = tag; + ip.params = params; + + cb::list in; + encode(ip, in); + op.exec(fifo::op::CLASS, fifo::op::INIT_PART, in); +} + +void push_part(WriteOp& op, std::string_view tag, + std::deque<cb::list> data_bufs, + fu2::unique_function<void(bs::error_code, int)> f) +{ + fifo::op::push_part pp; + + pp.tag = tag; + pp.data_bufs = data_bufs; + pp.total_len = 0; + + for (const auto& bl : data_bufs) + pp.total_len += bl.length(); + + cb::list in; + encode(pp, in); + op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, + [f = std::move(f)](bs::error_code ec, int r, const cb::list&) mutable { + std::move(f)(ec, r); + }); + op.returnvec(); +} + +void trim_part(WriteOp& op, + std::optional<std::string_view> tag, + std::uint64_t ofs, bool exclusive) +{ + fifo::op::trim_part tp; + + tp.tag = tag; + tp.ofs = ofs; + tp.exclusive = exclusive; + + bufferlist in; + encode(tp, in); + op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in); +} + +void list_part(ReadOp& op, + std::optional<string_view> tag, + std::uint64_t ofs, + std::uint64_t max_entries, + bs::error_code* ec_out, + std::vector<fifo::part_list_entry>* entries, + bool* more, + bool* full_part, + std::string* ptag) +{ + fifo::op::list_part lp; + + lp.tag = tag; + lp.ofs = ofs; + lp.max_entries = max_entries; + + bufferlist in; + encode(lp, in); + op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, + [entries, more, full_part, ptag, ec_out](bs::error_code ec, + const cb::list& bl) { + if (ec) { + if (ec_out) *ec_out = ec; + return; + } + + fifo::op::list_part_reply reply; + auto iter = bl.cbegin(); + try { + decode(reply, iter); + } catch (const cb::error& err) { + if (ec_out) *ec_out = ec; + return; + } + + if (entries) *entries = std::move(reply.entries); + if (more) *more = reply.more; + if (full_part) *full_part = reply.full_part; + if (ptag) *ptag = reply.tag; + }); +} + +void get_part_info(ReadOp& op, + bs::error_code* out_ec, + fifo::part_header* header) +{ + fifo::op::get_part_info gpi; + + bufferlist in; + encode(gpi, in); + op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, + [out_ec, header](bs::error_code ec, const cb::list& bl) { + if (ec) { + if (out_ec) *out_ec = ec; + } + fifo::op::get_part_info_reply reply; + auto iter = bl.cbegin(); + try { + decode(reply, iter); + } catch (const cb::error& err) { + if (out_ec) *out_ec = ec; + return; + } + + if (header) *header = std::move(reply.header); + }); +} + +std::optional<marker> FIFO::to_marker(std::string_view s) { + marker m; + if (s.empty()) { + m.num = info.tail_part_num; + m.ofs = 0; + return m; + } + + auto pos = s.find(':'); + if (pos == string::npos) { + return std::nullopt; + } + + auto num = s.substr(0, pos); + auto ofs = s.substr(pos + 1); + + auto n = ceph::parse<decltype(m.num)>(num); + if (!n) { + return std::nullopt; + } + m.num = *n; + auto o = ceph::parse<decltype(m.ofs)>(ofs); + if (!o) { + return std::nullopt; + } + m.ofs = *o; + return m; +} + +bs::error_code FIFO::apply_update(fifo::info* info, + const fifo::objv& objv, + const fifo::update& update) { + std::unique_lock l(m); + auto err = info->apply_update(update); + if (objv != info->version) { + ldout(r->cct(), 0) << __func__ << "(): Raced locally!" << dendl; + return errc::raced; + } + if (err) { + ldout(r->cct(), 0) << __func__ << "(): ERROR: " << err << dendl; + return errc::update_failed; + } + + ++info->version.ver; + + return {}; +} + +std::string FIFO::generate_tag() const +{ + static constexpr auto HEADER_TAG_SIZE = 16; + return gen_rand_alphanumeric_plain(r->cct(), HEADER_TAG_SIZE); +} + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wnon-virtual-dtor" +class error_category : public ceph::converting_category { +public: + error_category(){} + const char* name() const noexcept override; + const char* message(int ev, char*, std::size_t) const noexcept override; + std::string message(int ev) const override; + bs::error_condition default_error_condition(int ev) const noexcept + override; + bool equivalent(int ev, const bs::error_condition& c) const + noexcept override; + using ceph::converting_category::equivalent; + int from_code(int ev) const noexcept override; +}; +#pragma GCC diagnostic pop +#pragma clang diagnostic pop + +const char* error_category::name() const noexcept { + return "FIFO"; +} + +const char* error_category::message(int ev, char*, std::size_t) const noexcept { + if (ev == 0) + return "No error"; + + switch (static_cast<errc>(ev)) { + case errc::raced: + return "Retry-race count exceeded"; + + case errc::inconsistency: + return "Inconsistent result! New head before old head"; + + case errc::entry_too_large: + return "Pushed entry too large"; + + case errc::invalid_marker: + return "Invalid marker string"; + + case errc::update_failed: + return "Update failed"; + } + + return "Unknown error"; +} + +std::string error_category::message(int ev) const { + return message(ev, nullptr, 0); +} + +bs::error_condition +error_category::default_error_condition(int ev) const noexcept { + switch (static_cast<errc>(ev)) { + case errc::raced: + return bs::errc::operation_canceled; + + case errc::inconsistency: + return bs::errc::io_error; + + case errc::entry_too_large: + return bs::errc::value_too_large; + + case errc::invalid_marker: + return bs::errc::invalid_argument; + + case errc::update_failed: + return bs::errc::invalid_argument; + } + + return { ev, *this }; +} + +bool error_category::equivalent(int ev, const bs::error_condition& c) const noexcept { + return default_error_condition(ev) == c; +} + +int error_category::from_code(int ev) const noexcept { + switch (static_cast<errc>(ev)) { + case errc::raced: + return -ECANCELED; + + case errc::inconsistency: + return -EIO; + + case errc::entry_too_large: + return -E2BIG; + + case errc::invalid_marker: + return -EINVAL; + + case errc::update_failed: + return -EINVAL; + + } + return -EDOM; +} + +const bs::error_category& error_category() noexcept { + static const class error_category c; + return c; +} + +} diff --git a/src/neorados/cls/fifo.h b/src/neorados/cls/fifo.h new file mode 100644 index 000000000..05865dcca --- /dev/null +++ b/src/neorados/cls/fifo.h @@ -0,0 +1,1754 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat <contact@redhat.com> + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_NEORADOS_CLS_FIFIO_H +#define CEPH_NEORADOS_CLS_FIFIO_H + +#include <cstdint> +#include <deque> +#include <map> +#include <memory> +#include <mutex> +#include <optional> +#include <string_view> +#include <vector> + +#include <boost/asio.hpp> +#include <boost/system/error_code.hpp> + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include <fmt/format.h> + +#include "include/neorados/RADOS.hpp" +#include "include/buffer.h" + +#include "common/allocate_unique.h" +#include "common/async/bind_handler.h" +#include "common/async/bind_like.h" +#include "common/async/completion.h" +#include "common/async/forward_handler.h" + +#include "common/dout.h" + +#include "cls/fifo/cls_fifo_types.h" +#include "cls/fifo/cls_fifo_ops.h" + +namespace neorados::cls::fifo { +namespace ba = boost::asio; +namespace bs = boost::system; +namespace ca = ceph::async; +namespace cb = ceph::buffer; +namespace fifo = rados::cls::fifo; + +inline constexpr auto dout_subsys = ceph_subsys_rados; +inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024; +inline constexpr std::uint64_t default_max_entry_size = 32 * 1024; +inline constexpr auto MAX_RACE_RETRIES = 10; + + +const boost::system::error_category& error_category() noexcept; + +enum class errc { + raced = 1, + inconsistency, + entry_too_large, + invalid_marker, + update_failed +}; +} + +namespace boost::system { +template<> +struct is_error_code_enum<::neorados::cls::fifo::errc> { + static const bool value = true; +}; +template<> +struct is_error_condition_enum<::neorados::cls::fifo::errc> { + static const bool value = false; +}; +} + +namespace neorados::cls::fifo { +// explicit conversion: +inline bs::error_code make_error_code(errc e) noexcept { + return { static_cast<int>(e), error_category() }; +} + +inline bs::error_code make_error_category(errc e) noexcept { + return { static_cast<int>(e), error_category() }; +} + +void create_meta(WriteOp& op, std::string_view id, + std::optional<fifo::objv> objv, + std::optional<std::string_view> oid_prefix, + bool exclusive = false, + std::uint64_t max_part_size = default_max_part_size, + std::uint64_t max_entry_size = default_max_entry_size); +void get_meta(ReadOp& op, std::optional<fifo::objv> objv, + bs::error_code* ec_out, fifo::info* info, + std::uint32_t* part_header_size, + std::uint32_t* part_entry_overhead); + +void update_meta(WriteOp& op, const fifo::objv& objv, + const fifo::update& desc); + +void part_init(WriteOp& op, std::string_view tag, + fifo::data_params params); + +void push_part(WriteOp& op, std::string_view tag, + std::deque<cb::list> data_bufs, + fu2::unique_function<void(bs::error_code, int)>); +void trim_part(WriteOp& op, std::optional<std::string_view> tag, + std::uint64_t ofs, + bool exclusive); +void list_part(ReadOp& op, + std::optional<std::string_view> tag, + std::uint64_t ofs, + std::uint64_t max_entries, + bs::error_code* ec_out, + std::vector<fifo::part_list_entry>* entries, + bool* more, + bool* full_part, + std::string* ptag); +void get_part_info(ReadOp& op, + bs::error_code* out_ec, + fifo::part_header* header); + +struct marker { + std::int64_t num = 0; + std::uint64_t ofs = 0; + + marker() = default; + marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {} + static marker max() { + return { std::numeric_limits<decltype(num)>::max(), + std::numeric_limits<decltype(ofs)>::max() }; + } + + std::string to_string() { + return fmt::format("{:0>20}:{:0>20}", num, ofs); + } +}; + +struct list_entry { + cb::list data; + std::string marker; + ceph::real_time mtime; +}; + +using part_info = fifo::part_header; + +namespace detail { +template<typename Handler> +class JournalProcessor; +} + +/// Completions, Handlers, and CompletionTokens +/// =========================================== +/// +/// This class is based on Boost.Asio. For information, see +/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio.html +/// +/// As summary, Asio's design is that of functions taking completion +/// handlers. Every handler has a signature, like +/// (boost::system::error_code, std::string). The completion handler +/// receives the result of the function, and the signature is the type +/// of that result. +/// +/// The completion handler is specified with a CompletionToken. The +/// CompletionToken is any type that has a specialization of +/// async_complete and async_result. See +/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_completion.html +/// and https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_result.html +/// +/// The return type of a function taking a CompletionToken is +/// async_result<CompletionToken, Signature>::return_type. +/// +/// Functions +/// --------- +/// +/// The default implementations treat whatever value is described as a +/// function, whose parameters correspond to the signature, and calls +/// it upon completion. +/// +/// EXAMPLE: +/// Let f be an asynchronous function whose signature is (bs::error_code, int) +/// Let g be an asynchronous function whose signature is +/// (bs::error_code, int, std::string). +/// +/// +/// f([](bs::error_code ec, int i) { ... }); +/// g([](bs::error_code ec, int i, std::string s) { ... }); +/// +/// Will schedule asynchronous tasks, and the provided lambdas will be +/// called on completion. In this case, f and g return void. +/// +/// There are other specializations. Commonly used ones are. +/// +/// Futures +/// ------- +/// +/// A CompletionToken of boost::asio::use_future will complete with a +/// promise whose type matches (minus any initial error_code) the +/// function's signature. The corresponding future is returned. If the +/// error_code of the result is non-zero, the future is set with an +/// exception of type boost::asio::system_error. +/// +/// See https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/use_future_t.html +/// +/// EXAMPLE: +/// +/// std::future<int> = f(ba::use_future); +/// std::future<std::tuple<int, std::string> = g(ba::use_future). +/// +/// Coroutines +/// ---------- +/// +/// A CompletionToken of type spawn::yield_context suspends execution +/// of the current coroutine until completion of the operation. See +/// src/spawn/README.md +/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/spawn.html and +/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/yield_context.html +/// +/// Operations given this CompletionToken return their results, modulo +/// any leading error_code. A non-zero error code will be thrown, by +/// default, but may be bound to a variable instead with the overload +/// of the array-subscript oeprator. +/// +/// EXAMPLE: +/// // Within a function with a yield_context parameter named y +/// +/// try { +/// int i = f(y); +/// } catch (const bs::system_error& ec) { ... } +/// +/// bs::error_code ec; +/// auto [i, s] = g(y[ec]); +/// +/// Blocking calls +/// -------------- +/// +/// ceph::async::use_blocked, defined in src/common/async/blocked_completion.h +/// Suspends the current thread of execution, returning the results of +/// the operation on resumption. Its calling convention is analogous to +/// that of yield_context. +/// +/// EXAMPLE: +/// try { +/// int i = f(ca::use_blocked); +/// } catch (const bs::system_error& e) { ... } +/// +/// bs::error_code ec; +/// auto [i, s] = g(ca::use_blocked[ec]); +/// +/// librados Completions +/// -------------------- +/// +/// If src/common/async/librados_completion.h is included in the +/// current translation unit, then librados::AioCompletion* may be used +/// as a CompletionToken. This is only permitted when the completion +/// signature is either bs::system_error or void. The return type of +/// functions provided a CompletionToken of AioCompletion* is void. If +/// the signature includes an error code and the error code is set, +/// then the error is translated to an int which is set as the result +/// of the AioCompletion. +/// +/// EXAMPLE: +/// // Assume an asynchronous function h whose signature is bs::error_code. +/// +/// AioCompletion* c = Rados::aio_create_completion(); +/// h(c); +/// int r = c.get_return_value(); +/// +/// See also src/test/cls_fifo/bench_cls_fifo.cc for a full, simple +/// example of a program using this class with coroutines. +/// +/// +/// Markers +/// ======= +/// +/// Markers represent a position within the FIFO. Internally, they are +/// part/offset pairs. Externally, they are ordered but otherwise +/// opaque strings. Markers that compare lower denote positions closer +/// to the tail. +/// +/// A marker is returned with every entry from a list() operation. They +/// may be supplied to a list operation to resume from a given +/// position, and must be supplied to trim give the position to which +/// to trim. + +class FIFO { +public: + + FIFO(const FIFO&) = delete; + FIFO& operator =(const FIFO&) = delete; + FIFO(FIFO&&) = delete; + FIFO& operator =(FIFO&&) = delete; + + /// Open an existing FIFO. + /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f) + template<typename CT> + static auto open(RADOS& r, //< RADOS handle + const IOContext& ioc, //< Context for pool, namespace, etc. + Object oid, //< OID for the 'main' object of the FIFO + CT&& ct, //< CompletionToken + /// Fail if is not this version + std::optional<fifo::objv> objv = std::nullopt, + /// Default executor. By default use the one + /// associated with the RADOS handle. + std::optional<ba::executor> executor = std::nullopt) { + ba::async_completion<CT, void(bs::error_code, + std::unique_ptr<FIFO>)> init(ct); + auto e = ba::get_associated_executor(init.completion_handler, + executor.value_or(r.get_executor())); + auto a = ba::get_associated_allocator(init.completion_handler); + _read_meta_( + &r, oid, ioc, objv, + ca::bind_ea( + e, a, + [&r, ioc, oid, executor, handler = std::move(init.completion_handler)] + (bs::error_code ec, fifo::info info, + std::uint32_t size, std::uint32_t over) mutable { + std::unique_ptr<FIFO> f( + new FIFO(r, ioc, oid, executor.value_or(r.get_executor()))); + f->info = info; + f->part_header_size = size; + f->part_entry_overhead = over; + // If there are journal entries, process them, in case + // someone crashed mid-transaction. + if (!ec && !info.journal.empty()) { + auto e = ba::get_associated_executor(handler, f->get_executor()); + auto a = ba::get_associated_allocator(handler); + auto g = f.get(); + g->_process_journal( + ca::bind_ea( + e, a, + [f = std::move(f), + handler = std::move(handler)](bs::error_code ec) mutable { + std::move(handler)(ec, std::move(f)); + })); + return; + } + std::move(handler)(ec, std::move(f)); + return; + })); + return init.result.get(); + } + + /// Open an existing or create a new FIFO. + /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f) + template<typename CT> + static auto create(RADOS& r, /// RADOS handle + const IOContext& ioc, /// Context for pool, namespace, etc. + Object oid, /// OID for the 'main' object of the FIFO + CT&& ct, /// CompletionToken + /// Fail if FIFO exists and is not this version + std::optional<fifo::objv> objv = std::nullopt, + /// Custom prefix for parts + std::optional<std::string_view> oid_prefix = std::nullopt, + /// Fail if FIFO already exists + bool exclusive = false, + /// Size at which a part is considered full + std::uint64_t max_part_size = default_max_part_size, + /// Maximum size of any entry + std::uint64_t max_entry_size = default_max_entry_size, + /// Default executor. By default use the one + /// associated with the RADOS handle. + std::optional<ba::executor> executor = std::nullopt) { + ba::async_completion<CT, void(bs::error_code, + std::unique_ptr<FIFO>)> init(ct); + WriteOp op; + create_meta(op, oid, objv, oid_prefix, exclusive, max_part_size, + max_entry_size); + auto e = ba::get_associated_executor(init.completion_handler, + executor.value_or(r.get_executor())); + auto a = ba::get_associated_allocator(init.completion_handler); + r.execute( + oid, ioc, std::move(op), + ca::bind_ea( + e, a, + [objv, &r, ioc, oid, executor, handler = std::move(init.completion_handler)] + (bs::error_code ec) mutable { + if (ec) { + std::move(handler)(ec, nullptr); + return; + } + auto e = ba::get_associated_executor( + handler, executor.value_or(r.get_executor())); + auto a = ba::get_associated_allocator(handler); + FIFO::_read_meta_( + &r, oid, ioc, objv, + ca::bind_ea( + e, a, + [&r, ioc, executor, oid, handler = std::move(handler)] + (bs::error_code ec, fifo::info info, + std::uint32_t size, std::uint32_t over) mutable { + std::unique_ptr<FIFO> f( + new FIFO(r, ioc, oid, executor.value_or(r.get_executor()))); + f->info = info; + f->part_header_size = size; + f->part_entry_overhead = over; + if (!ec && !info.journal.empty()) { + auto e = ba::get_associated_executor(handler, + f->get_executor()); + auto a = ba::get_associated_allocator(handler); + auto g = f.get(); + g->_process_journal( + ca::bind_ea( + e, a, + [f = std::move(f), handler = std::move(handler)] + (bs::error_code ec) mutable { + std::move(handler)(ec, std::move(f)); + })); + return; + } + std::move(handler)(ec, std::move(f)); + })); + })); + return init.result.get(); + } + + /// Force a re-read of FIFO metadata. + /// Signature: (bs::error_code ec) + template<typename CT> + auto read_meta(CT&& ct, //< CompletionToken + /// Fail if FIFO not at this version + std::optional<fifo::objv> objv = std::nullopt) { + std::unique_lock l(m); + auto version = info.version; + l.unlock(); + ba::async_completion<CT, void(bs::error_code)> init(ct); + auto e = ba::get_associated_executor(init.completion_handler, + get_executor()); + auto a = ba::get_associated_allocator(init.completion_handler); + _read_meta_( + r, oid, ioc, objv, + ca::bind_ea( + e, a, + [this, version, handler = std::move(init.completion_handler)] + (bs::error_code ec, fifo::info newinfo, + std::uint32_t size, std::uint32_t over) mutable { + std::unique_lock l(m); + if (version == info.version) { + info = newinfo; + part_header_size = size; + part_entry_overhead = over; + } + l.unlock(); + return std::move(handler)(ec); + })); + return init.result.get(); + } + + /// Return a reference to currently known metadata + const fifo::info& meta() const { + return info; + } + + /// Return header size and entry overhead of partitions. + std::pair<std::uint32_t, std::uint32_t> get_part_layout_info() { + return {part_header_size, part_entry_overhead}; + } + + /// Push a single entry to the FIFO. + /// Signature: (bs::error_code) + template<typename CT> + auto push(const cb::list& bl, //< Bufferlist holding entry to push + CT&& ct //< CompletionToken + ) { + return push(std::vector{ bl }, std::forward<CT>(ct)); + } + + /// Push a many entries to the FIFO. + /// Signature: (bs::error_code) + template<typename CT> + auto push(const std::vector<cb::list>& data_bufs, //< Entries to push + CT&& ct //< CompletionToken + ) { + ba::async_completion<CT, void(bs::error_code)> init(ct); + std::unique_lock l(m); + auto max_entry_size = info.params.max_entry_size; + auto need_new_head = info.need_new_head(); + l.unlock(); + auto e = ba::get_associated_executor(init.completion_handler, + get_executor()); + auto a = ba::get_associated_allocator(init.completion_handler); + if (data_bufs.empty() ) { + // Can't fail if you don't try. + e.post(ca::bind_handler(std::move(init.completion_handler), + bs::error_code{}), a); + return init.result.get(); + } + + // Validate sizes + for (const auto& bl : data_bufs) { + if (bl.length() > max_entry_size) { + ldout(r->cct(), 10) << __func__ << "(): entry too large: " + << bl.length() << " > " + << info.params.max_entry_size << dendl; + e.post(ca::bind_handler(std::move(init.completion_handler), + errc::entry_too_large), a); + return init.result.get(); + } + } + + auto p = ca::bind_ea(e, a, + Pusher(this, {data_bufs.begin(), data_bufs.end()}, + {}, 0, std::move(init.completion_handler))); + + if (need_new_head) { + _prepare_new_head(std::move(p)); + } else { + e.dispatch(std::move(p), a); + } + return init.result.get(); + } + + /// List the entries in a FIFO + /// Signature(bs::error_code ec, bs::vector<list_entry> entries, bool more) + /// + /// More is true if entries beyond the last exist. + /// The list entries are of the form: + /// data - Contents of the entry + /// marker - String representing the position of this entry within the FIFO. + /// mtime - Time (on the OSD) at which the entry was pushed. + template<typename CT> + auto list(int max_entries, //< Maximum number of entries to fetch + /// Optionally, a marker indicating the position after + /// which to begin listing. If null, begin at the tail. + std::optional<std::string_view> markstr, + CT&& ct //< CompletionToken + ) { + ba::async_completion<CT, void(bs::error_code, + std::vector<list_entry>, bool)> init(ct); + std::unique_lock l(m); + std::int64_t part_num = info.tail_part_num; + l.unlock(); + std::uint64_t ofs = 0; + auto a = ba::get_associated_allocator(init.completion_handler); + auto e = ba::get_associated_executor(init.completion_handler); + + if (markstr) { + auto marker = to_marker(*markstr); + if (!marker) { + ldout(r->cct(), 0) << __func__ + << "(): failed to parse marker (" << *markstr + << ")" << dendl; + e.post(ca::bind_handler(std::move(init.completion_handler), + errc::invalid_marker, + std::vector<list_entry>{}, false), a); + return init.result.get(); + } + part_num = marker->num; + ofs = marker->ofs; + } + + using handler_type = decltype(init.completion_handler); + auto ls = ceph::allocate_unique<Lister<handler_type>>( + a, this, part_num, ofs, max_entries, + std::move(init.completion_handler)); + ls.release()->list(); + return init.result.get(); + } + + /// Trim entries from the tail to the given position + /// Signature: (bs::error_code) + template<typename CT> + auto trim(std::string_view markstr, //< Position to which to trim, inclusive + bool exclusive, //< If true, trim markers up to but NOT INCLUDING + //< markstr, otherwise trim markstr as well. + CT&& ct //< CompletionToken + ) { + auto m = to_marker(markstr); + ba::async_completion<CT, void(bs::error_code)> init(ct); + auto a = ba::get_associated_allocator(init.completion_handler); + auto e = ba::get_associated_executor(init.completion_handler); + if (!m) { + ldout(r->cct(), 0) << __func__ << "(): failed to parse marker: marker=" + << markstr << dendl; + e.post(ca::bind_handler(std::move(init.completion_handler), + errc::invalid_marker), a); + return init.result.get(); + } else { + using handler_type = decltype(init.completion_handler); + auto t = ceph::allocate_unique<Trimmer<handler_type>>( + a, this, m->num, m->ofs, exclusive, std::move(init.completion_handler)); + t.release()->trim(); + } + return init.result.get(); + } + + /// Get information about a specific partition + /// Signature: (bs::error_code, part_info) + /// + /// part_info has the following entries + /// tag - A random string identifying this partition. Used internally + /// as a sanity check to make sure operations haven't been misdirected + /// params - Data parameters, identical for every partition within a + /// FIFO and the same as what is returned from get_part_layout() + /// magic - A random magic number, used internally as a prefix to + /// every entry stored on the OSD to ensure sync + /// min_ofs - Offset of the first entry + /// max_ofs - Offset of the highest entry + /// min_index - Minimum entry index + /// max_index - Maximum entry index + /// max_time - Time of the latest push + /// + /// The difference between ofs and index is that ofs is a byte + /// offset. Index is a count. Nothing really uses indices, but + /// they're tracked and sanity-checked as an invariant on the OSD. + /// + /// max_ofs and max_time are the two that have been used externally + /// so far. + template<typename CT> + auto get_part_info(int64_t part_num, // The number of the partition + CT&& ct // CompletionToken + ) { + + ba::async_completion<CT, void(bs::error_code, part_info)> init(ct); + fifo::op::get_part_info gpi; + cb::list in; + encode(gpi, in); + ReadOp op; + auto e = ba::get_associated_executor(init.completion_handler, + get_executor()); + auto a = ba::get_associated_allocator(init.completion_handler); + auto reply = ceph::allocate_unique< + ExecDecodeCB<fifo::op::get_part_info_reply>>(a); + + op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, + std::ref(*reply)); + std::unique_lock l(m); + auto part_oid = info.part_oid(part_num); + l.unlock(); + r->execute(part_oid, ioc, std::move(op), nullptr, + ca::bind_ea(e, a, + PartInfoGetter(std::move(init.completion_handler), + std::move(reply)))); + return init.result.get(); + } + + using executor_type = ba::executor; + + /// Return the default executor, as specified at creation. + ba::executor get_executor() const { + return executor; + } + +private: + template<typename Handler> + friend class detail::JournalProcessor; + RADOS* const r; + const IOContext ioc; + const Object oid; + std::mutex m; + + fifo::info info; + + std::uint32_t part_header_size = 0xdeadbeef; + std::uint32_t part_entry_overhead = 0xdeadbeef; + + ba::executor executor; + + std::optional<marker> to_marker(std::string_view s); + + template<typename Handler, typename T> + static void assoc_delete(const Handler& handler, T* t) { + typename std::allocator_traits<typename ba::associated_allocator<Handler>::type> + ::template rebind_alloc<T> a( + ba::get_associated_allocator(handler)); + a.destroy(t); + a.deallocate(t, 1); + } + + FIFO(RADOS& r, + IOContext ioc, + Object oid, + ba::executor executor) + : r(&r), ioc(std::move(ioc)), oid(oid), executor(executor) {} + + std::string generate_tag() const; + + template <typename T> + struct ExecDecodeCB { + bs::error_code ec; + T result; + void operator()(bs::error_code e, const cb::list& r) { + if (e) { + ec = e; + return; + } + try { + auto p = r.begin(); + using ceph::decode; + decode(result, p); + } catch (const cb::error& err) { + ec = err.code(); + } + } + }; + + template<typename Handler> + class MetaReader { + Handler handler; + using allocator_type = boost::asio::associated_allocator_t<Handler>; + using decoder_type = ExecDecodeCB<fifo::op::get_meta_reply>; + using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>; + decoder_ptr decoder; + public: + MetaReader(Handler&& handler, decoder_ptr&& decoder) + : handler(std::move(handler)), decoder(std::move(decoder)) {} + + void operator ()(bs::error_code ec) { + if (!ec) { + ec = decoder->ec; + } + auto reply = std::move(decoder->result); + decoder.reset(); // free handler-allocated memory before dispatching + + std::move(handler)(ec, std::move(reply.info), + std::move(reply.part_header_size), + std::move(reply.part_entry_overhead)); + } + }; + + // Renamed to get around a compiler bug in Bionic that kept + // complaining we weren't capturing 'this' to make a static function call. + template<typename Handler> + static void _read_meta_(RADOS* r, const Object& oid, const IOContext& ioc, + std::optional<fifo::objv> objv, + Handler&& handler, /* error_code, info, uint64, + uint64 */ + std::optional<ba::executor> executor = std::nullopt){ + fifo::op::get_meta gm; + + gm.version = objv; + + cb::list in; + encode(gm, in); + ReadOp op; + + auto a = ba::get_associated_allocator(handler); + auto reply = + ceph::allocate_unique<ExecDecodeCB<fifo::op::get_meta_reply>>(a); + + auto e = ba::get_associated_executor(handler); + op.exec(fifo::op::CLASS, fifo::op::GET_META, in, std::ref(*reply)); + r->execute(oid, ioc, std::move(op), nullptr, + ca::bind_ea(e, a, MetaReader(std::move(handler), + std::move(reply)))); + }; + + template<typename Handler> + void _read_meta(Handler&& handler /* error_code */) { + auto e = ba::get_associated_executor(handler, get_executor()); + auto a = ba::get_associated_allocator(handler); + _read_meta_(r, oid, ioc, + nullopt, + ca::bind_ea( + e, a, + [this, + handler = std::move(handler)](bs::error_code ec, + fifo::info&& info, + std::uint64_t phs, + std::uint64_t peo) mutable { + std::unique_lock l(m); + if (ec) { + l.unlock(); + std::move(handler)(ec); + return; + } + // We have a newer version already! + if (!info.version.same_or_later(this->info.version)) { + l.unlock(); + std::move(handler)(bs::error_code{}); + return; + } + this->info = std::move(info); + part_header_size = phs; + part_entry_overhead = peo; + l.unlock(); + std::move(handler)(bs::error_code{}); + }), get_executor()); + } + + bs::error_code apply_update(fifo::info* info, + const fifo::objv& objv, + const fifo::update& update); + + + template<typename Handler> + void _update_meta(const fifo::update& update, + fifo::objv version, + Handler&& handler /* error_code, bool */) { + WriteOp op; + + cls::fifo::update_meta(op, info.version, update); + + auto a = ba::get_associated_allocator(handler); + auto e = ba::get_associated_executor(handler, get_executor()); + + r->execute( + oid, ioc, std::move(op), + ca::bind_ea( + e, a, + [this, e, a, version, update, + handler = std::move(handler)](bs::error_code ec) mutable { + if (ec && ec != bs::errc::operation_canceled) { + std::move(handler)(ec, bool{}); + return; + } + + auto canceled = (ec == bs::errc::operation_canceled); + + if (!canceled) { + ec = apply_update(&info, + version, + update); + if (ec) { + canceled = true; + } + } + + if (canceled) { + _read_meta( + ca::bind_ea( + e, a, + [handler = std::move(handler)](bs::error_code ec) mutable { + std::move(handler)(ec, ec ? false : true); + })); + return; + } + std::move(handler)(ec, false); + return; + })); + } + + template<typename Handler> + auto _process_journal(Handler&& handler /* error_code */) { + auto a = ba::get_associated_allocator(std::ref(handler)); + auto j = ceph::allocate_unique<detail::JournalProcessor<Handler>>( + a, this, std::move(handler)); + auto p = j.release(); + p->process(); + } + + template<typename Handler> + class NewPartPreparer { + FIFO* f; + Handler handler; + std::vector<fifo::journal_entry> jentries; + int i; + std::int64_t new_head_part_num; + + public: + + void operator ()(bs::error_code ec, bool canceled) { + if (ec) { + std::move(handler)(ec); + return; + } + + if (canceled) { + std::unique_lock l(f->m); + auto iter = f->info.journal.find(jentries.front().part_num); + auto max_push_part_num = f->info.max_push_part_num; + auto head_part_num = f->info.head_part_num; + auto version = f->info.version; + auto found = (iter != f->info.journal.end()); + l.unlock(); + if ((max_push_part_num >= jentries.front().part_num && + head_part_num >= new_head_part_num)) { + /* raced, but new part was already written */ + std::move(handler)(bs::error_code{}); + return; + } + if (i >= MAX_RACE_RETRIES) { + std::move(handler)(errc::raced); + return; + } + if (!found) { + auto e = ba::get_associated_executor(handler, f->get_executor()); + auto a = ba::get_associated_allocator(handler); + f->_update_meta(fifo::update{} + .journal_entries_add(jentries), + version, + ca::bind_ea( + e, a, + NewPartPreparer(f, std::move(handler), + jentries, + i + 1, new_head_part_num))); + return; + } + // Fall through. We still need to process the journal. + } + f->_process_journal(std::move(handler)); + return; + } + + NewPartPreparer(FIFO* f, + Handler&& handler, + std::vector<fifo::journal_entry> jentries, + int i, std::int64_t new_head_part_num) + : f(f), handler(std::move(handler)), jentries(std::move(jentries)), + i(i), new_head_part_num(new_head_part_num) {} + }; + + template<typename Handler> + void _prepare_new_part(bool is_head, + Handler&& handler /* error_code */) { + std::unique_lock l(m); + std::vector jentries = { info.next_journal_entry(generate_tag()) }; + std::int64_t new_head_part_num = info.head_part_num; + auto version = info.version; + + if (is_head) { + auto new_head_jentry = jentries.front(); + new_head_jentry.op = fifo::journal_entry::Op::set_head; + new_head_part_num = jentries.front().part_num; + jentries.push_back(std::move(new_head_jentry)); + } + l.unlock(); + + auto e = ba::get_associated_executor(handler, get_executor()); + auto a = ba::get_associated_allocator(handler); + _update_meta(fifo::update{}.journal_entries_add(jentries), + version, + ca::bind_ea( + e, a, + NewPartPreparer(this, std::move(handler), + jentries, 0, new_head_part_num))); + } + + template<typename Handler> + class NewHeadPreparer { + FIFO* f; + Handler handler; + int i; + std::int64_t new_head_num; + + public: + + void operator ()(bs::error_code ec, bool canceled) { + std::unique_lock l(f->m); + auto head_part_num = f->info.head_part_num; + auto version = f->info.version; + l.unlock(); + + if (ec) { + std::move(handler)(ec); + return; + } + if (canceled) { + if (i >= MAX_RACE_RETRIES) { + std::move(handler)(errc::raced); + return; + } + + // Raced, but there's still work to do! + if (head_part_num < new_head_num) { + auto e = ba::get_associated_executor(handler, f->get_executor()); + auto a = ba::get_associated_allocator(handler); + f->_update_meta(fifo::update{}.head_part_num(new_head_num), + version, + ca::bind_ea( + e, a, + NewHeadPreparer(f, std::move(handler), + i + 1, + new_head_num))); + return; + } + } + // Either we succeeded, or we were raced by someone who did it for us. + std::move(handler)(bs::error_code{}); + return; + } + + NewHeadPreparer(FIFO* f, + Handler&& handler, + int i, std::int64_t new_head_num) + : f(f), handler(std::move(handler)), i(i), new_head_num(new_head_num) {} + }; + + template<typename Handler> + void _prepare_new_head(Handler&& handler /* error_code */) { + std::unique_lock l(m); + int64_t new_head_num = info.head_part_num + 1; + auto max_push_part_num = info.max_push_part_num; + auto version = info.version; + l.unlock(); + + if (max_push_part_num < new_head_num) { + auto e = ba::get_associated_executor(handler, get_executor()); + auto a = ba::get_associated_allocator(handler); + _prepare_new_part( + true, + ca::bind_ea( + e, a, + [this, new_head_num, + handler = std::move(handler)](bs::error_code ec) mutable { + if (ec) { + handler(ec); + return; + } + std::unique_lock l(m); + if (info.max_push_part_num < new_head_num) { + l.unlock(); + ldout(r->cct(), 0) + << "ERROR: " << __func__ + << ": after new part creation: meta_info.max_push_part_num=" + << info.max_push_part_num << " new_head_num=" + << info.max_push_part_num << dendl; + std::move(handler)(errc::inconsistency); + } else { + l.unlock(); + std::move(handler)(bs::error_code{}); + } + })); + return; + } + auto e = ba::get_associated_executor(handler, get_executor()); + auto a = ba::get_associated_allocator(handler); + _update_meta(fifo::update{}.head_part_num(new_head_num), + version, + ca::bind_ea( + e, a, + NewHeadPreparer(this, std::move(handler), 0, + new_head_num))); + } + + template<typename T> + struct ExecHandleCB { + bs::error_code ec; + T result; + void operator()(bs::error_code e, const T& t) { + if (e) { + ec = e; + return; + } + result = t; + } + }; + + template<typename Handler> + class EntryPusher { + Handler handler; + using allocator_type = boost::asio::associated_allocator_t<Handler>; + using decoder_type = ExecHandleCB<int>; + using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>; + decoder_ptr decoder; + + public: + + EntryPusher(Handler&& handler, decoder_ptr&& decoder) + : handler(std::move(handler)), decoder(std::move(decoder)) {} + + void operator ()(bs::error_code ec) { + if (!ec) { + ec = decoder->ec; + } + auto reply = std::move(decoder->result); + decoder.reset(); // free handler-allocated memory before dispatching + + std::move(handler)(ec, std::move(reply)); + } + }; + + template<typename Handler> + auto push_entries(const std::deque<cb::list>& data_bufs, + Handler&& handler /* error_code, int */) { + WriteOp op; + std::unique_lock l(m); + auto head_part_num = info.head_part_num; + auto tag = info.head_tag; + auto oid = info.part_oid(head_part_num); + l.unlock(); + + auto a = ba::get_associated_allocator(handler); + auto reply = ceph::allocate_unique<ExecHandleCB<int>>(a); + + auto e = ba::get_associated_executor(handler, get_executor()); + push_part(op, tag, data_bufs, std::ref(*reply)); + return r->execute(oid, ioc, std::move(op), + ca::bind_ea(e, a, EntryPusher(std::move(handler), + std::move(reply)))); + } + + template<typename CT> + auto trim_part(int64_t part_num, + uint64_t ofs, + std::optional<std::string_view> tag, + bool exclusive, + CT&& ct) { + WriteOp op; + cls::fifo::trim_part(op, tag, ofs, exclusive); + return r->execute(info.part_oid(part_num), ioc, std::move(op), + std::forward<CT>(ct)); + } + + + template<typename Handler> + class Pusher { + FIFO* f; + std::deque<cb::list> remaining; + std::deque<cb::list> batch; + int i; + Handler handler; + + void prep_then_push(const unsigned successes) { + std::unique_lock l(f->m); + auto max_part_size = f->info.params.max_part_size; + auto part_entry_overhead = f->part_entry_overhead; + l.unlock(); + + uint64_t batch_len = 0; + if (successes > 0) { + if (successes == batch.size()) { + batch.clear(); + } else { + batch.erase(batch.begin(), batch.begin() + successes); + for (const auto& b : batch) { + batch_len += b.length() + part_entry_overhead; + } + } + } + + if (batch.empty() && remaining.empty()) { + std::move(handler)(bs::error_code{}); + return; + } + + while (!remaining.empty() && + (remaining.front().length() + batch_len <= max_part_size)) { + + /* We can send entries with data_len up to max_entry_size, + however, we want to also account the overhead when + dealing with multiple entries. Previous check doesn't + account for overhead on purpose. */ + batch_len += remaining.front().length() + part_entry_overhead; + batch.push_back(std::move(remaining.front())); + remaining.pop_front(); + } + push(); + } + + void push() { + auto e = ba::get_associated_executor(handler, f->get_executor()); + auto a = ba::get_associated_allocator(handler); + f->push_entries(batch, + ca::bind_ea(e, a, + Pusher(f, std::move(remaining), + batch, i, + std::move(handler)))); + } + + public: + + // Initial call! + void operator ()() { + prep_then_push(0); + } + + // Called with response to push_entries + void operator ()(bs::error_code ec, int r) { + if (ec == bs::errc::result_out_of_range) { + auto e = ba::get_associated_executor(handler, f->get_executor()); + auto a = ba::get_associated_allocator(handler); + f->_prepare_new_head( + ca::bind_ea(e, a, + Pusher(f, std::move(remaining), + std::move(batch), i, + std::move(handler)))); + return; + } + if (ec) { + std::move(handler)(ec); + return; + } + i = 0; // We've made forward progress, so reset the race counter! + prep_then_push(r); + } + + // Called with response to prepare_new_head + void operator ()(bs::error_code ec) { + if (ec == bs::errc::operation_canceled) { + if (i == MAX_RACE_RETRIES) { + ldout(f->r->cct(), 0) + << "ERROR: " << __func__ + << "(): race check failed too many times, likely a bug" << dendl; + std::move(handler)(make_error_code(errc::raced)); + return; + } + ++i; + } else if (ec) { + std::move(handler)(ec); + return; + } + + if (batch.empty()) { + prep_then_push(0); + return; + } else { + push(); + return; + } + } + + Pusher(FIFO* f, std::deque<cb::list>&& remaining, + std::deque<cb::list> batch, int i, + Handler&& handler) + : f(f), remaining(std::move(remaining)), + batch(std::move(batch)), i(i), + handler(std::move(handler)) {} + }; + + template<typename Handler> + class Lister { + FIFO* f; + std::vector<list_entry> result; + bool more = false; + std::int64_t part_num; + std::uint64_t ofs; + int max_entries; + bs::error_code ec_out; + std::vector<fifo::part_list_entry> entries; + bool part_more = false; + bool part_full = false; + Handler handler; + + void handle(bs::error_code ec) { + auto h = std::move(handler); + auto m = more; + auto r = std::move(result); + + FIFO::assoc_delete(h, this); + std::move(h)(ec, std::move(r), m); + } + + public: + Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries, + Handler&& handler) + : f(f), part_num(part_num), ofs(ofs), max_entries(max_entries), + handler(std::move(handler)) { + result.reserve(max_entries); + } + + + Lister(const Lister&) = delete; + Lister& operator =(const Lister&) = delete; + Lister(Lister&&) = delete; + Lister& operator =(Lister&&) = delete; + + void list() { + if (max_entries > 0) { + ReadOp op; + ec_out.clear(); + part_more = false; + part_full = false; + entries.clear(); + + std::unique_lock l(f->m); + auto part_oid = f->info.part_oid(part_num); + l.unlock(); + + list_part(op, + {}, + ofs, + max_entries, + &ec_out, + &entries, + &part_more, + &part_full, + nullptr); + auto e = ba::get_associated_executor(handler, f->get_executor()); + auto a = ba::get_associated_allocator(handler); + f->r->execute( + part_oid, + f->ioc, + std::move(op), + nullptr, + ca::bind_ea( + e, a, + [t = std::unique_ptr<Lister>(this), this, + part_oid](bs::error_code ec) mutable { + t.release(); + if (ec == bs::errc::no_such_file_or_directory) { + auto e = ba::get_associated_executor(handler, + f->get_executor()); + auto a = ba::get_associated_allocator(handler); + f->_read_meta( + ca::bind_ea( + e, a, + [this](bs::error_code ec) mutable { + if (ec) { + handle(ec); + return; + } + + if (part_num < f->info.tail_part_num) { + /* raced with trim? restart */ + max_entries += result.size(); + result.clear(); + part_num = f->info.tail_part_num; + ofs = 0; + list(); + } + /* assuming part was not written yet, so end of data */ + more = false; + handle({}); + return; + })); + return; + } + if (ec) { + ldout(f->r->cct(), 0) + << __func__ + << "(): list_part() on oid=" << part_oid + << " returned ec=" << ec.message() << dendl; + handle(ec); + return; + } + if (ec_out) { + ldout(f->r->cct(), 0) + << __func__ + << "(): list_part() on oid=" << f->info.part_oid(part_num) + << " returned ec=" << ec_out.message() << dendl; + handle(ec_out); + return; + } + + more = part_full || part_more; + for (auto& entry : entries) { + list_entry e; + e.data = std::move(entry.data); + e.marker = marker{part_num, entry.ofs}.to_string(); + e.mtime = entry.mtime; + result.push_back(std::move(e)); + } + max_entries -= entries.size(); + entries.clear(); + if (max_entries > 0 && + part_more) { + list(); + return; + } + + if (!part_full) { /* head part is not full */ + handle({}); + return; + } + ++part_num; + ofs = 0; + list(); + })); + } else { + handle({}); + return; + } + } + }; + + template<typename Handler> + class Trimmer { + FIFO* f; + std::int64_t part_num; + std::uint64_t ofs; + bool exclusive; + Handler handler; + std::int64_t pn; + int i = 0; + + void handle(bs::error_code ec) { + auto h = std::move(handler); + + FIFO::assoc_delete(h, this); + return std::move(h)(ec); + } + + void update() { + std::unique_lock l(f->m); + auto objv = f->info.version; + l.unlock(); + auto a = ba::get_associated_allocator(handler); + auto e = ba::get_associated_executor(handler, f->get_executor()); + f->_update_meta( + fifo::update{}.tail_part_num(part_num), + objv, + ca::bind_ea( + e, a, + [this, t = std::unique_ptr<Trimmer>(this)](bs::error_code ec, + bool canceled) mutable { + t.release(); + if (canceled) + if (i >= MAX_RACE_RETRIES) { + ldout(f->r->cct(), 0) + << "ERROR: " << __func__ + << "(): race check failed too many times, likely a bug" + << dendl; + handle(errc::raced); + return; + } + std::unique_lock l(f->m); + auto tail_part_num = f->info.tail_part_num; + l.unlock(); + if (tail_part_num < part_num) { + ++i; + update(); + return; + } + handle({}); + return; + })); + } + + public: + Trimmer(FIFO* f, std::int64_t part_num, std::uint64_t ofs, + bool exclusive, Handler&& handler) + : f(f), part_num(part_num), ofs(ofs), exclusive(exclusive), + handler(std::move(handler)) { + std::unique_lock l(f->m); + pn = f->info.tail_part_num; + } + + void trim() { + auto a = ba::get_associated_allocator(handler); + auto e = ba::get_associated_executor(handler, f->get_executor()); + if (pn < part_num) { + std::unique_lock l(f->m); + auto max_part_size = f->info.params.max_part_size; + l.unlock(); + f->trim_part( + pn, max_part_size, std::nullopt, + false, + ca::bind_ea( + e, a, + [t = std::unique_ptr<Trimmer>(this), + this](bs::error_code ec) mutable { + t.release(); + if (ec && ec != bs::errc::no_such_file_or_directory) { + ldout(f->r->cct(), 0) + << __func__ << "(): ERROR: trim_part() on part=" + << pn << " returned ec=" << ec.message() << dendl; + handle(ec); + return; + } + ++pn; + trim(); + })); + return; + } + f->trim_part( + part_num, ofs, std::nullopt, exclusive, + ca::bind_ea( + e, a, + [t = std::unique_ptr<Trimmer>(this), + this](bs::error_code ec) mutable { + t.release(); + if (ec && ec != bs::errc::no_such_file_or_directory) { + ldout(f->r->cct(), 0) + << __func__ << "(): ERROR: trim_part() on part=" << part_num + << " returned ec=" << ec.message() << dendl; + handle(ec); + return; + } + std::unique_lock l(f->m); + auto tail_part_num = f->info.tail_part_num; + l.unlock(); + if (part_num <= tail_part_num) { + /* don't need to modify meta info */ + handle({}); + return; + } + update(); + })); + } + }; + + template<typename Handler> + class PartInfoGetter { + Handler handler; + using allocator_type = boost::asio::associated_allocator_t<Handler>; + using decoder_type = ExecDecodeCB<fifo::op::get_part_info_reply>; + using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>; + decoder_ptr decoder; + public: + PartInfoGetter(Handler&& handler, decoder_ptr&& decoder) + : handler(std::move(handler)), decoder(std::move(decoder)) {} + + void operator ()(bs::error_code ec) { + if (!ec) { + ec = decoder->ec; + } + auto reply = std::move(decoder->result); + decoder.reset(); // free handler-allocated memory before dispatching + + auto p = ca::bind_handler(std::move(handler), + ec, std::move(reply.header)); + std::move(p)(); + } + }; + + +}; + +namespace detail { +template<typename Handler> +class JournalProcessor { +private: + FIFO* const fifo; + Handler handler; + + std::vector<fifo::journal_entry> processed; + std::multimap<std::int64_t, fifo::journal_entry> journal; + std::multimap<std::int64_t, fifo::journal_entry>::iterator iter; + std::int64_t new_tail; + std::int64_t new_head; + std::int64_t new_max; + int race_retries = 0; + + template<typename CT> + auto create_part(int64_t part_num, std::string_view tag, CT&& ct) { + WriteOp op; + op.create(false); /* We don't need exclusivity, part_init ensures + we're creating from the same journal entry. */ + std::unique_lock l(fifo->m); + part_init(op, tag, fifo->info.params); + auto oid = fifo->info.part_oid(part_num); + l.unlock(); + return fifo->r->execute(oid, fifo->ioc, + std::move(op), std::forward<CT>(ct)); + } + + template<typename CT> + auto remove_part(int64_t part_num, std::string_view tag, CT&& ct) { + WriteOp op; + op.remove(); + std::unique_lock l(fifo->m); + auto oid = fifo->info.part_oid(part_num); + l.unlock(); + return fifo->r->execute(oid, fifo->ioc, + std::move(op), std::forward<CT>(ct)); + } + + template<typename PP> + void process_journal_entry(const fifo::journal_entry& entry, + PP&& pp) { + switch (entry.op) { + case fifo::journal_entry::Op::unknown: + std::move(pp)(errc::inconsistency); + return; + break; + + case fifo::journal_entry::Op::create: + create_part(entry.part_num, entry.part_tag, std::move(pp)); + return; + break; + case fifo::journal_entry::Op::set_head: + ba::post(ba::get_associated_executor(handler, fifo->get_executor()), + [pp = std::move(pp)]() mutable { + std::move(pp)(bs::error_code{}); + }); + return; + break; + case fifo::journal_entry::Op::remove: + remove_part(entry.part_num, entry.part_tag, std::move(pp)); + return; + break; + } + std::move(pp)(errc::inconsistency); + return; + } + + auto journal_entry_finisher(const fifo::journal_entry& entry) { + auto a = ba::get_associated_allocator(handler); + auto e = ba::get_associated_executor(handler, fifo->get_executor()); + return + ca::bind_ea( + e, a, + [t = std::unique_ptr<JournalProcessor>(this), this, + entry](bs::error_code ec) mutable { + t.release(); + if (entry.op == fifo::journal_entry::Op::remove && + ec == bs::errc::no_such_file_or_directory) + ec.clear(); + + if (ec) { + ldout(fifo->r->cct(), 0) + << __func__ + << "(): ERROR: failed processing journal entry for part=" + << entry.part_num << " with error " << ec.message() + << " Bug or inconsistency." << dendl; + handle(errc::inconsistency); + return; + } else { + switch (entry.op) { + case fifo::journal_entry::Op::unknown: + // Can't happen. Filtered out in process_journal_entry. + abort(); + break; + + case fifo::journal_entry::Op::create: + if (entry.part_num > new_max) { + new_max = entry.part_num; + } + break; + case fifo::journal_entry::Op::set_head: + if (entry.part_num > new_head) { + new_head = entry.part_num; + } + break; + case fifo::journal_entry::Op::remove: + if (entry.part_num >= new_tail) { + new_tail = entry.part_num + 1; + } + break; + } + processed.push_back(entry); + } + ++iter; + process(); + }); + } + + struct JournalPostprocessor { + std::unique_ptr<JournalProcessor> j_; + bool first; + void operator ()(bs::error_code ec, bool canceled) { + std::optional<int64_t> tail_part_num; + std::optional<int64_t> head_part_num; + std::optional<int64_t> max_part_num; + + auto j = j_.release(); + + if (!first && !ec && !canceled) { + j->handle({}); + return; + } + + if (canceled) { + if (j->race_retries >= MAX_RACE_RETRIES) { + ldout(j->fifo->r->cct(), 0) << "ERROR: " << __func__ << + "(): race check failed too many times, likely a bug" << dendl; + j->handle(errc::raced); + return; + } + + ++j->race_retries; + + std::vector<fifo::journal_entry> new_processed; + std::unique_lock l(j->fifo->m); + for (auto& e : j->processed) { + auto jiter = j->fifo->info.journal.find(e.part_num); + /* journal entry was already processed */ + if (jiter == j->fifo->info.journal.end() || + !(jiter->second == e)) { + continue; + } + new_processed.push_back(e); + } + j->processed = std::move(new_processed); + } + + std::unique_lock l(j->fifo->m); + auto objv = j->fifo->info.version; + if (j->new_tail > j->fifo->info.tail_part_num) { + tail_part_num = j->new_tail; + } + + if (j->new_head > j->fifo->info.head_part_num) { + head_part_num = j->new_head; + } + + if (j->new_max > j->fifo->info.max_push_part_num) { + max_part_num = j->new_max; + } + l.unlock(); + + if (j->processed.empty() && + !tail_part_num && + !max_part_num) { + /* nothing to update anymore */ + j->handle({}); + return; + } + auto a = ba::get_associated_allocator(j->handler); + auto e = ba::get_associated_executor(j->handler, j->fifo->get_executor()); + j->fifo->_update_meta(fifo::update{} + .tail_part_num(tail_part_num) + .head_part_num(head_part_num) + .max_push_part_num(max_part_num) + .journal_entries_rm(j->processed), + objv, + ca::bind_ea( + e, a, + JournalPostprocessor{j, false})); + return; + } + + JournalPostprocessor(JournalProcessor* j, bool first) + : j_(j), first(first) {} + }; + + void postprocess() { + if (processed.empty()) { + handle({}); + return; + } + JournalPostprocessor(this, true)({}, false); + } + + void handle(bs::error_code ec) { + auto e = ba::get_associated_executor(handler, fifo->get_executor()); + auto a = ba::get_associated_allocator(handler); + auto h = std::move(handler); + FIFO::assoc_delete(h, this); + e.dispatch(ca::bind_handler(std::move(h), ec), a); + return; + } + +public: + + JournalProcessor(FIFO* fifo, Handler&& handler) + : fifo(fifo), handler(std::move(handler)) { + std::unique_lock l(fifo->m); + journal = fifo->info.journal; + iter = journal.begin(); + new_tail = fifo->info.tail_part_num; + new_head = fifo->info.head_part_num; + new_max = fifo->info.max_push_part_num; + } + + JournalProcessor(const JournalProcessor&) = delete; + JournalProcessor& operator =(const JournalProcessor&) = delete; + JournalProcessor(JournalProcessor&&) = delete; + JournalProcessor& operator =(JournalProcessor&&) = delete; + + void process() { + if (iter != journal.end()) { + const auto entry = iter->second; + process_journal_entry(entry, + journal_entry_finisher(entry)); + return; + } else { + postprocess(); + return; + } + } +}; +} +} + +#endif // CEPH_RADOS_CLS_FIFIO_H |