summaryrefslogtreecommitdiffstats
path: root/src/neorados/cls/fifo.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/neorados/cls/fifo.h
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/neorados/cls/fifo.h1754
1 files changed, 1754 insertions, 0 deletions
diff --git a/src/neorados/cls/fifo.h b/src/neorados/cls/fifo.h
new file mode 100644
index 000000000..05865dcca
--- /dev/null
+++ b/src/neorados/cls/fifo.h
@@ -0,0 +1,1754 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat <contact@redhat.com>
+ * Author: Adam C. Emerson
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_NEORADOS_CLS_FIFIO_H
+#define CEPH_NEORADOS_CLS_FIFIO_H
+
+#include <cstdint>
+#include <deque>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <string_view>
+#include <vector>
+
+#include <boost/asio.hpp>
+#include <boost/system/error_code.hpp>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/format.h>
+
+#include "include/neorados/RADOS.hpp"
+#include "include/buffer.h"
+
+#include "common/allocate_unique.h"
+#include "common/async/bind_handler.h"
+#include "common/async/bind_like.h"
+#include "common/async/completion.h"
+#include "common/async/forward_handler.h"
+
+#include "common/dout.h"
+
+#include "cls/fifo/cls_fifo_types.h"
+#include "cls/fifo/cls_fifo_ops.h"
+
+namespace neorados::cls::fifo {
+namespace ba = boost::asio;
+namespace bs = boost::system;
+namespace ca = ceph::async;
+namespace cb = ceph::buffer;
+namespace fifo = rados::cls::fifo;
+
+inline constexpr auto dout_subsys = ceph_subsys_rados;
+inline constexpr std::uint64_t default_max_part_size = 4 * 1024 * 1024;
+inline constexpr std::uint64_t default_max_entry_size = 32 * 1024;
+inline constexpr auto MAX_RACE_RETRIES = 10;
+
+
+const boost::system::error_category& error_category() noexcept;
+
+enum class errc {
+ raced = 1,
+ inconsistency,
+ entry_too_large,
+ invalid_marker,
+ update_failed
+};
+}
+
+namespace boost::system {
+template<>
+struct is_error_code_enum<::neorados::cls::fifo::errc> {
+ static const bool value = true;
+};
+template<>
+struct is_error_condition_enum<::neorados::cls::fifo::errc> {
+ static const bool value = false;
+};
+}
+
+namespace neorados::cls::fifo {
+// explicit conversion:
+inline bs::error_code make_error_code(errc e) noexcept {
+ return { static_cast<int>(e), error_category() };
+}
+
+inline bs::error_code make_error_category(errc e) noexcept {
+ return { static_cast<int>(e), error_category() };
+}
+
+void create_meta(WriteOp& op, std::string_view id,
+ std::optional<fifo::objv> objv,
+ std::optional<std::string_view> oid_prefix,
+ bool exclusive = false,
+ std::uint64_t max_part_size = default_max_part_size,
+ std::uint64_t max_entry_size = default_max_entry_size);
+void get_meta(ReadOp& op, std::optional<fifo::objv> objv,
+ bs::error_code* ec_out, fifo::info* info,
+ std::uint32_t* part_header_size,
+ std::uint32_t* part_entry_overhead);
+
+void update_meta(WriteOp& op, const fifo::objv& objv,
+ const fifo::update& desc);
+
+void part_init(WriteOp& op, std::string_view tag,
+ fifo::data_params params);
+
+void push_part(WriteOp& op, std::string_view tag,
+ std::deque<cb::list> data_bufs,
+ fu2::unique_function<void(bs::error_code, int)>);
+void trim_part(WriteOp& op, std::optional<std::string_view> tag,
+ std::uint64_t ofs,
+ bool exclusive);
+void list_part(ReadOp& op,
+ std::optional<std::string_view> tag,
+ std::uint64_t ofs,
+ std::uint64_t max_entries,
+ bs::error_code* ec_out,
+ std::vector<fifo::part_list_entry>* entries,
+ bool* more,
+ bool* full_part,
+ std::string* ptag);
+void get_part_info(ReadOp& op,
+ bs::error_code* out_ec,
+ fifo::part_header* header);
+
+struct marker {
+ std::int64_t num = 0;
+ std::uint64_t ofs = 0;
+
+ marker() = default;
+ marker(std::int64_t num, std::uint64_t ofs) : num(num), ofs(ofs) {}
+ static marker max() {
+ return { std::numeric_limits<decltype(num)>::max(),
+ std::numeric_limits<decltype(ofs)>::max() };
+ }
+
+ std::string to_string() {
+ return fmt::format("{:0>20}:{:0>20}", num, ofs);
+ }
+};
+
+struct list_entry {
+ cb::list data;
+ std::string marker;
+ ceph::real_time mtime;
+};
+
+using part_info = fifo::part_header;
+
+namespace detail {
+template<typename Handler>
+class JournalProcessor;
+}
+
+/// Completions, Handlers, and CompletionTokens
+/// ===========================================
+///
+/// This class is based on Boost.Asio. For information, see
+/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio.html
+///
+/// As summary, Asio's design is that of functions taking completion
+/// handlers. Every handler has a signature, like
+/// (boost::system::error_code, std::string). The completion handler
+/// receives the result of the function, and the signature is the type
+/// of that result.
+///
+/// The completion handler is specified with a CompletionToken. The
+/// CompletionToken is any type that has a specialization of
+/// async_complete and async_result. See
+/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_completion.html
+/// and https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/async_result.html
+///
+/// The return type of a function taking a CompletionToken is
+/// async_result<CompletionToken, Signature>::return_type.
+///
+/// Functions
+/// ---------
+///
+/// The default implementations treat whatever value is described as a
+/// function, whose parameters correspond to the signature, and calls
+/// it upon completion.
+///
+/// EXAMPLE:
+/// Let f be an asynchronous function whose signature is (bs::error_code, int)
+/// Let g be an asynchronous function whose signature is
+/// (bs::error_code, int, std::string).
+///
+///
+/// f([](bs::error_code ec, int i) { ... });
+/// g([](bs::error_code ec, int i, std::string s) { ... });
+///
+/// Will schedule asynchronous tasks, and the provided lambdas will be
+/// called on completion. In this case, f and g return void.
+///
+/// There are other specializations. Commonly used ones are.
+///
+/// Futures
+/// -------
+///
+/// A CompletionToken of boost::asio::use_future will complete with a
+/// promise whose type matches (minus any initial error_code) the
+/// function's signature. The corresponding future is returned. If the
+/// error_code of the result is non-zero, the future is set with an
+/// exception of type boost::asio::system_error.
+///
+/// See https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/use_future_t.html
+///
+/// EXAMPLE:
+///
+/// std::future<int> = f(ba::use_future);
+/// std::future<std::tuple<int, std::string> = g(ba::use_future).
+///
+/// Coroutines
+/// ----------
+///
+/// A CompletionToken of type spawn::yield_context suspends execution
+/// of the current coroutine until completion of the operation. See
+/// src/spawn/README.md
+/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/spawn.html and
+/// https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/yield_context.html
+///
+/// Operations given this CompletionToken return their results, modulo
+/// any leading error_code. A non-zero error code will be thrown, by
+/// default, but may be bound to a variable instead with the overload
+/// of the array-subscript oeprator.
+///
+/// EXAMPLE:
+/// // Within a function with a yield_context parameter named y
+///
+/// try {
+/// int i = f(y);
+/// } catch (const bs::system_error& ec) { ... }
+///
+/// bs::error_code ec;
+/// auto [i, s] = g(y[ec]);
+///
+/// Blocking calls
+/// --------------
+///
+/// ceph::async::use_blocked, defined in src/common/async/blocked_completion.h
+/// Suspends the current thread of execution, returning the results of
+/// the operation on resumption. Its calling convention is analogous to
+/// that of yield_context.
+///
+/// EXAMPLE:
+/// try {
+/// int i = f(ca::use_blocked);
+/// } catch (const bs::system_error& e) { ... }
+///
+/// bs::error_code ec;
+/// auto [i, s] = g(ca::use_blocked[ec]);
+///
+/// librados Completions
+/// --------------------
+///
+/// If src/common/async/librados_completion.h is included in the
+/// current translation unit, then librados::AioCompletion* may be used
+/// as a CompletionToken. This is only permitted when the completion
+/// signature is either bs::system_error or void. The return type of
+/// functions provided a CompletionToken of AioCompletion* is void. If
+/// the signature includes an error code and the error code is set,
+/// then the error is translated to an int which is set as the result
+/// of the AioCompletion.
+///
+/// EXAMPLE:
+/// // Assume an asynchronous function h whose signature is bs::error_code.
+///
+/// AioCompletion* c = Rados::aio_create_completion();
+/// h(c);
+/// int r = c.get_return_value();
+///
+/// See also src/test/cls_fifo/bench_cls_fifo.cc for a full, simple
+/// example of a program using this class with coroutines.
+///
+///
+/// Markers
+/// =======
+///
+/// Markers represent a position within the FIFO. Internally, they are
+/// part/offset pairs. Externally, they are ordered but otherwise
+/// opaque strings. Markers that compare lower denote positions closer
+/// to the tail.
+///
+/// A marker is returned with every entry from a list() operation. They
+/// may be supplied to a list operation to resume from a given
+/// position, and must be supplied to trim give the position to which
+/// to trim.
+
+class FIFO {
+public:
+
+ FIFO(const FIFO&) = delete;
+ FIFO& operator =(const FIFO&) = delete;
+ FIFO(FIFO&&) = delete;
+ FIFO& operator =(FIFO&&) = delete;
+
+ /// Open an existing FIFO.
+ /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f)
+ template<typename CT>
+ static auto open(RADOS& r, //< RADOS handle
+ const IOContext& ioc, //< Context for pool, namespace, etc.
+ Object oid, //< OID for the 'main' object of the FIFO
+ CT&& ct, //< CompletionToken
+ /// Fail if is not this version
+ std::optional<fifo::objv> objv = std::nullopt,
+ /// Default executor. By default use the one
+ /// associated with the RADOS handle.
+ std::optional<ba::executor> executor = std::nullopt) {
+ ba::async_completion<CT, void(bs::error_code,
+ std::unique_ptr<FIFO>)> init(ct);
+ auto e = ba::get_associated_executor(init.completion_handler,
+ executor.value_or(r.get_executor()));
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ _read_meta_(
+ &r, oid, ioc, objv,
+ ca::bind_ea(
+ e, a,
+ [&r, ioc, oid, executor, handler = std::move(init.completion_handler)]
+ (bs::error_code ec, fifo::info info,
+ std::uint32_t size, std::uint32_t over) mutable {
+ std::unique_ptr<FIFO> f(
+ new FIFO(r, ioc, oid, executor.value_or(r.get_executor())));
+ f->info = info;
+ f->part_header_size = size;
+ f->part_entry_overhead = over;
+ // If there are journal entries, process them, in case
+ // someone crashed mid-transaction.
+ if (!ec && !info.journal.empty()) {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ auto g = f.get();
+ g->_process_journal(
+ ca::bind_ea(
+ e, a,
+ [f = std::move(f),
+ handler = std::move(handler)](bs::error_code ec) mutable {
+ std::move(handler)(ec, std::move(f));
+ }));
+ return;
+ }
+ std::move(handler)(ec, std::move(f));
+ return;
+ }));
+ return init.result.get();
+ }
+
+ /// Open an existing or create a new FIFO.
+ /// Signature: (bs::error_code ec, std::unique_ptr<FIFO> f)
+ template<typename CT>
+ static auto create(RADOS& r, /// RADOS handle
+ const IOContext& ioc, /// Context for pool, namespace, etc.
+ Object oid, /// OID for the 'main' object of the FIFO
+ CT&& ct, /// CompletionToken
+ /// Fail if FIFO exists and is not this version
+ std::optional<fifo::objv> objv = std::nullopt,
+ /// Custom prefix for parts
+ std::optional<std::string_view> oid_prefix = std::nullopt,
+ /// Fail if FIFO already exists
+ bool exclusive = false,
+ /// Size at which a part is considered full
+ std::uint64_t max_part_size = default_max_part_size,
+ /// Maximum size of any entry
+ std::uint64_t max_entry_size = default_max_entry_size,
+ /// Default executor. By default use the one
+ /// associated with the RADOS handle.
+ std::optional<ba::executor> executor = std::nullopt) {
+ ba::async_completion<CT, void(bs::error_code,
+ std::unique_ptr<FIFO>)> init(ct);
+ WriteOp op;
+ create_meta(op, oid, objv, oid_prefix, exclusive, max_part_size,
+ max_entry_size);
+ auto e = ba::get_associated_executor(init.completion_handler,
+ executor.value_or(r.get_executor()));
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ r.execute(
+ oid, ioc, std::move(op),
+ ca::bind_ea(
+ e, a,
+ [objv, &r, ioc, oid, executor, handler = std::move(init.completion_handler)]
+ (bs::error_code ec) mutable {
+ if (ec) {
+ std::move(handler)(ec, nullptr);
+ return;
+ }
+ auto e = ba::get_associated_executor(
+ handler, executor.value_or(r.get_executor()));
+ auto a = ba::get_associated_allocator(handler);
+ FIFO::_read_meta_(
+ &r, oid, ioc, objv,
+ ca::bind_ea(
+ e, a,
+ [&r, ioc, executor, oid, handler = std::move(handler)]
+ (bs::error_code ec, fifo::info info,
+ std::uint32_t size, std::uint32_t over) mutable {
+ std::unique_ptr<FIFO> f(
+ new FIFO(r, ioc, oid, executor.value_or(r.get_executor())));
+ f->info = info;
+ f->part_header_size = size;
+ f->part_entry_overhead = over;
+ if (!ec && !info.journal.empty()) {
+ auto e = ba::get_associated_executor(handler,
+ f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ auto g = f.get();
+ g->_process_journal(
+ ca::bind_ea(
+ e, a,
+ [f = std::move(f), handler = std::move(handler)]
+ (bs::error_code ec) mutable {
+ std::move(handler)(ec, std::move(f));
+ }));
+ return;
+ }
+ std::move(handler)(ec, std::move(f));
+ }));
+ }));
+ return init.result.get();
+ }
+
+ /// Force a re-read of FIFO metadata.
+ /// Signature: (bs::error_code ec)
+ template<typename CT>
+ auto read_meta(CT&& ct, //< CompletionToken
+ /// Fail if FIFO not at this version
+ std::optional<fifo::objv> objv = std::nullopt) {
+ std::unique_lock l(m);
+ auto version = info.version;
+ l.unlock();
+ ba::async_completion<CT, void(bs::error_code)> init(ct);
+ auto e = ba::get_associated_executor(init.completion_handler,
+ get_executor());
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ _read_meta_(
+ r, oid, ioc, objv,
+ ca::bind_ea(
+ e, a,
+ [this, version, handler = std::move(init.completion_handler)]
+ (bs::error_code ec, fifo::info newinfo,
+ std::uint32_t size, std::uint32_t over) mutable {
+ std::unique_lock l(m);
+ if (version == info.version) {
+ info = newinfo;
+ part_header_size = size;
+ part_entry_overhead = over;
+ }
+ l.unlock();
+ return std::move(handler)(ec);
+ }));
+ return init.result.get();
+ }
+
+ /// Return a reference to currently known metadata
+ const fifo::info& meta() const {
+ return info;
+ }
+
+ /// Return header size and entry overhead of partitions.
+ std::pair<std::uint32_t, std::uint32_t> get_part_layout_info() {
+ return {part_header_size, part_entry_overhead};
+ }
+
+ /// Push a single entry to the FIFO.
+ /// Signature: (bs::error_code)
+ template<typename CT>
+ auto push(const cb::list& bl, //< Bufferlist holding entry to push
+ CT&& ct //< CompletionToken
+ ) {
+ return push(std::vector{ bl }, std::forward<CT>(ct));
+ }
+
+ /// Push a many entries to the FIFO.
+ /// Signature: (bs::error_code)
+ template<typename CT>
+ auto push(const std::vector<cb::list>& data_bufs, //< Entries to push
+ CT&& ct //< CompletionToken
+ ) {
+ ba::async_completion<CT, void(bs::error_code)> init(ct);
+ std::unique_lock l(m);
+ auto max_entry_size = info.params.max_entry_size;
+ auto need_new_head = info.need_new_head();
+ l.unlock();
+ auto e = ba::get_associated_executor(init.completion_handler,
+ get_executor());
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ if (data_bufs.empty() ) {
+ // Can't fail if you don't try.
+ e.post(ca::bind_handler(std::move(init.completion_handler),
+ bs::error_code{}), a);
+ return init.result.get();
+ }
+
+ // Validate sizes
+ for (const auto& bl : data_bufs) {
+ if (bl.length() > max_entry_size) {
+ ldout(r->cct(), 10) << __func__ << "(): entry too large: "
+ << bl.length() << " > "
+ << info.params.max_entry_size << dendl;
+ e.post(ca::bind_handler(std::move(init.completion_handler),
+ errc::entry_too_large), a);
+ return init.result.get();
+ }
+ }
+
+ auto p = ca::bind_ea(e, a,
+ Pusher(this, {data_bufs.begin(), data_bufs.end()},
+ {}, 0, std::move(init.completion_handler)));
+
+ if (need_new_head) {
+ _prepare_new_head(std::move(p));
+ } else {
+ e.dispatch(std::move(p), a);
+ }
+ return init.result.get();
+ }
+
+ /// List the entries in a FIFO
+ /// Signature(bs::error_code ec, bs::vector<list_entry> entries, bool more)
+ ///
+ /// More is true if entries beyond the last exist.
+ /// The list entries are of the form:
+ /// data - Contents of the entry
+ /// marker - String representing the position of this entry within the FIFO.
+ /// mtime - Time (on the OSD) at which the entry was pushed.
+ template<typename CT>
+ auto list(int max_entries, //< Maximum number of entries to fetch
+ /// Optionally, a marker indicating the position after
+ /// which to begin listing. If null, begin at the tail.
+ std::optional<std::string_view> markstr,
+ CT&& ct //< CompletionToken
+ ) {
+ ba::async_completion<CT, void(bs::error_code,
+ std::vector<list_entry>, bool)> init(ct);
+ std::unique_lock l(m);
+ std::int64_t part_num = info.tail_part_num;
+ l.unlock();
+ std::uint64_t ofs = 0;
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ auto e = ba::get_associated_executor(init.completion_handler);
+
+ if (markstr) {
+ auto marker = to_marker(*markstr);
+ if (!marker) {
+ ldout(r->cct(), 0) << __func__
+ << "(): failed to parse marker (" << *markstr
+ << ")" << dendl;
+ e.post(ca::bind_handler(std::move(init.completion_handler),
+ errc::invalid_marker,
+ std::vector<list_entry>{}, false), a);
+ return init.result.get();
+ }
+ part_num = marker->num;
+ ofs = marker->ofs;
+ }
+
+ using handler_type = decltype(init.completion_handler);
+ auto ls = ceph::allocate_unique<Lister<handler_type>>(
+ a, this, part_num, ofs, max_entries,
+ std::move(init.completion_handler));
+ ls.release()->list();
+ return init.result.get();
+ }
+
+ /// Trim entries from the tail to the given position
+ /// Signature: (bs::error_code)
+ template<typename CT>
+ auto trim(std::string_view markstr, //< Position to which to trim, inclusive
+ bool exclusive, //< If true, trim markers up to but NOT INCLUDING
+ //< markstr, otherwise trim markstr as well.
+ CT&& ct //< CompletionToken
+ ) {
+ auto m = to_marker(markstr);
+ ba::async_completion<CT, void(bs::error_code)> init(ct);
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ auto e = ba::get_associated_executor(init.completion_handler);
+ if (!m) {
+ ldout(r->cct(), 0) << __func__ << "(): failed to parse marker: marker="
+ << markstr << dendl;
+ e.post(ca::bind_handler(std::move(init.completion_handler),
+ errc::invalid_marker), a);
+ return init.result.get();
+ } else {
+ using handler_type = decltype(init.completion_handler);
+ auto t = ceph::allocate_unique<Trimmer<handler_type>>(
+ a, this, m->num, m->ofs, exclusive, std::move(init.completion_handler));
+ t.release()->trim();
+ }
+ return init.result.get();
+ }
+
+ /// Get information about a specific partition
+ /// Signature: (bs::error_code, part_info)
+ ///
+ /// part_info has the following entries
+ /// tag - A random string identifying this partition. Used internally
+ /// as a sanity check to make sure operations haven't been misdirected
+ /// params - Data parameters, identical for every partition within a
+ /// FIFO and the same as what is returned from get_part_layout()
+ /// magic - A random magic number, used internally as a prefix to
+ /// every entry stored on the OSD to ensure sync
+ /// min_ofs - Offset of the first entry
+ /// max_ofs - Offset of the highest entry
+ /// min_index - Minimum entry index
+ /// max_index - Maximum entry index
+ /// max_time - Time of the latest push
+ ///
+ /// The difference between ofs and index is that ofs is a byte
+ /// offset. Index is a count. Nothing really uses indices, but
+ /// they're tracked and sanity-checked as an invariant on the OSD.
+ ///
+ /// max_ofs and max_time are the two that have been used externally
+ /// so far.
+ template<typename CT>
+ auto get_part_info(int64_t part_num, // The number of the partition
+ CT&& ct // CompletionToken
+ ) {
+
+ ba::async_completion<CT, void(bs::error_code, part_info)> init(ct);
+ fifo::op::get_part_info gpi;
+ cb::list in;
+ encode(gpi, in);
+ ReadOp op;
+ auto e = ba::get_associated_executor(init.completion_handler,
+ get_executor());
+ auto a = ba::get_associated_allocator(init.completion_handler);
+ auto reply = ceph::allocate_unique<
+ ExecDecodeCB<fifo::op::get_part_info_reply>>(a);
+
+ op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
+ std::ref(*reply));
+ std::unique_lock l(m);
+ auto part_oid = info.part_oid(part_num);
+ l.unlock();
+ r->execute(part_oid, ioc, std::move(op), nullptr,
+ ca::bind_ea(e, a,
+ PartInfoGetter(std::move(init.completion_handler),
+ std::move(reply))));
+ return init.result.get();
+ }
+
+ using executor_type = ba::executor;
+
+ /// Return the default executor, as specified at creation.
+ ba::executor get_executor() const {
+ return executor;
+ }
+
+private:
+ template<typename Handler>
+ friend class detail::JournalProcessor;
+ RADOS* const r;
+ const IOContext ioc;
+ const Object oid;
+ std::mutex m;
+
+ fifo::info info;
+
+ std::uint32_t part_header_size = 0xdeadbeef;
+ std::uint32_t part_entry_overhead = 0xdeadbeef;
+
+ ba::executor executor;
+
+ std::optional<marker> to_marker(std::string_view s);
+
+ template<typename Handler, typename T>
+ static void assoc_delete(const Handler& handler, T* t) {
+ typename std::allocator_traits<typename ba::associated_allocator<Handler>::type>
+ ::template rebind_alloc<T> a(
+ ba::get_associated_allocator(handler));
+ a.destroy(t);
+ a.deallocate(t, 1);
+ }
+
+ FIFO(RADOS& r,
+ IOContext ioc,
+ Object oid,
+ ba::executor executor)
+ : r(&r), ioc(std::move(ioc)), oid(oid), executor(executor) {}
+
+ std::string generate_tag() const;
+
+ template <typename T>
+ struct ExecDecodeCB {
+ bs::error_code ec;
+ T result;
+ void operator()(bs::error_code e, const cb::list& r) {
+ if (e) {
+ ec = e;
+ return;
+ }
+ try {
+ auto p = r.begin();
+ using ceph::decode;
+ decode(result, p);
+ } catch (const cb::error& err) {
+ ec = err.code();
+ }
+ }
+ };
+
+ template<typename Handler>
+ class MetaReader {
+ Handler handler;
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ using decoder_type = ExecDecodeCB<fifo::op::get_meta_reply>;
+ using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
+ decoder_ptr decoder;
+ public:
+ MetaReader(Handler&& handler, decoder_ptr&& decoder)
+ : handler(std::move(handler)), decoder(std::move(decoder)) {}
+
+ void operator ()(bs::error_code ec) {
+ if (!ec) {
+ ec = decoder->ec;
+ }
+ auto reply = std::move(decoder->result);
+ decoder.reset(); // free handler-allocated memory before dispatching
+
+ std::move(handler)(ec, std::move(reply.info),
+ std::move(reply.part_header_size),
+ std::move(reply.part_entry_overhead));
+ }
+ };
+
+ // Renamed to get around a compiler bug in Bionic that kept
+ // complaining we weren't capturing 'this' to make a static function call.
+ template<typename Handler>
+ static void _read_meta_(RADOS* r, const Object& oid, const IOContext& ioc,
+ std::optional<fifo::objv> objv,
+ Handler&& handler, /* error_code, info, uint64,
+ uint64 */
+ std::optional<ba::executor> executor = std::nullopt){
+ fifo::op::get_meta gm;
+
+ gm.version = objv;
+
+ cb::list in;
+ encode(gm, in);
+ ReadOp op;
+
+ auto a = ba::get_associated_allocator(handler);
+ auto reply =
+ ceph::allocate_unique<ExecDecodeCB<fifo::op::get_meta_reply>>(a);
+
+ auto e = ba::get_associated_executor(handler);
+ op.exec(fifo::op::CLASS, fifo::op::GET_META, in, std::ref(*reply));
+ r->execute(oid, ioc, std::move(op), nullptr,
+ ca::bind_ea(e, a, MetaReader(std::move(handler),
+ std::move(reply))));
+ };
+
+ template<typename Handler>
+ void _read_meta(Handler&& handler /* error_code */) {
+ auto e = ba::get_associated_executor(handler, get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ _read_meta_(r, oid, ioc,
+ nullopt,
+ ca::bind_ea(
+ e, a,
+ [this,
+ handler = std::move(handler)](bs::error_code ec,
+ fifo::info&& info,
+ std::uint64_t phs,
+ std::uint64_t peo) mutable {
+ std::unique_lock l(m);
+ if (ec) {
+ l.unlock();
+ std::move(handler)(ec);
+ return;
+ }
+ // We have a newer version already!
+ if (!info.version.same_or_later(this->info.version)) {
+ l.unlock();
+ std::move(handler)(bs::error_code{});
+ return;
+ }
+ this->info = std::move(info);
+ part_header_size = phs;
+ part_entry_overhead = peo;
+ l.unlock();
+ std::move(handler)(bs::error_code{});
+ }), get_executor());
+ }
+
+ bs::error_code apply_update(fifo::info* info,
+ const fifo::objv& objv,
+ const fifo::update& update);
+
+
+ template<typename Handler>
+ void _update_meta(const fifo::update& update,
+ fifo::objv version,
+ Handler&& handler /* error_code, bool */) {
+ WriteOp op;
+
+ cls::fifo::update_meta(op, info.version, update);
+
+ auto a = ba::get_associated_allocator(handler);
+ auto e = ba::get_associated_executor(handler, get_executor());
+
+ r->execute(
+ oid, ioc, std::move(op),
+ ca::bind_ea(
+ e, a,
+ [this, e, a, version, update,
+ handler = std::move(handler)](bs::error_code ec) mutable {
+ if (ec && ec != bs::errc::operation_canceled) {
+ std::move(handler)(ec, bool{});
+ return;
+ }
+
+ auto canceled = (ec == bs::errc::operation_canceled);
+
+ if (!canceled) {
+ ec = apply_update(&info,
+ version,
+ update);
+ if (ec) {
+ canceled = true;
+ }
+ }
+
+ if (canceled) {
+ _read_meta(
+ ca::bind_ea(
+ e, a,
+ [handler = std::move(handler)](bs::error_code ec) mutable {
+ std::move(handler)(ec, ec ? false : true);
+ }));
+ return;
+ }
+ std::move(handler)(ec, false);
+ return;
+ }));
+ }
+
+ template<typename Handler>
+ auto _process_journal(Handler&& handler /* error_code */) {
+ auto a = ba::get_associated_allocator(std::ref(handler));
+ auto j = ceph::allocate_unique<detail::JournalProcessor<Handler>>(
+ a, this, std::move(handler));
+ auto p = j.release();
+ p->process();
+ }
+
+ template<typename Handler>
+ class NewPartPreparer {
+ FIFO* f;
+ Handler handler;
+ std::vector<fifo::journal_entry> jentries;
+ int i;
+ std::int64_t new_head_part_num;
+
+ public:
+
+ void operator ()(bs::error_code ec, bool canceled) {
+ if (ec) {
+ std::move(handler)(ec);
+ return;
+ }
+
+ if (canceled) {
+ std::unique_lock l(f->m);
+ auto iter = f->info.journal.find(jentries.front().part_num);
+ auto max_push_part_num = f->info.max_push_part_num;
+ auto head_part_num = f->info.head_part_num;
+ auto version = f->info.version;
+ auto found = (iter != f->info.journal.end());
+ l.unlock();
+ if ((max_push_part_num >= jentries.front().part_num &&
+ head_part_num >= new_head_part_num)) {
+ /* raced, but new part was already written */
+ std::move(handler)(bs::error_code{});
+ return;
+ }
+ if (i >= MAX_RACE_RETRIES) {
+ std::move(handler)(errc::raced);
+ return;
+ }
+ if (!found) {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->_update_meta(fifo::update{}
+ .journal_entries_add(jentries),
+ version,
+ ca::bind_ea(
+ e, a,
+ NewPartPreparer(f, std::move(handler),
+ jentries,
+ i + 1, new_head_part_num)));
+ return;
+ }
+ // Fall through. We still need to process the journal.
+ }
+ f->_process_journal(std::move(handler));
+ return;
+ }
+
+ NewPartPreparer(FIFO* f,
+ Handler&& handler,
+ std::vector<fifo::journal_entry> jentries,
+ int i, std::int64_t new_head_part_num)
+ : f(f), handler(std::move(handler)), jentries(std::move(jentries)),
+ i(i), new_head_part_num(new_head_part_num) {}
+ };
+
+ template<typename Handler>
+ void _prepare_new_part(bool is_head,
+ Handler&& handler /* error_code */) {
+ std::unique_lock l(m);
+ std::vector jentries = { info.next_journal_entry(generate_tag()) };
+ std::int64_t new_head_part_num = info.head_part_num;
+ auto version = info.version;
+
+ if (is_head) {
+ auto new_head_jentry = jentries.front();
+ new_head_jentry.op = fifo::journal_entry::Op::set_head;
+ new_head_part_num = jentries.front().part_num;
+ jentries.push_back(std::move(new_head_jentry));
+ }
+ l.unlock();
+
+ auto e = ba::get_associated_executor(handler, get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ _update_meta(fifo::update{}.journal_entries_add(jentries),
+ version,
+ ca::bind_ea(
+ e, a,
+ NewPartPreparer(this, std::move(handler),
+ jentries, 0, new_head_part_num)));
+ }
+
+ template<typename Handler>
+ class NewHeadPreparer {
+ FIFO* f;
+ Handler handler;
+ int i;
+ std::int64_t new_head_num;
+
+ public:
+
+ void operator ()(bs::error_code ec, bool canceled) {
+ std::unique_lock l(f->m);
+ auto head_part_num = f->info.head_part_num;
+ auto version = f->info.version;
+ l.unlock();
+
+ if (ec) {
+ std::move(handler)(ec);
+ return;
+ }
+ if (canceled) {
+ if (i >= MAX_RACE_RETRIES) {
+ std::move(handler)(errc::raced);
+ return;
+ }
+
+ // Raced, but there's still work to do!
+ if (head_part_num < new_head_num) {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->_update_meta(fifo::update{}.head_part_num(new_head_num),
+ version,
+ ca::bind_ea(
+ e, a,
+ NewHeadPreparer(f, std::move(handler),
+ i + 1,
+ new_head_num)));
+ return;
+ }
+ }
+ // Either we succeeded, or we were raced by someone who did it for us.
+ std::move(handler)(bs::error_code{});
+ return;
+ }
+
+ NewHeadPreparer(FIFO* f,
+ Handler&& handler,
+ int i, std::int64_t new_head_num)
+ : f(f), handler(std::move(handler)), i(i), new_head_num(new_head_num) {}
+ };
+
+ template<typename Handler>
+ void _prepare_new_head(Handler&& handler /* error_code */) {
+ std::unique_lock l(m);
+ int64_t new_head_num = info.head_part_num + 1;
+ auto max_push_part_num = info.max_push_part_num;
+ auto version = info.version;
+ l.unlock();
+
+ if (max_push_part_num < new_head_num) {
+ auto e = ba::get_associated_executor(handler, get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ _prepare_new_part(
+ true,
+ ca::bind_ea(
+ e, a,
+ [this, new_head_num,
+ handler = std::move(handler)](bs::error_code ec) mutable {
+ if (ec) {
+ handler(ec);
+ return;
+ }
+ std::unique_lock l(m);
+ if (info.max_push_part_num < new_head_num) {
+ l.unlock();
+ ldout(r->cct(), 0)
+ << "ERROR: " << __func__
+ << ": after new part creation: meta_info.max_push_part_num="
+ << info.max_push_part_num << " new_head_num="
+ << info.max_push_part_num << dendl;
+ std::move(handler)(errc::inconsistency);
+ } else {
+ l.unlock();
+ std::move(handler)(bs::error_code{});
+ }
+ }));
+ return;
+ }
+ auto e = ba::get_associated_executor(handler, get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ _update_meta(fifo::update{}.head_part_num(new_head_num),
+ version,
+ ca::bind_ea(
+ e, a,
+ NewHeadPreparer(this, std::move(handler), 0,
+ new_head_num)));
+ }
+
+ template<typename T>
+ struct ExecHandleCB {
+ bs::error_code ec;
+ T result;
+ void operator()(bs::error_code e, const T& t) {
+ if (e) {
+ ec = e;
+ return;
+ }
+ result = t;
+ }
+ };
+
+ template<typename Handler>
+ class EntryPusher {
+ Handler handler;
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ using decoder_type = ExecHandleCB<int>;
+ using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
+ decoder_ptr decoder;
+
+ public:
+
+ EntryPusher(Handler&& handler, decoder_ptr&& decoder)
+ : handler(std::move(handler)), decoder(std::move(decoder)) {}
+
+ void operator ()(bs::error_code ec) {
+ if (!ec) {
+ ec = decoder->ec;
+ }
+ auto reply = std::move(decoder->result);
+ decoder.reset(); // free handler-allocated memory before dispatching
+
+ std::move(handler)(ec, std::move(reply));
+ }
+ };
+
+ template<typename Handler>
+ auto push_entries(const std::deque<cb::list>& data_bufs,
+ Handler&& handler /* error_code, int */) {
+ WriteOp op;
+ std::unique_lock l(m);
+ auto head_part_num = info.head_part_num;
+ auto tag = info.head_tag;
+ auto oid = info.part_oid(head_part_num);
+ l.unlock();
+
+ auto a = ba::get_associated_allocator(handler);
+ auto reply = ceph::allocate_unique<ExecHandleCB<int>>(a);
+
+ auto e = ba::get_associated_executor(handler, get_executor());
+ push_part(op, tag, data_bufs, std::ref(*reply));
+ return r->execute(oid, ioc, std::move(op),
+ ca::bind_ea(e, a, EntryPusher(std::move(handler),
+ std::move(reply))));
+ }
+
+ template<typename CT>
+ auto trim_part(int64_t part_num,
+ uint64_t ofs,
+ std::optional<std::string_view> tag,
+ bool exclusive,
+ CT&& ct) {
+ WriteOp op;
+ cls::fifo::trim_part(op, tag, ofs, exclusive);
+ return r->execute(info.part_oid(part_num), ioc, std::move(op),
+ std::forward<CT>(ct));
+ }
+
+
+ template<typename Handler>
+ class Pusher {
+ FIFO* f;
+ std::deque<cb::list> remaining;
+ std::deque<cb::list> batch;
+ int i;
+ Handler handler;
+
+ void prep_then_push(const unsigned successes) {
+ std::unique_lock l(f->m);
+ auto max_part_size = f->info.params.max_part_size;
+ auto part_entry_overhead = f->part_entry_overhead;
+ l.unlock();
+
+ uint64_t batch_len = 0;
+ if (successes > 0) {
+ if (successes == batch.size()) {
+ batch.clear();
+ } else {
+ batch.erase(batch.begin(), batch.begin() + successes);
+ for (const auto& b : batch) {
+ batch_len += b.length() + part_entry_overhead;
+ }
+ }
+ }
+
+ if (batch.empty() && remaining.empty()) {
+ std::move(handler)(bs::error_code{});
+ return;
+ }
+
+ while (!remaining.empty() &&
+ (remaining.front().length() + batch_len <= max_part_size)) {
+
+ /* We can send entries with data_len up to max_entry_size,
+ however, we want to also account the overhead when
+ dealing with multiple entries. Previous check doesn't
+ account for overhead on purpose. */
+ batch_len += remaining.front().length() + part_entry_overhead;
+ batch.push_back(std::move(remaining.front()));
+ remaining.pop_front();
+ }
+ push();
+ }
+
+ void push() {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->push_entries(batch,
+ ca::bind_ea(e, a,
+ Pusher(f, std::move(remaining),
+ batch, i,
+ std::move(handler))));
+ }
+
+ public:
+
+ // Initial call!
+ void operator ()() {
+ prep_then_push(0);
+ }
+
+ // Called with response to push_entries
+ void operator ()(bs::error_code ec, int r) {
+ if (ec == bs::errc::result_out_of_range) {
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->_prepare_new_head(
+ ca::bind_ea(e, a,
+ Pusher(f, std::move(remaining),
+ std::move(batch), i,
+ std::move(handler))));
+ return;
+ }
+ if (ec) {
+ std::move(handler)(ec);
+ return;
+ }
+ i = 0; // We've made forward progress, so reset the race counter!
+ prep_then_push(r);
+ }
+
+ // Called with response to prepare_new_head
+ void operator ()(bs::error_code ec) {
+ if (ec == bs::errc::operation_canceled) {
+ if (i == MAX_RACE_RETRIES) {
+ ldout(f->r->cct(), 0)
+ << "ERROR: " << __func__
+ << "(): race check failed too many times, likely a bug" << dendl;
+ std::move(handler)(make_error_code(errc::raced));
+ return;
+ }
+ ++i;
+ } else if (ec) {
+ std::move(handler)(ec);
+ return;
+ }
+
+ if (batch.empty()) {
+ prep_then_push(0);
+ return;
+ } else {
+ push();
+ return;
+ }
+ }
+
+ Pusher(FIFO* f, std::deque<cb::list>&& remaining,
+ std::deque<cb::list> batch, int i,
+ Handler&& handler)
+ : f(f), remaining(std::move(remaining)),
+ batch(std::move(batch)), i(i),
+ handler(std::move(handler)) {}
+ };
+
+ template<typename Handler>
+ class Lister {
+ FIFO* f;
+ std::vector<list_entry> result;
+ bool more = false;
+ std::int64_t part_num;
+ std::uint64_t ofs;
+ int max_entries;
+ bs::error_code ec_out;
+ std::vector<fifo::part_list_entry> entries;
+ bool part_more = false;
+ bool part_full = false;
+ Handler handler;
+
+ void handle(bs::error_code ec) {
+ auto h = std::move(handler);
+ auto m = more;
+ auto r = std::move(result);
+
+ FIFO::assoc_delete(h, this);
+ std::move(h)(ec, std::move(r), m);
+ }
+
+ public:
+ Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
+ Handler&& handler)
+ : f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
+ handler(std::move(handler)) {
+ result.reserve(max_entries);
+ }
+
+
+ Lister(const Lister&) = delete;
+ Lister& operator =(const Lister&) = delete;
+ Lister(Lister&&) = delete;
+ Lister& operator =(Lister&&) = delete;
+
+ void list() {
+ if (max_entries > 0) {
+ ReadOp op;
+ ec_out.clear();
+ part_more = false;
+ part_full = false;
+ entries.clear();
+
+ std::unique_lock l(f->m);
+ auto part_oid = f->info.part_oid(part_num);
+ l.unlock();
+
+ list_part(op,
+ {},
+ ofs,
+ max_entries,
+ &ec_out,
+ &entries,
+ &part_more,
+ &part_full,
+ nullptr);
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->r->execute(
+ part_oid,
+ f->ioc,
+ std::move(op),
+ nullptr,
+ ca::bind_ea(
+ e, a,
+ [t = std::unique_ptr<Lister>(this), this,
+ part_oid](bs::error_code ec) mutable {
+ t.release();
+ if (ec == bs::errc::no_such_file_or_directory) {
+ auto e = ba::get_associated_executor(handler,
+ f->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ f->_read_meta(
+ ca::bind_ea(
+ e, a,
+ [this](bs::error_code ec) mutable {
+ if (ec) {
+ handle(ec);
+ return;
+ }
+
+ if (part_num < f->info.tail_part_num) {
+ /* raced with trim? restart */
+ max_entries += result.size();
+ result.clear();
+ part_num = f->info.tail_part_num;
+ ofs = 0;
+ list();
+ }
+ /* assuming part was not written yet, so end of data */
+ more = false;
+ handle({});
+ return;
+ }));
+ return;
+ }
+ if (ec) {
+ ldout(f->r->cct(), 0)
+ << __func__
+ << "(): list_part() on oid=" << part_oid
+ << " returned ec=" << ec.message() << dendl;
+ handle(ec);
+ return;
+ }
+ if (ec_out) {
+ ldout(f->r->cct(), 0)
+ << __func__
+ << "(): list_part() on oid=" << f->info.part_oid(part_num)
+ << " returned ec=" << ec_out.message() << dendl;
+ handle(ec_out);
+ return;
+ }
+
+ more = part_full || part_more;
+ for (auto& entry : entries) {
+ list_entry e;
+ e.data = std::move(entry.data);
+ e.marker = marker{part_num, entry.ofs}.to_string();
+ e.mtime = entry.mtime;
+ result.push_back(std::move(e));
+ }
+ max_entries -= entries.size();
+ entries.clear();
+ if (max_entries > 0 &&
+ part_more) {
+ list();
+ return;
+ }
+
+ if (!part_full) { /* head part is not full */
+ handle({});
+ return;
+ }
+ ++part_num;
+ ofs = 0;
+ list();
+ }));
+ } else {
+ handle({});
+ return;
+ }
+ }
+ };
+
+ template<typename Handler>
+ class Trimmer {
+ FIFO* f;
+ std::int64_t part_num;
+ std::uint64_t ofs;
+ bool exclusive;
+ Handler handler;
+ std::int64_t pn;
+ int i = 0;
+
+ void handle(bs::error_code ec) {
+ auto h = std::move(handler);
+
+ FIFO::assoc_delete(h, this);
+ return std::move(h)(ec);
+ }
+
+ void update() {
+ std::unique_lock l(f->m);
+ auto objv = f->info.version;
+ l.unlock();
+ auto a = ba::get_associated_allocator(handler);
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ f->_update_meta(
+ fifo::update{}.tail_part_num(part_num),
+ objv,
+ ca::bind_ea(
+ e, a,
+ [this, t = std::unique_ptr<Trimmer>(this)](bs::error_code ec,
+ bool canceled) mutable {
+ t.release();
+ if (canceled)
+ if (i >= MAX_RACE_RETRIES) {
+ ldout(f->r->cct(), 0)
+ << "ERROR: " << __func__
+ << "(): race check failed too many times, likely a bug"
+ << dendl;
+ handle(errc::raced);
+ return;
+ }
+ std::unique_lock l(f->m);
+ auto tail_part_num = f->info.tail_part_num;
+ l.unlock();
+ if (tail_part_num < part_num) {
+ ++i;
+ update();
+ return;
+ }
+ handle({});
+ return;
+ }));
+ }
+
+ public:
+ Trimmer(FIFO* f, std::int64_t part_num, std::uint64_t ofs,
+ bool exclusive, Handler&& handler)
+ : f(f), part_num(part_num), ofs(ofs), exclusive(exclusive),
+ handler(std::move(handler)) {
+ std::unique_lock l(f->m);
+ pn = f->info.tail_part_num;
+ }
+
+ void trim() {
+ auto a = ba::get_associated_allocator(handler);
+ auto e = ba::get_associated_executor(handler, f->get_executor());
+ if (pn < part_num) {
+ std::unique_lock l(f->m);
+ auto max_part_size = f->info.params.max_part_size;
+ l.unlock();
+ f->trim_part(
+ pn, max_part_size, std::nullopt,
+ false,
+ ca::bind_ea(
+ e, a,
+ [t = std::unique_ptr<Trimmer>(this),
+ this](bs::error_code ec) mutable {
+ t.release();
+ if (ec && ec != bs::errc::no_such_file_or_directory) {
+ ldout(f->r->cct(), 0)
+ << __func__ << "(): ERROR: trim_part() on part="
+ << pn << " returned ec=" << ec.message() << dendl;
+ handle(ec);
+ return;
+ }
+ ++pn;
+ trim();
+ }));
+ return;
+ }
+ f->trim_part(
+ part_num, ofs, std::nullopt, exclusive,
+ ca::bind_ea(
+ e, a,
+ [t = std::unique_ptr<Trimmer>(this),
+ this](bs::error_code ec) mutable {
+ t.release();
+ if (ec && ec != bs::errc::no_such_file_or_directory) {
+ ldout(f->r->cct(), 0)
+ << __func__ << "(): ERROR: trim_part() on part=" << part_num
+ << " returned ec=" << ec.message() << dendl;
+ handle(ec);
+ return;
+ }
+ std::unique_lock l(f->m);
+ auto tail_part_num = f->info.tail_part_num;
+ l.unlock();
+ if (part_num <= tail_part_num) {
+ /* don't need to modify meta info */
+ handle({});
+ return;
+ }
+ update();
+ }));
+ }
+ };
+
+ template<typename Handler>
+ class PartInfoGetter {
+ Handler handler;
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ using decoder_type = ExecDecodeCB<fifo::op::get_part_info_reply>;
+ using decoder_ptr = ceph::allocated_unique_ptr<decoder_type, allocator_type>;
+ decoder_ptr decoder;
+ public:
+ PartInfoGetter(Handler&& handler, decoder_ptr&& decoder)
+ : handler(std::move(handler)), decoder(std::move(decoder)) {}
+
+ void operator ()(bs::error_code ec) {
+ if (!ec) {
+ ec = decoder->ec;
+ }
+ auto reply = std::move(decoder->result);
+ decoder.reset(); // free handler-allocated memory before dispatching
+
+ auto p = ca::bind_handler(std::move(handler),
+ ec, std::move(reply.header));
+ std::move(p)();
+ }
+ };
+
+
+};
+
+namespace detail {
+template<typename Handler>
+class JournalProcessor {
+private:
+ FIFO* const fifo;
+ Handler handler;
+
+ std::vector<fifo::journal_entry> processed;
+ std::multimap<std::int64_t, fifo::journal_entry> journal;
+ std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
+ std::int64_t new_tail;
+ std::int64_t new_head;
+ std::int64_t new_max;
+ int race_retries = 0;
+
+ template<typename CT>
+ auto create_part(int64_t part_num, std::string_view tag, CT&& ct) {
+ WriteOp op;
+ op.create(false); /* We don't need exclusivity, part_init ensures
+ we're creating from the same journal entry. */
+ std::unique_lock l(fifo->m);
+ part_init(op, tag, fifo->info.params);
+ auto oid = fifo->info.part_oid(part_num);
+ l.unlock();
+ return fifo->r->execute(oid, fifo->ioc,
+ std::move(op), std::forward<CT>(ct));
+ }
+
+ template<typename CT>
+ auto remove_part(int64_t part_num, std::string_view tag, CT&& ct) {
+ WriteOp op;
+ op.remove();
+ std::unique_lock l(fifo->m);
+ auto oid = fifo->info.part_oid(part_num);
+ l.unlock();
+ return fifo->r->execute(oid, fifo->ioc,
+ std::move(op), std::forward<CT>(ct));
+ }
+
+ template<typename PP>
+ void process_journal_entry(const fifo::journal_entry& entry,
+ PP&& pp) {
+ switch (entry.op) {
+ case fifo::journal_entry::Op::unknown:
+ std::move(pp)(errc::inconsistency);
+ return;
+ break;
+
+ case fifo::journal_entry::Op::create:
+ create_part(entry.part_num, entry.part_tag, std::move(pp));
+ return;
+ break;
+ case fifo::journal_entry::Op::set_head:
+ ba::post(ba::get_associated_executor(handler, fifo->get_executor()),
+ [pp = std::move(pp)]() mutable {
+ std::move(pp)(bs::error_code{});
+ });
+ return;
+ break;
+ case fifo::journal_entry::Op::remove:
+ remove_part(entry.part_num, entry.part_tag, std::move(pp));
+ return;
+ break;
+ }
+ std::move(pp)(errc::inconsistency);
+ return;
+ }
+
+ auto journal_entry_finisher(const fifo::journal_entry& entry) {
+ auto a = ba::get_associated_allocator(handler);
+ auto e = ba::get_associated_executor(handler, fifo->get_executor());
+ return
+ ca::bind_ea(
+ e, a,
+ [t = std::unique_ptr<JournalProcessor>(this), this,
+ entry](bs::error_code ec) mutable {
+ t.release();
+ if (entry.op == fifo::journal_entry::Op::remove &&
+ ec == bs::errc::no_such_file_or_directory)
+ ec.clear();
+
+ if (ec) {
+ ldout(fifo->r->cct(), 0)
+ << __func__
+ << "(): ERROR: failed processing journal entry for part="
+ << entry.part_num << " with error " << ec.message()
+ << " Bug or inconsistency." << dendl;
+ handle(errc::inconsistency);
+ return;
+ } else {
+ switch (entry.op) {
+ case fifo::journal_entry::Op::unknown:
+ // Can't happen. Filtered out in process_journal_entry.
+ abort();
+ break;
+
+ case fifo::journal_entry::Op::create:
+ if (entry.part_num > new_max) {
+ new_max = entry.part_num;
+ }
+ break;
+ case fifo::journal_entry::Op::set_head:
+ if (entry.part_num > new_head) {
+ new_head = entry.part_num;
+ }
+ break;
+ case fifo::journal_entry::Op::remove:
+ if (entry.part_num >= new_tail) {
+ new_tail = entry.part_num + 1;
+ }
+ break;
+ }
+ processed.push_back(entry);
+ }
+ ++iter;
+ process();
+ });
+ }
+
+ struct JournalPostprocessor {
+ std::unique_ptr<JournalProcessor> j_;
+ bool first;
+ void operator ()(bs::error_code ec, bool canceled) {
+ std::optional<int64_t> tail_part_num;
+ std::optional<int64_t> head_part_num;
+ std::optional<int64_t> max_part_num;
+
+ auto j = j_.release();
+
+ if (!first && !ec && !canceled) {
+ j->handle({});
+ return;
+ }
+
+ if (canceled) {
+ if (j->race_retries >= MAX_RACE_RETRIES) {
+ ldout(j->fifo->r->cct(), 0) << "ERROR: " << __func__ <<
+ "(): race check failed too many times, likely a bug" << dendl;
+ j->handle(errc::raced);
+ return;
+ }
+
+ ++j->race_retries;
+
+ std::vector<fifo::journal_entry> new_processed;
+ std::unique_lock l(j->fifo->m);
+ for (auto& e : j->processed) {
+ auto jiter = j->fifo->info.journal.find(e.part_num);
+ /* journal entry was already processed */
+ if (jiter == j->fifo->info.journal.end() ||
+ !(jiter->second == e)) {
+ continue;
+ }
+ new_processed.push_back(e);
+ }
+ j->processed = std::move(new_processed);
+ }
+
+ std::unique_lock l(j->fifo->m);
+ auto objv = j->fifo->info.version;
+ if (j->new_tail > j->fifo->info.tail_part_num) {
+ tail_part_num = j->new_tail;
+ }
+
+ if (j->new_head > j->fifo->info.head_part_num) {
+ head_part_num = j->new_head;
+ }
+
+ if (j->new_max > j->fifo->info.max_push_part_num) {
+ max_part_num = j->new_max;
+ }
+ l.unlock();
+
+ if (j->processed.empty() &&
+ !tail_part_num &&
+ !max_part_num) {
+ /* nothing to update anymore */
+ j->handle({});
+ return;
+ }
+ auto a = ba::get_associated_allocator(j->handler);
+ auto e = ba::get_associated_executor(j->handler, j->fifo->get_executor());
+ j->fifo->_update_meta(fifo::update{}
+ .tail_part_num(tail_part_num)
+ .head_part_num(head_part_num)
+ .max_push_part_num(max_part_num)
+ .journal_entries_rm(j->processed),
+ objv,
+ ca::bind_ea(
+ e, a,
+ JournalPostprocessor{j, false}));
+ return;
+ }
+
+ JournalPostprocessor(JournalProcessor* j, bool first)
+ : j_(j), first(first) {}
+ };
+
+ void postprocess() {
+ if (processed.empty()) {
+ handle({});
+ return;
+ }
+ JournalPostprocessor(this, true)({}, false);
+ }
+
+ void handle(bs::error_code ec) {
+ auto e = ba::get_associated_executor(handler, fifo->get_executor());
+ auto a = ba::get_associated_allocator(handler);
+ auto h = std::move(handler);
+ FIFO::assoc_delete(h, this);
+ e.dispatch(ca::bind_handler(std::move(h), ec), a);
+ return;
+ }
+
+public:
+
+ JournalProcessor(FIFO* fifo, Handler&& handler)
+ : fifo(fifo), handler(std::move(handler)) {
+ std::unique_lock l(fifo->m);
+ journal = fifo->info.journal;
+ iter = journal.begin();
+ new_tail = fifo->info.tail_part_num;
+ new_head = fifo->info.head_part_num;
+ new_max = fifo->info.max_push_part_num;
+ }
+
+ JournalProcessor(const JournalProcessor&) = delete;
+ JournalProcessor& operator =(const JournalProcessor&) = delete;
+ JournalProcessor(JournalProcessor&&) = delete;
+ JournalProcessor& operator =(JournalProcessor&&) = delete;
+
+ void process() {
+ if (iter != journal.end()) {
+ const auto entry = iter->second;
+ process_journal_entry(entry,
+ journal_entry_finisher(entry));
+ return;
+ } else {
+ postprocess();
+ return;
+ }
+ }
+};
+}
+}
+
+#endif // CEPH_RADOS_CLS_FIFIO_H