summaryrefslogtreecommitdiffstats
path: root/src/neorados
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/neorados
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/neorados')
-rw-r--r--src/neorados/CMakeLists.txt42
-rw-r--r--src/neorados/RADOS.cc1736
-rw-r--r--src/neorados/RADOSImpl.cc121
-rw-r--r--src/neorados/RADOSImpl.h135
-rw-r--r--src/neorados/cls/fifo.cc385
-rw-r--r--src/neorados/cls/fifo.h1754
6 files changed, 4173 insertions, 0 deletions
diff --git a/src/neorados/CMakeLists.txt b/src/neorados/CMakeLists.txt
new file mode 100644
index 000000000..8695b48f0
--- /dev/null
+++ b/src/neorados/CMakeLists.txt
@@ -0,0 +1,42 @@
+add_library(neorados_objs OBJECT
+ RADOSImpl.cc)
+compile_with_fmt(neorados_objs)
+add_library(neorados_api_obj OBJECT
+ RADOS.cc)
+compile_with_fmt(neorados_api_obj)
+
+add_library(libneorados STATIC
+ $<TARGET_OBJECTS:neorados_api_obj>
+ $<TARGET_OBJECTS:neorados_objs>)
+target_link_libraries(libneorados PRIVATE
+ osdc ceph-common cls_lock_client fmt::fmt
+ ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS})
+
+# if(ENABLE_SHARED)
+# add_library(libneorados ${CEPH_SHARED}
+# $<TARGET_OBJECTS:neorados_api_obj>
+# $<TARGET_OBJECTS:neorados_objs>
+# $<TARGET_OBJECTS:common_buffer_obj>)
+# set_target_properties(libneorados PROPERTIES
+# OUTPUT_NAME RADOS
+# VERSION 0.0.1
+# SOVERSION 1
+# CXX_VISIBILITY_PRESET hidden
+# VISIBILITY_INLINES_HIDDEN ON)
+# if(NOT APPLE)
+# set_property(TARGET libneorados APPEND_STRING PROPERTY
+# LINK_FLAGS " -Wl,--exclude-libs,ALL")
+# endif()
+# else(ENABLE_SHARED)
+# add_library(libneorados STATIC
+# $<TARGET_OBJECTS:neorados_api_obj>
+# $<TARGET_OBJECTS:neorados_objs>)
+# endif(ENABLE_SHARED)
+# target_link_libraries(libneorados PRIVATE
+# osdc ceph-common cls_lock_client
+# ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS})
+# target_link_libraries(libneorados ${rados_libs})
+# install(TARGETS libneorados DESTINATION ${CMAKE_INSTALL_LIBDIR})
+add_library(neorados_cls_fifo STATIC cls/fifo.cc)
+target_link_libraries(neorados_cls_fifo PRIVATE
+ libneorados ceph-common fmt::fmt)
diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc
new file mode 100644
index 000000000..927f995cc
--- /dev/null
+++ b/src/neorados/RADOS.cc
@@ -0,0 +1,1736 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#define BOOST_BIND_NO_PLACEHOLDERS
+
+#include <optional>
+#include <string_view>
+
+#include <boost/intrusive_ptr.hpp>
+
+#include <fmt/format.h>
+
+#include "include/ceph_fs.h"
+
+#include "common/ceph_context.h"
+#include "common/ceph_argparse.h"
+#include "common/common_init.h"
+#include "common/hobject.h"
+#include "common/EventTrace.h"
+
+#include "global/global_init.h"
+
+#include "osd/osd_types.h"
+#include "osdc/error_code.h"
+
+#include "neorados/RADOSImpl.h"
+#include "include/neorados/RADOS.hpp"
+
+namespace bc = boost::container;
+namespace bs = boost::system;
+namespace ca = ceph::async;
+namespace cb = ceph::buffer;
+
+namespace neorados {
+// Object
+
+Object::Object() {
+ static_assert(impl_size >= sizeof(object_t));
+ new (&impl) object_t();
+}
+
+Object::Object(const char* s) {
+ static_assert(impl_size >= sizeof(object_t));
+ new (&impl) object_t(s);
+}
+
+Object::Object(std::string_view s) {
+ static_assert(impl_size >= sizeof(object_t));
+ new (&impl) object_t(s);
+}
+
+Object::Object(std::string&& s) {
+ static_assert(impl_size >= sizeof(object_t));
+ new (&impl) object_t(std::move(s));
+}
+
+Object::Object(const std::string& s) {
+ static_assert(impl_size >= sizeof(object_t));
+ new (&impl) object_t(s);
+}
+
+Object::~Object() {
+ reinterpret_cast<object_t*>(&impl)->~object_t();
+}
+
+Object::Object(const Object& o) {
+ static_assert(impl_size >= sizeof(object_t));
+ new (&impl) object_t(*reinterpret_cast<const object_t*>(&o.impl));
+}
+Object& Object::operator =(const Object& o) {
+ *reinterpret_cast<object_t*>(&impl) =
+ *reinterpret_cast<const object_t*>(&o.impl);
+ return *this;
+}
+Object::Object(Object&& o) {
+ static_assert(impl_size >= sizeof(object_t));
+ new (&impl) object_t(std::move(*reinterpret_cast<object_t*>(&o.impl)));
+}
+Object& Object::operator =(Object&& o) {
+ *reinterpret_cast<object_t*>(&impl) =
+ std::move(*reinterpret_cast<object_t*>(&o.impl));
+ return *this;
+}
+
+Object::operator std::string_view() const {
+ return std::string_view(reinterpret_cast<const object_t*>(&impl)->name);
+}
+
+bool operator <(const Object& lhs, const Object& rhs) {
+ return (*reinterpret_cast<const object_t*>(&lhs.impl) <
+ *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+bool operator <=(const Object& lhs, const Object& rhs) {
+ return (*reinterpret_cast<const object_t*>(&lhs.impl) <=
+ *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+bool operator >=(const Object& lhs, const Object& rhs) {
+ return (*reinterpret_cast<const object_t*>(&lhs.impl) >=
+ *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+bool operator >(const Object& lhs, const Object& rhs) {
+ return (*reinterpret_cast<const object_t*>(&lhs.impl) >
+ *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+
+bool operator ==(const Object& lhs, const Object& rhs) {
+ return (*reinterpret_cast<const object_t*>(&lhs.impl) ==
+ *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+bool operator !=(const Object& lhs, const Object& rhs) {
+ return (*reinterpret_cast<const object_t*>(&lhs.impl) !=
+ *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+
+std::ostream& operator <<(std::ostream& m, const Object& o) {
+ return (m << *reinterpret_cast<const object_t*>(&o.impl));
+}
+
+// IOContext
+
+struct IOContextImpl {
+ object_locator_t oloc;
+ snapid_t snap_seq = CEPH_NOSNAP;
+ SnapContext snapc;
+ int extra_op_flags = 0;
+};
+
+IOContext::IOContext() {
+ static_assert(impl_size >= sizeof(IOContextImpl));
+ new (&impl) IOContextImpl();
+}
+
+IOContext::IOContext(std::int64_t _pool) : IOContext() {
+ pool(_pool);
+}
+
+IOContext::IOContext(std::int64_t _pool, std::string_view _ns)
+ : IOContext() {
+ pool(_pool);
+ ns(_ns);
+}
+
+IOContext::IOContext(std::int64_t _pool, std::string&& _ns)
+ : IOContext() {
+ pool(_pool);
+ ns(std::move(_ns));
+}
+
+IOContext::~IOContext() {
+ reinterpret_cast<IOContextImpl*>(&impl)->~IOContextImpl();
+}
+
+IOContext::IOContext(const IOContext& rhs) {
+ static_assert(impl_size >= sizeof(IOContextImpl));
+ new (&impl) IOContextImpl(*reinterpret_cast<const IOContextImpl*>(&rhs.impl));
+}
+
+IOContext& IOContext::operator =(const IOContext& rhs) {
+ *reinterpret_cast<IOContextImpl*>(&impl) =
+ *reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+ return *this;
+}
+
+IOContext::IOContext(IOContext&& rhs) {
+ static_assert(impl_size >= sizeof(IOContextImpl));
+ new (&impl) IOContextImpl(
+ std::move(*reinterpret_cast<IOContextImpl*>(&rhs.impl)));
+}
+
+IOContext& IOContext::operator =(IOContext&& rhs) {
+ *reinterpret_cast<IOContextImpl*>(&impl) =
+ std::move(*reinterpret_cast<IOContextImpl*>(&rhs.impl));
+ return *this;
+}
+
+std::int64_t IOContext::pool() const {
+ return reinterpret_cast<const IOContextImpl*>(&impl)->oloc.pool;
+}
+
+void IOContext::pool(std::int64_t _pool) {
+ reinterpret_cast<IOContextImpl*>(&impl)->oloc.pool = _pool;
+}
+
+std::string_view IOContext::ns() const {
+ return reinterpret_cast<const IOContextImpl*>(&impl)->oloc.nspace;
+}
+
+void IOContext::ns(std::string_view _ns) {
+ reinterpret_cast<IOContextImpl*>(&impl)->oloc.nspace = _ns;
+}
+
+void IOContext::ns(std::string&& _ns) {
+ reinterpret_cast<IOContextImpl*>(&impl)->oloc.nspace = std::move(_ns);
+}
+
+std::optional<std::string_view> IOContext::key() const {
+ auto& oloc = reinterpret_cast<const IOContextImpl*>(&impl)->oloc;
+ if (oloc.key.empty())
+ return std::nullopt;
+ else
+ return std::string_view(oloc.key);
+}
+
+void IOContext::key(std::string_view _key) {
+ auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+ oloc.hash = -1;
+ oloc.key = _key;
+}
+
+void IOContext::key(std::string&&_key) {
+ auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+ oloc.hash = -1;
+ oloc.key = std::move(_key);
+}
+
+void IOContext::clear_key() {
+ auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+ oloc.hash = -1;
+ oloc.key.clear();
+}
+
+std::optional<std::int64_t> IOContext::hash() const {
+ auto& oloc = reinterpret_cast<const IOContextImpl*>(&impl)->oloc;
+ if (oloc.hash < 0)
+ return std::nullopt;
+ else
+ return oloc.hash;
+}
+
+void IOContext::hash(std::int64_t _hash) {
+ auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+ oloc.hash = _hash;
+ oloc.key.clear();
+}
+
+void IOContext::clear_hash() {
+ auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+ oloc.hash = -1;
+ oloc.key.clear();
+}
+
+
+std::optional<std::uint64_t> IOContext::read_snap() const {
+ auto& snap_seq = reinterpret_cast<const IOContextImpl*>(&impl)->snap_seq;
+ if (snap_seq == CEPH_NOSNAP)
+ return std::nullopt;
+ else
+ return snap_seq;
+}
+void IOContext::read_snap(std::optional<std::uint64_t> _snapid) {
+ auto& snap_seq = reinterpret_cast<IOContextImpl*>(&impl)->snap_seq;
+ snap_seq = _snapid.value_or(CEPH_NOSNAP);
+}
+
+std::optional<
+ std::pair<std::uint64_t,
+ std::vector<std::uint64_t>>> IOContext::write_snap_context() const {
+ auto& snapc = reinterpret_cast<const IOContextImpl*>(&impl)->snapc;
+ if (snapc.empty()) {
+ return std::nullopt;
+ } else {
+ std::vector<uint64_t> v(snapc.snaps.begin(), snapc.snaps.end());
+ return std::make_optional(std::make_pair(uint64_t(snapc.seq), v));
+ }
+}
+
+void IOContext::write_snap_context(
+ std::optional<std::pair<std::uint64_t, std::vector<std::uint64_t>>> _snapc) {
+ auto& snapc = reinterpret_cast<IOContextImpl*>(&impl)->snapc;
+ if (!_snapc) {
+ snapc.clear();
+ } else {
+ SnapContext n(_snapc->first, { _snapc->second.begin(), _snapc->second.end()});
+ if (!n.is_valid()) {
+ throw bs::system_error(EINVAL,
+ bs::system_category(),
+ "Invalid snap context.");
+
+ } else {
+ snapc = n;
+ }
+ }
+}
+
+bool IOContext::full_try() const {
+ const auto ioc = reinterpret_cast<const IOContextImpl*>(&impl);
+ return (ioc->extra_op_flags & CEPH_OSD_FLAG_FULL_TRY) != 0;
+}
+
+void IOContext::full_try(bool _full_try) {
+ auto ioc = reinterpret_cast<IOContextImpl*>(&impl);
+ if (_full_try) {
+ ioc->extra_op_flags |= CEPH_OSD_FLAG_FULL_TRY;
+ } else {
+ ioc->extra_op_flags &= ~CEPH_OSD_FLAG_FULL_TRY;
+ }
+}
+
+bool operator <(const IOContext& lhs, const IOContext& rhs) {
+ const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+ const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+ return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) <
+ std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator <=(const IOContext& lhs, const IOContext& rhs) {
+ const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+ const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+ return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) <=
+ std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator >=(const IOContext& lhs, const IOContext& rhs) {
+ const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+ const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+ return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) >=
+ std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator >(const IOContext& lhs, const IOContext& rhs) {
+ const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+ const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+ return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) >
+ std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator ==(const IOContext& lhs, const IOContext& rhs) {
+ const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+ const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+ return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) ==
+ std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator !=(const IOContext& lhs, const IOContext& rhs) {
+ const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+ const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+ return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) !=
+ std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+std::ostream& operator <<(std::ostream& m, const IOContext& o) {
+ const auto l = reinterpret_cast<const IOContextImpl*>(&o.impl);
+ return (m << l->oloc.pool << ":" << l->oloc.nspace << ":" << l->oloc.key);
+}
+
+
+// Op
+
+struct OpImpl {
+ ObjectOperation op;
+ std::optional<ceph::real_time> mtime;
+
+ OpImpl() = default;
+
+ OpImpl(const OpImpl& rhs) = delete;
+ OpImpl(OpImpl&& rhs) = default;
+
+ OpImpl& operator =(const OpImpl& rhs) = delete;
+ OpImpl& operator =(OpImpl&& rhs) = default;
+};
+
+Op::Op() {
+ static_assert(Op::impl_size >= sizeof(OpImpl));
+ new (&impl) OpImpl;
+}
+
+Op::Op(Op&& rhs) {
+ new (&impl) OpImpl(std::move(*reinterpret_cast<OpImpl*>(&rhs.impl)));
+}
+Op& Op::operator =(Op&& rhs) {
+ reinterpret_cast<OpImpl*>(&impl)->~OpImpl();
+ new (&impl) OpImpl(std::move(*reinterpret_cast<OpImpl*>(&rhs.impl)));
+ return *this;
+}
+Op::~Op() {
+ reinterpret_cast<OpImpl*>(&impl)->~OpImpl();
+}
+
+void Op::set_excl() {
+ reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(CEPH_OSD_OP_FLAG_EXCL);
+}
+void Op::set_failok() {
+ reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+ CEPH_OSD_OP_FLAG_FAILOK);
+}
+void Op::set_fadvise_random() {
+ reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+ CEPH_OSD_OP_FLAG_FADVISE_RANDOM);
+}
+void Op::set_fadvise_sequential() {
+ reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+ CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+}
+void Op::set_fadvise_willneed() {
+ reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+ CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+}
+void Op::set_fadvise_dontneed() {
+ reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+ CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+}
+void Op::set_fadvise_nocache() {
+ reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+ CEPH_OSD_OP_FLAG_FADVISE_NOCACHE);
+}
+
+void Op::cmpext(uint64_t off, bufferlist&& cmp_bl, std::size_t* s) {
+ reinterpret_cast<OpImpl*>(&impl)->op.cmpext(off, std::move(cmp_bl), nullptr,
+ s);
+}
+void Op::cmpxattr(std::string_view name, cmpxattr_op op, const bufferlist& val) {
+ reinterpret_cast<OpImpl*>(&impl)->
+ op.cmpxattr(name, std::uint8_t(op), CEPH_OSD_CMPXATTR_MODE_STRING, val);
+}
+void Op::cmpxattr(std::string_view name, cmpxattr_op op, std::uint64_t val) {
+ bufferlist bl;
+ encode(val, bl);
+ reinterpret_cast<OpImpl*>(&impl)->
+ op.cmpxattr(name, std::uint8_t(op), CEPH_OSD_CMPXATTR_MODE_U64, bl);
+}
+
+void Op::assert_version(uint64_t ver) {
+ reinterpret_cast<OpImpl*>(&impl)->op.assert_version(ver);
+}
+void Op::assert_exists() {
+ reinterpret_cast<OpImpl*>(&impl)->op.stat(
+ nullptr,
+ static_cast<ceph::real_time*>(nullptr),
+ static_cast<bs::error_code*>(nullptr));
+}
+void Op::cmp_omap(const bc::flat_map<
+ std::string, std::pair<cb::list,
+ int>>& assertions) {
+ reinterpret_cast<OpImpl*>(&impl)->op.omap_cmp(assertions, nullptr);
+}
+
+void Op::exec(std::string_view cls, std::string_view method,
+ const bufferlist& inbl,
+ cb::list* out,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, ec, out);
+}
+
+void Op::exec(std::string_view cls, std::string_view method,
+ const bufferlist& inbl,
+ fu2::unique_function<void(bs::error_code,
+ const cb::list&) &&> f) {
+ reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, std::move(f));
+}
+
+void Op::exec(std::string_view cls, std::string_view method,
+ const bufferlist& inbl,
+ fu2::unique_function<void(bs::error_code, int,
+ const cb::list&) &&> f) {
+ reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, std::move(f));
+}
+
+void Op::exec(std::string_view cls, std::string_view method,
+ const bufferlist& inbl, bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, ec);
+}
+
+void Op::balance_reads() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_BALANCE_READS;
+}
+void Op::localize_reads() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_LOCALIZE_READS;
+}
+void Op::order_reads_writes() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_RWORDERED;
+}
+void Op::ignore_cache() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_CACHE;
+}
+void Op::skiprwlocks() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_SKIPRWLOCKS;
+}
+void Op::ignore_overlay() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
+}
+void Op::full_try() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_FULL_TRY;
+}
+void Op::full_force() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_FULL_FORCE;
+}
+void Op::ignore_redirect() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_REDIRECT;
+}
+void Op::ordersnap() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_ORDERSNAP;
+}
+void Op::returnvec() {
+ reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_RETURNVEC;
+}
+
+std::size_t Op::size() const {
+ return reinterpret_cast<const OpImpl*>(&impl)->op.size();
+}
+
+std::ostream& operator <<(std::ostream& m, const Op& o) {
+ return m << reinterpret_cast<const OpImpl*>(&o.impl)->op;
+}
+
+
+// ---
+
+// ReadOp / WriteOp
+
+void ReadOp::read(size_t off, uint64_t len, cb::list* out,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.read(off, len, ec, out);
+}
+
+void ReadOp::get_xattr(std::string_view name, cb::list* out,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.getxattr(name, ec, out);
+}
+
+void ReadOp::get_omap_header(cb::list* out,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.omap_get_header(ec, out);
+}
+
+void ReadOp::sparse_read(uint64_t off, uint64_t len, cb::list* out,
+ std::vector<std::pair<std::uint64_t,
+ std::uint64_t>>* extents,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.sparse_read(off, len, ec, extents, out);
+}
+
+void ReadOp::stat(std::uint64_t* size, ceph::real_time* mtime,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.stat(size, mtime, ec);
+}
+
+void ReadOp::get_omap_keys(std::optional<std::string_view> start_after,
+ std::uint64_t max_return,
+ bc::flat_set<std::string>* keys,
+ bool* done,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.omap_get_keys(start_after, max_return,
+ ec, keys, done);
+}
+
+void ReadOp::get_xattrs(bc::flat_map<std::string,
+ cb::list>* kv,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.getxattrs(ec, kv);
+}
+
+void ReadOp::get_omap_vals(std::optional<std::string_view> start_after,
+ std::optional<std::string_view> filter_prefix,
+ uint64_t max_return,
+ bc::flat_map<std::string,
+ cb::list>* kv,
+ bool* done,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.omap_get_vals(start_after, filter_prefix,
+ max_return, ec, kv, done);
+}
+
+void ReadOp::get_omap_vals_by_keys(
+ const bc::flat_set<std::string>& keys,
+ bc::flat_map<std::string, cb::list>* kv,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.omap_get_vals_by_keys(keys, ec, kv);
+}
+
+void ReadOp::list_watchers(std::vector<ObjWatcher>* watchers,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)-> op.list_watchers(watchers, ec);
+}
+
+void ReadOp::list_snaps(SnapSet* snaps,
+ bs::error_code* ec) {
+ reinterpret_cast<OpImpl*>(&impl)->op.list_snaps(snaps, nullptr, ec);
+}
+
+// WriteOp
+
+void WriteOp::set_mtime(ceph::real_time t) {
+ auto o = reinterpret_cast<OpImpl*>(&impl);
+ o->mtime = t;
+}
+
+void WriteOp::create(bool exclusive) {
+ reinterpret_cast<OpImpl*>(&impl)->op.create(exclusive);
+}
+
+void WriteOp::write(uint64_t off, bufferlist&& bl) {
+ reinterpret_cast<OpImpl*>(&impl)->op.write(off, bl);
+}
+
+void WriteOp::write_full(bufferlist&& bl) {
+ reinterpret_cast<OpImpl*>(&impl)->op.write_full(bl);
+}
+
+void WriteOp::writesame(uint64_t off, uint64_t write_len, bufferlist&& bl) {
+ reinterpret_cast<OpImpl*>(&impl)->op.writesame(off, write_len, bl);
+}
+
+void WriteOp::append(bufferlist&& bl) {
+ reinterpret_cast<OpImpl*>(&impl)->op.append(bl);
+}
+
+void WriteOp::remove() {
+ reinterpret_cast<OpImpl*>(&impl)->op.remove();
+}
+
+void WriteOp::truncate(uint64_t off) {
+ reinterpret_cast<OpImpl*>(&impl)->op.truncate(off);
+}
+
+void WriteOp::zero(uint64_t off, uint64_t len) {
+ reinterpret_cast<OpImpl*>(&impl)->op.zero(off, len);
+}
+
+void WriteOp::rmxattr(std::string_view name) {
+ reinterpret_cast<OpImpl*>(&impl)->op.rmxattr(name);
+}
+
+void WriteOp::setxattr(std::string_view name,
+ bufferlist&& bl) {
+ reinterpret_cast<OpImpl*>(&impl)->op.setxattr(name, bl);
+}
+
+void WriteOp::rollback(uint64_t snapid) {
+ reinterpret_cast<OpImpl*>(&impl)->op.rollback(snapid);
+}
+
+void WriteOp::set_omap(
+ const bc::flat_map<std::string, cb::list>& map) {
+ reinterpret_cast<OpImpl*>(&impl)->op.omap_set(map);
+}
+
+void WriteOp::set_omap_header(bufferlist&& bl) {
+ reinterpret_cast<OpImpl*>(&impl)->op.omap_set_header(bl);
+}
+
+void WriteOp::clear_omap() {
+ reinterpret_cast<OpImpl*>(&impl)->op.omap_clear();
+}
+
+void WriteOp::rm_omap_keys(
+ const bc::flat_set<std::string>& to_rm) {
+ reinterpret_cast<OpImpl*>(&impl)->op.omap_rm_keys(to_rm);
+}
+
+void WriteOp::set_alloc_hint(uint64_t expected_object_size,
+ uint64_t expected_write_size,
+ alloc_hint::alloc_hint_t flags) {
+ using namespace alloc_hint;
+ static_assert(sequential_write ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE));
+ static_assert(random_write ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_WRITE));
+ static_assert(sequential_read ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ));
+ static_assert(random_read ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ));
+ static_assert(append_only ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY));
+ static_assert(immutable ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE));
+ static_assert(shortlived ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_SHORTLIVED));
+ static_assert(longlived ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_LONGLIVED));
+ static_assert(compressible ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE));
+ static_assert(incompressible ==
+ static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE));
+
+ reinterpret_cast<OpImpl*>(&impl)->op.set_alloc_hint(expected_object_size,
+ expected_write_size,
+ flags);
+}
+
+// RADOS
+
+RADOS::Builder& RADOS::Builder::add_conf_file(std::string_view f) {
+ if (conf_files)
+ *conf_files += (", " + std::string(f));
+ else
+ conf_files = std::string(f);
+ return *this;
+}
+
+void RADOS::Builder::build(boost::asio::io_context& ioctx,
+ std::unique_ptr<BuildComp> c) {
+ constexpr auto env = CODE_ENVIRONMENT_LIBRARY;
+ CephInitParameters ci(env);
+ if (name)
+ ci.name.set(CEPH_ENTITY_TYPE_CLIENT, *name);
+ else
+ ci.name.set(CEPH_ENTITY_TYPE_CLIENT, "admin");
+ uint32_t flags = 0;
+ if (no_default_conf)
+ flags |= CINIT_FLAG_NO_DEFAULT_CONFIG_FILE;
+ if (no_mon_conf)
+ flags |= CINIT_FLAG_NO_MON_CONFIG;
+
+ CephContext *cct = common_preinit(ci, env, flags);
+ if (cluster)
+ cct->_conf->cluster = *cluster;
+
+ if (no_mon_conf)
+ cct->_conf->no_mon_config = true;
+
+ // TODO: Come up with proper error codes here. Maybe augment the
+ // functions with a default bs::error_code* parameter to
+ // pass back.
+ {
+ std::ostringstream ss;
+ auto r = cct->_conf.parse_config_files(conf_files ? conf_files->data() : nullptr,
+ &ss, flags);
+ if (r < 0)
+ c->post(std::move(c), ceph::to_error_code(r), RADOS{nullptr});
+ }
+
+ cct->_conf.parse_env(cct->get_module_type());
+
+ for (const auto& [n, v] : configs) {
+ std::stringstream ss;
+ auto r = cct->_conf.set_val(n, v, &ss);
+ if (r < 0)
+ c->post(std::move(c), ceph::to_error_code(-EINVAL), RADOS{nullptr});
+ }
+
+ if (!no_mon_conf) {
+ MonClient mc_bootstrap(cct, ioctx);
+ // TODO This function should return an error code.
+ auto err = mc_bootstrap.get_monmap_and_config();
+ if (err < 0)
+ c->post(std::move(c), ceph::to_error_code(err), RADOS{nullptr});
+ }
+ if (!cct->_log->is_started()) {
+ cct->_log->start();
+ }
+ common_init_finish(cct);
+
+ RADOS::make_with_cct(cct, ioctx, std::move(c));
+}
+
+void RADOS::make_with_cct(CephContext* cct,
+ boost::asio::io_context& ioctx,
+ std::unique_ptr<BuildComp> c) {
+ try {
+ auto r = new detail::NeoClient{std::make_unique<detail::RADOS>(ioctx, cct)};
+ r->objecter->wait_for_osd_map(
+ [c = std::move(c), r = std::unique_ptr<detail::Client>(r)]() mutable {
+ c->dispatch(std::move(c), bs::error_code{},
+ RADOS{std::move(r)});
+ });
+ } catch (const bs::system_error& err) {
+ c->post(std::move(c), err.code(), RADOS{nullptr});
+ }
+}
+
+RADOS RADOS::make_with_librados(librados::Rados& rados) {
+ return RADOS{std::make_unique<detail::RadosClient>(rados.client)};
+}
+
+RADOS::RADOS() = default;
+
+RADOS::RADOS(std::unique_ptr<detail::Client> impl)
+ : impl(std::move(impl)) {}
+
+RADOS::RADOS(RADOS&&) = default;
+RADOS& RADOS::operator =(RADOS&&) = default;
+
+RADOS::~RADOS() = default;
+
+RADOS::executor_type RADOS::get_executor() const {
+ return impl->ioctx.get_executor();
+}
+
+boost::asio::io_context& RADOS::get_io_context() {
+ return impl->ioctx;
+}
+
+void RADOS::execute(const Object& o, const IOContext& _ioc, ReadOp&& _op,
+ cb::list* bl,
+ std::unique_ptr<ReadOp::Completion> c, version_t* objver,
+ const blkin_trace_info *trace_info) {
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+ auto op = reinterpret_cast<OpImpl*>(&_op.impl);
+ auto flags = op->op.flags | ioc->extra_op_flags;
+
+ ZTracer::Trace trace;
+ if (trace_info) {
+ ZTracer::Trace parent_trace("", nullptr, trace_info);
+ trace.init("rados execute", &impl->objecter->trace_endpoint, &parent_trace);
+ }
+
+ trace.event("init");
+ impl->objecter->read(
+ *oid, ioc->oloc, std::move(op->op), ioc->snap_seq, bl, flags,
+ std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace);
+
+ trace.event("submitted");
+}
+
+void RADOS::execute(const Object& o, const IOContext& _ioc, WriteOp&& _op,
+ std::unique_ptr<WriteOp::Completion> c, version_t* objver,
+ const blkin_trace_info *trace_info) {
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+ auto op = reinterpret_cast<OpImpl*>(&_op.impl);
+ auto flags = op->op.flags | ioc->extra_op_flags;
+ ceph::real_time mtime;
+ if (op->mtime)
+ mtime = *op->mtime;
+ else
+ mtime = ceph::real_clock::now();
+
+ ZTracer::Trace trace;
+ if (trace_info) {
+ ZTracer::Trace parent_trace("", nullptr, trace_info);
+ trace.init("rados execute", &impl->objecter->trace_endpoint, &parent_trace);
+ }
+
+ trace.event("init");
+ impl->objecter->mutate(
+ *oid, ioc->oloc, std::move(op->op), ioc->snapc,
+ mtime, flags,
+ std::move(c), objver, osd_reqid_t{}, &trace);
+ trace.event("submitted");
+}
+
+void RADOS::execute(const Object& o, std::int64_t pool, ReadOp&& _op,
+ cb::list* bl,
+ std::unique_ptr<ReadOp::Completion> c,
+ std::optional<std::string_view> ns,
+ std::optional<std::string_view> key,
+ version_t* objver) {
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ auto op = reinterpret_cast<OpImpl*>(&_op.impl);
+ auto flags = op->op.flags;
+ object_locator_t oloc;
+ oloc.pool = pool;
+ if (ns)
+ oloc.nspace = *ns;
+ if (key)
+ oloc.key = *key;
+
+ impl->objecter->read(
+ *oid, oloc, std::move(op->op), CEPH_NOSNAP, bl, flags,
+ std::move(c), objver);
+}
+
+void RADOS::execute(const Object& o, std::int64_t pool, WriteOp&& _op,
+ std::unique_ptr<WriteOp::Completion> c,
+ std::optional<std::string_view> ns,
+ std::optional<std::string_view> key,
+ version_t* objver) {
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ auto op = reinterpret_cast<OpImpl*>(&_op.impl);
+ auto flags = op->op.flags;
+ object_locator_t oloc;
+ oloc.pool = pool;
+ if (ns)
+ oloc.nspace = *ns;
+ if (key)
+ oloc.key = *key;
+
+ ceph::real_time mtime;
+ if (op->mtime)
+ mtime = *op->mtime;
+ else
+ mtime = ceph::real_clock::now();
+
+ impl->objecter->mutate(
+ *oid, oloc, std::move(op->op), {},
+ mtime, flags,
+ std::move(c), objver);
+}
+
+boost::uuids::uuid RADOS::get_fsid() const noexcept {
+ return impl->monclient.get_fsid().uuid;
+}
+
+
+void RADOS::lookup_pool(std::string_view name,
+ std::unique_ptr<LookupPoolComp> c)
+{
+ // I kind of want to make lookup_pg_pool return
+ // std::optional<int64_t> since it can only return one error code.
+ int64_t ret = impl->objecter->with_osdmap(
+ std::mem_fn(&OSDMap::lookup_pg_pool_name),
+ name);
+ if (ret < 0) {
+ impl->objecter->wait_for_latest_osdmap(
+ [name = std::string(name), c = std::move(c),
+ objecter = impl->objecter]
+ (bs::error_code ec) mutable {
+ int64_t ret =
+ objecter->with_osdmap([&](const OSDMap &osdmap) {
+ return osdmap.lookup_pg_pool_name(name);
+ });
+ if (ret < 0)
+ ca::dispatch(std::move(c), osdc_errc::pool_dne,
+ std::int64_t(0));
+ else
+ ca::dispatch(std::move(c), bs::error_code{}, ret);
+ });
+ } else if (ret < 0) {
+ ca::post(std::move(c), osdc_errc::pool_dne,
+ std::int64_t(0));
+ } else {
+ ca::post(std::move(c), bs::error_code{}, ret);
+ }
+}
+
+
+std::optional<uint64_t> RADOS::get_pool_alignment(int64_t pool_id)
+{
+ return impl->objecter->with_osdmap(
+ [pool_id](const OSDMap &o) -> std::optional<uint64_t> {
+ if (!o.have_pg_pool(pool_id)) {
+ throw bs::system_error(
+ ENOENT, bs::system_category(),
+ "Cannot find pool in OSDMap.");
+ } else if (o.get_pg_pool(pool_id)->requires_aligned_append()) {
+ return o.get_pg_pool(pool_id)->required_alignment();
+ } else {
+ return std::nullopt;
+ }
+ });
+}
+
+void RADOS::list_pools(std::unique_ptr<LSPoolsComp> c) {
+ impl->objecter->with_osdmap(
+ [&](OSDMap& o) {
+ std::vector<std::pair<std::int64_t, std::string>> v;
+ for (auto p : o.get_pools())
+ v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
+ ca::dispatch(std::move(c), std::move(v));
+ });
+}
+
+void RADOS::create_pool_snap(std::int64_t pool,
+ std::string_view snapName,
+ std::unique_ptr<SimpleOpComp> c)
+{
+ impl->objecter->create_pool_snap(
+ pool, snapName,
+ Objecter::PoolOp::OpComp::create(
+ get_executor(),
+ [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+ ca::dispatch(std::move(c), e);
+ }));
+}
+
+void RADOS::allocate_selfmanaged_snap(int64_t pool,
+ std::unique_ptr<SMSnapComp> c) {
+ impl->objecter->allocate_selfmanaged_snap(
+ pool,
+ ca::Completion<void(bs::error_code, snapid_t)>::create(
+ get_executor(),
+ [c = std::move(c)](bs::error_code e, snapid_t snap) mutable {
+ ca::dispatch(std::move(c), e, snap);
+ }));
+}
+
+void RADOS::delete_pool_snap(std::int64_t pool,
+ std::string_view snapName,
+ std::unique_ptr<SimpleOpComp> c)
+{
+ impl->objecter->delete_pool_snap(
+ pool, snapName,
+ Objecter::PoolOp::OpComp::create(
+ get_executor(),
+ [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+ ca::dispatch(std::move(c), e);
+ }));
+}
+
+void RADOS::delete_selfmanaged_snap(std::int64_t pool,
+ std::uint64_t snap,
+ std::unique_ptr<SimpleOpComp> c)
+{
+ impl->objecter->delete_selfmanaged_snap(
+ pool, snap,
+ Objecter::PoolOp::OpComp::create(
+ get_executor(),
+ [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+ ca::dispatch(std::move(c), e);
+ }));
+}
+
+void RADOS::create_pool(std::string_view name,
+ std::optional<int> crush_rule,
+ std::unique_ptr<SimpleOpComp> c)
+{
+ impl->objecter->create_pool(
+ name,
+ Objecter::PoolOp::OpComp::create(
+ get_executor(),
+ [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+ ca::dispatch(std::move(c), e);
+ }),
+ crush_rule.value_or(-1));
+}
+
+void RADOS::delete_pool(std::string_view name,
+ std::unique_ptr<SimpleOpComp> c)
+{
+ impl->objecter->delete_pool(
+ name,
+ Objecter::PoolOp::OpComp::create(
+ get_executor(),
+ [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+ ca::dispatch(std::move(c), e);
+ }));
+}
+
+void RADOS::delete_pool(std::int64_t pool,
+ std::unique_ptr<SimpleOpComp> c)
+{
+ impl->objecter->delete_pool(
+ pool,
+ Objecter::PoolOp::OpComp::create(
+ get_executor(),
+ [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+ ca::dispatch(std::move(c), e);
+ }));
+}
+
+void RADOS::stat_pools(const std::vector<std::string>& pools,
+ std::unique_ptr<PoolStatComp> c) {
+ impl->objecter->get_pool_stats(
+ pools,
+ [c = std::move(c)]
+ (bs::error_code ec,
+ bc::flat_map<std::string, pool_stat_t> rawresult,
+ bool per_pool) mutable {
+ bc::flat_map<std::string, PoolStats> result;
+ for (auto p = rawresult.begin(); p != rawresult.end(); ++p) {
+ auto& pv = result[p->first];
+ auto& pstat = p->second;
+ store_statfs_t &statfs = pstat.store_stats;
+ uint64_t allocated_bytes = pstat.get_allocated_data_bytes(per_pool) +
+ pstat.get_allocated_omap_bytes(per_pool);
+ // FIXME: raw_used_rate is unknown hence use 1.0 here
+ // meaning we keep net amount aggregated over all replicas
+ // Not a big deal so far since this field isn't exposed
+ uint64_t user_bytes = pstat.get_user_data_bytes(1.0, per_pool) +
+ pstat.get_user_omap_bytes(1.0, per_pool);
+
+ object_stat_sum_t *sum = &p->second.stats.sum;
+ pv.num_kb = shift_round_up(allocated_bytes, 10);
+ pv.num_bytes = allocated_bytes;
+ pv.num_objects = sum->num_objects;
+ pv.num_object_clones = sum->num_object_clones;
+ pv.num_object_copies = sum->num_object_copies;
+ pv.num_objects_missing_on_primary = sum->num_objects_missing_on_primary;
+ pv.num_objects_unfound = sum->num_objects_unfound;
+ pv.num_objects_degraded = sum->num_objects_degraded;
+ pv.num_rd = sum->num_rd;
+ pv.num_rd_kb = sum->num_rd_kb;
+ pv.num_wr = sum->num_wr;
+ pv.num_wr_kb = sum->num_wr_kb;
+ pv.num_user_bytes = user_bytes;
+ pv.compressed_bytes_orig = statfs.data_compressed_original;
+ pv.compressed_bytes = statfs.data_compressed;
+ pv.compressed_bytes_alloc = statfs.data_compressed_allocated;
+ }
+
+ ca::dispatch(std::move(c), ec, std::move(result), per_pool);
+ });
+}
+
+void RADOS::stat_fs(std::optional<std::int64_t> _pool,
+ std::unique_ptr<StatFSComp> c) {
+ boost::optional<int64_t> pool;
+ if (_pool)
+ pool = *pool;
+ impl->objecter->get_fs_stats(
+ pool,
+ [c = std::move(c)](bs::error_code ec, const struct ceph_statfs s) mutable {
+ FSStats fso{s.kb, s.kb_used, s.kb_avail, s.num_objects};
+ c->dispatch(std::move(c), ec, std::move(fso));
+ });
+}
+
+// --- Watch/Notify
+
+void RADOS::watch(const Object& o, const IOContext& _ioc,
+ std::optional<std::chrono::seconds> timeout, WatchCB&& cb,
+ std::unique_ptr<WatchComp> c) {
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+ ObjectOperation op;
+
+ auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc,
+ ioc->extra_op_flags);
+ uint64_t cookie = linger_op->get_cookie();
+ linger_op->handle = std::move(cb);
+ op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count());
+ bufferlist bl;
+ impl->objecter->linger_watch(
+ linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
+ Objecter::LingerOp::OpComp::create(
+ get_executor(),
+ [c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
+ ca::dispatch(std::move(c), e, cookie);
+ }), nullptr);
+}
+
+void RADOS::watch(const Object& o, std::int64_t pool,
+ std::optional<std::chrono::seconds> timeout, WatchCB&& cb,
+ std::unique_ptr<WatchComp> c,
+ std::optional<std::string_view> ns,
+ std::optional<std::string_view> key) {
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ object_locator_t oloc;
+ oloc.pool = pool;
+ if (ns)
+ oloc.nspace = *ns;
+ if (key)
+ oloc.key = *key;
+
+ ObjectOperation op;
+
+ Objecter::LingerOp *linger_op = impl->objecter->linger_register(*oid, oloc, 0);
+ uint64_t cookie = linger_op->get_cookie();
+ linger_op->handle = std::move(cb);
+ op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count());
+ bufferlist bl;
+ impl->objecter->linger_watch(
+ linger_op, op, {}, ceph::real_clock::now(), bl,
+ Objecter::LingerOp::OpComp::create(
+ get_executor(),
+ [c = std::move(c), cookie](bs::error_code e, bufferlist) mutable {
+ ca::dispatch(std::move(c), e, cookie);
+ }), nullptr);
+}
+
+void RADOS::notify_ack(const Object& o,
+ const IOContext& _ioc,
+ uint64_t notify_id,
+ uint64_t cookie,
+ bufferlist&& bl,
+ std::unique_ptr<SimpleOpComp> c)
+{
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+ ObjectOperation op;
+ op.notify_ack(notify_id, cookie, bl);
+
+ impl->objecter->read(*oid, ioc->oloc, std::move(op), ioc->snap_seq,
+ nullptr, ioc->extra_op_flags, std::move(c));
+}
+
+void RADOS::notify_ack(const Object& o,
+ std::int64_t pool,
+ uint64_t notify_id,
+ uint64_t cookie,
+ bufferlist&& bl,
+ std::unique_ptr<SimpleOpComp> c,
+ std::optional<std::string_view> ns,
+ std::optional<std::string_view> key) {
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ object_locator_t oloc;
+ oloc.pool = pool;
+ if (ns)
+ oloc.nspace = *ns;
+ if (key)
+ oloc.key = *key;
+
+ ObjectOperation op;
+ op.notify_ack(notify_id, cookie, bl);
+ impl->objecter->read(*oid, oloc, std::move(op), CEPH_NOSNAP, nullptr, 0,
+ std::move(c));
+}
+
+tl::expected<ceph::timespan, bs::error_code> RADOS::watch_check(uint64_t cookie)
+{
+ Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+ return impl->objecter->linger_check(linger_op);
+}
+
+void RADOS::unwatch(uint64_t cookie, const IOContext& _ioc,
+ std::unique_ptr<SimpleOpComp> c)
+{
+ auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+ Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+
+ ObjectOperation op;
+ op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
+ impl->objecter->mutate(linger_op->target.base_oid, ioc->oloc, std::move(op),
+ ioc->snapc, ceph::real_clock::now(), ioc->extra_op_flags,
+ Objecter::Op::OpComp::create(
+ get_executor(),
+ [objecter = impl->objecter,
+ linger_op, c = std::move(c)]
+ (bs::error_code ec) mutable {
+ objecter->linger_cancel(linger_op);
+ ca::dispatch(std::move(c), ec);
+ }));
+}
+
+void RADOS::unwatch(uint64_t cookie, std::int64_t pool,
+ std::unique_ptr<SimpleOpComp> c,
+ std::optional<std::string_view> ns,
+ std::optional<std::string_view> key)
+{
+ object_locator_t oloc;
+ oloc.pool = pool;
+ if (ns)
+ oloc.nspace = *ns;
+ if (key)
+ oloc.key = *key;
+
+ Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+
+ ObjectOperation op;
+ op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
+ impl->objecter->mutate(linger_op->target.base_oid, oloc, std::move(op),
+ {}, ceph::real_clock::now(), 0,
+ Objecter::Op::OpComp::create(
+ get_executor(),
+ [objecter = impl->objecter,
+ linger_op, c = std::move(c)]
+ (bs::error_code ec) mutable {
+ objecter->linger_cancel(linger_op);
+ ca::dispatch(std::move(c), ec);
+ }));
+}
+
+void RADOS::flush_watch(std::unique_ptr<VoidOpComp> c)
+{
+ impl->objecter->linger_callback_flush([c = std::move(c)]() mutable {
+ ca::post(std::move(c));
+ });
+}
+
+struct NotifyHandler : std::enable_shared_from_this<NotifyHandler> {
+ boost::asio::io_context& ioc;
+ boost::asio::io_context::strand strand;
+ Objecter* objecter;
+ Objecter::LingerOp* op;
+ std::unique_ptr<RADOS::NotifyComp> c;
+
+ bool acked = false;
+ bool finished = false;
+ bs::error_code res;
+ bufferlist rbl;
+
+ NotifyHandler(boost::asio::io_context& ioc,
+ Objecter* objecter,
+ Objecter::LingerOp* op,
+ std::unique_ptr<RADOS::NotifyComp> c)
+ : ioc(ioc), strand(ioc), objecter(objecter), op(op), c(std::move(c)) {}
+
+ // Use bind or a lambda to pass this in.
+ void handle_ack(bs::error_code ec,
+ bufferlist&&) {
+ boost::asio::post(
+ strand,
+ [this, ec, p = shared_from_this()]() mutable {
+ acked = true;
+ maybe_cleanup(ec);
+ });
+ }
+
+ // Notify finish callback. It can actually own the object's storage.
+
+ void operator()(bs::error_code ec,
+ bufferlist&& bl) {
+ boost::asio::post(
+ strand,
+ [this, ec, p = shared_from_this()]() mutable {
+ finished = true;
+ maybe_cleanup(ec);
+ });
+ }
+
+ // Should be called from strand.
+ void maybe_cleanup(bs::error_code ec) {
+ if (!res && ec)
+ res = ec;
+ if ((acked && finished) || res) {
+ objecter->linger_cancel(op);
+ ceph_assert(c);
+ ca::dispatch(std::move(c), res, std::move(rbl));
+ }
+ }
+};
+
+void RADOS::notify(const Object& o, const IOContext& _ioc, bufferlist&& bl,
+ std::optional<std::chrono::milliseconds> timeout,
+ std::unique_ptr<NotifyComp> c)
+{
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+ auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc,
+ ioc->extra_op_flags);
+
+ auto cb = std::make_shared<NotifyHandler>(impl->ioctx, impl->objecter,
+ linger_op, std::move(c));
+ linger_op->on_notify_finish =
+ Objecter::LingerOp::OpComp::create(
+ get_executor(),
+ [cb](bs::error_code ec, ceph::bufferlist bl) mutable {
+ (*cb)(ec, std::move(bl));
+ });
+ ObjectOperation rd;
+ bufferlist inbl;
+ rd.notify(
+ linger_op->get_cookie(), 1,
+ timeout ? timeout->count() : impl->cct->_conf->client_notify_timeout,
+ bl, &inbl);
+
+ impl->objecter->linger_notify(
+ linger_op, rd, ioc->snap_seq, inbl,
+ Objecter::LingerOp::OpComp::create(
+ get_executor(),
+ [cb](bs::error_code ec, ceph::bufferlist bl) mutable {
+ cb->handle_ack(ec, std::move(bl));
+ }), nullptr);
+}
+
+void RADOS::notify(const Object& o, std::int64_t pool, bufferlist&& bl,
+ std::optional<std::chrono::milliseconds> timeout,
+ std::unique_ptr<NotifyComp> c,
+ std::optional<std::string_view> ns,
+ std::optional<std::string_view> key)
+{
+ auto oid = reinterpret_cast<const object_t*>(&o.impl);
+ object_locator_t oloc;
+ oloc.pool = pool;
+ if (ns)
+ oloc.nspace = *ns;
+ if (key)
+ oloc.key = *key;
+ auto linger_op = impl->objecter->linger_register(*oid, oloc, 0);
+
+ auto cb = std::make_shared<NotifyHandler>(impl->ioctx, impl->objecter,
+ linger_op, std::move(c));
+ linger_op->on_notify_finish =
+ Objecter::LingerOp::OpComp::create(
+ get_executor(),
+ [cb](bs::error_code ec, ceph::bufferlist&& bl) mutable {
+ (*cb)(ec, std::move(bl));
+ });
+ ObjectOperation rd;
+ bufferlist inbl;
+ rd.notify(
+ linger_op->get_cookie(), 1,
+ timeout ? timeout->count() : impl->cct->_conf->client_notify_timeout,
+ bl, &inbl);
+
+ impl->objecter->linger_notify(
+ linger_op, rd, CEPH_NOSNAP, inbl,
+ Objecter::LingerOp::OpComp::create(
+ get_executor(),
+ [cb](bs::error_code ec, bufferlist&& bl) mutable {
+ cb->handle_ack(ec, std::move(bl));
+ }), nullptr);
+}
+
+// Enumeration
+
+Cursor::Cursor() {
+ static_assert(impl_size >= sizeof(hobject_t));
+ new (&impl) hobject_t();
+};
+
+Cursor::Cursor(end_magic_t) {
+ static_assert(impl_size >= sizeof(hobject_t));
+ new (&impl) hobject_t(hobject_t::get_max());
+}
+
+Cursor::Cursor(void* p) {
+ static_assert(impl_size >= sizeof(hobject_t));
+ new (&impl) hobject_t(std::move(*reinterpret_cast<hobject_t*>(p)));
+}
+
+Cursor Cursor::begin() {
+ Cursor e;
+ return e;
+}
+
+Cursor Cursor::end() {
+ Cursor e(end_magic_t{});
+ return e;
+}
+
+Cursor::Cursor(const Cursor& rhs) {
+ static_assert(impl_size >= sizeof(hobject_t));
+ new (&impl) hobject_t(*reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+Cursor& Cursor::operator =(const Cursor& rhs) {
+ static_assert(impl_size >= sizeof(hobject_t));
+ reinterpret_cast<hobject_t*>(&impl)->~hobject_t();
+ new (&impl) hobject_t(*reinterpret_cast<const hobject_t*>(&rhs.impl));
+ return *this;
+}
+
+Cursor::Cursor(Cursor&& rhs) {
+ static_assert(impl_size >= sizeof(hobject_t));
+ new (&impl) hobject_t(std::move(*reinterpret_cast<hobject_t*>(&rhs.impl)));
+}
+
+Cursor& Cursor::operator =(Cursor&& rhs) {
+ static_assert(impl_size >= sizeof(hobject_t));
+ reinterpret_cast<hobject_t*>(&impl)->~hobject_t();
+ new (&impl) hobject_t(std::move(*reinterpret_cast<hobject_t*>(&rhs.impl)));
+ return *this;
+}
+Cursor::~Cursor() {
+ reinterpret_cast<hobject_t*>(&impl)->~hobject_t();
+}
+
+bool operator ==(const Cursor& lhs, const Cursor& rhs) {
+ return (*reinterpret_cast<const hobject_t*>(&lhs.impl) ==
+ *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator !=(const Cursor& lhs, const Cursor& rhs) {
+ return (*reinterpret_cast<const hobject_t*>(&lhs.impl) !=
+ *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator <(const Cursor& lhs, const Cursor& rhs) {
+ return (*reinterpret_cast<const hobject_t*>(&lhs.impl) <
+ *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator <=(const Cursor& lhs, const Cursor& rhs) {
+ return (*reinterpret_cast<const hobject_t*>(&lhs.impl) <=
+ *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator >=(const Cursor& lhs, const Cursor& rhs) {
+ return (*reinterpret_cast<const hobject_t*>(&lhs.impl) >=
+ *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator >(const Cursor& lhs, const Cursor& rhs) {
+ return (*reinterpret_cast<const hobject_t*>(&lhs.impl) >
+ *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+std::string Cursor::to_str() const {
+ using namespace std::literals;
+ auto& h = *reinterpret_cast<const hobject_t*>(&impl);
+
+ return h.is_max() ? "MAX"s : h.to_str();
+}
+
+std::optional<Cursor>
+Cursor::from_str(const std::string& s) {
+ Cursor e;
+ auto& h = *reinterpret_cast<hobject_t*>(&e.impl);
+ if (!h.parse(s))
+ return std::nullopt;
+
+ return e;
+}
+
+void RADOS::enumerate_objects(const IOContext& _ioc,
+ const Cursor& begin,
+ const Cursor& end,
+ const std::uint32_t max,
+ const bufferlist& filter,
+ std::unique_ptr<EnumerateComp> c) {
+ auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+ impl->objecter->enumerate_objects<Entry>(
+ ioc->oloc.pool,
+ ioc->oloc.nspace,
+ *reinterpret_cast<const hobject_t*>(&begin.impl),
+ *reinterpret_cast<const hobject_t*>(&end.impl),
+ max,
+ filter,
+ [c = std::move(c)]
+ (bs::error_code ec, std::vector<Entry>&& v,
+ hobject_t&& n) mutable {
+ ca::dispatch(std::move(c), ec, std::move(v),
+ Cursor(static_cast<void*>(&n)));
+ });
+}
+
+void RADOS::enumerate_objects(std::int64_t pool,
+ const Cursor& begin,
+ const Cursor& end,
+ const std::uint32_t max,
+ const bufferlist& filter,
+ std::unique_ptr<EnumerateComp> c,
+ std::optional<std::string_view> ns,
+ std::optional<std::string_view> key) {
+ impl->objecter->enumerate_objects<Entry>(
+ pool,
+ ns ? *ns : std::string_view{},
+ *reinterpret_cast<const hobject_t*>(&begin.impl),
+ *reinterpret_cast<const hobject_t*>(&end.impl),
+ max,
+ filter,
+ [c = std::move(c)]
+ (bs::error_code ec, std::vector<Entry>&& v,
+ hobject_t&& n) mutable {
+ ca::dispatch(std::move(c), ec, std::move(v),
+ Cursor(static_cast<void*>(&n)));
+ });
+}
+
+
+void RADOS::osd_command(int osd, std::vector<std::string>&& cmd,
+ ceph::bufferlist&& in, std::unique_ptr<CommandComp> c) {
+ impl->objecter->osd_command(osd, std::move(cmd), std::move(in), nullptr,
+ [c = std::move(c)]
+ (bs::error_code ec,
+ std::string&& s,
+ ceph::bufferlist&& b) mutable {
+ ca::dispatch(std::move(c), ec,
+ std::move(s),
+ std::move(b));
+ });
+}
+void RADOS::pg_command(PG pg, std::vector<std::string>&& cmd,
+ ceph::bufferlist&& in, std::unique_ptr<CommandComp> c) {
+ impl->objecter->pg_command(pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr,
+ [c = std::move(c)]
+ (bs::error_code ec,
+ std::string&& s,
+ ceph::bufferlist&& b) mutable {
+ ca::dispatch(std::move(c), ec,
+ std::move(s),
+ std::move(b));
+ });
+}
+
+void RADOS::enable_application(std::string_view pool, std::string_view app_name,
+ bool force, std::unique_ptr<SimpleOpComp> c) {
+ // pre-Luminous clusters will return -EINVAL and application won't be
+ // preserved until Luminous is configured as minimum version.
+ if (!impl->get_required_monitor_features().contains_all(
+ ceph::features::mon::FEATURE_LUMINOUS)) {
+ ca::post(std::move(c), ceph::to_error_code(-EOPNOTSUPP));
+ } else {
+ impl->monclient.start_mon_command(
+ { fmt::format("{{ \"prefix\": \"osd pool application enable\","
+ "\"pool\": \"{}\", \"app\": \"{}\"{}}}",
+ pool, app_name,
+ force ? " ,\"yes_i_really_mean_it\": true" : "")},
+ {}, [c = std::move(c)](bs::error_code e,
+ std::string, cb::list) mutable {
+ ca::post(std::move(c), e);
+ });
+ }
+}
+
+void RADOS::blocklist_add(std::string_view client_address,
+ std::optional<std::chrono::seconds> expire,
+ std::unique_ptr<SimpleOpComp> c) {
+ auto expire_arg = (expire ?
+ fmt::format(", \"expire\": \"{}.0\"", expire->count()) : std::string{});
+ impl->monclient.start_mon_command(
+ { fmt::format("{{"
+ "\"prefix\": \"osd blocklist\", "
+ "\"blocklistop\": \"add\", "
+ "\"addr\": \"{}\"{}}}",
+ client_address, expire_arg) },
+ {},
+ [this, client_address = std::string(client_address), expire_arg,
+ c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable {
+ if (ec != bs::errc::invalid_argument) {
+ ca::post(std::move(c), ec);
+ return;
+ }
+
+ // retry using the legacy command
+ impl->monclient.start_mon_command(
+ { fmt::format("{{"
+ "\"prefix\": \"osd blacklist\", "
+ "\"blacklistop\": \"add\", "
+ "\"addr\": \"{}\"{}}}",
+ client_address, expire_arg) },
+ {},
+ [c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable {
+ ca::post(std::move(c), ec);
+ });
+ });
+}
+
+void RADOS::wait_for_latest_osd_map(std::unique_ptr<SimpleOpComp> c) {
+ impl->objecter->wait_for_latest_osdmap(std::move(c));
+}
+
+void RADOS::mon_command(std::vector<std::string> command,
+ const cb::list& bl,
+ std::string* outs, cb::list* outbl,
+ std::unique_ptr<SimpleOpComp> c) {
+
+ impl->monclient.start_mon_command(
+ command, bl,
+ [c = std::move(c), outs, outbl](bs::error_code e,
+ std::string s, cb::list bl) mutable {
+ if (outs)
+ *outs = std::move(s);
+ if (outbl)
+ *outbl = std::move(bl);
+ ca::post(std::move(c), e);
+ });
+}
+
+uint64_t RADOS::instance_id() const {
+ return impl->get_instance_id();
+}
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
+class category : public ceph::converting_category {
+public:
+ category() {}
+ const char* name() const noexcept override;
+ const char* message(int ev, char*, std::size_t) const noexcept override;
+ std::string message(int ev) const override;
+ bs::error_condition default_error_condition(int ev) const noexcept
+ override;
+ bool equivalent(int ev, const bs::error_condition& c) const
+ noexcept override;
+ using ceph::converting_category::equivalent;
+ int from_code(int ev) const noexcept override;
+};
+#pragma GCC diagnostic pop
+#pragma clang diagnostic pop
+
+const char* category::name() const noexcept {
+ return "RADOS";
+}
+
+const char* category::message(int ev, char*,
+ std::size_t) const noexcept {
+ if (ev == 0)
+ return "No error";
+
+ switch (static_cast<errc>(ev)) {
+ case errc::pool_dne:
+ return "Pool does not exist";
+
+ case errc::invalid_snapcontext:
+ return "Invalid snapcontext";
+ }
+
+ return "Unknown error";
+}
+
+std::string category::message(int ev) const {
+ return message(ev, nullptr, 0);
+}
+
+bs::error_condition category::default_error_condition(int ev) const noexcept {
+ switch (static_cast<errc>(ev)) {
+ case errc::pool_dne:
+ return ceph::errc::does_not_exist;
+ case errc::invalid_snapcontext:
+ return bs::errc::invalid_argument;
+ }
+
+ return { ev, *this };
+}
+
+bool category::equivalent(int ev, const bs::error_condition& c) const noexcept {
+ if (static_cast<errc>(ev) == errc::pool_dne) {
+ if (c == bs::errc::no_such_file_or_directory) {
+ return true;
+ }
+ }
+
+ return default_error_condition(ev) == c;
+}
+
+int category::from_code(int ev) const noexcept {
+ switch (static_cast<errc>(ev)) {
+ case errc::pool_dne:
+ return -ENOENT;
+ case errc::invalid_snapcontext:
+ return -EINVAL;
+ }
+ return -EDOM;
+}
+
+const bs::error_category& error_category() noexcept {
+ static const class category c;
+ return c;
+}
+
+CephContext* RADOS::cct() {
+ return impl->cct.get();
+}
+}
+
+namespace std {
+size_t hash<neorados::Object>::operator ()(
+ const neorados::Object& r) const {
+ static constexpr const hash<object_t> H;
+ return H(*reinterpret_cast<const object_t*>(&r.impl));
+}
+
+size_t hash<neorados::IOContext>::operator ()(
+ const neorados::IOContext& r) const {
+ static constexpr const hash<int64_t> H;
+ static constexpr const hash<std::string> G;
+ const auto l = reinterpret_cast<const neorados::IOContextImpl*>(&r.impl);
+ return H(l->oloc.pool) ^ (G(l->oloc.nspace) << 1) ^ (G(l->oloc.key) << 2);
+}
+}
diff --git a/src/neorados/RADOSImpl.cc b/src/neorados/RADOSImpl.cc
new file mode 100644
index 000000000..6c9c210a8
--- /dev/null
+++ b/src/neorados/RADOSImpl.cc
@@ -0,0 +1,121 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <boost/system/system_error.hpp>
+
+#include "common/common_init.h"
+
+#include "global/global_init.h"
+
+#include "RADOSImpl.h"
+
+namespace neorados {
+namespace detail {
+
+RADOS::RADOS(boost::asio::io_context& ioctx,
+ boost::intrusive_ptr<CephContext> cct)
+ : Dispatcher(cct.get()),
+ ioctx(ioctx),
+ cct(cct),
+ monclient(cct.get(), ioctx),
+ mgrclient(cct.get(), nullptr, &monclient.monmap) {
+ auto err = monclient.build_initial_monmap();
+ if (err < 0)
+ throw std::system_error(ceph::to_error_code(err));
+
+ messenger.reset(Messenger::create_client_messenger(cct.get(), "radosclient"));
+ if (!messenger)
+ throw std::bad_alloc();
+
+ // Require OSDREPLYMUX feature. This means we will fail to talk to
+ // old servers. This is necessary because otherwise we won't know
+ // how to decompose the reply data into its constituent pieces.
+ messenger->set_default_policy(
+ Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
+
+ objecter = std::make_unique<Objecter>(cct.get(), messenger.get(), &monclient, ioctx);
+
+ objecter->set_balanced_budget();
+ monclient.set_messenger(messenger.get());
+ mgrclient.set_messenger(messenger.get());
+ objecter->init();
+ messenger->add_dispatcher_head(&mgrclient);
+ messenger->add_dispatcher_tail(objecter.get());
+ messenger->start();
+ monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
+ err = monclient.init();
+ if (err) {
+ throw boost::system::system_error(ceph::to_error_code(err));
+ }
+ err = monclient.authenticate(std::chrono::duration<double>(cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count());
+ if (err) {
+ throw boost::system::system_error(ceph::to_error_code(err));
+ }
+ messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
+ // Detect older cluster, put mgrclient into compatible mode
+ mgrclient.set_mgr_optional(
+ !get_required_monitor_features().contains_all(
+ ceph::features::mon::FEATURE_LUMINOUS));
+
+ // MgrClient needs this (it doesn't have MonClient reference itself)
+ monclient.sub_want("mgrmap", 0, 0);
+ monclient.renew_subs();
+
+ mgrclient.init();
+ objecter->set_client_incarnation(0);
+ objecter->start();
+
+ messenger->add_dispatcher_tail(this);
+
+ std::unique_lock l(lock);
+ instance_id = monclient.get_global_id();
+}
+
+RADOS::~RADOS() {
+ if (objecter && objecter->initialized) {
+ objecter->shutdown();
+ }
+
+ mgrclient.shutdown();
+ monclient.shutdown();
+
+ if (messenger) {
+ messenger->shutdown();
+ messenger->wait();
+ }
+}
+
+bool RADOS::ms_dispatch(Message *m)
+{
+ switch (m->get_type()) {
+ // OSD
+ case CEPH_MSG_OSD_MAP:
+ m->put();
+ return true;
+ }
+ return false;
+}
+
+void RADOS::ms_handle_connect(Connection *con) {}
+bool RADOS::ms_handle_reset(Connection *con) {
+ return false;
+}
+void RADOS::ms_handle_remote_reset(Connection *con) {}
+bool RADOS::ms_handle_refused(Connection *con) {
+ return false;
+}
+
+} // namespace detail
+} // namespace neorados
diff --git a/src/neorados/RADOSImpl.h b/src/neorados/RADOSImpl.h
new file mode 100644
index 000000000..d45ca9481
--- /dev/null
+++ b/src/neorados/RADOSImpl.h
@@ -0,0 +1,135 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_NEORADOS_RADOSIMPL_H
+#define CEPH_NEORADOS_RADOSIMPL_H
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include <boost/asio.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+#include "common/ceph_context.h"
+#include "common/ceph_mutex.h"
+
+#include "librados/RadosClient.h"
+
+#include "mon/MonClient.h"
+
+#include "mgr/MgrClient.h"
+
+#include "osdc/Objecter.h"
+
+namespace neorados {
+
+class RADOS;
+
+namespace detail {
+
+class NeoClient;
+
+class RADOS : public Dispatcher
+{
+ friend ::neorados::RADOS;
+ friend NeoClient;
+
+ boost::asio::io_context& ioctx;
+ boost::intrusive_ptr<CephContext> cct;
+
+ ceph::mutex lock = ceph::make_mutex("RADOS_unleashed::_::RADOSImpl");
+ int instance_id = -1;
+
+ std::unique_ptr<Messenger> messenger;
+
+ MonClient monclient;
+ MgrClient mgrclient;
+
+ std::unique_ptr<Objecter> objecter;
+
+public:
+
+ RADOS(boost::asio::io_context& ioctx, boost::intrusive_ptr<CephContext> cct);
+ ~RADOS();
+ bool ms_dispatch(Message *m) override;
+ void ms_handle_connect(Connection *con) override;
+ bool ms_handle_reset(Connection *con) override;
+ void ms_handle_remote_reset(Connection *con) override;
+ bool ms_handle_refused(Connection *con) override;
+ mon_feature_t get_required_monitor_features() const {
+ return monclient.with_monmap(std::mem_fn(&MonMap::get_required_features));
+ }
+};
+
+class Client {
+public:
+ Client(boost::asio::io_context& ioctx,
+ boost::intrusive_ptr<CephContext> cct,
+ MonClient& monclient, Objecter* objecter)
+ : ioctx(ioctx), cct(cct), monclient(monclient), objecter(objecter) {
+ }
+ virtual ~Client() {}
+
+ Client(const Client&) = delete;
+ Client& operator=(const Client&) = delete;
+
+ boost::asio::io_context& ioctx;
+
+ boost::intrusive_ptr<CephContext> cct;
+ MonClient& monclient;
+ Objecter* objecter;
+
+ mon_feature_t get_required_monitor_features() const {
+ return monclient.with_monmap(std::mem_fn(&MonMap::get_required_features));
+ }
+
+ virtual int get_instance_id() const = 0;
+};
+
+class NeoClient : public Client {
+public:
+ NeoClient(std::unique_ptr<RADOS>&& rados)
+ : Client(rados->ioctx, rados->cct, rados->monclient,
+ rados->objecter.get()),
+ rados(std::move(rados)) {
+ }
+
+ int get_instance_id() const override {
+ return rados->instance_id;
+ }
+
+private:
+ std::unique_ptr<RADOS> rados;
+};
+
+class RadosClient : public Client {
+public:
+ RadosClient(librados::RadosClient* rados_client)
+ : Client(rados_client->poolctx, {rados_client->cct},
+ rados_client->monclient, rados_client->objecter),
+ rados_client(rados_client) {
+ }
+
+ int get_instance_id() const override {
+ return rados_client->instance_id;
+ }
+
+public:
+ librados::RadosClient* rados_client;
+};
+
+} // namespace detail
+} // namespace neorados
+
+#endif
diff --git a/src/neorados/cls/fifo.cc b/src/neorados/cls/fifo.cc
new file mode 100644
index 000000000..fa99275b2
--- /dev/null
+++ b/src/neorados/cls/fifo.cc
@@ -0,0 +1,385 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat <contact@redhat.com>
+ * Author: Adam C. Emerson
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <cstdint>
+#include <numeric>
+#include <optional>
+#include <string_view>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/format.h>
+
+#include <boost/system/error_code.hpp>
+
+#include "include/neorados/RADOS.hpp"
+
+#include "include/buffer.h"
+
+#include "common/random_string.h"
+
+#include "cls/fifo/cls_fifo_types.h"
+#include "cls/fifo/cls_fifo_ops.h"
+
+#include "fifo.h"
+
+namespace neorados::cls::fifo {
+namespace bs = boost::system;
+namespace cb = ceph::buffer;
+namespace fifo = rados::cls::fifo;
+
+void create_meta(WriteOp& op, std::string_view id,
+ std::optional<fifo::objv> objv,
+ std::optional<std::string_view> oid_prefix,
+ bool exclusive,
+ std::uint64_t max_part_size,
+ std::uint64_t max_entry_size)
+{
+ fifo::op::create_meta cm;
+
+ cm.id = id;
+ cm.version = objv;
+ cm.oid_prefix = oid_prefix;
+ cm.max_part_size = max_part_size;
+ cm.max_entry_size = max_entry_size;
+ cm.exclusive = exclusive;
+
+ cb::list in;
+ encode(cm, in);
+ op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
+}
+
+void get_meta(ReadOp& op, std::optional<fifo::objv> objv,
+ bs::error_code* ec_out, fifo::info* info,
+ std::uint32_t* part_header_size,
+ std::uint32_t* part_entry_overhead)
+{
+ fifo::op::get_meta gm;
+ gm.version = objv;
+ cb::list in;
+ encode(gm, in);
+ op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
+ [ec_out, info, part_header_size,
+ part_entry_overhead](bs::error_code ec, const cb::list& bl) {
+ fifo::op::get_meta_reply reply;
+ if (!ec) try {
+ auto iter = bl.cbegin();
+ decode(reply, iter);
+ } catch (const cb::error& err) {
+ ec = err.code();
+ }
+ if (ec_out) *ec_out = ec;
+ if (info) *info = std::move(reply.info);
+ if (part_header_size) *part_header_size = reply.part_header_size;
+ if (part_entry_overhead)
+ *part_entry_overhead = reply.part_entry_overhead;
+ });
+};
+
+void update_meta(WriteOp& op, const fifo::objv& objv,
+ const fifo::update& update)
+{
+ fifo::op::update_meta um;
+
+ um.version = objv;
+ um.tail_part_num = update.tail_part_num();
+ um.head_part_num = update.head_part_num();
+ um.min_push_part_num = update.min_push_part_num();
+ um.max_push_part_num = update.max_push_part_num();
+ um.journal_entries_add = std::move(update).journal_entries_add();
+ um.journal_entries_rm = std::move(update).journal_entries_rm();
+
+ cb::list in;
+ encode(um, in);
+ op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
+}
+
+void part_init(WriteOp& op, std::string_view tag,
+ fifo::data_params params)
+{
+ fifo::op::init_part ip;
+
+ ip.tag = tag;
+ ip.params = params;
+
+ cb::list in;
+ encode(ip, in);
+ op.exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
+}
+
+void push_part(WriteOp& op, std::string_view tag,
+ std::deque<cb::list> data_bufs,
+ fu2::unique_function<void(bs::error_code, int)> f)
+{
+ fifo::op::push_part pp;
+
+ pp.tag = tag;
+ pp.data_bufs = data_bufs;
+ pp.total_len = 0;
+
+ for (const auto& bl : data_bufs)
+ pp.total_len += bl.length();
+
+ cb::list in;
+ encode(pp, in);
+ op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in,
+ [f = std::move(f)](bs::error_code ec, int r, const cb::list&) mutable {
+ std::move(f)(ec, r);
+ });
+ op.returnvec();
+}
+
+void trim_part(WriteOp& op,
+ std::optional<std::string_view> tag,
+ std::uint64_t ofs, bool exclusive)
+{
+ fifo::op::trim_part tp;
+
+ tp.tag = tag;
+ tp.ofs = ofs;
+ tp.exclusive = exclusive;
+
+ bufferlist in;
+ encode(tp, in);
+ op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
+}
+
+void list_part(ReadOp& op,
+ std::optional<string_view> tag,
+ std::uint64_t ofs,
+ std::uint64_t max_entries,
+ bs::error_code* ec_out,
+ std::vector<fifo::part_list_entry>* entries,
+ bool* more,
+ bool* full_part,
+ std::string* ptag)
+{
+ fifo::op::list_part lp;
+
+ lp.tag = tag;
+ lp.ofs = ofs;
+ lp.max_entries = max_entries;
+
+ bufferlist in;
+ encode(lp, in);
+ op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
+ [entries, more, full_part, ptag, ec_out](bs::error_code ec,
+ const cb::list& bl) {
+ if (ec) {
+ if (ec_out) *ec_out = ec;
+ return;
+ }
+
+ fifo::op::list_part_reply reply;
+ auto iter = bl.cbegin();
+ try {
+ decode(reply, iter);
+ } catch (const cb::error& err) {
+ if (ec_out) *ec_out = ec;
+ return;
+ }
+
+ if (entries) *entries = std::move(reply.entries);
+ if (more) *more = reply.more;
+ if (full_part) *full_part = reply.full_part;
+ if (ptag) *ptag = reply.tag;
+ });
+}
+
+void get_part_info(ReadOp& op,
+ bs::error_code* out_ec,
+ fifo::part_header* header)
+{
+ fifo::op::get_part_info gpi;
+
+ bufferlist in;
+ encode(gpi, in);
+ op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
+ [out_ec, header](bs::error_code ec, const cb::list& bl) {
+ if (ec) {
+ if (out_ec) *out_ec = ec;
+ }
+ fifo::op::get_part_info_reply reply;
+ auto iter = bl.cbegin();
+ try {
+ decode(reply, iter);
+ } catch (const cb::error& err) {
+ if (out_ec) *out_ec = ec;
+ return;
+ }
+
+ if (header) *header = std::move(reply.header);
+ });
+}
+
+std::optional<marker> FIFO::to_marker(std::string_view s) {
+ marker m;
+ if (s.empty()) {
+ m.num = info.tail_part_num;
+ m.ofs = 0;
+ return m;
+ }
+
+ auto pos = s.find(':');
+ if (pos == string::npos) {
+ return std::nullopt;
+ }
+
+ auto num = s.substr(0, pos);
+ auto ofs = s.substr(pos + 1);
+
+ auto n = ceph::parse<decltype(m.num)>(num);
+ if (!n) {
+ return std::nullopt;
+ }
+ m.num = *n;
+ auto o = ceph::parse<decltype(m.ofs)>(ofs);
+ if (!o) {
+ return std::nullopt;
+ }
+ m.ofs = *o;
+ return m;
+}
+
+bs::error_code FIFO::apply_update(fifo::info* info,
+ const fifo::objv& objv,
+ const fifo::update& update) {
+ std::unique_lock l(m);
+ auto err = info->apply_update(update);
+ if (objv != info->version) {
+ ldout(r->cct(), 0) << __func__ << "(): Raced locally!" << dendl;
+ return errc::raced;
+ }
+ if (err) {
+ ldout(r->cct(), 0) << __func__ << "(): ERROR: " << err << dendl;
+ return errc::update_failed;
+ }
+
+ ++info->version.ver;
+
+ return {};
+}
+
+std::string FIFO::generate_tag() const
+{
+ static constexpr auto HEADER_TAG_SIZE = 16;
+ return gen_rand_alphanumeric_plain(r->cct(), HEADER_TAG_SIZE);
+}
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
+class error_category : public ceph::converting_category {
+public:
+ error_category(){}
+ const char* name() const noexcept override;
+ const char* message(int ev, char*, std::size_t) const noexcept override;
+ std::string message(int ev) const override;
+ bs::error_condition default_error_condition(int ev) const noexcept
+ override;
+ bool equivalent(int ev, const bs::error_condition& c) const
+ noexcept override;
+ using ceph::converting_category::equivalent;
+ int from_code(int ev) const noexcept override;
+};
+#pragma GCC diagnostic pop
+#pragma clang diagnostic pop
+
+const char* error_category::name() const noexcept {
+ return "FIFO";
+}
+
+const char* error_category::message(int ev, char*, std::size_t) const noexcept {
+ if (ev == 0)
+ return "No error";
+
+ switch (static_cast<errc>(ev)) {
+ case errc::raced:
+ return "Retry-race count exceeded";
+
+ case errc::inconsistency:
+ return "Inconsistent result! New head before old head";
+
+ case errc::entry_too_large:
+ return "Pushed entry too large";
+
+ case errc::invalid_marker:
+ return "Invalid marker string";
+
+ case errc::update_failed:
+ return "Update failed";
+ }
+
+ return "Unknown error";
+}
+
+std::string error_category::message(int ev) const {
+ return message(ev, nullptr, 0);
+}
+
+bs::error_condition
+error_category::default_error_condition(int ev) const noexcept {
+ switch (static_cast<errc>(ev)) {
+ case errc::raced:
+ return bs::errc::operation_canceled;
+
+ case errc::inconsistency:
+ return bs::errc::io_error;
+
+ case errc::entry_too_large:
+ return bs::errc::value_too_large;
+
+ case errc::invalid_marker:
+ return bs::errc::invalid_argument;
+
+ case errc::update_failed:
+ return bs::errc::invalid_argument;
+ }
+
+ return { ev, *this };
+}
+
+bool error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
+ return default_error_condition(ev) == c;
+}
+
+int error_category::from_code(int ev) const noexcept {
+ switch (static_cast<errc>(ev)) {
+ case errc::raced:
+ return -ECANCELED;
+
+ case errc::inconsistency:
+ return -EIO;
+
+ case errc::entry_too_large:
+ return -E2BIG;
+
+ case errc::invalid_marker:
+ return -EINVAL;
+
+ case errc::update_failed:
+ return -EINVAL;
+
+ }
+ return -EDOM;
+}
+
+const bs::error_category& error_category() noexcept {
+ static const class error_category c;
+ return c;
+}
+
+}
diff --git a/src/neorados/cls/fifo.h b/src/neorados/cls/fifo.h
new file mode 100644
index 000000000..05865dcca
--- /dev/null
+++ b/src/neorados/cls/fifo.h
@@ -0,0 +1,1754 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat <contact@redhat.com>
+ * Author: Adam C. Emerson
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_NEORADOS_CLS_FIFIO_H
+#define CEPH_NEORADOS_CLS_FIFIO_H
+
+#include <cstdint>
+#include <deque>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <string_view>
+#include <vector>
+
+#include <boost/asio.hpp>
+#include <boost/system/error_code.hpp>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/format.h>
+
+#include "include/neorados/RADOS.hpp"
+#include "include/buffer.h"
+
+#include "common/allocate_unique.h"
+#include "common/async/bind_handler.h"
+#include "common/async/bind_like.h"
+#include "common/async/completion.h"
+#include "common/async/forward_handler.h"
+
+#include "common/dout.h"
+
+#include "cls/fifo/cls_fifo_types.h"
+#include "cls/fifo/cls_fifo_ops.h"
+
+namespace neorados::cls::fifo {
+namespace ba = boost::asio;
+namespace bs = boost::system;
+namespace ca = ceph::async;
+namespace cb = ceph::buffer;
+namespace fifo = rados::cls::fifo;
+
+inline constexpr auto dout_subsys = ceph_subsys_rados;
+inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024;
+inline constexpr std::uint64_t default_max_entry_size = 32 * 1024;
+inline constexpr auto MAX_RACE_RETRIES = 10;
+
+
+const boost::system::error_category& error_category() noexcept;
+
+enum class errc {
+ raced = 1,
+ inconsistency,
+ entry_too_large,
+ invalid_marker,
+ update_failed
+};
+}
+
+namespace boost::system {
+template<>
+struct is_error_code_enum<::neorados::cls::fifo::errc> {
+ static const bool value = true;
+};
+template<>
+struct is_error_condition_enum<::neorados::cls::fifo::errc> {
+ static const bool value = false;
+};
+}
+
+namespace neorados::cls::fifo {
+// explicit conversion:
+inline bs::error_code make_error_code(errc e) noexcept {
+ return { static_cast<int>(e), error_category() };
+}
+
+inline bs::error_code make_error_category(errc e) noexcept {
+ return { static_cast<int>(e), error_category() };
+}
+
+void create_meta(WriteOp& op, std::string_view id,
+ std::optional<fifo::objv> objv,
+ std::optional<std::string_view> oid_prefix,
+ bool exclusive = false,
+ std::uint64_t max_part_size = default_max_part_size,
+ std::uint64_t max_entry_size = default_max_entry_size);
+void get_meta(ReadOp& op, std::optional<fifo::objv> objv,
+ bs::error_code* ec_out, fifo::info* info,
+ std::uint32_t* part_header_size,
+ std::uint32_t* part_entry_overhead);
+
+void update_meta(WriteOp& op, const fifo::objv& objv,
+ const fifo::update& desc);
+
+void part_init(WriteOp& op, std::string_view tag,
+ fifo::data_params params);
+
+void push_part(WriteOp& op, std::string_view tag,
+ std::deque<cb::list> data_bufs,
+ fu2::unique_function<void(bs::error_code, int)>);
+void trim_part(WriteOp& op, std::optional<std::string_view> tag,
+ std::uint64_t ofs,
+ bool exclusive);
+void list_part(ReadOp& op,
+ std::optional<std::string_view> tag,
+ std::uint64_t ofs,
+ std::uint64_t max_entries,
+ bs::error_code* ec_out,
+ std::vector<fifo::part_list_entry>* entries,
+ bool* more,
+ bool* full_part,
+ std::string* ptag);
+void get_part_info(ReadOp& op,
+ bs::error_code* out_ec,
+ fifo::part_header* header);
+
+struct marker {
+ std::int64_t num = 0;
+ std::uint64_t ofs = 0;
+
+ marker() = default;
+ marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {}
+ static marker max() {
+ return { std::numeric_limits<decltype(num)>::max(),
+ std::numeric_limits<decltype(ofs)>::max() };
+ }
+
+ std::string to_string() {
+ return fmt::format("{:0>20}:{:0>20}", num, ofs);
+ }
+};
+
+struct list_entry {
+ cb::list data;
+ std::string marker;
+ ceph::real_time mtime;
+};
+
+using part_info = fifo::part_header;
+
+namespace detail {
+template<typename Handler>
+class JournalProcessor;
+}
+
+/// Completions, Handlers, and CompletionTokens
+/// ===========================================
+///
+/// This class is based on Boost.Asio. For information, see
+/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio.html
+///
+/// As summary, Asio's design is that of functions taking completion
+/// handlers. Every handler has a signature, like
+/// (boost::system::error_code, std::string). The completion handler
+/// receives the result of the function, and the signature is the type
+/// of that result.
+///
+/// The completion handler is specified with a CompletionToken. The
+/// CompletionToken is any type that has a specialization of
+/// async_complete and async_result. See
+/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_completion.html
+/// and https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_result.html
+///
+/// The return type of a function taking a CompletionToken is
+/// async_result<CompletionToken, Signature>::return_type.
+///
+/// Functions
+/// ---------
+///
+/// The default implementations treat whatever value is described as a
+/// function, whose parameters correspond to the signature, and calls
+/// it upon completion.
+///
+/// EXAMPLE:
+/// Let f be an asynchronous function whose signature is (bs::error_code, int)
+/// Let g be an asynchronous function whose signature is
+/// (bs::error_code, int, std::string).
+///
+///
+/// f([](bs::error_code ec, int i) { ... });
+/// g([](bs::error_code ec, int i, std::string s) { ... });
+///
+/// Will schedule asynchronous tasks, and the provided lambdas will be
+/// called on completion. In this case, f and g return void.
+///
+/// There are other specializations. Commonly used ones are.
+///
+/// Futures
+/// -------
+///
+/// A CompletionToken of boost::asio::use_future will complete with a
+/// promise whose type matches (minus any initial error_code) the
+/// function's signature. The corresponding future is returned. If the
+/// error_code of the result is non-zero, the future is set with an
+/// exception of type boost::asio::system_error.
+///
+/// See https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/use_future_t.html
+///
+/// EXAMPLE:
+///
+/// std::future<int> = f(ba::use_future);
+/// std::future<std::tuple<int, std::string> = g(ba::use_future).
+///
+/// Coroutines
+/// ----------
+///
+/// A CompletionToken of type spawn::yield_context suspends execution
+/// of the current coroutine until completion of the operation. See
+/// src/spawn/README.md
+/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/spawn.html and
+/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/yield_context.html
+///
+/// Operations given this CompletionToken return their results, modulo
+/// any leading error_code. A non-zero error code will be thrown, by
+/// default, but may be bound to a variable instead with the overload
+/// of the array-subscript oeprator.
+///
+/// EXAMPLE:
+/// // Within a function with a yield_context parameter named y
+///
+/// try {
+/// int i = f(y);
+/// } catch (const bs::system_error& ec) { ... }
+///
+/// bs::error_code ec;
+/// auto [i, s] = g(y[ec]);
+///
+/// Blocking calls
+/// --------------
+///
+/// ceph::async::use_blocked, defined in src/common/async/blocked_completion.h
+/// Suspends the current thread of execution, returning the results of
+/// the operation on resumption. Its calling convention is analogous to
+/// that of yield_context.
+///
+/// EXAMPLE:
+/// try {
+/// int i = f(ca::use_blocked);
+/// } catch (const bs::system_error& e) { ... }
+///
+/// bs::error_code ec;
+/// auto [i, s] = g(ca::use_blocked[ec]);
+///
+/// librados Completions
+/// --------------------
+///
+/// If src/common/async/librados_completion.h is included in the
+/// current translation unit, then librados::AioCompletion* may be used
+/// as a CompletionToken. This is only permitted when the completion
+/// signature is either bs::system_error or void. The return type of
+/// functions provided a CompletionToken of AioCompletion* is void. If
+/// the signature includes an error code and the error code is set,
+/// then the error is translated to an int which is set as the result
+/// of the AioCompletion.
+///
+/// EXAMPLE:
+/// // Assume an asynchronous function h whose signature is bs::error_code.
+///
+/// AioCompletion* c = Rados::aio_create_completion();
+/// h(c);
+/// int r = c.get_return_value();
+///
+/// See also src/test/cls_fifo/bench_cls_fifo.cc for a full, simple
+/// example of a program using this class with coroutines.
+///
+///
+/// Markers
+/// =======
+///
+/// Markers represent a position within the FIFO. Internally, they are
+/// part/offset pairs. Externally, they are ordered but otherwise
+/// opaque strings. Markers that compare lower denote positions closer
+/// to the tail.
+///
+/// A marker is returned with every entry from a list() operation. They
+/// may be supplied to a list operation to resume from a given
+/// position, and must be supplied to trim give the position to which
+/// to trim.
+
+class FIFO {
+public:
+
+ FIFO(const FIFO&) = delete;
+ FIFO& operator =(const FIFO&) = delete;
+ FIFO(FIFO&&) = delete;
+ FIFO& operator =(FIFO&&) = delete;
+
+ /// Open an existing FIFO.
+ /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f)
+ template<typename CT>
+ static auto open(RADOS& r, //< RADOS handle
+ const IOContext& ioc, //< Context for pool, namespace, etc.
+ Object oid, //< OID for the 'main' object of the FIFO
+ CT&& ct, //< CompletionToken
+ /// Fail if is not this version
+ std::optional<fifo::objv> objv = std::nullopt,
+ /// Default executor. By default use the one
+ /// associated with the RADOS handle.
+ std::optional<ba::executor> executor = std::nullopt) {
+ ba::async_completion<CT, void(bs::error_code,
+ std::unique_ptr<FIFO>)> init(ct);
+ auto e = ba::get_associated_executor(init.completion_handler,
+ executor.value_or(r.get_executor()));
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ _read_meta_(
+ &r, oid, ioc, objv,
+ ca::bind_ea(
+ e, a,
+ [&r, ioc, oid, executor, handler = std::move(init.completion_handler)]
+ (bs::error_code ec, fifo::info info,
+ std::uint32_t size, std::uint32_t over) mutable {
+ std::unique_ptr<FIFO> f(
+ new FIFO(r, ioc, oid, executor.value_or(r.get_executor())));
+ f->info = info;
+ f->part_header_size = size;
+ f->part_entry_overhead = over;
+ // If there are journal entries, process them, in case
+ // someone crashed mid-transaction.
+ if (!ec && !info.journal.empty()) {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ auto g = f.get();
+ g->_process_journal(
+ ca::bind_ea(
+ e, a,
+ [f = std::move(f),
+ handler = std::move(handler)](bs::error_code ec) mutable {
+ std::move(handler)(ec, std::move(f));
+ }));
+ return;
+ }
+ std::move(handler)(ec, std::move(f));
+ return;
+ }));
+ return init.result.get();
+ }
+
+ /// Open an existing or create a new FIFO.
+ /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f)
+ template<typename CT>
+ static auto create(RADOS& r, /// RADOS handle
+ const IOContext& ioc, /// Context for pool, namespace, etc.
+ Object oid, /// OID for the 'main' object of the FIFO
+ CT&& ct, /// CompletionToken
+ /// Fail if FIFO exists and is not this version
+ std::optional<fifo::objv> objv = std::nullopt,
+ /// Custom prefix for parts
+ std::optional<std::string_view> oid_prefix = std::nullopt,
+ /// Fail if FIFO already exists
+ bool exclusive = false,
+ /// Size at which a part is considered full
+ std::uint64_t max_part_size = default_max_part_size,
+ /// Maximum size of any entry
+ std::uint64_t max_entry_size = default_max_entry_size,
+ /// Default executor. By default use the one
+ /// associated with the RADOS handle.
+ std::optional<ba::executor> executor = std::nullopt) {
+ ba::async_completion<CT, void(bs::error_code,
+ std::unique_ptr<FIFO>)> init(ct);
+ WriteOp op;
+ create_meta(op, oid, objv, oid_prefix, exclusive, max_part_size,
+ max_entry_size);
+ auto e = ba::get_associated_executor(init.completion_handler,
+ executor.value_or(r.get_executor()));
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ r.execute(
+ oid, ioc, std::move(op),
+ ca::bind_ea(
+ e, a,
+ [objv, &r, ioc, oid, executor, handler = std::move(init.completion_handler)]
+ (bs::error_code ec) mutable {
+ if (ec) {
+ std::move(handler)(ec, nullptr);
+ return;
+ }
+ auto e = ba::get_associated_executor(
+ handler, executor.value_or(r.get_executor()));
+ auto a = ba::get_associated_allocator(handler);
+ FIFO::_read_meta_(
+ &r, oid, ioc, objv,
+ ca::bind_ea(
+ e, a,
+ [&r, ioc, executor, oid, handler = std::move(handler)]
+ (bs::error_code ec, fifo::info info,
+ std::uint32_t size, std::uint32_t over) mutable {
+ std::unique_ptr<FIFO> f(
+ new FIFO(r, ioc, oid, executor.value_or(r.get_executor())));
+ f->info = info;
+ f->part_header_size = size;
+ f->part_entry_overhead = over;
+ if (!ec && !info.journal.empty()) {
+ auto e = ba::get_associated_executor(handler,
+ f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ auto g = f.get();
+ g->_process_journal(
+ ca::bind_ea(
+ e, a,
+ [f = std::move(f), handler = std::move(handler)]
+ (bs::error_code ec) mutable {
+ std::move(handler)(ec, std::move(f));
+ }));
+ return;
+ }
+ std::move(handler)(ec, std::move(f));
+ }));
+ }));
+ return init.result.get();
+ }
+
+ /// Force a re-read of FIFO metadata.
+ /// Signature: (bs::error_code ec)
+ template<typename CT>
+ auto read_meta(CT&& ct, //< CompletionToken
+ /// Fail if FIFO not at this version
+ std::optional<fifo::objv> objv = std::nullopt) {
+ std::unique_lock l(m);
+ auto version = info.version;
+ l.unlock();
+ ba::async_completion<CT, void(bs::error_code)> init(ct);
+ auto e = ba::get_associated_executor(init.completion_handler,
+ get_executor());
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ _read_meta_(
+ r, oid, ioc, objv,
+ ca::bind_ea(
+ e, a,
+ [this, version, handler = std::move(init.completion_handler)]
+ (bs::error_code ec, fifo::info newinfo,
+ std::uint32_t size, std::uint32_t over) mutable {
+ std::unique_lock l(m);
+ if (version == info.version) {
+ info = newinfo;
+ part_header_size = size;
+ part_entry_overhead = over;
+ }
+ l.unlock();
+ return std::move(handler)(ec);
+ }));
+ return init.result.get();
+ }
+
+ /// Return a reference to currently known metadata
+ const fifo::info& meta() const {
+ return info;
+ }
+
+ /// Return header size and entry overhead of partitions.
+ std::pair<std::uint32_t, std::uint32_t> get_part_layout_info() {
+ return {part_header_size, part_entry_overhead};
+ }
+
+ /// Push a single entry to the FIFO.
+ /// Signature: (bs::error_code)
+ template<typename CT>
+ auto push(const cb::list& bl, //< Bufferlist holding entry to push
+ CT&& ct //< CompletionToken
+ ) {
+ return push(std::vector{ bl }, std::forward<CT>(ct));
+ }
+
+ /// Push a many entries to the FIFO.
+ /// Signature: (bs::error_code)
+ template<typename CT>
+ auto push(const std::vector<cb::list>& data_bufs, //< Entries to push
+ CT&& ct //< CompletionToken
+ ) {
+ ba::async_completion<CT, void(bs::error_code)> init(ct);
+ std::unique_lock l(m);
+ auto max_entry_size = info.params.max_entry_size;
+ auto need_new_head = info.need_new_head();
+ l.unlock();
+ auto e = ba::get_associated_executor(init.completion_handler,
+ get_executor());
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ if (data_bufs.empty() ) {
+ // Can't fail if you don't try.
+ e.post(ca::bind_handler(std::move(init.completion_handler),
+ bs::error_code{}), a);
+ return init.result.get();
+ }
+
+ // Validate sizes
+ for (const auto& bl : data_bufs) {
+ if (bl.length() > max_entry_size) {
+ ldout(r->cct(), 10) << __func__ << "(): entry too large: "
+ << bl.length() << " > "
+ << info.params.max_entry_size << dendl;
+ e.post(ca::bind_handler(std::move(init.completion_handler),
+ errc::entry_too_large), a);
+ return init.result.get();
+ }
+ }
+
+ auto p = ca::bind_ea(e, a,
+ Pusher(this, {data_bufs.begin(), data_bufs.end()},
+ {}, 0, std::move(init.completion_handler)));
+
+ if (need_new_head) {
+ _prepare_new_head(std::move(p));
+ } else {
+ e.dispatch(std::move(p), a);
+ }
+ return init.result.get();
+ }
+
+ /// List the entries in a FIFO
+ /// Signature(bs::error_code ec, bs::vector<list_entry> entries, bool more)
+ ///
+ /// More is true if entries beyond the last exist.
+ /// The list entries are of the form:
+ /// data - Contents of the entry
+ /// marker - String representing the position of this entry within the FIFO.
+ /// mtime - Time (on the OSD) at which the entry was pushed.
+ template<typename CT>
+ auto list(int max_entries, //< Maximum number of entries to fetch
+ /// Optionally, a marker indicating the position after
+ /// which to begin listing. If null, begin at the tail.
+ std::optional<std::string_view> markstr,
+ CT&& ct //< CompletionToken
+ ) {
+ ba::async_completion<CT, void(bs::error_code,
+ std::vector<list_entry>, bool)> init(ct);
+ std::unique_lock l(m);
+ std::int64_t part_num = info.tail_part_num;
+ l.unlock();
+ std::uint64_t ofs = 0;
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ auto e = ba::get_associated_executor(init.completion_handler);
+
+ if (markstr) {
+ auto marker = to_marker(*markstr);
+ if (!marker) {
+ ldout(r->cct(), 0) << __func__
+ << "(): failed to parse marker (" << *markstr
+ << ")" << dendl;
+ e.post(ca::bind_handler(std::move(init.completion_handler),
+ errc::invalid_marker,
+ std::vector<list_entry>{}, false), a);
+ return init.result.get();
+ }
+ part_num = marker->num;
+ ofs = marker->ofs;
+ }
+
+ using handler_type = decltype(init.completion_handler);
+ auto ls = ceph::allocate_unique<Lister<handler_type>>(
+ a, this, part_num, ofs, max_entries,
+ std::move(init.completion_handler));
+ ls.release()->list();
+ return init.result.get();
+ }
+
+ /// Trim entries from the tail to the given position
+ /// Signature: (bs::error_code)
+ template<typename CT>
+ auto trim(std::string_view markstr, //< Position to which to trim, inclusive
+ bool exclusive, //< If true, trim markers up to but NOT INCLUDING
+ //< markstr, otherwise trim markstr as well.
+ CT&& ct //< CompletionToken
+ ) {
+ auto m = to_marker(markstr);
+ ba::async_completion<CT, void(bs::error_code)> init(ct);
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ auto e = ba::get_associated_executor(init.completion_handler);
+ if (!m) {
+ ldout(r->cct(), 0) << __func__ << "(): failed to parse marker: marker="
+ << markstr << dendl;
+ e.post(ca::bind_handler(std::move(init.completion_handler),
+ errc::invalid_marker), a);
+ return init.result.get();
+ } else {
+ using handler_type = decltype(init.completion_handler);
+ auto t = ceph::allocate_unique<Trimmer<handler_type>>(
+ a, this, m->num, m->ofs, exclusive, std::move(init.completion_handler));
+ t.release()->trim();
+ }
+ return init.result.get();
+ }
+
+ /// Get information about a specific partition
+ /// Signature: (bs::error_code, part_info)
+ ///
+ /// part_info has the following entries
+ /// tag - A random string identifying this partition. Used internally
+ /// as a sanity check to make sure operations haven't been misdirected
+ /// params - Data parameters, identical for every partition within a
+ /// FIFO and the same as what is returned from get_part_layout()
+ /// magic - A random magic number, used internally as a prefix to
+ /// every entry stored on the OSD to ensure sync
+ /// min_ofs - Offset of the first entry
+ /// max_ofs - Offset of the highest entry
+ /// min_index - Minimum entry index
+ /// max_index - Maximum entry index
+ /// max_time - Time of the latest push
+ ///
+ /// The difference between ofs and index is that ofs is a byte
+ /// offset. Index is a count. Nothing really uses indices, but
+ /// they're tracked and sanity-checked as an invariant on the OSD.
+ ///
+ /// max_ofs and max_time are the two that have been used externally
+ /// so far.
+ template<typename CT>
+ auto get_part_info(int64_t part_num, // The number of the partition
+ CT&& ct // CompletionToken
+ ) {
+
+ ba::async_completion<CT, void(bs::error_code, part_info)> init(ct);
+ fifo::op::get_part_info gpi;
+ cb::list in;
+ encode(gpi, in);
+ ReadOp op;
+ auto e = ba::get_associated_executor(init.completion_handler,
+ get_executor());
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ auto reply = ceph::allocate_unique<
+ ExecDecodeCB<fifo::op::get_part_info_reply>>(a);
+
+ op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
+ std::ref(*reply));
+ std::unique_lock l(m);
+ auto part_oid = info.part_oid(part_num);
+ l.unlock();
+ r->execute(part_oid, ioc, std::move(op), nullptr,
+ ca::bind_ea(e, a,
+ PartInfoGetter(std::move(init.completion_handler),
+ std::move(reply))));
+ return init.result.get();
+ }
+
+ using executor_type = ba::executor;
+
+ /// Return the default executor, as specified at creation.
+ ba::executor get_executor() const {
+ return executor;
+ }
+
+private:
+ template<typename Handler>
+ friend class detail::JournalProcessor;
+ RADOS* const r;
+ const IOContext ioc;
+ const Object oid;
+ std::mutex m;
+
+ fifo::info info;
+
+ std::uint32_t part_header_size = 0xdeadbeef;
+ std::uint32_t part_entry_overhead = 0xdeadbeef;
+
+ ba::executor executor;
+
+ std::optional<marker> to_marker(std::string_view s);
+
+ template<typename Handler, typename T>
+ static void assoc_delete(const Handler& handler, T* t) {
+ typename std::allocator_traits<typename ba::associated_allocator<Handler>::type>
+ ::template rebind_alloc<T> a(
+ ba::get_associated_allocator(handler));
+ a.destroy(t);
+ a.deallocate(t, 1);
+ }
+
+ FIFO(RADOS& r,
+ IOContext ioc,
+ Object oid,
+ ba::executor executor)
+ : r(&r), ioc(std::move(ioc)), oid(oid), executor(executor) {}
+
+ std::string generate_tag() const;
+
+ template <typename T>
+ struct ExecDecodeCB {
+ bs::error_code ec;
+ T result;
+ void operator()(bs::error_code e, const cb::list& r) {
+ if (e) {
+ ec = e;
+ return;
+ }
+ try {
+ auto p = r.begin();
+ using ceph::decode;
+ decode(result, p);
+ } catch (const cb::error& err) {
+ ec = err.code();
+ }
+ }
+ };
+
+ template<typename Handler>
+ class MetaReader {
+ Handler handler;
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ using decoder_type = ExecDecodeCB<fifo::op::get_meta_reply>;
+ using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
+ decoder_ptr decoder;
+ public:
+ MetaReader(Handler&& handler, decoder_ptr&& decoder)
+ : handler(std::move(handler)), decoder(std::move(decoder)) {}
+
+ void operator ()(bs::error_code ec) {
+ if (!ec) {
+ ec = decoder->ec;
+ }
+ auto reply = std::move(decoder->result);
+ decoder.reset(); // free handler-allocated memory before dispatching
+
+ std::move(handler)(ec, std::move(reply.info),
+ std::move(reply.part_header_size),
+ std::move(reply.part_entry_overhead));
+ }
+ };
+
+ // Renamed to get around a compiler bug in Bionic that kept
+ // complaining we weren't capturing 'this' to make a static function call.
+ template<typename Handler>
+ static void _read_meta_(RADOS* r, const Object& oid, const IOContext& ioc,
+ std::optional<fifo::objv> objv,
+ Handler&& handler, /* error_code, info, uint64,
+ uint64 */
+ std::optional<ba::executor> executor = std::nullopt){
+ fifo::op::get_meta gm;
+
+ gm.version = objv;
+
+ cb::list in;
+ encode(gm, in);
+ ReadOp op;
+
+ auto a = ba::get_associated_allocator(handler);
+ auto reply =
+ ceph::allocate_unique<ExecDecodeCB<fifo::op::get_meta_reply>>(a);
+
+ auto e = ba::get_associated_executor(handler);
+ op.exec(fifo::op::CLASS, fifo::op::GET_META, in, std::ref(*reply));
+ r->execute(oid, ioc, std::move(op), nullptr,
+ ca::bind_ea(e, a, MetaReader(std::move(handler),
+ std::move(reply))));
+ };
+
+ template<typename Handler>
+ void _read_meta(Handler&& handler /* error_code */) {
+ auto e = ba::get_associated_executor(handler, get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ _read_meta_(r, oid, ioc,
+ nullopt,
+ ca::bind_ea(
+ e, a,
+ [this,
+ handler = std::move(handler)](bs::error_code ec,
+ fifo::info&& info,
+ std::uint64_t phs,
+ std::uint64_t peo) mutable {
+ std::unique_lock l(m);
+ if (ec) {
+ l.unlock();
+ std::move(handler)(ec);
+ return;
+ }
+ // We have a newer version already!
+ if (!info.version.same_or_later(this->info.version)) {
+ l.unlock();
+ std::move(handler)(bs::error_code{});
+ return;
+ }
+ this->info = std::move(info);
+ part_header_size = phs;
+ part_entry_overhead = peo;
+ l.unlock();
+ std::move(handler)(bs::error_code{});
+ }), get_executor());
+ }
+
+ bs::error_code apply_update(fifo::info* info,
+ const fifo::objv& objv,
+ const fifo::update& update);
+
+
+ template<typename Handler>
+ void _update_meta(const fifo::update& update,
+ fifo::objv version,
+ Handler&& handler /* error_code, bool */) {
+ WriteOp op;
+
+ cls::fifo::update_meta(op, info.version, update);
+
+ auto a = ba::get_associated_allocator(handler);
+ auto e = ba::get_associated_executor(handler, get_executor());
+
+ r->execute(
+ oid, ioc, std::move(op),
+ ca::bind_ea(
+ e, a,
+ [this, e, a, version, update,
+ handler = std::move(handler)](bs::error_code ec) mutable {
+ if (ec && ec != bs::errc::operation_canceled) {
+ std::move(handler)(ec, bool{});
+ return;
+ }
+
+ auto canceled = (ec == bs::errc::operation_canceled);
+
+ if (!canceled) {
+ ec = apply_update(&info,
+ version,
+ update);
+ if (ec) {
+ canceled = true;
+ }
+ }
+
+ if (canceled) {
+ _read_meta(
+ ca::bind_ea(
+ e, a,
+ [handler = std::move(handler)](bs::error_code ec) mutable {
+ std::move(handler)(ec, ec ? false : true);
+ }));
+ return;
+ }
+ std::move(handler)(ec, false);
+ return;
+ }));
+ }
+
+ template<typename Handler>
+ auto _process_journal(Handler&& handler /* error_code */) {
+ auto a = ba::get_associated_allocator(std::ref(handler));
+ auto j = ceph::allocate_unique<detail::JournalProcessor<Handler>>(
+ a, this, std::move(handler));
+ auto p = j.release();
+ p->process();
+ }
+
+ template<typename Handler>
+ class NewPartPreparer {
+ FIFO* f;
+ Handler handler;
+ std::vector<fifo::journal_entry> jentries;
+ int i;
+ std::int64_t new_head_part_num;
+
+ public:
+
+ void operator ()(bs::error_code ec, bool canceled) {
+ if (ec) {
+ std::move(handler)(ec);
+ return;
+ }
+
+ if (canceled) {
+ std::unique_lock l(f->m);
+ auto iter = f->info.journal.find(jentries.front().part_num);
+ auto max_push_part_num = f->info.max_push_part_num;
+ auto head_part_num = f->info.head_part_num;
+ auto version = f->info.version;
+ auto found = (iter != f->info.journal.end());
+ l.unlock();
+ if ((max_push_part_num >= jentries.front().part_num &&
+ head_part_num >= new_head_part_num)) {
+ /* raced, but new part was already written */
+ std::move(handler)(bs::error_code{});
+ return;
+ }
+ if (i >= MAX_RACE_RETRIES) {
+ std::move(handler)(errc::raced);
+ return;
+ }
+ if (!found) {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->_update_meta(fifo::update{}
+ .journal_entries_add(jentries),
+ version,
+ ca::bind_ea(
+ e, a,
+ NewPartPreparer(f, std::move(handler),
+ jentries,
+ i + 1, new_head_part_num)));
+ return;
+ }
+ // Fall through. We still need to process the journal.
+ }
+ f->_process_journal(std::move(handler));
+ return;
+ }
+
+ NewPartPreparer(FIFO* f,
+ Handler&& handler,
+ std::vector<fifo::journal_entry> jentries,
+ int i, std::int64_t new_head_part_num)
+ : f(f), handler(std::move(handler)), jentries(std::move(jentries)),
+ i(i), new_head_part_num(new_head_part_num) {}
+ };
+
+ template<typename Handler>
+ void _prepare_new_part(bool is_head,
+ Handler&& handler /* error_code */) {
+ std::unique_lock l(m);
+ std::vector jentries = { info.next_journal_entry(generate_tag()) };
+ std::int64_t new_head_part_num = info.head_part_num;
+ auto version = info.version;
+
+ if (is_head) {
+ auto new_head_jentry = jentries.front();
+ new_head_jentry.op = fifo::journal_entry::Op::set_head;
+ new_head_part_num = jentries.front().part_num;
+ jentries.push_back(std::move(new_head_jentry));
+ }
+ l.unlock();
+
+ auto e = ba::get_associated_executor(handler, get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ _update_meta(fifo::update{}.journal_entries_add(jentries),
+ version,
+ ca::bind_ea(
+ e, a,
+ NewPartPreparer(this, std::move(handler),
+ jentries, 0, new_head_part_num)));
+ }
+
+ template<typename Handler>
+ class NewHeadPreparer {
+ FIFO* f;
+ Handler handler;
+ int i;
+ std::int64_t new_head_num;
+
+ public:
+
+ void operator ()(bs::error_code ec, bool canceled) {
+ std::unique_lock l(f->m);
+ auto head_part_num = f->info.head_part_num;
+ auto version = f->info.version;
+ l.unlock();
+
+ if (ec) {
+ std::move(handler)(ec);
+ return;
+ }
+ if (canceled) {
+ if (i >= MAX_RACE_RETRIES) {
+ std::move(handler)(errc::raced);
+ return;
+ }
+
+ // Raced, but there's still work to do!
+ if (head_part_num < new_head_num) {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->_update_meta(fifo::update{}.head_part_num(new_head_num),
+ version,
+ ca::bind_ea(
+ e, a,
+ NewHeadPreparer(f, std::move(handler),
+ i + 1,
+ new_head_num)));
+ return;
+ }
+ }
+ // Either we succeeded, or we were raced by someone who did it for us.
+ std::move(handler)(bs::error_code{});
+ return;
+ }
+
+ NewHeadPreparer(FIFO* f,
+ Handler&& handler,
+ int i, std::int64_t new_head_num)
+ : f(f), handler(std::move(handler)), i(i), new_head_num(new_head_num) {}
+ };
+
+ template<typename Handler>
+ void _prepare_new_head(Handler&& handler /* error_code */) {
+ std::unique_lock l(m);
+ int64_t new_head_num = info.head_part_num + 1;
+ auto max_push_part_num = info.max_push_part_num;
+ auto version = info.version;
+ l.unlock();
+
+ if (max_push_part_num < new_head_num) {
+ auto e = ba::get_associated_executor(handler, get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ _prepare_new_part(
+ true,
+ ca::bind_ea(
+ e, a,
+ [this, new_head_num,
+ handler = std::move(handler)](bs::error_code ec) mutable {
+ if (ec) {
+ handler(ec);
+ return;
+ }
+ std::unique_lock l(m);
+ if (info.max_push_part_num < new_head_num) {
+ l.unlock();
+ ldout(r->cct(), 0)
+ << "ERROR: " << __func__
+ << ": after new part creation: meta_info.max_push_part_num="
+ << info.max_push_part_num << " new_head_num="
+ << info.max_push_part_num << dendl;
+ std::move(handler)(errc::inconsistency);
+ } else {
+ l.unlock();
+ std::move(handler)(bs::error_code{});
+ }
+ }));
+ return;
+ }
+ auto e = ba::get_associated_executor(handler, get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ _update_meta(fifo::update{}.head_part_num(new_head_num),
+ version,
+ ca::bind_ea(
+ e, a,
+ NewHeadPreparer(this, std::move(handler), 0,
+ new_head_num)));
+ }
+
+ template<typename T>
+ struct ExecHandleCB {
+ bs::error_code ec;
+ T result;
+ void operator()(bs::error_code e, const T& t) {
+ if (e) {
+ ec = e;
+ return;
+ }
+ result = t;
+ }
+ };
+
+ template<typename Handler>
+ class EntryPusher {
+ Handler handler;
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ using decoder_type = ExecHandleCB<int>;
+ using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
+ decoder_ptr decoder;
+
+ public:
+
+ EntryPusher(Handler&& handler, decoder_ptr&& decoder)
+ : handler(std::move(handler)), decoder(std::move(decoder)) {}
+
+ void operator ()(bs::error_code ec) {
+ if (!ec) {
+ ec = decoder->ec;
+ }
+ auto reply = std::move(decoder->result);
+ decoder.reset(); // free handler-allocated memory before dispatching
+
+ std::move(handler)(ec, std::move(reply));
+ }
+ };
+
+ template<typename Handler>
+ auto push_entries(const std::deque<cb::list>& data_bufs,
+ Handler&& handler /* error_code, int */) {
+ WriteOp op;
+ std::unique_lock l(m);
+ auto head_part_num = info.head_part_num;
+ auto tag = info.head_tag;
+ auto oid = info.part_oid(head_part_num);
+ l.unlock();
+
+ auto a = ba::get_associated_allocator(handler);
+ auto reply = ceph::allocate_unique<ExecHandleCB<int>>(a);
+
+ auto e = ba::get_associated_executor(handler, get_executor());
+ push_part(op, tag, data_bufs, std::ref(*reply));
+ return r->execute(oid, ioc, std::move(op),
+ ca::bind_ea(e, a, EntryPusher(std::move(handler),
+ std::move(reply))));
+ }
+
+ template<typename CT>
+ auto trim_part(int64_t part_num,
+ uint64_t ofs,
+ std::optional<std::string_view> tag,
+ bool exclusive,
+ CT&& ct) {
+ WriteOp op;
+ cls::fifo::trim_part(op, tag, ofs, exclusive);
+ return r->execute(info.part_oid(part_num), ioc, std::move(op),
+ std::forward<CT>(ct));
+ }
+
+
+ template<typename Handler>
+ class Pusher {
+ FIFO* f;
+ std::deque<cb::list> remaining;
+ std::deque<cb::list> batch;
+ int i;
+ Handler handler;
+
+ void prep_then_push(const unsigned successes) {
+ std::unique_lock l(f->m);
+ auto max_part_size = f->info.params.max_part_size;
+ auto part_entry_overhead = f->part_entry_overhead;
+ l.unlock();
+
+ uint64_t batch_len = 0;
+ if (successes > 0) {
+ if (successes == batch.size()) {
+ batch.clear();
+ } else {
+ batch.erase(batch.begin(), batch.begin() + successes);
+ for (const auto& b : batch) {
+ batch_len += b.length() + part_entry_overhead;
+ }
+ }
+ }
+
+ if (batch.empty() && remaining.empty()) {
+ std::move(handler)(bs::error_code{});
+ return;
+ }
+
+ while (!remaining.empty() &&
+ (remaining.front().length() + batch_len <= max_part_size)) {
+
+ /* We can send entries with data_len up to max_entry_size,
+ however, we want to also account the overhead when
+ dealing with multiple entries. Previous check doesn't
+ account for overhead on purpose. */
+ batch_len += remaining.front().length() + part_entry_overhead;
+ batch.push_back(std::move(remaining.front()));
+ remaining.pop_front();
+ }
+ push();
+ }
+
+ void push() {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->push_entries(batch,
+ ca::bind_ea(e, a,
+ Pusher(f, std::move(remaining),
+ batch, i,
+ std::move(handler))));
+ }
+
+ public:
+
+ // Initial call!
+ void operator ()() {
+ prep_then_push(0);
+ }
+
+ // Called with response to push_entries
+ void operator ()(bs::error_code ec, int r) {
+ if (ec == bs::errc::result_out_of_range) {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->_prepare_new_head(
+ ca::bind_ea(e, a,
+ Pusher(f, std::move(remaining),
+ std::move(batch), i,
+ std::move(handler))));
+ return;
+ }
+ if (ec) {
+ std::move(handler)(ec);
+ return;
+ }
+ i = 0; // We've made forward progress, so reset the race counter!
+ prep_then_push(r);
+ }
+
+ // Called with response to prepare_new_head
+ void operator ()(bs::error_code ec) {
+ if (ec == bs::errc::operation_canceled) {
+ if (i == MAX_RACE_RETRIES) {
+ ldout(f->r->cct(), 0)
+ << "ERROR: " << __func__
+ << "(): race check failed too many times, likely a bug" << dendl;
+ std::move(handler)(make_error_code(errc::raced));
+ return;
+ }
+ ++i;
+ } else if (ec) {
+ std::move(handler)(ec);
+ return;
+ }
+
+ if (batch.empty()) {
+ prep_then_push(0);
+ return;
+ } else {
+ push();
+ return;
+ }
+ }
+
+ Pusher(FIFO* f, std::deque<cb::list>&& remaining,
+ std::deque<cb::list> batch, int i,
+ Handler&& handler)
+ : f(f), remaining(std::move(remaining)),
+ batch(std::move(batch)), i(i),
+ handler(std::move(handler)) {}
+ };
+
+ template<typename Handler>
+ class Lister {
+ FIFO* f;
+ std::vector<list_entry> result;
+ bool more = false;
+ std::int64_t part_num;
+ std::uint64_t ofs;
+ int max_entries;
+ bs::error_code ec_out;
+ std::vector<fifo::part_list_entry> entries;
+ bool part_more = false;
+ bool part_full = false;
+ Handler handler;
+
+ void handle(bs::error_code ec) {
+ auto h = std::move(handler);
+ auto m = more;
+ auto r = std::move(result);
+
+ FIFO::assoc_delete(h, this);
+ std::move(h)(ec, std::move(r), m);
+ }
+
+ public:
+ Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
+ Handler&& handler)
+ : f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
+ handler(std::move(handler)) {
+ result.reserve(max_entries);
+ }
+
+
+ Lister(const Lister&) = delete;
+ Lister& operator =(const Lister&) = delete;
+ Lister(Lister&&) = delete;
+ Lister& operator =(Lister&&) = delete;
+
+ void list() {
+ if (max_entries > 0) {
+ ReadOp op;
+ ec_out.clear();
+ part_more = false;
+ part_full = false;
+ entries.clear();
+
+ std::unique_lock l(f->m);
+ auto part_oid = f->info.part_oid(part_num);
+ l.unlock();
+
+ list_part(op,
+ {},
+ ofs,
+ max_entries,
+ &ec_out,
+ &entries,
+ &part_more,
+ &part_full,
+ nullptr);
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->r->execute(
+ part_oid,
+ f->ioc,
+ std::move(op),
+ nullptr,
+ ca::bind_ea(
+ e, a,
+ [t = std::unique_ptr<Lister>(this), this,
+ part_oid](bs::error_code ec) mutable {
+ t.release();
+ if (ec == bs::errc::no_such_file_or_directory) {
+ auto e = ba::get_associated_executor(handler,
+ f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->_read_meta(
+ ca::bind_ea(
+ e, a,
+ [this](bs::error_code ec) mutable {
+ if (ec) {
+ handle(ec);
+ return;
+ }
+
+ if (part_num < f->info.tail_part_num) {
+ /* raced with trim? restart */
+ max_entries += result.size();
+ result.clear();
+ part_num = f->info.tail_part_num;
+ ofs = 0;
+ list();
+ }
+ /* assuming part was not written yet, so end of data */
+ more = false;
+ handle({});
+ return;
+ }));
+ return;
+ }
+ if (ec) {
+ ldout(f->r->cct(), 0)
+ << __func__
+ << "(): list_part() on oid=" << part_oid
+ << " returned ec=" << ec.message() << dendl;
+ handle(ec);
+ return;
+ }
+ if (ec_out) {
+ ldout(f->r->cct(), 0)
+ << __func__
+ << "(): list_part() on oid=" << f->info.part_oid(part_num)
+ << " returned ec=" << ec_out.message() << dendl;
+ handle(ec_out);
+ return;
+ }
+
+ more = part_full || part_more;
+ for (auto& entry : entries) {
+ list_entry e;
+ e.data = std::move(entry.data);
+ e.marker = marker{part_num, entry.ofs}.to_string();
+ e.mtime = entry.mtime;
+ result.push_back(std::move(e));
+ }
+ max_entries -= entries.size();
+ entries.clear();
+ if (max_entries > 0 &&
+ part_more) {
+ list();
+ return;
+ }
+
+ if (!part_full) { /* head part is not full */
+ handle({});
+ return;
+ }
+ ++part_num;
+ ofs = 0;
+ list();
+ }));
+ } else {
+ handle({});
+ return;
+ }
+ }
+ };
+
+ template<typename Handler>
+ class Trimmer {
+ FIFO* f;
+ std::int64_t part_num;
+ std::uint64_t ofs;
+ bool exclusive;
+ Handler handler;
+ std::int64_t pn;
+ int i = 0;
+
+ void handle(bs::error_code ec) {
+ auto h = std::move(handler);
+
+ FIFO::assoc_delete(h, this);
+ return std::move(h)(ec);
+ }
+
+ void update() {
+ std::unique_lock l(f->m);
+ auto objv = f->info.version;
+ l.unlock();
+ auto a = ba::get_associated_allocator(handler);
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ f->_update_meta(
+ fifo::update{}.tail_part_num(part_num),
+ objv,
+ ca::bind_ea(
+ e, a,
+ [this, t = std::unique_ptr<Trimmer>(this)](bs::error_code ec,
+ bool canceled) mutable {
+ t.release();
+ if (canceled)
+ if (i >= MAX_RACE_RETRIES) {
+ ldout(f->r->cct(), 0)
+ << "ERROR: " << __func__
+ << "(): race check failed too many times, likely a bug"
+ << dendl;
+ handle(errc::raced);
+ return;
+ }
+ std::unique_lock l(f->m);
+ auto tail_part_num = f->info.tail_part_num;
+ l.unlock();
+ if (tail_part_num < part_num) {
+ ++i;
+ update();
+ return;
+ }
+ handle({});
+ return;
+ }));
+ }
+
+ public:
+ Trimmer(FIFO* f, std::int64_t part_num, std::uint64_t ofs,
+ bool exclusive, Handler&& handler)
+ : f(f), part_num(part_num), ofs(ofs), exclusive(exclusive),
+ handler(std::move(handler)) {
+ std::unique_lock l(f->m);
+ pn = f->info.tail_part_num;
+ }
+
+ void trim() {
+ auto a = ba::get_associated_allocator(handler);
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ if (pn < part_num) {
+ std::unique_lock l(f->m);
+ auto max_part_size = f->info.params.max_part_size;
+ l.unlock();
+ f->trim_part(
+ pn, max_part_size, std::nullopt,
+ false,
+ ca::bind_ea(
+ e, a,
+ [t = std::unique_ptr<Trimmer>(this),
+ this](bs::error_code ec) mutable {
+ t.release();
+ if (ec && ec != bs::errc::no_such_file_or_directory) {
+ ldout(f->r->cct(), 0)
+ << __func__ << "(): ERROR: trim_part() on part="
+ << pn << " returned ec=" << ec.message() << dendl;
+ handle(ec);
+ return;
+ }
+ ++pn;
+ trim();
+ }));
+ return;
+ }
+ f->trim_part(
+ part_num, ofs, std::nullopt, exclusive,
+ ca::bind_ea(
+ e, a,
+ [t = std::unique_ptr<Trimmer>(this),
+ this](bs::error_code ec) mutable {
+ t.release();
+ if (ec && ec != bs::errc::no_such_file_or_directory) {
+ ldout(f->r->cct(), 0)
+ << __func__ << "(): ERROR: trim_part() on part=" << part_num
+ << " returned ec=" << ec.message() << dendl;
+ handle(ec);
+ return;
+ }
+ std::unique_lock l(f->m);
+ auto tail_part_num = f->info.tail_part_num;
+ l.unlock();
+ if (part_num <= tail_part_num) {
+ /* don't need to modify meta info */
+ handle({});
+ return;
+ }
+ update();
+ }));
+ }
+ };
+
+ template<typename Handler>
+ class PartInfoGetter {
+ Handler handler;
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ using decoder_type = ExecDecodeCB<fifo::op::get_part_info_reply>;
+ using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
+ decoder_ptr decoder;
+ public:
+ PartInfoGetter(Handler&& handler, decoder_ptr&& decoder)
+ : handler(std::move(handler)), decoder(std::move(decoder)) {}
+
+ void operator ()(bs::error_code ec) {
+ if (!ec) {
+ ec = decoder->ec;
+ }
+ auto reply = std::move(decoder->result);
+ decoder.reset(); // free handler-allocated memory before dispatching
+
+ auto p = ca::bind_handler(std::move(handler),
+ ec, std::move(reply.header));
+ std::move(p)();
+ }
+ };
+
+
+};
+
+namespace detail {
+template<typename Handler>
+class JournalProcessor {
+private:
+ FIFO* const fifo;
+ Handler handler;
+
+ std::vector<fifo::journal_entry> processed;
+ std::multimap<std::int64_t, fifo::journal_entry> journal;
+ std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
+ std::int64_t new_tail;
+ std::int64_t new_head;
+ std::int64_t new_max;
+ int race_retries = 0;
+
+ template<typename CT>
+ auto create_part(int64_t part_num, std::string_view tag, CT&& ct) {
+ WriteOp op;
+ op.create(false); /* We don't need exclusivity, part_init ensures
+ we're creating from the same journal entry. */
+ std::unique_lock l(fifo->m);
+ part_init(op, tag, fifo->info.params);
+ auto oid = fifo->info.part_oid(part_num);
+ l.unlock();
+ return fifo->r->execute(oid, fifo->ioc,
+ std::move(op), std::forward<CT>(ct));
+ }
+
+ template<typename CT>
+ auto remove_part(int64_t part_num, std::string_view tag, CT&& ct) {
+ WriteOp op;
+ op.remove();
+ std::unique_lock l(fifo->m);
+ auto oid = fifo->info.part_oid(part_num);
+ l.unlock();
+ return fifo->r->execute(oid, fifo->ioc,
+ std::move(op), std::forward<CT>(ct));
+ }
+
+ template<typename PP>
+ void process_journal_entry(const fifo::journal_entry& entry,
+ PP&& pp) {
+ switch (entry.op) {
+ case fifo::journal_entry::Op::unknown:
+ std::move(pp)(errc::inconsistency);
+ return;
+ break;
+
+ case fifo::journal_entry::Op::create:
+ create_part(entry.part_num, entry.part_tag, std::move(pp));
+ return;
+ break;
+ case fifo::journal_entry::Op::set_head:
+ ba::post(ba::get_associated_executor(handler, fifo->get_executor()),
+ [pp = std::move(pp)]() mutable {
+ std::move(pp)(bs::error_code{});
+ });
+ return;
+ break;
+ case fifo::journal_entry::Op::remove:
+ remove_part(entry.part_num, entry.part_tag, std::move(pp));
+ return;
+ break;
+ }
+ std::move(pp)(errc::inconsistency);
+ return;
+ }
+
+ auto journal_entry_finisher(const fifo::journal_entry& entry) {
+ auto a = ba::get_associated_allocator(handler);
+ auto e = ba::get_associated_executor(handler, fifo->get_executor());
+ return
+ ca::bind_ea(
+ e, a,
+ [t = std::unique_ptr<JournalProcessor>(this), this,
+ entry](bs::error_code ec) mutable {
+ t.release();
+ if (entry.op == fifo::journal_entry::Op::remove &&
+ ec == bs::errc::no_such_file_or_directory)
+ ec.clear();
+
+ if (ec) {
+ ldout(fifo->r->cct(), 0)
+ << __func__
+ << "(): ERROR: failed processing journal entry for part="
+ << entry.part_num << " with error " << ec.message()
+ << " Bug or inconsistency." << dendl;
+ handle(errc::inconsistency);
+ return;
+ } else {
+ switch (entry.op) {
+ case fifo::journal_entry::Op::unknown:
+ // Can't happen. Filtered out in process_journal_entry.
+ abort();
+ break;
+
+ case fifo::journal_entry::Op::create:
+ if (entry.part_num > new_max) {
+ new_max = entry.part_num;
+ }
+ break;
+ case fifo::journal_entry::Op::set_head:
+ if (entry.part_num > new_head) {
+ new_head = entry.part_num;
+ }
+ break;
+ case fifo::journal_entry::Op::remove:
+ if (entry.part_num >= new_tail) {
+ new_tail = entry.part_num + 1;
+ }
+ break;
+ }
+ processed.push_back(entry);
+ }
+ ++iter;
+ process();
+ });
+ }
+
+ struct JournalPostprocessor {
+ std::unique_ptr<JournalProcessor> j_;
+ bool first;
+ void operator ()(bs::error_code ec, bool canceled) {
+ std::optional<int64_t> tail_part_num;
+ std::optional<int64_t> head_part_num;
+ std::optional<int64_t> max_part_num;
+
+ auto j = j_.release();
+
+ if (!first && !ec && !canceled) {
+ j->handle({});
+ return;
+ }
+
+ if (canceled) {
+ if (j->race_retries >= MAX_RACE_RETRIES) {
+ ldout(j->fifo->r->cct(), 0) << "ERROR: " << __func__ <<
+ "(): race check failed too many times, likely a bug" << dendl;
+ j->handle(errc::raced);
+ return;
+ }
+
+ ++j->race_retries;
+
+ std::vector<fifo::journal_entry> new_processed;
+ std::unique_lock l(j->fifo->m);
+ for (auto& e : j->processed) {
+ auto jiter = j->fifo->info.journal.find(e.part_num);
+ /* journal entry was already processed */
+ if (jiter == j->fifo->info.journal.end() ||
+ !(jiter->second == e)) {
+ continue;
+ }
+ new_processed.push_back(e);
+ }
+ j->processed = std::move(new_processed);
+ }
+
+ std::unique_lock l(j->fifo->m);
+ auto objv = j->fifo->info.version;
+ if (j->new_tail > j->fifo->info.tail_part_num) {
+ tail_part_num = j->new_tail;
+ }
+
+ if (j->new_head > j->fifo->info.head_part_num) {
+ head_part_num = j->new_head;
+ }
+
+ if (j->new_max > j->fifo->info.max_push_part_num) {
+ max_part_num = j->new_max;
+ }
+ l.unlock();
+
+ if (j->processed.empty() &&
+ !tail_part_num &&
+ !max_part_num) {
+ /* nothing to update anymore */
+ j->handle({});
+ return;
+ }
+ auto a = ba::get_associated_allocator(j->handler);
+ auto e = ba::get_associated_executor(j->handler, j->fifo->get_executor());
+ j->fifo->_update_meta(fifo::update{}
+ .tail_part_num(tail_part_num)
+ .head_part_num(head_part_num)
+ .max_push_part_num(max_part_num)
+ .journal_entries_rm(j->processed),
+ objv,
+ ca::bind_ea(
+ e, a,
+ JournalPostprocessor{j, false}));
+ return;
+ }
+
+ JournalPostprocessor(JournalProcessor* j, bool first)
+ : j_(j), first(first) {}
+ };
+
+ void postprocess() {
+ if (processed.empty()) {
+ handle({});
+ return;
+ }
+ JournalPostprocessor(this, true)({}, false);
+ }
+
+ void handle(bs::error_code ec) {
+ auto e = ba::get_associated_executor(handler, fifo->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ auto h = std::move(handler);
+ FIFO::assoc_delete(h, this);
+ e.dispatch(ca::bind_handler(std::move(h), ec), a);
+ return;
+ }
+
+public:
+
+ JournalProcessor(FIFO* fifo, Handler&& handler)
+ : fifo(fifo), handler(std::move(handler)) {
+ std::unique_lock l(fifo->m);
+ journal = fifo->info.journal;
+ iter = journal.begin();
+ new_tail = fifo->info.tail_part_num;
+ new_head = fifo->info.head_part_num;
+ new_max = fifo->info.max_push_part_num;
+ }
+
+ JournalProcessor(const JournalProcessor&) = delete;
+ JournalProcessor& operator =(const JournalProcessor&) = delete;
+ JournalProcessor(JournalProcessor&&) = delete;
+ JournalProcessor& operator =(JournalProcessor&&) = delete;
+
+ void process() {
+ if (iter != journal.end()) {
+ const auto entry = iter->second;
+ process_journal_entry(entry,
+ journal_entry_finisher(entry));
+ return;
+ } else {
+ postprocess();
+ return;
+ }
+ }
+};
+}
+}
+
+#endif // CEPH_RADOS_CLS_FIFIO_H