summaryrefslogtreecommitdiffstats
path: root/src/rgw/cls_fifo_legacy.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rgw/cls_fifo_legacy.cc2484
1 files changed, 2484 insertions, 0 deletions
diff --git a/src/rgw/cls_fifo_legacy.cc b/src/rgw/cls_fifo_legacy.cc
new file mode 100644
index 000000000..80af90055
--- /dev/null
+++ b/src/rgw/cls_fifo_legacy.cc
@@ -0,0 +1,2484 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat <contact@redhat.com>
+ * Author: Adam C. Emerson
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <cstdint>
+#include <numeric>
+#include <optional>
+#include <string_view>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/format.h>
+
+#include "include/rados/librados.hpp"
+
+#include "include/buffer.h"
+
+#include "common/async/yield_context.h"
+#include "common/random_string.h"
+
+#include "cls/fifo/cls_fifo_types.h"
+#include "cls/fifo/cls_fifo_ops.h"
+
+#include "cls_fifo_legacy.h"
+
+namespace rgw::cls::fifo {
+static constexpr auto dout_subsys = ceph_subsys_objclass;
+namespace cb = ceph::buffer;
+namespace fifo = rados::cls::fifo;
+
+using ceph::from_error_code;
+
+inline constexpr auto MAX_RACE_RETRIES = 10;
+
+void create_meta(lr::ObjectWriteOperation* op,
+ std::string_view id,
+ std::optional<fifo::objv> objv,
+ std::optional<std::string_view> oid_prefix,
+ bool exclusive,
+ std::uint64_t max_part_size,
+ std::uint64_t max_entry_size)
+{
+ fifo::op::create_meta cm;
+
+ cm.id = id;
+ cm.version = objv;
+ cm.oid_prefix = oid_prefix;
+ cm.max_part_size = max_part_size;
+ cm.max_entry_size = max_entry_size;
+ cm.exclusive = exclusive;
+
+ cb::list in;
+ encode(cm, in);
+ op->exec(fifo::op::CLASS, fifo::op::CREATE_META, in);
+}
+
+int get_meta(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
+ std::optional<fifo::objv> objv, fifo::info* info,
+ std::uint32_t* part_header_size,
+ std::uint32_t* part_entry_overhead,
+ uint64_t tid, optional_yield y,
+ bool probe)
+{
+ lr::ObjectReadOperation op;
+ fifo::op::get_meta gm;
+ gm.version = objv;
+ cb::list in;
+ encode(gm, in);
+ cb::list bl;
+
+ op.exec(fifo::op::CLASS, fifo::op::GET_META, in,
+ &bl, nullptr);
+ auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
+ if (r >= 0) try {
+ fifo::op::get_meta_reply reply;
+ auto iter = bl.cbegin();
+ decode(reply, iter);
+ if (info) *info = std::move(reply.info);
+ if (part_header_size) *part_header_size = reply.part_header_size;
+ if (part_entry_overhead)
+ *part_entry_overhead = reply.part_entry_overhead;
+ } catch (const cb::error& err) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " decode failed: " << err.what()
+ << " tid=" << tid << dendl;
+ r = from_error_code(err.code());
+ } else if (!(probe && (r == -ENOENT || r == -ENODATA))) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::GET_META failed r=" << r << " tid=" << tid
+ << dendl;
+ }
+ return r;
+};
+
+namespace {
+void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv,
+ const fifo::update& update)
+{
+ fifo::op::update_meta um;
+
+ um.version = objv;
+ um.tail_part_num = update.tail_part_num();
+ um.head_part_num = update.head_part_num();
+ um.min_push_part_num = update.min_push_part_num();
+ um.max_push_part_num = update.max_push_part_num();
+ um.journal_entries_add = std::move(update).journal_entries_add();
+ um.journal_entries_rm = std::move(update).journal_entries_rm();
+
+ cb::list in;
+ encode(um, in);
+ op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
+}
+
+void part_init(lr::ObjectWriteOperation* op, std::string_view tag,
+ fifo::data_params params)
+{
+ fifo::op::init_part ip;
+
+ ip.tag = tag;
+ ip.params = params;
+
+ cb::list in;
+ encode(ip, in);
+ op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
+}
+
+int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
+ std::deque<cb::list> data_bufs, std::uint64_t tid,
+ optional_yield y)
+{
+ lr::ObjectWriteOperation op;
+ fifo::op::push_part pp;
+
+ pp.tag = tag;
+ pp.data_bufs = data_bufs;
+ pp.total_len = 0;
+
+ for (const auto& bl : data_bufs)
+ pp.total_len += bl.length();
+
+ cb::list in;
+ encode(pp, in);
+ auto retval = 0;
+ op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, nullptr, &retval);
+ auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y, lr::OPERATION_RETURNVEC);
+ if (r < 0) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::PUSH_PART failed r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ if (retval < 0) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " error handling response retval=" << retval
+ << " tid=" << tid << dendl;
+ }
+ return retval;
+}
+
+void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
+ std::deque<cb::list> data_bufs, std::uint64_t tid,
+ lr::AioCompletion* c)
+{
+ lr::ObjectWriteOperation op;
+ fifo::op::push_part pp;
+
+ pp.tag = tag;
+ pp.data_bufs = data_bufs;
+ pp.total_len = 0;
+
+ for (const auto& bl : data_bufs)
+ pp.total_len += bl.length();
+
+ cb::list in;
+ encode(pp, in);
+ op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in);
+ auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC);
+ ceph_assert(r >= 0);
+}
+
+void trim_part(lr::ObjectWriteOperation* op,
+ std::optional<std::string_view> tag,
+ std::uint64_t ofs, bool exclusive)
+{
+ fifo::op::trim_part tp;
+
+ tp.tag = tag;
+ tp.ofs = ofs;
+ tp.exclusive = exclusive;
+
+ cb::list in;
+ encode(tp, in);
+ op->exec(fifo::op::CLASS, fifo::op::TRIM_PART, in);
+}
+
+int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
+ std::optional<std::string_view> tag, std::uint64_t ofs,
+ std::uint64_t max_entries,
+ std::vector<fifo::part_list_entry>* entries,
+ bool* more, bool* full_part, std::string* ptag,
+ std::uint64_t tid, optional_yield y)
+{
+ lr::ObjectReadOperation op;
+ fifo::op::list_part lp;
+
+ lp.tag = tag;
+ lp.ofs = ofs;
+ lp.max_entries = max_entries;
+
+ cb::list in;
+ encode(lp, in);
+ cb::list bl;
+ op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, &bl, nullptr);
+ auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
+ if (r >= 0) try {
+ fifo::op::list_part_reply reply;
+ auto iter = bl.cbegin();
+ decode(reply, iter);
+ if (entries) *entries = std::move(reply.entries);
+ if (more) *more = reply.more;
+ if (full_part) *full_part = reply.full_part;
+ if (ptag) *ptag = reply.tag;
+ } catch (const cb::error& err) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " decode failed: " << err.what()
+ << " tid=" << tid << dendl;
+ r = from_error_code(err.code());
+ } else if (r != -ENOENT) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
+ << dendl;
+ }
+ return r;
+}
+
+struct list_entry_completion : public lr::ObjectOperationCompletion {
+ CephContext* cct;
+ int* r_out;
+ std::vector<fifo::part_list_entry>* entries;
+ bool* more;
+ bool* full_part;
+ std::string* ptag;
+ std::uint64_t tid;
+
+ list_entry_completion(CephContext* cct, int* r_out, std::vector<fifo::part_list_entry>* entries,
+ bool* more, bool* full_part, std::string* ptag,
+ std::uint64_t tid)
+ : cct(cct), r_out(r_out), entries(entries), more(more),
+ full_part(full_part), ptag(ptag), tid(tid) {}
+ virtual ~list_entry_completion() = default;
+ void handle_completion(int r, bufferlist& bl) override {
+ if (r >= 0) try {
+ fifo::op::list_part_reply reply;
+ auto iter = bl.cbegin();
+ decode(reply, iter);
+ if (entries) *entries = std::move(reply.entries);
+ if (more) *more = reply.more;
+ if (full_part) *full_part = reply.full_part;
+ if (ptag) *ptag = reply.tag;
+ } catch (const cb::error& err) {
+ lderr(cct)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " decode failed: " << err.what()
+ << " tid=" << tid << dendl;
+ r = from_error_code(err.code());
+ } else if (r < 0) {
+ lderr(cct)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
+ << dendl;
+ }
+ if (r_out) *r_out = r;
+ }
+};
+
+lr::ObjectReadOperation list_part(CephContext* cct,
+ std::optional<std::string_view> tag,
+ std::uint64_t ofs,
+ std::uint64_t max_entries,
+ int* r_out,
+ std::vector<fifo::part_list_entry>* entries,
+ bool* more, bool* full_part,
+ std::string* ptag, std::uint64_t tid)
+{
+ lr::ObjectReadOperation op;
+ fifo::op::list_part lp;
+
+ lp.tag = tag;
+ lp.ofs = ofs;
+ lp.max_entries = max_entries;
+
+ cb::list in;
+ encode(lp, in);
+ op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
+ new list_entry_completion(cct, r_out, entries, more, full_part,
+ ptag, tid));
+ return op;
+}
+
+int get_part_info(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
+ fifo::part_header* header,
+ std::uint64_t tid, optional_yield y)
+{
+ lr::ObjectReadOperation op;
+ fifo::op::get_part_info gpi;
+
+ cb::list in;
+ cb::list bl;
+ encode(gpi, in);
+ op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, &bl, nullptr);
+ auto r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
+ if (r >= 0) try {
+ fifo::op::get_part_info_reply reply;
+ auto iter = bl.cbegin();
+ decode(reply, iter);
+ if (header) *header = std::move(reply.header);
+ } catch (const cb::error& err) {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " decode failed: " << err.what()
+ << " tid=" << tid << dendl;
+ r = from_error_code(err.code());
+ } else {
+ ldpp_dout(dpp, -1)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
+ << dendl;
+ }
+ return r;
+}
+
+struct partinfo_completion : public lr::ObjectOperationCompletion {
+ CephContext* cct;
+ int* rp;
+ fifo::part_header* h;
+ std::uint64_t tid;
+ partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h,
+ std::uint64_t tid) :
+ cct(cct), rp(rp), h(h), tid(tid) {
+ }
+ virtual ~partinfo_completion() = default;
+ void handle_completion(int r, bufferlist& bl) override {
+ if (r >= 0) try {
+ fifo::op::get_part_info_reply reply;
+ auto iter = bl.cbegin();
+ decode(reply, iter);
+ if (h) *h = std::move(reply.header);
+ } catch (const cb::error& err) {
+ r = from_error_code(err.code());
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " decode failed: " << err.what()
+ << " tid=" << tid << dendl;
+ } else {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
+ << dendl;
+ }
+ if (rp) {
+ *rp = r;
+ }
+ }
+};
+
+lr::ObjectReadOperation get_part_info(CephContext* cct,
+ fifo::part_header* header,
+ std::uint64_t tid, int* r = 0)
+{
+ lr::ObjectReadOperation op;
+ fifo::op::get_part_info gpi;
+
+ cb::list in;
+ cb::list bl;
+ encode(gpi, in);
+ op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
+ new partinfo_completion(cct, r, header, tid));
+ return op;
+}
+}
+
+std::optional<marker> FIFO::to_marker(std::string_view s)
+{
+ marker m;
+ if (s.empty()) {
+ m.num = info.tail_part_num;
+ m.ofs = 0;
+ return m;
+ }
+
+ auto pos = s.find(':');
+ if (pos == string::npos) {
+ return std::nullopt;
+ }
+
+ auto num = s.substr(0, pos);
+ auto ofs = s.substr(pos + 1);
+
+ auto n = ceph::parse<decltype(m.num)>(num);
+ if (!n) {
+ return std::nullopt;
+ }
+ m.num = *n;
+ auto o = ceph::parse<decltype(m.ofs)>(ofs);
+ if (!o) {
+ return std::nullopt;
+ }
+ m.ofs = *o;
+ return m;
+}
+
+std::string FIFO::generate_tag() const
+{
+ static constexpr auto HEADER_TAG_SIZE = 16;
+ return gen_rand_alphanumeric_plain(static_cast<CephContext*>(ioctx.cct()),
+ HEADER_TAG_SIZE);
+}
+
+
+int FIFO::apply_update(fifo::info* info,
+ const fifo::objv& objv,
+ const fifo::update& update,
+ std::uint64_t tid)
+{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ std::unique_lock l(m);
+ if (objv != info->version) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " version mismatch, canceling: tid=" << tid << dendl;
+ return -ECANCELED;
+ }
+ auto err = info->apply_update(update);
+ if (err) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " error applying update: " << *err << " tid=" << tid << dendl;
+ return -ECANCELED;
+ }
+
+ ++info->version.ver;
+
+ return {};
+}
+
+int FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
+ fifo::objv version, bool* pcanceled,
+ std::uint64_t tid, optional_yield y)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ lr::ObjectWriteOperation op;
+ bool canceled = false;
+ update_meta(&op, info.version, update);
+ auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
+ if (r >= 0 || r == -ECANCELED) {
+ canceled = (r == -ECANCELED);
+ if (!canceled) {
+ r = apply_update(&info, version, update, tid);
+ if (r < 0) canceled = true;
+ }
+ if (canceled) {
+ r = read_meta(dpp, tid, y);
+ canceled = r < 0 ? false : true;
+ }
+ }
+ if (pcanceled) *pcanceled = canceled;
+ if (canceled) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled: tid=" << tid << dendl;
+ }
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " returning error: r=" << r << " tid=" << tid << dendl;
+ }
+ return r;
+}
+
+struct Updater : public Completion<Updater> {
+ FIFO* fifo;
+ fifo::update update;
+ fifo::objv version;
+ bool reread = false;
+ bool* pcanceled = nullptr;
+ std::uint64_t tid;
+ Updater(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super,
+ const fifo::update& update, fifo::objv version,
+ bool* pcanceled, std::uint64_t tid)
+ : Completion(dpp, super), fifo(fifo), update(update), version(version),
+ pcanceled(pcanceled) {}
+
+ void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ if (reread)
+ handle_reread(std::move(p), r);
+ else
+ handle_update(dpp, std::move(p), r);
+ }
+
+ void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " handling async update_meta: tid="
+ << tid << dendl;
+ if (r < 0 && r != -ECANCELED) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " update failed: r=" << r << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+ bool canceled = (r == -ECANCELED);
+ if (!canceled) {
+ int r = fifo->apply_update(&fifo->info, version, update, tid);
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " update failed, marking canceled: r=" << r
+ << " tid=" << tid << dendl;
+ canceled = true;
+ }
+ }
+ if (canceled) {
+ reread = true;
+ fifo->read_meta(dpp, tid, call(std::move(p)));
+ return;
+ }
+ if (pcanceled)
+ *pcanceled = false;
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " completing: tid=" << tid << dendl;
+ complete(std::move(p), 0);
+ }
+
+ void handle_reread(Ptr&& p, int r) {
+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " handling async read_meta: tid="
+ << tid << dendl;
+ if (r < 0 && pcanceled) {
+ *pcanceled = false;
+ } else if (r >= 0 && pcanceled) {
+ *pcanceled = true;
+ }
+ if (r < 0) {
+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed dispatching read_meta: r=" << r << " tid="
+ << tid << dendl;
+ } else {
+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " completing: tid=" << tid << dendl;
+ }
+ complete(std::move(p), r);
+ }
+};
+
+void FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update,
+ fifo::objv version, bool* pcanceled,
+ std::uint64_t tid, lr::AioCompletion* c)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ lr::ObjectWriteOperation op;
+ update_meta(&op, info.version, update);
+ auto updater = std::make_unique<Updater>(dpp, this, c, update, version, pcanceled,
+ tid);
+ auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op);
+ assert(r >= 0);
+}
+
+int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
+ optional_yield y)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ lr::ObjectWriteOperation op;
+ op.create(false); /* We don't need exclusivity, part_init ensures
+ we're creating from the same journal entry. */
+ std::unique_lock l(m);
+ part_init(&op, tag, info.params);
+ auto oid = info.part_oid(part_num);
+ l.unlock();
+ auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " part_init failed: r=" << r << " tid="
+ << tid << dendl;
+ }
+ return r;
+}
+
+int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
+ optional_yield y)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ lr::ObjectWriteOperation op;
+ op.remove();
+ std::unique_lock l(m);
+ auto oid = info.part_oid(part_num);
+ l.unlock();
+ auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " remove failed: r=" << r << " tid="
+ << tid << dendl;
+ }
+ return r;
+}
+
+int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ std::vector<fifo::journal_entry> processed;
+
+ std::unique_lock l(m);
+ auto tmpjournal = info.journal;
+ auto new_tail = info.tail_part_num;
+ auto new_head = info.head_part_num;
+ auto new_max = info.max_push_part_num;
+ l.unlock();
+
+ int r = 0;
+ for (auto& [n, entry] : tmpjournal) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing entry: entry=" << entry << " tid=" << tid
+ << dendl;
+ switch (entry.op) {
+ case fifo::journal_entry::Op::create:
+ r = create_part(dpp, entry.part_num, entry.part_tag, tid, y);
+ if (entry.part_num > new_max) {
+ new_max = entry.part_num;
+ }
+ break;
+ case fifo::journal_entry::Op::set_head:
+ r = 0;
+ if (entry.part_num > new_head) {
+ new_head = entry.part_num;
+ }
+ break;
+ case fifo::journal_entry::Op::remove:
+ r = remove_part(dpp, entry.part_num, entry.part_tag, tid, y);
+ if (r == -ENOENT) r = 0;
+ if (entry.part_num >= new_tail) {
+ new_tail = entry.part_num + 1;
+ }
+ break;
+ default:
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " unknown journaled op: entry=" << entry << " tid="
+ << tid << dendl;
+ return -EIO;
+ }
+
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing entry failed: entry=" << entry
+ << " r=" << r << " tid=" << tid << dendl;
+ return -r;
+ }
+
+ processed.push_back(std::move(entry));
+ }
+
+ // Postprocess
+ bool canceled = true;
+
+ for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " postprocessing: i=" << i << " tid=" << tid << dendl;
+
+ std::optional<int64_t> tail_part_num;
+ std::optional<int64_t> head_part_num;
+ std::optional<int64_t> max_part_num;
+
+ std::unique_lock l(m);
+ auto objv = info.version;
+ if (new_tail > tail_part_num) tail_part_num = new_tail;
+ if (new_head > info.head_part_num) head_part_num = new_head;
+ if (new_max > info.max_push_part_num) max_part_num = new_max;
+ l.unlock();
+
+ if (processed.empty() &&
+ !tail_part_num &&
+ !max_part_num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " nothing to update any more: i=" << i << " tid="
+ << tid << dendl;
+ canceled = false;
+ break;
+ }
+ auto u = fifo::update().tail_part_num(tail_part_num)
+ .head_part_num(head_part_num).max_push_part_num(max_part_num)
+ .journal_entries_rm(processed);
+ r = _update_meta(dpp, u, objv, &canceled, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: update=" << u
+ << " r=" << r << " tid=" << tid << dendl;
+ break;
+ }
+
+ if (canceled) {
+ std::vector<fifo::journal_entry> new_processed;
+ std::unique_lock l(m);
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " update canceled, retrying: i=" << i << " tid="
+ << tid << dendl;
+ for (auto& e : processed) {
+ auto jiter = info.journal.find(e.part_num);
+ /* journal entry was already processed */
+ if (jiter == info.journal.end() ||
+ !(jiter->second == e)) {
+ continue;
+ }
+ new_processed.push_back(e);
+ }
+ processed = std::move(new_processed);
+ }
+ }
+ if (r == 0 && canceled) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ r = -ECANCELED;
+ }
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed, r=: " << r << " tid=" << tid << dendl;
+ }
+ return r;
+}
+
+int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, optional_yield y)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ std::unique_lock l(m);
+ std::vector jentries = { info.next_journal_entry(generate_tag()) };
+ if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
+ l.unlock();
+ ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " new part journaled, but not processed: tid="
+ << tid << dendl;
+ auto r = process_journal(dpp, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " process_journal failed: r=" << r << " tid=" << tid << dendl;
+ }
+ return r;
+ }
+ std::int64_t new_head_part_num = info.head_part_num;
+ auto version = info.version;
+
+ if (is_head) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " needs new head: tid=" << tid << dendl;
+ auto new_head_jentry = jentries.front();
+ new_head_jentry.op = fifo::journal_entry::Op::set_head;
+ new_head_part_num = jentries.front().part_num;
+ jentries.push_back(std::move(new_head_jentry));
+ }
+ l.unlock();
+
+ int r = 0;
+ bool canceled = true;
+ for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ canceled = false;
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " updating metadata: i=" << i << " tid=" << tid << dendl;
+ auto u = fifo::update{}.journal_entries_add(jentries);
+ r = _update_meta(dpp, u, version, &canceled, tid, y);
+ if (r >= 0 && canceled) {
+ std::unique_lock l(m);
+ auto found = (info.journal.find(jentries.front().part_num) !=
+ info.journal.end());
+ if ((info.max_push_part_num >= jentries.front().part_num &&
+ info.head_part_num >= new_head_part_num)) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but journaled and processed: i=" << i
+ << " tid=" << tid << dendl;
+ return 0;
+ }
+ if (found) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, journaled but not processed: i=" << i
+ << " tid=" << tid << dendl;
+ canceled = false;
+ }
+ l.unlock();
+ }
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: update=" << u << " r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ }
+ if (canceled) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ return -ECANCELED;
+ }
+ r = process_journal(dpp, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " process_journal failed: r=" << r << " tid=" << tid << dendl;
+ }
+ return r;
+}
+
+int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ std::unique_lock l(m);
+ std::int64_t new_head_num = info.head_part_num + 1;
+ auto max_push_part_num = info.max_push_part_num;
+ auto version = info.version;
+ l.unlock();
+
+ int r = 0;
+ if (max_push_part_num < new_head_num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new part: tid=" << tid << dendl;
+ r = _prepare_new_part(dpp, true, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _prepare_new_part failed: r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ std::unique_lock l(m);
+ if (info.max_push_part_num < new_head_num) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " inconsistency, push part less than head part: "
+ << " tid=" << tid << dendl;
+ return -EIO;
+ }
+ l.unlock();
+ return 0;
+ }
+
+ bool canceled = true;
+ for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " updating head: i=" << i << " tid=" << tid << dendl;
+ auto u = fifo::update{}.head_part_num(new_head_num);
+ r = _update_meta(dpp, u, version, &canceled, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: update=" << u << " r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ std::unique_lock l(m);
+ auto head_part_num = info.head_part_num;
+ version = info.version;
+ l.unlock();
+ if (canceled && (head_part_num >= new_head_num)) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but completed by the other caller: i=" << i
+ << " tid=" << tid << dendl;
+ canceled = false;
+ }
+ }
+ if (canceled) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ return -ECANCELED;
+ }
+ return 0;
+}
+
+struct NewPartPreparer : public Completion<NewPartPreparer> {
+ FIFO* f;
+ std::vector<fifo::journal_entry> jentries;
+ int i = 0;
+ std::int64_t new_head_part_num;
+ bool canceled = false;
+ uint64_t tid;
+
+ NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
+ std::vector<fifo::journal_entry> jentries,
+ std::int64_t new_head_part_num,
+ std::uint64_t tid)
+ : Completion(dpp, super), f(f), jentries(std::move(jentries)),
+ new_head_part_num(new_head_part_num), tid(tid) {}
+
+ void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: r=" << r
+ << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+
+ if (canceled) {
+ std::unique_lock l(f->m);
+ auto iter = f->info.journal.find(jentries.front().part_num);
+ auto max_push_part_num = f->info.max_push_part_num;
+ auto head_part_num = f->info.head_part_num;
+ auto version = f->info.version;
+ auto found = (iter != f->info.journal.end());
+ l.unlock();
+ if ((max_push_part_num >= jentries.front().part_num &&
+ head_part_num >= new_head_part_num)) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but journaled and processed: i=" << i
+ << " tid=" << tid << dendl;
+ complete(std::move(p), 0);
+ return;
+ }
+ if (i >= MAX_RACE_RETRIES) {
+ complete(std::move(p), -ECANCELED);
+ return;
+ }
+ if (!found) {
+ ++i;
+ f->_update_meta(dpp, fifo::update{}
+ .journal_entries_add(jentries),
+ version, &canceled, tid, call(std::move(p)));
+ return;
+ } else {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, journaled but not processed: i=" << i
+ << " tid=" << tid << dendl;
+ canceled = false;
+ }
+ // Fall through. We still need to process the journal.
+ }
+ f->process_journal(dpp, tid, super());
+ return;
+ }
+};
+
+void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid,
+ lr::AioCompletion* c)
+{
+ std::unique_lock l(m);
+ std::vector jentries = { info.next_journal_entry(generate_tag()) };
+ if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
+ l.unlock();
+ ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " new part journaled, but not processed: tid="
+ << tid << dendl;
+ process_journal(dpp, tid, c);
+ return;
+ }
+ std::int64_t new_head_part_num = info.head_part_num;
+ auto version = info.version;
+
+ if (is_head) {
+ auto new_head_jentry = jentries.front();
+ new_head_jentry.op = fifo::journal_entry::Op::set_head;
+ new_head_part_num = jentries.front().part_num;
+ jentries.push_back(std::move(new_head_jentry));
+ }
+ l.unlock();
+
+ auto n = std::make_unique<NewPartPreparer>(dpp, this, c, jentries,
+ new_head_part_num, tid);
+ auto np = n.get();
+ _update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version,
+ &np->canceled, tid, NewPartPreparer::call(std::move(n)));
+}
+
+struct NewHeadPreparer : public Completion<NewHeadPreparer> {
+ FIFO* f;
+ int i = 0;
+ bool newpart;
+ std::int64_t new_head_num;
+ bool canceled = false;
+ std::uint64_t tid;
+
+ NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
+ bool newpart, std::int64_t new_head_num, std::uint64_t tid)
+ : Completion(dpp, super), f(f), newpart(newpart), new_head_num(new_head_num),
+ tid(tid) {}
+
+ void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ if (newpart)
+ handle_newpart(std::move(p), r);
+ else
+ handle_update(dpp, std::move(p), r);
+ }
+
+ void handle_newpart(Ptr&& p, int r) {
+ if (r < 0) {
+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _prepare_new_part failed: r=" << r
+ << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+ std::unique_lock l(f->m);
+ if (f->info.max_push_part_num < new_head_num) {
+ l.unlock();
+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _prepare_new_part failed: r=" << r
+ << " tid=" << tid << dendl;
+ complete(std::move(p), -EIO);
+ } else {
+ l.unlock();
+ complete(std::move(p), 0);
+ }
+ }
+
+ void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ std::unique_lock l(f->m);
+ auto head_part_num = f->info.head_part_num;
+ auto version = f->info.version;
+ l.unlock();
+
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: r=" << r
+ << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+ if (canceled) {
+ if (i >= MAX_RACE_RETRIES) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ complete(std::move(p), -ECANCELED);
+ return;
+ }
+
+ // Raced, but there's still work to do!
+ if (head_part_num < new_head_num) {
+ canceled = false;
+ ++i;
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " updating head: i=" << i << " tid=" << tid << dendl;
+ f->_update_meta(dpp, fifo::update{}.head_part_num(new_head_num),
+ version, &this->canceled, tid, call(std::move(p)));
+ return;
+ }
+ }
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " succeeded : i=" << i << " tid=" << tid << dendl;
+ complete(std::move(p), 0);
+ return;
+ }
+};
+
+void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ std::unique_lock l(m);
+ int64_t new_head_num = info.head_part_num + 1;
+ auto max_push_part_num = info.max_push_part_num;
+ auto version = info.version;
+ l.unlock();
+
+ if (max_push_part_num < new_head_num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new part: tid=" << tid << dendl;
+ auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_num,
+ tid);
+ _prepare_new_part(dpp, true, tid, NewHeadPreparer::call(std::move(n)));
+ } else {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " updating head: tid=" << tid << dendl;
+ auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_num,
+ tid);
+ auto np = n.get();
+ _update_meta(dpp, fifo::update{}.head_part_num(new_head_num), version,
+ &np->canceled, tid, NewHeadPreparer::call(std::move(n)));
+ }
+}
+
+int FIFO::push_entries(const DoutPrefixProvider *dpp, const std::deque<cb::list>& data_bufs,
+ std::uint64_t tid, optional_yield y)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ std::unique_lock l(m);
+ auto head_part_num = info.head_part_num;
+ auto tag = info.head_tag;
+ const auto part_oid = info.part_oid(head_part_num);
+ l.unlock();
+
+ auto r = push_part(dpp, ioctx, part_oid, tag, data_bufs, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " push_part failed: r=" << r << " tid=" << tid << dendl;
+ }
+ return r;
+}
+
+void FIFO::push_entries(const std::deque<cb::list>& data_bufs,
+ std::uint64_t tid, lr::AioCompletion* c)
+{
+ std::unique_lock l(m);
+ auto head_part_num = info.head_part_num;
+ auto tag = info.head_tag;
+ const auto part_oid = info.part_oid(head_part_num);
+ l.unlock();
+
+ push_part(ioctx, part_oid, tag, data_bufs, tid, c);
+}
+
+int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
+ std::optional<std::string_view> tag,
+ bool exclusive, std::uint64_t tid,
+ optional_yield y)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ lr::ObjectWriteOperation op;
+ std::unique_lock l(m);
+ const auto part_oid = info.part_oid(part_num);
+ l.unlock();
+ rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
+ auto r = rgw_rados_operate(dpp, ioctx, part_oid, &op, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim_part failed: r=" << r << " tid=" << tid << dendl;
+ }
+ return 0;
+}
+
+void FIFO::trim_part(int64_t part_num, uint64_t ofs,
+ std::optional<std::string_view> tag,
+ bool exclusive, std::uint64_t tid,
+ lr::AioCompletion* c)
+{
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ lr::ObjectWriteOperation op;
+ std::unique_lock l(m);
+ const auto part_oid = info.part_oid(part_num);
+ l.unlock();
+ rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
+ auto r = ioctx.aio_operate(part_oid, c, &op);
+ ceph_assert(r >= 0);
+}
+
+int FIFO::open(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
+ optional_yield y, std::optional<fifo::objv> objv,
+ bool probe)
+{
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ fifo::info info;
+ std::uint32_t size;
+ std::uint32_t over;
+ int r = get_meta(dpp, ioctx, std::move(oid), objv, &info, &size, &over, 0, y,
+ probe);
+ if (r < 0) {
+ if (!(probe && (r == -ENOENT || r == -ENODATA))) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " get_meta failed: r=" << r << dendl;
+ }
+ return r;
+ }
+ std::unique_ptr<FIFO> f(new FIFO(std::move(ioctx), oid));
+ f->info = info;
+ f->part_header_size = size;
+ f->part_entry_overhead = over;
+ // If there are journal entries, process them, in case
+ // someone crashed mid-transaction.
+ if (!info.journal.empty()) {
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing leftover journal" << dendl;
+ r = f->process_journal(dpp, 0, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " process_journal failed: r=" << r << dendl;
+ return r;
+ }
+ }
+ *fifo = std::move(f);
+ return 0;
+}
+
+int FIFO::create(const DoutPrefixProvider *dpp, lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
+ optional_yield y, std::optional<fifo::objv> objv,
+ std::optional<std::string_view> oid_prefix,
+ bool exclusive, std::uint64_t max_part_size,
+ std::uint64_t max_entry_size)
+{
+ ldpp_dout(dpp, 20)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering" << dendl;
+ lr::ObjectWriteOperation op;
+ create_meta(&op, oid, objv, oid_prefix, exclusive, max_part_size,
+ max_entry_size);
+ auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " create_meta failed: r=" << r << dendl;
+ return r;
+ }
+ r = open(dpp, std::move(ioctx), std::move(oid), fifo, y, objv);
+ return r;
+}
+
+int FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ fifo::info _info;
+ std::uint32_t _phs;
+ std::uint32_t _peo;
+
+ auto r = get_meta(dpp, ioctx, oid, nullopt, &_info, &_phs, &_peo, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " get_meta failed: r=" << r << " tid=" << tid << dendl;
+ return r;
+ }
+ std::unique_lock l(m);
+ // We have a newer version already!
+ if (_info.version.same_or_later(this->info.version)) {
+ info = std::move(_info);
+ part_header_size = _phs;
+ part_entry_overhead = _peo;
+ }
+ return 0;
+}
+
+int FIFO::read_meta(const DoutPrefixProvider *dpp, optional_yield y) {
+ std::unique_lock l(m);
+ auto tid = ++next_tid;
+ l.unlock();
+ return read_meta(dpp, tid, y);
+}
+
+struct Reader : public Completion<Reader> {
+ FIFO* fifo;
+ cb::list bl;
+ std::uint64_t tid;
+ Reader(const DoutPrefixProvider *dpp, FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid)
+ : Completion(dpp, super), fifo(fifo), tid(tid) {}
+
+ void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ if (r >= 0) try {
+ fifo::op::get_meta_reply reply;
+ auto iter = bl.cbegin();
+ decode(reply, iter);
+ std::unique_lock l(fifo->m);
+ if (reply.info.version.same_or_later(fifo->info.version)) {
+ fifo->info = std::move(reply.info);
+ fifo->part_header_size = reply.part_header_size;
+ fifo->part_entry_overhead = reply.part_entry_overhead;
+ }
+ } catch (const cb::error& err) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed to decode response err=" << err.what()
+ << " tid=" << tid << dendl;
+ r = from_error_code(err.code());
+ } else {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed r=" << r
+ << " tid=" << tid << dendl;
+ }
+ complete(std::move(p), r);
+ }
+};
+
+void FIFO::read_meta(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
+{
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ lr::ObjectReadOperation op;
+ fifo::op::get_meta gm;
+ cb::list in;
+ encode(gm, in);
+ auto reader = std::make_unique<Reader>(dpp, this, c, tid);
+ auto rp = reader.get();
+ auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS,
+ fifo::op::GET_META, in, &rp->bl);
+ assert(r >= 0);
+}
+
+const fifo::info& FIFO::meta() const {
+ return info;
+}
+
+std::pair<std::uint32_t, std::uint32_t> FIFO::get_part_layout_info() const {
+ return {part_header_size, part_entry_overhead};
+}
+
+int FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, optional_yield y) {
+ return push(dpp, std::vector{ bl }, y);
+}
+
+void FIFO::push(const DoutPrefixProvider *dpp, const cb::list& bl, lr::AioCompletion* c) {
+ push(dpp, std::vector{ bl }, c);
+}
+
+int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs, optional_yield y)
+{
+ std::unique_lock l(m);
+ auto tid = ++next_tid;
+ auto max_entry_size = info.params.max_entry_size;
+ auto need_new_head = info.need_new_head();
+ l.unlock();
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ if (data_bufs.empty()) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " empty push, returning success tid=" << tid << dendl;
+ return 0;
+ }
+
+ // Validate sizes
+ for (const auto& bl : data_bufs) {
+ if (bl.length() > max_entry_size) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entry bigger than max_entry_size tid=" << tid << dendl;
+ return -E2BIG;
+ }
+ }
+
+ int r = 0;
+ if (need_new_head) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new head tid=" << tid << dendl;
+ r = _prepare_new_head(dpp, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _prepare_new_head failed: r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ }
+
+ std::deque<cb::list> remaining(data_bufs.begin(), data_bufs.end());
+ std::deque<cb::list> batch;
+
+ uint64_t batch_len = 0;
+ auto retries = 0;
+ bool canceled = true;
+ while ((!remaining.empty() || !batch.empty()) &&
+ (retries <= MAX_RACE_RETRIES)) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " preparing push: remaining=" << remaining.size()
+ << " batch=" << batch.size() << " retries=" << retries
+ << " tid=" << tid << dendl;
+ std::unique_lock l(m);
+ auto max_part_size = info.params.max_part_size;
+ auto overhead = part_entry_overhead;
+ l.unlock();
+
+ while (!remaining.empty() &&
+ (remaining.front().length() + batch_len <= max_part_size)) {
+ /* We can send entries with data_len up to max_entry_size,
+ however, we want to also account the overhead when
+ dealing with multiple entries. Previous check doesn't
+ account for overhead on purpose. */
+ batch_len += remaining.front().length() + overhead;
+ batch.push_back(std::move(remaining.front()));
+ remaining.pop_front();
+ }
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " prepared push: remaining=" << remaining.size()
+ << " batch=" << batch.size() << " retries=" << retries
+ << " batch_len=" << batch_len
+ << " tid=" << tid << dendl;
+
+ auto r = push_entries(dpp, batch, tid, y);
+ if (r == -ERANGE) {
+ canceled = true;
+ ++retries;
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new head tid=" << tid << dendl;
+ r = _prepare_new_head(dpp, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " prepare_new_head failed: r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ r = 0;
+ continue;
+ }
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " push_entries failed: r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ // Made forward progress!
+ canceled = false;
+ retries = 0;
+ batch_len = 0;
+ if (static_cast<unsigned>(r) == batch.size()) {
+ batch.clear();
+ } else {
+ batch.erase(batch.begin(), batch.begin() + r);
+ for (const auto& b : batch) {
+ batch_len += b.length() + part_entry_overhead;
+ }
+ }
+ }
+ if (canceled) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ return -ECANCELED;
+ }
+ return 0;
+}
+
+struct Pusher : public Completion<Pusher> {
+ FIFO* f;
+ std::deque<cb::list> remaining;
+ std::deque<cb::list> batch;
+ int i = 0;
+ std::uint64_t tid;
+ bool new_heading = false;
+
+ void prep_then_push(Ptr&& p, const unsigned successes) {
+ std::unique_lock l(f->m);
+ auto max_part_size = f->info.params.max_part_size;
+ auto part_entry_overhead = f->part_entry_overhead;
+ l.unlock();
+
+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " preparing push: remaining=" << remaining.size()
+ << " batch=" << batch.size() << " i=" << i
+ << " tid=" << tid << dendl;
+
+ uint64_t batch_len = 0;
+ if (successes > 0) {
+ if (successes == batch.size()) {
+ batch.clear();
+ } else {
+ batch.erase(batch.begin(), batch.begin() + successes);
+ for (const auto& b : batch) {
+ batch_len += b.length() + part_entry_overhead;
+ }
+ }
+ }
+
+ if (batch.empty() && remaining.empty()) {
+ complete(std::move(p), 0);
+ return;
+ }
+
+ while (!remaining.empty() &&
+ (remaining.front().length() + batch_len <= max_part_size)) {
+
+ /* We can send entries with data_len up to max_entry_size,
+ however, we want to also account the overhead when
+ dealing with multiple entries. Previous check doesn't
+ account for overhead on purpose. */
+ batch_len += remaining.front().length() + part_entry_overhead;
+ batch.push_back(std::move(remaining.front()));
+ remaining.pop_front();
+ }
+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " prepared push: remaining=" << remaining.size()
+ << " batch=" << batch.size() << " i=" << i
+ << " batch_len=" << batch_len
+ << " tid=" << tid << dendl;
+ push(std::move(p));
+ }
+
+ void push(Ptr&& p) {
+ f->push_entries(batch, tid, call(std::move(p)));
+ }
+
+ void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
+ new_heading = true;
+ f->_prepare_new_head(dpp, tid, call(std::move(p)));
+ }
+
+ void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ if (!new_heading) {
+ if (r == -ERANGE) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new head tid=" << tid << dendl;
+ new_head(dpp, std::move(p));
+ return;
+ }
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " push_entries failed: r=" << r
+ << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+ i = 0; // We've made forward progress, so reset the race counter!
+ prep_then_push(std::move(p), r);
+ } else {
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " prepare_new_head failed: r=" << r
+ << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+ new_heading = false;
+ handle_new_head(std::move(p), r);
+ }
+ }
+
+ void handle_new_head(Ptr&& p, int r) {
+ if (r == -ECANCELED) {
+ if (p->i == MAX_RACE_RETRIES) {
+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ complete(std::move(p), -ECANCELED);
+ return;
+ }
+ ++p->i;
+ } else if (r) {
+ complete(std::move(p), r);
+ return;
+ }
+
+ if (p->batch.empty()) {
+ prep_then_push(std::move(p), 0);
+ return;
+ } else {
+ push(std::move(p));
+ return;
+ }
+ }
+
+ Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque<cb::list>&& remaining,
+ std::uint64_t tid, lr::AioCompletion* super)
+ : Completion(dpp, super), f(f), remaining(std::move(remaining)),
+ tid(tid) {}
+};
+
+void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs,
+ lr::AioCompletion* c)
+{
+ std::unique_lock l(m);
+ auto tid = ++next_tid;
+ auto max_entry_size = info.params.max_entry_size;
+ auto need_new_head = info.need_new_head();
+ l.unlock();
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ auto p = std::make_unique<Pusher>(dpp, this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
+ tid, c);
+ // Validate sizes
+ for (const auto& bl : data_bufs) {
+ if (bl.length() > max_entry_size) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entry bigger than max_entry_size tid=" << tid << dendl;
+ Pusher::complete(std::move(p), -E2BIG);
+ return;
+ }
+ }
+
+ if (data_bufs.empty() ) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " empty push, returning success tid=" << tid << dendl;
+ Pusher::complete(std::move(p), 0);
+ return;
+ }
+
+ if (need_new_head) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " need new head tid=" << tid << dendl;
+ p->new_head(dpp, std::move(p));
+ } else {
+ p->prep_then_push(std::move(p), 0);
+ }
+}
+
+int FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
+ std::optional<std::string_view> markstr,
+ std::vector<list_entry>* presult, bool* pmore,
+ optional_yield y)
+{
+ std::unique_lock l(m);
+ auto tid = ++next_tid;
+ std::int64_t part_num = info.tail_part_num;
+ l.unlock();
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ std::uint64_t ofs = 0;
+ if (markstr) {
+ auto marker = to_marker(*markstr);
+ if (!marker) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " invalid marker string: " << markstr
+ << " tid= "<< tid << dendl;
+ return -EINVAL;
+ }
+ part_num = marker->num;
+ ofs = marker->ofs;
+ }
+
+ std::vector<list_entry> result;
+ result.reserve(max_entries);
+ bool more = false;
+
+ std::vector<fifo::part_list_entry> entries;
+ int r = 0;
+ while (max_entries > 0) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " max_entries=" << max_entries << " tid=" << tid << dendl;
+ bool part_more = false;
+ bool part_full = false;
+
+ std::unique_lock l(m);
+ auto part_oid = info.part_oid(part_num);
+ l.unlock();
+
+ r = list_part(dpp, ioctx, part_oid, {}, ofs, max_entries, &entries,
+ &part_more, &part_full, nullptr, tid, y);
+ if (r == -ENOENT) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " missing part, rereading metadata"
+ << " tid= "<< tid << dendl;
+ r = read_meta(dpp, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: r=" << r
+ << " tid= "<< tid << dendl;
+ return r;
+ }
+ if (part_num < info.tail_part_num) {
+ /* raced with trim? restart */
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced with trim, restarting: tid=" << tid << dendl;
+ max_entries += result.size();
+ result.clear();
+ std::unique_lock l(m);
+ part_num = info.tail_part_num;
+ l.unlock();
+ ofs = 0;
+ continue;
+ }
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " assuming part was not written yet, so end of data: "
+ << "tid=" << tid << dendl;
+ more = false;
+ r = 0;
+ break;
+ }
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " list_entries failed: r=" << r
+ << " tid= "<< tid << dendl;
+ return r;
+ }
+ more = part_full || part_more;
+ for (auto& entry : entries) {
+ list_entry e;
+ e.data = std::move(entry.data);
+ e.marker = marker{part_num, entry.ofs}.to_string();
+ e.mtime = entry.mtime;
+ result.push_back(std::move(e));
+ --max_entries;
+ if (max_entries == 0)
+ break;
+ }
+ entries.clear();
+ if (max_entries > 0 &&
+ part_more) {
+ }
+
+ if (!part_full) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " head part is not full, so we can assume we're done: "
+ << "tid=" << tid << dendl;
+ break;
+ }
+ if (!part_more) {
+ ++part_num;
+ ofs = 0;
+ }
+ }
+ if (presult)
+ *presult = std::move(result);
+ if (pmore)
+ *pmore = more;
+ return 0;
+}
+
+int FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive, optional_yield y)
+{
+ bool overshoot = false;
+ auto marker = to_marker(markstr);
+ if (!marker) {
+ return -EINVAL;
+ }
+ auto part_num = marker->num;
+ auto ofs = marker->ofs;
+ std::unique_lock l(m);
+ auto tid = ++next_tid;
+ auto hn = info.head_part_num;
+ const auto max_part_size = info.params.max_part_size;
+ if (part_num > hn) {
+ l.unlock();
+ auto r = read_meta(dpp, tid, y);
+ if (r < 0) {
+ return r;
+ }
+ l.lock();
+ auto hn = info.head_part_num;
+ if (part_num > hn) {
+ overshoot = true;
+ part_num = hn;
+ ofs = max_part_size;
+ }
+ }
+ if (part_num < info.tail_part_num) {
+ return -ENODATA;
+ }
+ auto pn = info.tail_part_num;
+ l.unlock();
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+
+ int r = 0;
+ while (pn < part_num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " pn=" << pn << " tid=" << tid << dendl;
+ std::unique_lock l(m);
+ l.unlock();
+ r = trim_part(dpp, pn, max_part_size, std::nullopt, false, tid, y);
+ if (r < 0 && r == -ENOENT) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim_part failed: r=" << r
+ << " tid= "<< tid << dendl;
+ return r;
+ }
+ ++pn;
+ }
+ r = trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid, y);
+ if (r < 0 && r != -ENOENT) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " trim_part failed: r=" << r
+ << " tid= "<< tid << dendl;
+ return r;
+ }
+
+ l.lock();
+ auto tail_part_num = info.tail_part_num;
+ auto objv = info.version;
+ l.unlock();
+ bool canceled = tail_part_num < part_num;
+ int retries = 0;
+ while ((tail_part_num < part_num) &&
+ canceled &&
+ (retries <= MAX_RACE_RETRIES)) {
+ r = _update_meta(dpp, fifo::update{}.tail_part_num(part_num), objv, &canceled,
+ tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " _update_meta failed: r=" << r
+ << " tid= "<< tid << dendl;
+ return r;
+ }
+ if (canceled) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled: retries=" << retries
+ << " tid=" << tid << dendl;
+ l.lock();
+ tail_part_num = info.tail_part_num;
+ objv = info.version;
+ l.unlock();
+ ++retries;
+ }
+ }
+ if (canceled) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ return -EIO;
+ }
+ return overshoot ? -ENODATA : 0;
+}
+
+struct Trimmer : public Completion<Trimmer> {
+ FIFO* fifo;
+ std::int64_t part_num;
+ std::uint64_t ofs;
+ std::int64_t pn;
+ bool exclusive;
+ std::uint64_t tid;
+ bool update = false;
+ bool reread = false;
+ bool canceled = false;
+ bool overshoot = false;
+ int retries = 0;
+
+ Trimmer(const DoutPrefixProvider *dpp, FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
+ bool exclusive, lr::AioCompletion* super, std::uint64_t tid)
+ : Completion(dpp, super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn),
+ exclusive(exclusive), tid(tid) {}
+
+ void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+
+ if (reread) {
+ reread = false;
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: r="
+ << r << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+ std::unique_lock l(fifo->m);
+ auto hn = fifo->info.head_part_num;
+ const auto max_part_size = fifo->info.params.max_part_size;
+ const auto tail_part_num = fifo->info.tail_part_num;
+ l.unlock();
+ if (part_num > hn) {
+ part_num = hn;
+ ofs = max_part_size;
+ overshoot = true;
+ }
+ if (part_num < tail_part_num) {
+ complete(std::move(p), -ENODATA);
+ return;
+ }
+ pn = tail_part_num;
+ if (pn < part_num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " pn=" << pn << " tid=" << tid << dendl;
+ fifo->trim_part(pn++, max_part_size, std::nullopt,
+ false, tid, call(std::move(p)));
+ } else {
+ update = true;
+ canceled = tail_part_num < part_num;
+ fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
+ call(std::move(p)));
+ }
+ return;
+ }
+
+ if (r == -ENOENT) {
+ r = 0;
+ }
+
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << (update ? " update_meta " : " trim ") << "failed: r="
+ << r << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+
+ if (!update) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " handling preceding trim callback: tid=" << tid << dendl;
+ retries = 0;
+ if (pn < part_num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " pn=" << pn << " tid=" << tid << dendl;
+ std::unique_lock l(fifo->m);
+ const auto max_part_size = fifo->info.params.max_part_size;
+ l.unlock();
+ fifo->trim_part(pn++, max_part_size, std::nullopt,
+ false, tid, call(std::move(p)));
+ return;
+ }
+
+ std::unique_lock l(fifo->m);
+ const auto tail_part_num = fifo->info.tail_part_num;
+ l.unlock();
+ update = true;
+ canceled = tail_part_num < part_num;
+ fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
+ call(std::move(p)));
+ return;
+ }
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " handling update-needed callback: tid=" << tid << dendl;
+ std::unique_lock l(fifo->m);
+ auto tail_part_num = fifo->info.tail_part_num;
+ auto objv = fifo->info.version;
+ l.unlock();
+ if ((tail_part_num < part_num) &&
+ canceled) {
+ if (retries > MAX_RACE_RETRIES) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid=" << tid << dendl;
+ complete(std::move(p), -EIO);
+ return;
+ }
+ ++retries;
+ fifo->_update_meta(dpp, fifo::update{}
+ .tail_part_num(part_num), objv, &canceled,
+ tid, call(std::move(p)));
+ } else {
+ complete(std::move(p), overshoot ? -ENODATA : 0);
+ }
+ }
+};
+
+void FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool exclusive,
+ lr::AioCompletion* c) {
+ auto marker = to_marker(markstr);
+ auto realmark = marker.value_or(::rgw::cls::fifo::marker{});
+ std::unique_lock l(m);
+ const auto hn = info.head_part_num;
+ const auto max_part_size = info.params.max_part_size;
+ const auto pn = info.tail_part_num;
+ const auto part_oid = info.part_oid(pn);
+ auto tid = ++next_tid;
+ l.unlock();
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ auto trimmer = std::make_unique<Trimmer>(dpp, this, realmark.num, realmark.ofs,
+ pn, exclusive, c, tid);
+ if (!marker) {
+ Trimmer::complete(std::move(trimmer), -EINVAL);
+ return;
+ }
+ ++trimmer->pn;
+ auto ofs = marker->ofs;
+ if (marker->num > hn) {
+ trimmer->reread = true;
+ read_meta(dpp, tid, Trimmer::call(std::move(trimmer)));
+ return;
+ }
+ if (pn < marker->num) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " pn=" << pn << " tid=" << tid << dendl;
+ ofs = max_part_size;
+ } else {
+ trimmer->update = true;
+ }
+ trim_part(pn, ofs, std::nullopt, exclusive,
+ tid, Trimmer::call(std::move(trimmer)));
+}
+
+int FIFO::get_part_info(const DoutPrefixProvider *dpp, int64_t part_num,
+ fifo::part_header* header,
+ optional_yield y)
+{
+ std::unique_lock l(m);
+ const auto part_oid = info.part_oid(part_num);
+ auto tid = ++next_tid;
+ l.unlock();
+ auto r = rgw::cls::fifo::get_part_info(dpp, ioctx, part_oid, header, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " get_part_info failed: r="
+ << r << " tid=" << tid << dendl;
+ }
+ return r;
+}
+
+void FIFO::get_part_info(int64_t part_num,
+ fifo::part_header* header,
+ lr::AioCompletion* c)
+{
+ std::unique_lock l(m);
+ const auto part_oid = info.part_oid(part_num);
+ auto tid = ++next_tid;
+ l.unlock();
+ auto op = rgw::cls::fifo::get_part_info(cct, header, tid);
+ auto r = ioctx.aio_operate(part_oid, c, &op, nullptr);
+ ceph_assert(r >= 0);
+}
+
+struct InfoGetter : Completion<InfoGetter> {
+ FIFO* fifo;
+ fifo::part_header header;
+ fu2::function<void(int r, fifo::part_header&&)> f;
+ std::uint64_t tid;
+ bool headerread = false;
+
+ InfoGetter(const DoutPrefixProvider *dpp, FIFO* fifo, fu2::function<void(int r, fifo::part_header&&)> f,
+ std::uint64_t tid, lr::AioCompletion* super)
+ : Completion(dpp, super), fifo(fifo), f(std::move(f)), tid(tid) {}
+ void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ if (!headerread) {
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: r="
+ << r << " tid=" << tid << dendl;
+ if (f)
+ f(r, {});
+ complete(std::move(p), r);
+ return;
+ }
+
+ auto info = fifo->meta();
+ auto hpn = info.head_part_num;
+ if (hpn < 0) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " no head, returning empty partinfo r="
+ << r << " tid=" << tid << dendl;
+ if (f)
+ f(0, {});
+ complete(std::move(p), r);
+ return;
+ }
+ headerread = true;
+ auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid);
+ std::unique_lock l(fifo->m);
+ auto oid = fifo->info.part_oid(hpn);
+ l.unlock();
+ r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op,
+ nullptr);
+ ceph_assert(r >= 0);
+ return;
+ }
+
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " get_part_info failed: r="
+ << r << " tid=" << tid << dendl;
+ }
+
+ if (f)
+ f(r, std::move(header));
+ complete(std::move(p), r);
+ return;
+ }
+};
+
+void FIFO::get_head_info(const DoutPrefixProvider *dpp, fu2::unique_function<void(int r,
+ fifo::part_header&&)> f,
+ lr::AioCompletion* c)
+{
+ std::unique_lock l(m);
+ auto tid = ++next_tid;
+ l.unlock();
+ auto ig = std::make_unique<InfoGetter>(dpp, this, std::move(f), tid, c);
+ read_meta(dpp, tid, InfoGetter::call(std::move(ig)));
+}
+
+struct JournalProcessor : public Completion<JournalProcessor> {
+private:
+ FIFO* const fifo;
+
+ std::vector<fifo::journal_entry> processed;
+ std::multimap<std::int64_t, fifo::journal_entry> journal;
+ std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
+ std::int64_t new_tail;
+ std::int64_t new_head;
+ std::int64_t new_max;
+ int race_retries = 0;
+ bool first_pp = true;
+ bool canceled = false;
+ std::uint64_t tid;
+
+ enum {
+ entry_callback,
+ pp_callback,
+ } state;
+
+ void create_part(Ptr&& p, int64_t part_num,
+ std::string_view tag) {
+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ state = entry_callback;
+ lr::ObjectWriteOperation op;
+ op.create(false); /* We don't need exclusivity, part_init ensures
+ we're creating from the same journal entry. */
+ std::unique_lock l(fifo->m);
+ part_init(&op, tag, fifo->info.params);
+ auto oid = fifo->info.part_oid(part_num);
+ l.unlock();
+ auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
+ ceph_assert(r >= 0);
+ return;
+ }
+
+ void remove_part(Ptr&& p, int64_t part_num,
+ std::string_view tag) {
+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ state = entry_callback;
+ lr::ObjectWriteOperation op;
+ op.remove();
+ std::unique_lock l(fifo->m);
+ auto oid = fifo->info.part_oid(part_num);
+ l.unlock();
+ auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
+ ceph_assert(r >= 0);
+ return;
+ }
+
+ void finish_je(const DoutPrefixProvider *dpp, Ptr&& p, int r,
+ const fifo::journal_entry& entry) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " finishing entry: entry=" << entry
+ << " tid=" << tid << dendl;
+
+ if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT)
+ r = 0;
+
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing entry failed: entry=" << entry
+ << " r=" << r << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ } else {
+ switch (entry.op) {
+ case fifo::journal_entry::Op::unknown:
+ case fifo::journal_entry::Op::set_head:
+ // Can't happen. Filtered out in process.
+ complete(std::move(p), -EIO);
+ return;
+
+ case fifo::journal_entry::Op::create:
+ if (entry.part_num > new_max) {
+ new_max = entry.part_num;
+ }
+ break;
+ case fifo::journal_entry::Op::remove:
+ if (entry.part_num >= new_tail) {
+ new_tail = entry.part_num + 1;
+ }
+ break;
+ }
+ processed.push_back(entry);
+ }
+ ++iter;
+ process(dpp, std::move(p));
+ }
+
+ void postprocess(const DoutPrefixProvider *dpp, Ptr&& p) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ if (processed.empty()) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " nothing to update any more: race_retries="
+ << race_retries << " tid=" << tid << dendl;
+ complete(std::move(p), 0);
+ return;
+ }
+ pp_run(dpp, std::move(p), 0, false);
+ }
+
+public:
+
+ JournalProcessor(const DoutPrefixProvider *dpp, FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super)
+ : Completion(dpp, super), fifo(fifo), tid(tid) {
+ std::unique_lock l(fifo->m);
+ journal = fifo->info.journal;
+ iter = journal.begin();
+ new_tail = fifo->info.tail_part_num;
+ new_head = fifo->info.head_part_num;
+ new_max = fifo->info.max_push_part_num;
+ }
+
+ void pp_run(const DoutPrefixProvider *dpp, Ptr&& p, int r, bool canceled) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ std::optional<int64_t> tail_part_num;
+ std::optional<int64_t> head_part_num;
+ std::optional<int64_t> max_part_num;
+
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " failed, r=: " << r << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ }
+
+
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " postprocessing: race_retries="
+ << race_retries << " tid=" << tid << dendl;
+
+ if (!first_pp && r == 0 && !canceled) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " nothing to update any more: race_retries="
+ << race_retries << " tid=" << tid << dendl;
+ complete(std::move(p), 0);
+ return;
+ }
+
+ first_pp = false;
+
+ if (canceled) {
+ if (race_retries >= MAX_RACE_RETRIES) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " canceled too many times, giving up: tid="
+ << tid << dendl;
+ complete(std::move(p), -ECANCELED);
+ return;
+ }
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " update canceled, retrying: race_retries="
+ << race_retries << " tid=" << tid << dendl;
+
+ ++race_retries;
+
+ std::vector<fifo::journal_entry> new_processed;
+ std::unique_lock l(fifo->m);
+ for (auto& e : processed) {
+ auto jiter = fifo->info.journal.find(e.part_num);
+ /* journal entry was already processed */
+ if (jiter == fifo->info.journal.end() ||
+ !(jiter->second == e)) {
+ continue;
+ }
+ new_processed.push_back(e);
+ }
+ processed = std::move(new_processed);
+ }
+
+ std::unique_lock l(fifo->m);
+ auto objv = fifo->info.version;
+ if (new_tail > fifo->info.tail_part_num) {
+ tail_part_num = new_tail;
+ }
+
+ if (new_head > fifo->info.head_part_num) {
+ head_part_num = new_head;
+ }
+
+ if (new_max > fifo->info.max_push_part_num) {
+ max_part_num = new_max;
+ }
+ l.unlock();
+
+ if (processed.empty() &&
+ !tail_part_num &&
+ !max_part_num) {
+ /* nothing to update anymore */
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " nothing to update any more: race_retries="
+ << race_retries << " tid=" << tid << dendl;
+ complete(std::move(p), 0);
+ return;
+ }
+ state = pp_callback;
+ fifo->_update_meta(dpp, fifo::update{}
+ .tail_part_num(tail_part_num)
+ .head_part_num(head_part_num)
+ .max_push_part_num(max_part_num)
+ .journal_entries_rm(processed),
+ objv, &this->canceled, tid, call(std::move(p)));
+ return;
+ }
+
+ JournalProcessor(const JournalProcessor&) = delete;
+ JournalProcessor& operator =(const JournalProcessor&) = delete;
+ JournalProcessor(JournalProcessor&&) = delete;
+ JournalProcessor& operator =(JournalProcessor&&) = delete;
+
+ void process(const DoutPrefixProvider *dpp, Ptr&& p) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ while (iter != journal.end()) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " processing entry: entry=" << *iter
+ << " tid=" << tid << dendl;
+ const auto entry = iter->second;
+ switch (entry.op) {
+ case fifo::journal_entry::Op::create:
+ create_part(std::move(p), entry.part_num, entry.part_tag);
+ return;
+ case fifo::journal_entry::Op::set_head:
+ if (entry.part_num > new_head) {
+ new_head = entry.part_num;
+ }
+ processed.push_back(entry);
+ ++iter;
+ continue;
+ case fifo::journal_entry::Op::remove:
+ remove_part(std::move(p), entry.part_num, entry.part_tag);
+ return;
+ default:
+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " unknown journaled op: entry=" << entry << " tid="
+ << tid << dendl;
+ complete(std::move(p), -EIO);
+ return;
+ }
+ }
+ postprocess(dpp, std::move(p));
+ return;
+ }
+
+ void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
+ switch (state) {
+ case entry_callback:
+ finish_je(dpp, std::move(p), r, iter->second);
+ return;
+ case pp_callback:
+ auto c = canceled;
+ canceled = false;
+ pp_run(dpp, std::move(p), r, c);
+ return;
+ }
+
+ abort();
+ }
+
+};
+
+void FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c) {
+ auto p = std::make_unique<JournalProcessor>(dpp, this, tid, c);
+ p->process(dpp, std::move(p));
+}
+
+struct Lister : Completion<Lister> {
+ FIFO* f;
+ std::vector<list_entry> result;
+ bool more = false;
+ std::int64_t part_num;
+ std::uint64_t ofs;
+ int max_entries;
+ int r_out = 0;
+ std::vector<fifo::part_list_entry> entries;
+ bool part_more = false;
+ bool part_full = false;
+ std::vector<list_entry>* entries_out;
+ bool* more_out;
+ std::uint64_t tid;
+
+ bool read = false;
+
+ void complete(Ptr&& p, int r) {
+ if (r >= 0) {
+ if (more_out) *more_out = more;
+ if (entries_out) *entries_out = std::move(result);
+ }
+ Completion::complete(std::move(p), r);
+ }
+
+public:
+ Lister(const DoutPrefixProvider *dpp, FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
+ std::vector<list_entry>* entries_out, bool* more_out,
+ std::uint64_t tid, lr::AioCompletion* super)
+ : Completion(dpp, super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
+ entries_out(entries_out), more_out(more_out), tid(tid) {
+ result.reserve(max_entries);
+ }
+
+ Lister(const Lister&) = delete;
+ Lister& operator =(const Lister&) = delete;
+ Lister(Lister&&) = delete;
+ Lister& operator =(Lister&&) = delete;
+
+ void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ if (read)
+ handle_read(std::move(p), r);
+ else
+ handle_list(dpp, std::move(p), r);
+ }
+
+ void list(Ptr&& p) {
+ if (max_entries > 0) {
+ part_more = false;
+ part_full = false;
+ entries.clear();
+
+ std::unique_lock l(f->m);
+ auto part_oid = f->info.part_oid(part_num);
+ l.unlock();
+
+ read = false;
+ auto op = list_part(f->cct, {}, ofs, max_entries, &r_out,
+ &entries, &part_more, &part_full,
+ nullptr, tid);
+ f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr);
+ } else {
+ complete(std::move(p), 0);
+ }
+ }
+
+ void handle_read(Ptr&& p, int r) {
+ read = false;
+ if (r >= 0) r = r_out;
+ r_out = 0;
+
+ if (r < 0) {
+ complete(std::move(p), r);
+ return;
+ }
+
+ if (part_num < f->info.tail_part_num) {
+ /* raced with trim? restart */
+ max_entries += result.size();
+ result.clear();
+ part_num = f->info.tail_part_num;
+ ofs = 0;
+ list(std::move(p));
+ return;
+ }
+ /* assuming part was not written yet, so end of data */
+ more = false;
+ complete(std::move(p), 0);
+ return;
+ }
+
+ void handle_list(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ if (r >= 0) r = r_out;
+ r_out = 0;
+ std::unique_lock l(f->m);
+ auto part_oid = f->info.part_oid(part_num);
+ l.unlock();
+ if (r == -ENOENT) {
+ read = true;
+ f->read_meta(dpp, tid, call(std::move(p)));
+ return;
+ }
+ if (r < 0) {
+ complete(std::move(p), r);
+ return;
+ }
+
+ more = part_full || part_more;
+ for (auto& entry : entries) {
+ list_entry e;
+ e.data = std::move(entry.data);
+ e.marker = marker{part_num, entry.ofs}.to_string();
+ e.mtime = entry.mtime;
+ result.push_back(std::move(e));
+ }
+ max_entries -= entries.size();
+ entries.clear();
+ if (max_entries > 0 && part_more) {
+ list(std::move(p));
+ return;
+ }
+
+ if (!part_full) { /* head part is not full */
+ complete(std::move(p), 0);
+ return;
+ }
+ ++part_num;
+ ofs = 0;
+ list(std::move(p));
+ }
+};
+
+void FIFO::list(const DoutPrefixProvider *dpp, int max_entries,
+ std::optional<std::string_view> markstr,
+ std::vector<list_entry>* out,
+ bool* more,
+ lr::AioCompletion* c) {
+ std::unique_lock l(m);
+ auto tid = ++next_tid;
+ std::int64_t part_num = info.tail_part_num;
+ l.unlock();
+ std::uint64_t ofs = 0;
+ std::optional<::rgw::cls::fifo::marker> marker;
+
+ if (markstr) {
+ marker = to_marker(*markstr);
+ if (marker) {
+ part_num = marker->num;
+ ofs = marker->ofs;
+ }
+ }
+
+ auto ls = std::make_unique<Lister>(dpp, this, part_num, ofs, max_entries, out,
+ more, tid, c);
+ if (markstr && !marker) {
+ auto l = ls.get();
+ l->complete(std::move(ls), -EINVAL);
+ } else {
+ ls->list(std::move(ls));
+ }
+}
+}