From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/neorados/cls/fifo.cc | 385 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 385 insertions(+) create mode 100644 src/neorados/cls/fifo.cc (limited to 'src/neorados/cls/fifo.cc') diff --git a/src/neorados/cls/fifo.cc b/src/neorados/cls/fifo.cc new file mode 100644 index 000000000..fa99275b2 --- /dev/null +++ b/src/neorados/cls/fifo.cc @@ -0,0 +1,385 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include + +#include + +#include "include/neorados/RADOS.hpp" + +#include "include/buffer.h" + +#include "common/random_string.h" + +#include "cls/fifo/cls_fifo_types.h" +#include "cls/fifo/cls_fifo_ops.h" + +#include "fifo.h" + +namespace neorados::cls::fifo { +namespace bs = boost::system; +namespace cb = ceph::buffer; +namespace fifo = rados::cls::fifo; + +void create_meta(WriteOp& op, std::string_view id, + std::optional objv, + std::optional oid_prefix, + bool exclusive, + std::uint64_t max_part_size, + std::uint64_t max_entry_size) +{ + fifo::op::create_meta cm; + + cm.id = id; + cm.version = objv; + cm.oid_prefix = oid_prefix; + cm.max_part_size = max_part_size; + cm.max_entry_size = max_entry_size; + cm.exclusive = exclusive; + + cb::list in; + encode(cm, in); + op.exec(fifo::op::CLASS, fifo::op::CREATE_META, in); +} + +void get_meta(ReadOp& op, std::optional objv, + bs::error_code* ec_out, fifo::info* info, + std::uint32_t* part_header_size, + std::uint32_t* part_entry_overhead) +{ + fifo::op::get_meta gm; + gm.version = objv; + cb::list in; + encode(gm, in); + op.exec(fifo::op::CLASS, fifo::op::GET_META, in, + [ec_out, info, part_header_size, + part_entry_overhead](bs::error_code ec, const cb::list& bl) { + fifo::op::get_meta_reply reply; + if (!ec) try { + auto iter = bl.cbegin(); + decode(reply, iter); + } catch (const cb::error& err) { + ec = err.code(); + } + if (ec_out) *ec_out = ec; + if (info) *info = std::move(reply.info); + if (part_header_size) *part_header_size = reply.part_header_size; + if (part_entry_overhead) + *part_entry_overhead = reply.part_entry_overhead; + }); +}; + +void update_meta(WriteOp& op, const fifo::objv& objv, + const fifo::update& update) +{ + fifo::op::update_meta um; + + um.version = objv; + um.tail_part_num = update.tail_part_num(); + um.head_part_num = update.head_part_num(); + um.min_push_part_num = update.min_push_part_num(); + um.max_push_part_num = update.max_push_part_num(); + um.journal_entries_add = std::move(update).journal_entries_add(); + um.journal_entries_rm = std::move(update).journal_entries_rm(); + + cb::list in; + encode(um, in); + op.exec(fifo::op::CLASS, fifo::op::UPDATE_META, in); +} + +void part_init(WriteOp& op, std::string_view tag, + fifo::data_params params) +{ + fifo::op::init_part ip; + + ip.tag = tag; + ip.params = params; + + cb::list in; + encode(ip, in); + op.exec(fifo::op::CLASS, fifo::op::INIT_PART, in); +} + +void push_part(WriteOp& op, std::string_view tag, + std::deque data_bufs, + fu2::unique_function f) +{ + fifo::op::push_part pp; + + pp.tag = tag; + pp.data_bufs = data_bufs; + pp.total_len = 0; + + for (const auto& bl : data_bufs) + pp.total_len += bl.length(); + + cb::list in; + encode(pp, in); + op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in, + [f = std::move(f)](bs::error_code ec, int r, const cb::list&) mutable { + std::move(f)(ec, r); + }); + op.returnvec(); +} + +void trim_part(WriteOp& op, + std::optional tag, + std::uint64_t ofs, bool exclusive) +{ + fifo::op::trim_part tp; + + tp.tag = tag; + tp.ofs = ofs; + tp.exclusive = exclusive; + + bufferlist in; + encode(tp, in); + op.exec(fifo::op::CLASS, fifo::op::TRIM_PART, in); +} + +void list_part(ReadOp& op, + std::optional tag, + std::uint64_t ofs, + std::uint64_t max_entries, + bs::error_code* ec_out, + std::vector* entries, + bool* more, + bool* full_part, + std::string* ptag) +{ + fifo::op::list_part lp; + + lp.tag = tag; + lp.ofs = ofs; + lp.max_entries = max_entries; + + bufferlist in; + encode(lp, in); + op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, + [entries, more, full_part, ptag, ec_out](bs::error_code ec, + const cb::list& bl) { + if (ec) { + if (ec_out) *ec_out = ec; + return; + } + + fifo::op::list_part_reply reply; + auto iter = bl.cbegin(); + try { + decode(reply, iter); + } catch (const cb::error& err) { + if (ec_out) *ec_out = ec; + return; + } + + if (entries) *entries = std::move(reply.entries); + if (more) *more = reply.more; + if (full_part) *full_part = reply.full_part; + if (ptag) *ptag = reply.tag; + }); +} + +void get_part_info(ReadOp& op, + bs::error_code* out_ec, + fifo::part_header* header) +{ + fifo::op::get_part_info gpi; + + bufferlist in; + encode(gpi, in); + op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, + [out_ec, header](bs::error_code ec, const cb::list& bl) { + if (ec) { + if (out_ec) *out_ec = ec; + } + fifo::op::get_part_info_reply reply; + auto iter = bl.cbegin(); + try { + decode(reply, iter); + } catch (const cb::error& err) { + if (out_ec) *out_ec = ec; + return; + } + + if (header) *header = std::move(reply.header); + }); +} + +std::optional FIFO::to_marker(std::string_view s) { + marker m; + if (s.empty()) { + m.num = info.tail_part_num; + m.ofs = 0; + return m; + } + + auto pos = s.find(':'); + if (pos == string::npos) { + return std::nullopt; + } + + auto num = s.substr(0, pos); + auto ofs = s.substr(pos + 1); + + auto n = ceph::parse(num); + if (!n) { + return std::nullopt; + } + m.num = *n; + auto o = ceph::parse(ofs); + if (!o) { + return std::nullopt; + } + m.ofs = *o; + return m; +} + +bs::error_code FIFO::apply_update(fifo::info* info, + const fifo::objv& objv, + const fifo::update& update) { + std::unique_lock l(m); + auto err = info->apply_update(update); + if (objv != info->version) { + ldout(r->cct(), 0) << __func__ << "(): Raced locally!" << dendl; + return errc::raced; + } + if (err) { + ldout(r->cct(), 0) << __func__ << "(): ERROR: " << err << dendl; + return errc::update_failed; + } + + ++info->version.ver; + + return {}; +} + +std::string FIFO::generate_tag() const +{ + static constexpr auto HEADER_TAG_SIZE = 16; + return gen_rand_alphanumeric_plain(r->cct(), HEADER_TAG_SIZE); +} + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wnon-virtual-dtor" +class error_category : public ceph::converting_category { +public: + error_category(){} + const char* name() const noexcept override; + const char* message(int ev, char*, std::size_t) const noexcept override; + std::string message(int ev) const override; + bs::error_condition default_error_condition(int ev) const noexcept + override; + bool equivalent(int ev, const bs::error_condition& c) const + noexcept override; + using ceph::converting_category::equivalent; + int from_code(int ev) const noexcept override; +}; +#pragma GCC diagnostic pop +#pragma clang diagnostic pop + +const char* error_category::name() const noexcept { + return "FIFO"; +} + +const char* error_category::message(int ev, char*, std::size_t) const noexcept { + if (ev == 0) + return "No error"; + + switch (static_cast(ev)) { + case errc::raced: + return "Retry-race count exceeded"; + + case errc::inconsistency: + return "Inconsistent result! New head before old head"; + + case errc::entry_too_large: + return "Pushed entry too large"; + + case errc::invalid_marker: + return "Invalid marker string"; + + case errc::update_failed: + return "Update failed"; + } + + return "Unknown error"; +} + +std::string error_category::message(int ev) const { + return message(ev, nullptr, 0); +} + +bs::error_condition +error_category::default_error_condition(int ev) const noexcept { + switch (static_cast(ev)) { + case errc::raced: + return bs::errc::operation_canceled; + + case errc::inconsistency: + return bs::errc::io_error; + + case errc::entry_too_large: + return bs::errc::value_too_large; + + case errc::invalid_marker: + return bs::errc::invalid_argument; + + case errc::update_failed: + return bs::errc::invalid_argument; + } + + return { ev, *this }; +} + +bool error_category::equivalent(int ev, const bs::error_condition& c) const noexcept { + return default_error_condition(ev) == c; +} + +int error_category::from_code(int ev) const noexcept { + switch (static_cast(ev)) { + case errc::raced: + return -ECANCELED; + + case errc::inconsistency: + return -EIO; + + case errc::entry_too_large: + return -E2BIG; + + case errc::invalid_marker: + return -EINVAL; + + case errc::update_failed: + return -EINVAL; + + } + return -EDOM; +} + +const bs::error_category& error_category() noexcept { + static const class error_category c; + return c; +} + +} -- cgit v1.2.3