summaryrefslogtreecommitdiffstats
path: root/src/cls/fifo
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/cls/fifo
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/cls/fifo')
-rw-r--r--src/cls/fifo/cls_fifo.cc1000
-rw-r--r--src/cls/fifo/cls_fifo_ops.h306
-rw-r--r--src/cls/fifo/cls_fifo_types.h524
3 files changed, 1830 insertions, 0 deletions
diff --git a/src/cls/fifo/cls_fifo.cc b/src/cls/fifo/cls_fifo.cc
new file mode 100644
index 000000000..14313a735
--- /dev/null
+++ b/src/cls/fifo/cls_fifo.cc
@@ -0,0 +1,1000 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/** \file
+ *
+ * This is an OSD class that implements methods for management
+ * and use of fifo
+ *
+ */
+
+#include <cerrno>
+#include <optional>
+#include <string>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/format.h>
+
+#include "include/buffer.h"
+#include "include/types.h"
+
+#include "objclass/objclass.h"
+
+#include "cls/fifo/cls_fifo_ops.h"
+#include "cls/fifo/cls_fifo_types.h"
+
+CLS_VER(1,0)
+CLS_NAME(fifo)
+
+namespace rados::cls::fifo {
+
+static constexpr auto CLS_FIFO_MAX_PART_HEADER_SIZE = 512;
+
+static std::uint32_t part_entry_overhead;
+
+struct entry_header_pre {
+ ceph_le64 magic;
+ ceph_le64 pre_size;
+ ceph_le64 header_size;
+ ceph_le64 data_size;
+ ceph_le64 index;
+ ceph_le32 reserved;
+} __attribute__ ((packed));
+
+struct entry_header {
+ ceph::real_time mtime;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(mtime, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(mtime, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(entry_header)
+
+namespace {
+
+std::string new_oid_prefix(std::string id, std::optional<std::string>& val)
+{
+ static constexpr auto PREFIX_RND_SIZE = 12;
+ if (val) {
+ return *val;
+ }
+
+ char buf[PREFIX_RND_SIZE + 1];
+ buf[PREFIX_RND_SIZE] = 0;
+
+ cls_gen_rand_base64(buf, sizeof(buf) - 1);
+
+ return fmt::format("{}.{}", id, buf);
+}
+
+int write_header(cls_method_context_t hctx,
+ info& header,
+ bool inc_ver = true)
+{
+ static constexpr auto HEADER_INSTANCE_SIZE = 16;
+ if (header.version.instance.empty()) {
+ char buf[HEADER_INSTANCE_SIZE + 1];
+ buf[HEADER_INSTANCE_SIZE] = 0;
+ cls_gen_rand_base64(buf, sizeof(buf) - 1);
+ header.version.instance = buf;
+ }
+ if (inc_ver) {
+ ++header.version.ver;
+ }
+ ceph::buffer::list bl;
+ encode(header, bl);
+ return cls_cxx_write_full(hctx, &bl);
+}
+
+int read_part_header(cls_method_context_t hctx,
+ part_header* part_header)
+{
+ ceph::buffer::list bl;
+ int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl,
+ CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ auto iter = bl.cbegin();
+ try {
+ decode(*part_header, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed decoding part header", __PRETTY_FUNCTION__);
+ return -EIO;
+ }
+
+ using ceph::operator <<;
+ std::ostringstream ss;
+ ss << part_header->max_time;
+ CLS_LOG(5, "%s:%d read part_header:\n"
+ "\ttag=%s\n"
+ "\tmagic=0x%" PRIx64 "\n"
+ "\tmin_ofs=%" PRId64 "\n"
+ "\tlast_ofs=%" PRId64 "\n"
+ "\tnext_ofs=%" PRId64 "\n"
+ "\tmin_index=%" PRId64 "\n"
+ "\tmax_index=%" PRId64 "\n"
+ "\tmax_time=%s\n",
+ __PRETTY_FUNCTION__, __LINE__,
+ part_header->tag.c_str(),
+ part_header->magic,
+ part_header->min_ofs,
+ part_header->last_ofs,
+ part_header->next_ofs,
+ part_header->min_index,
+ part_header->max_index,
+ ss.str().c_str());
+
+ return 0;
+}
+
+int write_part_header(cls_method_context_t hctx,
+ part_header& part_header)
+{
+ ceph::buffer::list bl;
+ encode(part_header, bl);
+
+ if (bl.length() > CLS_FIFO_MAX_PART_HEADER_SIZE) {
+ CLS_ERR("%s: cannot write part header, buffer exceeds max size", __PRETTY_FUNCTION__);
+ return -EIO;
+ }
+
+ int r = cls_cxx_write2(hctx, 0, bl.length(),
+ &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write part header: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+int read_header(cls_method_context_t hctx,
+ std::optional<objv> objv,
+ info* info, bool get_info = false)
+{
+ std::uint64_t size;
+
+ int r = cls_cxx_stat2(hctx, &size, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ ceph::buffer::list bl;
+ r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ if (r == 0) {
+ if (get_info) {
+ CLS_LOG(5, "%s: Zero length object, likely probe, returning ENODATA", __PRETTY_FUNCTION__);
+ } else {
+ CLS_ERR("ERROR: %s: Zero length object, returning ENODATA", __PRETTY_FUNCTION__);
+ }
+ return -ENODATA;
+ }
+
+ try {
+ auto iter = bl.cbegin();
+ decode(*info, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed decoding header", __PRETTY_FUNCTION__);
+ return -EIO;
+ }
+
+ if (objv && !(info->version== *objv)) {
+ auto s1 = info->version.to_str();
+ auto s2 = objv->to_str();
+ CLS_ERR("%s: version mismatch (header=%s, req=%s), canceled operation",
+ __PRETTY_FUNCTION__, s1.c_str(), s2.c_str());
+ return -ECANCELED;
+ }
+
+ return 0;
+}
+
+int create_meta(cls_method_context_t hctx,
+ ceph::buffer::list* in, ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::create_meta op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+ err.what());
+ return -EINVAL;
+ }
+
+ if (op.id.empty()) {
+ CLS_ERR("%s: ID cannot be empty", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ if (op.max_part_size == 0 ||
+ op.max_entry_size == 0 ||
+ op.max_entry_size > op.max_part_size) {
+ CLS_ERR("ERROR: %s: invalid dimensions.", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ std::uint64_t size;
+
+ int r = cls_cxx_stat2(hctx, &size, nullptr);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ if (op.exclusive && r == 0) {
+ CLS_ERR("%s: exclusive create but queue already exists",
+ __PRETTY_FUNCTION__);
+ return -EEXIST;
+ }
+
+ if (r == 0) {
+ CLS_LOG(5, "%s: FIFO already exists, reading from disk and comparing.",
+ __PRETTY_FUNCTION__);
+ ceph::buffer::list bl;
+ r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ info header;
+ try {
+ auto iter = bl.cbegin();
+ decode(header, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed decoding header: %s",
+ __PRETTY_FUNCTION__, err.what());
+ return -EIO;
+ }
+
+ if (!(header.id == op.id &&
+ (!op.oid_prefix ||
+ header.oid_prefix == *op.oid_prefix) &&
+ (!op.version ||
+ header.version == *op.version))) {
+ CLS_ERR("%s: failed to re-create existing queue "
+ "with different params", __PRETTY_FUNCTION__);
+ return -EEXIST;
+ }
+
+ return 0; /* already exists */
+ }
+ info header;
+
+ header.id = op.id;
+ if (op.version) {
+ header.version = *op.version;
+ } else {
+ static constexpr auto DEFAULT_INSTANCE_SIZE = 16;
+ char buf[DEFAULT_INSTANCE_SIZE + 1];
+ cls_gen_rand_base64(buf, sizeof(buf));
+ buf[DEFAULT_INSTANCE_SIZE] = '\0';
+ header.version.instance = buf;
+ header.version.ver = 1;
+ }
+ header.oid_prefix = new_oid_prefix(op.id, op.oid_prefix);
+
+ header.params.max_part_size = op.max_part_size;
+ header.params.max_entry_size = op.max_entry_size;
+ header.params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead;
+
+ r = write_header(hctx, header, false);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+int update_meta(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::update_meta op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ if (op.version.empty()) {
+ CLS_ERR("%s: no version supplied", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ info header;
+
+ int r = read_header(hctx, op.version, &header);
+ if (r < 0) {
+ return r;
+ }
+
+ auto u = fifo::update().tail_part_num(op.tail_part_num)
+ .head_part_num(op.head_part_num)
+ .min_push_part_num(op.min_push_part_num)
+ .max_push_part_num(op.max_push_part_num)
+ .journal_entries_add(
+ std::move(op.journal_entries_add))
+ .journal_entries_rm(
+ std::move(op.journal_entries_rm));
+
+ auto err = header.apply_update(u);
+ if (err) {
+ std::ostringstream ss;
+ ss << u;
+ CLS_ERR("%s: %s: %s", __PRETTY_FUNCTION__, err->c_str(),
+ ss.str().c_str());
+ return -EINVAL;
+ }
+
+ r = write_header(hctx, header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+int get_meta(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::get_meta op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ op::get_meta_reply reply;
+ int r = read_header(hctx, op.version, &reply.info, true);
+ if (r < 0) {
+ return r;
+ }
+
+ reply.part_header_size = CLS_FIFO_MAX_PART_HEADER_SIZE;
+ reply.part_entry_overhead = part_entry_overhead;
+
+ encode(reply, *out);
+
+ return 0;
+}
+
+int init_part(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list *out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::init_part op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ std::uint64_t size;
+
+ if (op.tag.empty()) {
+ CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ int r = cls_cxx_stat2(hctx, &size, nullptr);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ if (r == 0 && size > 0) {
+ part_header part_header;
+ r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ if (!(part_header.tag == op.tag &&
+ part_header.params == op.params)) {
+ CLS_ERR("%s: failed to re-create existing part with different "
+ "params", __PRETTY_FUNCTION__);
+ return -EEXIST;
+ }
+
+ return 0; /* already exists */
+ }
+
+ part_header part_header;
+
+ part_header.tag = op.tag;
+ part_header.params = op.params;
+
+ part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE;
+ part_header.last_ofs = 0;
+ part_header.next_ofs = part_header.min_ofs;
+ part_header.max_time = ceph::real_clock::now();
+
+ cls_gen_random_bytes(reinterpret_cast<char *>(&part_header.magic),
+ sizeof(part_header.magic));
+
+ r = write_part_header(hctx, part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+bool full_part(const part_header& part_header)
+{
+ return (part_header.next_ofs > part_header.params.full_size_threshold);
+}
+
+int push_part(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::push_part op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ if (op.tag.empty()) {
+ CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ part_header part_header;
+ int r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ if (!(part_header.tag == op.tag)) {
+ CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ std::uint64_t effective_len = op.total_len + op.data_bufs.size() *
+ part_entry_overhead;
+
+ if (effective_len > part_header.params.max_part_size) {
+ return -EINVAL;
+ }
+
+ if (full_part(part_header)) {
+ return -ERANGE;
+ }
+
+ auto now = ceph::real_clock::now();
+ struct entry_header entry_header = { now };
+ ceph::buffer::list entry_header_bl;
+ encode(entry_header, entry_header_bl);
+
+ auto max_index = part_header.max_index;
+ const auto write_ofs = part_header.next_ofs;
+ auto ofs = part_header.next_ofs;
+
+ entry_header_pre pre_header;
+ pre_header.magic = part_header.magic;
+ pre_header.pre_size = sizeof(pre_header);
+ pre_header.reserved = 0;
+
+ std::uint64_t total_data = 0;
+ for (auto& data : op.data_bufs) {
+ total_data += data.length();
+ }
+ if (total_data != op.total_len) {
+ CLS_ERR("%s: length mismatch: op.total_len=%" PRId64
+ " total data received=%" PRId64,
+ __PRETTY_FUNCTION__, op.total_len, total_data);
+ return -EINVAL;
+ }
+
+
+ int entries_pushed = 0;
+ ceph::buffer::list all_data;
+ for (auto& data : op.data_bufs) {
+ if (full_part(part_header))
+ break;
+
+ pre_header.header_size = entry_header_bl.length();
+ pre_header.data_size = data.length();
+ pre_header.index = max_index;
+
+ bufferptr pre(reinterpret_cast<char*>(&pre_header), sizeof(pre_header));
+ auto entry_write_len = pre.length() + entry_header_bl.length() + data.length();
+ all_data.append(pre);
+ all_data.append(entry_header_bl);
+ all_data.claim_append(data);
+
+ part_header.last_ofs = ofs;
+ ofs += entry_write_len;
+ ++max_index;
+ ++entries_pushed;
+ part_header.max_index = max_index;
+ part_header.next_ofs = ofs;
+ }
+ part_header.max_time = now;
+
+ auto write_len = all_data.length();
+
+ r = cls_cxx_write2(hctx, write_ofs, write_len,
+ &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+
+ if (r < 0) {
+ CLS_ERR("%s: failed to write entries (ofs=%" PRIu64
+ " len=%u): r=%d", __PRETTY_FUNCTION__, write_ofs,
+ write_len, r);
+ return r;
+ }
+
+
+ r = write_part_header(hctx, part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ if (entries_pushed == 0) {
+ CLS_ERR("%s: pushed no entries? Can't happen!", __PRETTY_FUNCTION__);
+ return -EFAULT;
+ }
+
+ return entries_pushed;
+}
+
+class EntryReader {
+ static constexpr std::uint64_t prefetch_len = (128 * 1024);
+
+ cls_method_context_t hctx;
+
+ const fifo::part_header& part_header;
+
+ std::uint64_t ofs;
+ ceph::buffer::list data;
+
+ int fetch(std::uint64_t num_bytes);
+ int read(std::uint64_t num_bytes, ceph::buffer::list* pbl);
+ int peek(std::uint64_t num_bytes, char *dest);
+ int seek(std::uint64_t num_bytes);
+
+public:
+ EntryReader(cls_method_context_t hctx,
+ const fifo::part_header& part_header,
+ uint64_t ofs) : hctx(hctx),
+ part_header(part_header),
+ ofs(ofs < part_header.min_ofs ?
+ part_header.min_ofs :
+ ofs) {}
+
+ std::uint64_t get_ofs() const {
+ return ofs;
+ }
+
+ bool end() const {
+ return (ofs >= part_header.next_ofs);
+ }
+
+ int peek_pre_header(entry_header_pre* pre_header);
+ int get_next_entry(ceph::buffer::list* pbl,
+ std::uint64_t* pofs,
+ ceph::real_time* pmtime);
+};
+
+
+int EntryReader::fetch(std::uint64_t num_bytes)
+{
+ CLS_LOG(5, "%s: fetch %d bytes, ofs=%d data.length()=%d", __PRETTY_FUNCTION__, (int)num_bytes, (int)ofs, (int)data.length());
+ if (data.length() < num_bytes) {
+ ceph::buffer::list bl;
+ CLS_LOG(5, "%s: reading % " PRId64 " bytes at ofs=%" PRId64, __PRETTY_FUNCTION__,
+ prefetch_len, ofs + data.length());
+ int r = cls_cxx_read2(hctx, ofs + data.length(), prefetch_len, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ data.claim_append(bl);
+ }
+
+ if (static_cast<unsigned>(num_bytes) > data.length()) {
+ CLS_ERR("%s: requested %" PRId64 " bytes, but only "
+ "%u were available", __PRETTY_FUNCTION__, num_bytes, data.length());
+ return -ERANGE;
+ }
+
+ return 0;
+}
+
+int EntryReader::read(std::uint64_t num_bytes, ceph::buffer::list* pbl)
+{
+ int r = fetch(num_bytes);
+ if (r < 0) {
+ return r;
+ }
+ data.splice(0, num_bytes, pbl);
+
+ ofs += num_bytes;
+
+ return 0;
+}
+
+int EntryReader::peek(std::uint64_t num_bytes, char* dest)
+{
+ int r = fetch(num_bytes);
+ if (r < 0) {
+ return r;
+ }
+
+ data.begin().copy(num_bytes, dest);
+
+ return 0;
+}
+
+int EntryReader::seek(std::uint64_t num_bytes)
+{
+ ceph::buffer::list bl;
+
+ CLS_LOG(5, "%s:%d: num_bytes=%" PRIu64, __PRETTY_FUNCTION__, __LINE__, num_bytes);
+ return read(num_bytes, &bl);
+}
+
+int EntryReader::peek_pre_header(entry_header_pre* pre_header)
+{
+ if (end()) {
+ return -ENOENT;
+ }
+
+ int r = peek(sizeof(*pre_header),
+ reinterpret_cast<char*>(pre_header));
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: peek() size=%zu failed: r=%d", __PRETTY_FUNCTION__,
+ sizeof(pre_header), r);
+ return r;
+ }
+
+ if (pre_header->magic != part_header.magic) {
+ CLS_ERR("ERROR: %s: unexpected pre_header magic", __PRETTY_FUNCTION__);
+ return -ERANGE;
+ }
+
+ return 0;
+}
+
+
+int EntryReader::get_next_entry(ceph::buffer::list* pbl,
+ std::uint64_t* pofs,
+ ceph::real_time* pmtime)
+{
+ entry_header_pre pre_header;
+ int r = peek_pre_header(&pre_header);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: peek_pre_header() failed: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ if (pofs) {
+ *pofs = ofs;
+ }
+
+ CLS_LOG(5, "%s:%d: pre_header.pre_size=%" PRIu64, __PRETTY_FUNCTION__, __LINE__,
+ uint64_t(pre_header.pre_size));
+ r = seek(pre_header.pre_size);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ ceph::buffer::list header;
+ CLS_LOG(5, "%s:%d: pre_header.header_size=%d", __PRETTY_FUNCTION__, __LINE__, (int)pre_header.header_size);
+ r = read(pre_header.header_size, &header);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to read entry header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ entry_header entry_header;
+ auto iter = header.cbegin();
+ try {
+ decode(entry_header, iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_ERR("%s: failed decoding entry header", __PRETTY_FUNCTION__);
+ return -EIO;
+ }
+
+ if (pmtime) {
+ *pmtime = entry_header.mtime;
+ }
+
+ if (pbl) {
+ r = read(pre_header.data_size, pbl);
+ if (r < 0) {
+ CLS_ERR("%s: failed reading data: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ } else {
+ r = seek(pre_header.data_size);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ }
+
+ return 0;
+}
+
+int trim_part(cls_method_context_t hctx,
+ ceph::buffer::list *in, ceph::buffer::list *out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::trim_part op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ part_header part_header;
+ int r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ if (op.tag &&
+ !(part_header.tag == *op.tag)) {
+ CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ if (op.ofs < part_header.min_ofs) {
+ return 0;
+ }
+ if (op.exclusive && op.ofs == part_header.min_ofs) {
+ return 0;
+ }
+
+ if (op.ofs >= part_header.next_ofs) {
+ if (full_part(part_header)) {
+ /*
+ * trim full part completely: remove object
+ */
+
+ r = cls_cxx_remove(hctx);
+ if (r < 0) {
+ CLS_ERR("%s: ERROR: cls_cxx_remove() returned r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+ }
+
+ part_header.min_ofs = part_header.next_ofs;
+ part_header.min_index = part_header.max_index;
+ } else {
+ EntryReader reader(hctx, part_header, op.ofs);
+
+ entry_header_pre pre_header;
+ int r = reader.peek_pre_header(&pre_header);
+ if (r < 0) {
+ return r;
+ }
+
+ if (op.exclusive) {
+ part_header.min_index = pre_header.index;
+ } else {
+ r = reader.get_next_entry(nullptr, nullptr, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ part_header.min_index = pre_header.index + 1;
+ }
+
+ part_header.min_ofs = reader.get_ofs();
+ }
+
+ r = write_part_header(hctx, part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+int list_part(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::list_part op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ part_header part_header;
+ int r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ if (op.tag &&
+ !(part_header.tag == *op.tag)) {
+ CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ EntryReader reader(hctx, part_header, op.ofs);
+
+ if (op.ofs >= part_header.min_ofs &&
+ !reader.end()) {
+ r = reader.get_next_entry(nullptr, nullptr, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ }
+
+ op::list_part_reply reply;
+
+ reply.tag = part_header.tag;
+
+ auto max_entries = std::min(op.max_entries, op::MAX_LIST_ENTRIES);
+
+ for (int i = 0; i < max_entries && !reader.end(); ++i) {
+ ceph::buffer::list data;
+ ceph::real_time mtime;
+ std::uint64_t ofs;
+
+ r = reader.get_next_entry(&data, &ofs, &mtime);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ reply.entries.emplace_back(std::move(data), ofs, mtime);
+ }
+
+ reply.more = !reader.end();
+ reply.full_part = full_part(part_header);
+
+ encode(reply, *out);
+
+ return 0;
+}
+
+int get_part_info(cls_method_context_t hctx, ceph::buffer::list *in,
+ ceph::buffer::list *out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::get_part_info op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ op::get_part_info_reply reply;
+
+ int r = read_part_header(hctx, &reply.header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ encode(reply, *out);
+
+ return 0;
+}
+}
+} // namespace rados::cls::fifo
+
+CLS_INIT(fifo)
+{
+ using namespace rados::cls::fifo;
+ CLS_LOG(10, "Loaded fifo class!");
+
+ cls_handle_t h_class;
+ cls_method_handle_t h_create_meta;
+ cls_method_handle_t h_get_meta;
+ cls_method_handle_t h_update_meta;
+ cls_method_handle_t h_init_part;
+ cls_method_handle_t h_push_part;
+ cls_method_handle_t h_trim_part;
+ cls_method_handle_t h_list_part;
+ cls_method_handle_t h_get_part_info;
+
+ cls_register(op::CLASS, &h_class);
+ cls_register_cxx_method(h_class, op::CREATE_META,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ create_meta, &h_create_meta);
+
+ cls_register_cxx_method(h_class, op::GET_META,
+ CLS_METHOD_RD,
+ get_meta, &h_get_meta);
+
+ cls_register_cxx_method(h_class, op::UPDATE_META,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ update_meta, &h_update_meta);
+
+ cls_register_cxx_method(h_class, op::INIT_PART,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ init_part, &h_init_part);
+
+ cls_register_cxx_method(h_class, op::PUSH_PART,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ push_part, &h_push_part);
+
+ cls_register_cxx_method(h_class, op::TRIM_PART,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ trim_part, &h_trim_part);
+
+ cls_register_cxx_method(h_class, op::LIST_PART,
+ CLS_METHOD_RD,
+ list_part, &h_list_part);
+
+ cls_register_cxx_method(h_class, op::GET_PART_INFO,
+ CLS_METHOD_RD,
+ get_part_info, &h_get_part_info);
+
+ /* calculate entry overhead */
+ struct entry_header entry_header;
+ ceph::buffer::list entry_header_bl;
+ encode(entry_header, entry_header_bl);
+
+ part_entry_overhead = sizeof(entry_header_pre) + entry_header_bl.length();
+
+ return;
+}
diff --git a/src/cls/fifo/cls_fifo_ops.h b/src/cls/fifo/cls_fifo_ops.h
new file mode 100644
index 000000000..a3f4ae237
--- /dev/null
+++ b/src/cls/fifo/cls_fifo_ops.h
@@ -0,0 +1,306 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ * Copyright (C) 2019 SUSE LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "include/types.h"
+
+#include "cls/fifo/cls_fifo_types.h"
+
+namespace rados::cls::fifo::op {
+struct create_meta
+{
+ std::string id;
+ std::optional<objv> version;
+ struct {
+ std::string name;
+ std::string ns;
+ } pool;
+ std::optional<std::string> oid_prefix;
+
+ std::uint64_t max_part_size{0};
+ std::uint64_t max_entry_size{0};
+
+ bool exclusive{false};
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(version, bl);
+ encode(pool.name, bl);
+ encode(pool.ns, bl);
+ encode(oid_prefix, bl);
+ encode(max_part_size, bl);
+ encode(max_entry_size, bl);
+ encode(exclusive, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ decode(version, bl);
+ decode(pool.name, bl);
+ decode(pool.ns, bl);
+ decode(oid_prefix, bl);
+ decode(max_part_size, bl);
+ decode(max_entry_size, bl);
+ decode(exclusive, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(create_meta)
+
+struct get_meta
+{
+ std::optional<objv> version;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(version, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(version, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(get_meta)
+
+struct get_meta_reply
+{
+ fifo::info info;
+ std::uint32_t part_header_size{0};
+ /* per entry extra data that is stored */
+ std::uint32_t part_entry_overhead{0};
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(info, bl);
+ encode(part_header_size, bl);
+ encode(part_entry_overhead, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(info, bl);
+ decode(part_header_size, bl);
+ decode(part_entry_overhead, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(get_meta_reply)
+
+struct update_meta
+{
+ objv version;
+
+ std::optional<std::uint64_t> tail_part_num;
+ std::optional<std::uint64_t> head_part_num;
+ std::optional<std::uint64_t> min_push_part_num;
+ std::optional<std::uint64_t> max_push_part_num;
+ std::vector<journal_entry> journal_entries_add;
+ std::vector<journal_entry> journal_entries_rm;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(version, bl);
+ encode(tail_part_num, bl);
+ encode(head_part_num, bl);
+ encode(min_push_part_num, bl);
+ encode(max_push_part_num, bl);
+ encode(journal_entries_add, bl);
+ encode(journal_entries_rm, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(version, bl);
+ decode(tail_part_num, bl);
+ decode(head_part_num, bl);
+ decode(min_push_part_num, bl);
+ decode(max_push_part_num, bl);
+ decode(journal_entries_add, bl);
+ decode(journal_entries_rm, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(update_meta)
+
+struct init_part
+{
+ std::string tag;
+ data_params params;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(params, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(params, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(init_part)
+
+struct push_part
+{
+ std::string tag;
+ std::deque<ceph::buffer::list> data_bufs;
+ std::uint64_t total_len{0};
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(data_bufs, bl);
+ encode(total_len, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(data_bufs, bl);
+ decode(total_len, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(push_part)
+
+struct trim_part
+{
+ std::optional<std::string> tag;
+ std::uint64_t ofs{0};
+ bool exclusive = false;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(ofs, bl);
+ encode(exclusive, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(ofs, bl);
+ decode(exclusive, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(trim_part)
+
+struct list_part
+{
+ std::optional<string> tag;
+ std::uint64_t ofs{0};
+ int max_entries{100};
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(ofs, bl);
+ encode(max_entries, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(ofs, bl);
+ decode(max_entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(list_part)
+inline constexpr int MAX_LIST_ENTRIES = 512;
+
+struct list_part_reply
+{
+ std::string tag;
+ std::vector<part_list_entry> entries;
+ bool more{false};
+ bool full_part{false}; /* whether part is full or still can be written to.
+ A non full part is by definition head part */
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(entries, bl);
+ encode(more, bl);
+ encode(full_part, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(entries, bl);
+ decode(more, bl);
+ decode(full_part, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(list_part_reply)
+
+struct get_part_info
+{
+ void encode(ceph::buffer::list &bl) const {
+ ENCODE_START(1, 1, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator &bl) {
+ DECODE_START(1, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(get_part_info)
+
+struct get_part_info_reply
+{
+ part_header header;
+
+ void encode(ceph::buffer::list &bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(header, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator &bl) {
+ DECODE_START(1, bl);
+ decode(header, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(get_part_info_reply)
+
+inline constexpr auto CLASS = "fifo";
+inline constexpr auto CREATE_META = "create_meta";
+inline constexpr auto GET_META = "get_meta";
+inline constexpr auto UPDATE_META = "update_meta";
+inline constexpr auto INIT_PART = "init_part";
+inline constexpr auto PUSH_PART = "push_part";
+inline constexpr auto TRIM_PART = "trim_part";
+inline constexpr auto LIST_PART = "part_list";
+inline constexpr auto GET_PART_INFO = "get_part_info";
+} // namespace rados::cls::fifo::op
diff --git a/src/cls/fifo/cls_fifo_types.h b/src/cls/fifo/cls_fifo_types.h
new file mode 100644
index 000000000..749f66e7b
--- /dev/null
+++ b/src/cls/fifo/cls_fifo_types.h
@@ -0,0 +1,524 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <optional>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/format.h>
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "include/types.h"
+
+#include "common/ceph_time.h"
+
+class JSONObj;
+
+namespace rados::cls::fifo {
+struct objv {
+ std::string instance;
+ std::uint64_t ver{0};
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(instance, bl);
+ encode(ver, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(instance, bl);
+ decode(ver, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
+
+ bool operator ==(const objv& rhs) const {
+ return (instance == rhs.instance &&
+ ver == rhs.ver);
+ }
+ bool operator !=(const objv& rhs) const {
+ return (instance != rhs.instance ||
+ ver != rhs.ver);
+ }
+ bool same_or_later(const objv& rhs) const {
+ return (instance == rhs.instance ||
+ ver >= rhs.ver);
+ }
+
+ bool empty() const {
+ return instance.empty();
+ }
+
+ std::string to_str() const {
+ return fmt::format("{}{{{}}}", instance, ver);
+ }
+};
+WRITE_CLASS_ENCODER(objv)
+inline ostream& operator <<(std::ostream& os, const objv& objv)
+{
+ return os << objv.to_str();
+}
+
+struct data_params {
+ std::uint64_t max_part_size{0};
+ std::uint64_t max_entry_size{0};
+ std::uint64_t full_size_threshold{0};
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(max_part_size, bl);
+ encode(max_entry_size, bl);
+ encode(full_size_threshold, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(max_part_size, bl);
+ decode(max_entry_size, bl);
+ decode(full_size_threshold, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
+
+ bool operator ==(const data_params& rhs) const {
+ return (max_part_size == rhs.max_part_size &&
+ max_entry_size == rhs.max_entry_size &&
+ full_size_threshold == rhs.full_size_threshold);
+ }
+};
+WRITE_CLASS_ENCODER(data_params)
+inline std::ostream& operator <<(std::ostream& m, const data_params& d) {
+ return m << "max_part_size: " << d.max_part_size << ", "
+ << "max_entry_size: " << d.max_entry_size << ", "
+ << "full_size_threshold: " << d.full_size_threshold;
+}
+
+struct journal_entry {
+ enum class Op {
+ unknown = 0,
+ create = 1,
+ set_head = 2,
+ remove = 3,
+ } op{Op::unknown};
+
+ std::int64_t part_num{0};
+ std::string part_tag;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode((int)op, bl);
+ encode(part_num, bl);
+ encode(part_tag, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ int i;
+ decode(i, bl);
+ op = static_cast<Op>(i);
+ decode(part_num, bl);
+ decode(part_tag, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(ceph::Formatter* f) const;
+
+ bool operator ==(const journal_entry& e) {
+ return (op == e.op &&
+ part_num == e.part_num &&
+ part_tag == e.part_tag);
+ }
+};
+WRITE_CLASS_ENCODER(journal_entry)
+inline std::ostream& operator <<(std::ostream& m, const journal_entry::Op& o) {
+ switch (o) {
+ case journal_entry::Op::unknown:
+ return m << "Op::unknown";
+ case journal_entry::Op::create:
+ return m << "Op::create";
+ case journal_entry::Op::set_head:
+ return m << "Op::set_head";
+ case journal_entry::Op::remove:
+ return m << "Op::remove";
+ }
+ return m << "Bad value: " << static_cast<int>(o);
+}
+inline std::ostream& operator <<(std::ostream& m, const journal_entry& j) {
+ return m << "op: " << j.op << ", "
+ << "part_num: " << j.part_num << ", "
+ << "part_tag: " << j.part_tag;
+}
+
+// This is actually a useful builder, since otherwise we end up with
+// four uint64_ts in a row and only care about a subset at a time.
+class update {
+ std::optional<std::uint64_t> tail_part_num_;
+ std::optional<std::uint64_t> head_part_num_;
+ std::optional<std::uint64_t> min_push_part_num_;
+ std::optional<std::uint64_t> max_push_part_num_;
+ std::vector<fifo::journal_entry> journal_entries_add_;
+ std::vector<fifo::journal_entry> journal_entries_rm_;
+
+public:
+
+ update&& tail_part_num(std::optional<std::uint64_t> num) noexcept {
+ tail_part_num_ = num;
+ return std::move(*this);
+ }
+ auto tail_part_num() const noexcept {
+ return tail_part_num_;
+ }
+
+ update&& head_part_num(std::optional<std::uint64_t> num) noexcept {
+ head_part_num_ = num;
+ return std::move(*this);
+ }
+ auto head_part_num() const noexcept {
+ return head_part_num_;
+ }
+
+ update&& min_push_part_num(std::optional<std::uint64_t> num)
+ noexcept {
+ min_push_part_num_ = num;
+ return std::move(*this);
+ }
+ auto min_push_part_num() const noexcept {
+ return min_push_part_num_;
+ }
+
+ update&& max_push_part_num(std::optional<std::uint64_t> num) noexcept {
+ max_push_part_num_ = num;
+ return std::move(*this);
+ }
+ auto max_push_part_num() const noexcept {
+ return max_push_part_num_;
+ }
+
+ update&& journal_entry_add(fifo::journal_entry entry) {
+ journal_entries_add_.push_back(std::move(entry));
+ return std::move(*this);
+ }
+ update&& journal_entries_add(
+ std::optional<std::vector<fifo::journal_entry>>&& entries) {
+ if (entries) {
+ journal_entries_add_ = std::move(*entries);
+ } else {
+ journal_entries_add_.clear();
+ }
+ return std::move(*this);
+ }
+ const auto& journal_entries_add() const & noexcept {
+ return journal_entries_add_;
+ }
+ auto&& journal_entries_add() && noexcept {
+ return std::move(journal_entries_add_);
+ }
+
+ update&& journal_entry_rm(fifo::journal_entry entry) {
+ journal_entries_rm_.push_back(std::move(entry));
+ return std::move(*this);
+ }
+ update&& journal_entries_rm(
+ std::optional<std::vector<fifo::journal_entry>>&& entries) {
+ if (entries) {
+ journal_entries_rm_ = std::move(*entries);
+ } else {
+ journal_entries_rm_.clear();
+ }
+ return std::move(*this);
+ }
+ const auto& journal_entries_rm() const & noexcept {
+ return journal_entries_rm_;
+ }
+ auto&& journal_entries_rm() && noexcept {
+ return std::move(journal_entries_rm_);
+ }
+ friend std::ostream& operator <<(std::ostream& m, const update& u);
+};
+inline std::ostream& operator <<(std::ostream& m, const update& u) {
+ bool prev = false;
+ if (u.tail_part_num_) {
+ m << "tail_part_num: " << *u.tail_part_num_;
+ prev = true;
+ }
+ if (u.head_part_num_) {
+ if (prev)
+ m << ", ";
+ m << "head_part_num: " << *u.head_part_num_;
+ prev = true;
+ }
+ if (u.min_push_part_num_) {
+ if (prev)
+ m << ", ";
+ m << "min_push_part_num: " << *u.min_push_part_num_;
+ prev = true;
+ }
+ if (u.max_push_part_num_) {
+ if (prev)
+ m << ", ";
+ m << "max_push_part_num: " << *u.max_push_part_num_;
+ prev = true;
+ }
+ if (!u.journal_entries_add_.empty()) {
+ if (prev)
+ m << ", ";
+ m << "journal_entries_add: {" << u.journal_entries_add_ << "}";
+ prev = true;
+ }
+ if (!u.journal_entries_rm_.empty()) {
+ if (prev)
+ m << ", ";
+ m << "journal_entries_rm: {" << u.journal_entries_rm_ << "}";
+ prev = true;
+ }
+ if (!prev)
+ m << "(none)";
+ return m;
+}
+
+struct info {
+ std::string id;
+ objv version;
+ std::string oid_prefix;
+ data_params params;
+
+ std::int64_t tail_part_num{0};
+ std::int64_t head_part_num{-1};
+ std::int64_t min_push_part_num{0};
+ std::int64_t max_push_part_num{-1};
+
+ std::string head_tag;
+ std::map<int64_t, string> tags;
+
+ std::multimap<int64_t, journal_entry> journal;
+
+ bool need_new_head() const {
+ return (head_part_num < min_push_part_num);
+ }
+
+ bool need_new_part() const {
+ return (max_push_part_num < min_push_part_num);
+ }
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(version, bl);
+ encode(oid_prefix, bl);
+ encode(params, bl);
+ encode(tail_part_num, bl);
+ encode(head_part_num, bl);
+ encode(min_push_part_num, bl);
+ encode(max_push_part_num, bl);
+ encode(tags, bl);
+ encode(head_tag, bl);
+ encode(journal, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ decode(version, bl);
+ decode(oid_prefix, bl);
+ decode(params, bl);
+ decode(tail_part_num, bl);
+ decode(head_part_num, bl);
+ decode(min_push_part_num, bl);
+ decode(max_push_part_num, bl);
+ decode(tags, bl);
+ decode(head_tag, bl);
+ decode(journal, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
+
+ std::string part_oid(std::int64_t part_num) const {
+ return fmt::format("{}.{}", oid_prefix, part_num);
+ }
+
+ journal_entry next_journal_entry(std::string tag) const {
+ journal_entry entry;
+ entry.op = journal_entry::Op::create;
+ entry.part_num = max_push_part_num + 1;
+ entry.part_tag = std::move(tag);
+ return entry;
+ }
+
+ std::optional<std::string>
+ apply_update(const update& update) {
+ if (update.tail_part_num()) {
+ tail_part_num = *update.tail_part_num();
+ }
+
+ if (update.min_push_part_num()) {
+ min_push_part_num = *update.min_push_part_num();
+ }
+
+ if (update.max_push_part_num()) {
+ max_push_part_num = *update.max_push_part_num();
+ }
+
+ for (const auto& entry : update.journal_entries_add()) {
+ auto iter = journal.find(entry.part_num);
+ if (iter != journal.end() &&
+ iter->second.op == entry.op) {
+ /* don't allow multiple concurrent (same) operations on the same part,
+ racing clients should use objv to avoid races anyway */
+ return fmt::format("multiple concurrent operations on same part are not "
+ "allowed, part num={}", entry.part_num);
+ }
+
+ if (entry.op == journal_entry::Op::create) {
+ tags[entry.part_num] = entry.part_tag;
+ }
+
+ journal.emplace(entry.part_num, entry);
+ }
+
+ for (const auto& entry : update.journal_entries_rm()) {
+ journal.erase(entry.part_num);
+ }
+
+ if (update.head_part_num()) {
+ tags.erase(head_part_num);
+ head_part_num = *update.head_part_num();
+ auto iter = tags.find(head_part_num);
+ if (iter != tags.end()) {
+ head_tag = iter->second;
+ } else {
+ head_tag.erase();
+ }
+ }
+
+ return std::nullopt;
+ }
+};
+WRITE_CLASS_ENCODER(info)
+inline std::ostream& operator <<(std::ostream& m, const info& i) {
+ return m << "id: " << i.id << ", "
+ << "version: " << i.version << ", "
+ << "oid_prefix: " << i.oid_prefix << ", "
+ << "params: {" << i.params << "}, "
+ << "tail_part_num: " << i.tail_part_num << ", "
+ << "head_part_num: " << i.head_part_num << ", "
+ << "min_push_part_num: " << i.min_push_part_num << ", "
+ << "max_push_part_num: " << i.max_push_part_num << ", "
+ << "head_tag: " << i.head_tag << ", "
+ << "tags: {" << i.tags << "}, "
+ << "journal: {" << i.journal;
+}
+
+struct part_list_entry {
+ ceph::buffer::list data;
+ std::uint64_t ofs = 0;
+ ceph::real_time mtime;
+
+ part_list_entry() {}
+ part_list_entry(ceph::buffer::list&& data,
+ uint64_t ofs,
+ ceph::real_time mtime)
+ : data(std::move(data)), ofs(ofs), mtime(mtime) {}
+
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(data, bl);
+ encode(ofs, bl);
+ encode(mtime, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(data, bl);
+ decode(ofs, bl);
+ decode(mtime, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(part_list_entry)
+inline std::ostream& operator <<(std::ostream& m,
+ const part_list_entry& p) {
+ using ceph::operator <<;
+ return m << "data: " << p.data << ", "
+ << "ofs: " << p.ofs << ", "
+ << "mtime: " << p.mtime;
+}
+
+struct part_header {
+ std::string tag;
+
+ data_params params;
+
+ std::uint64_t magic{0};
+
+ std::uint64_t min_ofs{0};
+ std::uint64_t last_ofs{0};
+ std::uint64_t next_ofs{0};
+ std::uint64_t min_index{0};
+ std::uint64_t max_index{0};
+ ceph::real_time max_time;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tag, bl);
+ encode(params, bl);
+ encode(magic, bl);
+ encode(min_ofs, bl);
+ encode(last_ofs, bl);
+ encode(next_ofs, bl);
+ encode(min_index, bl);
+ encode(max_index, bl);
+ encode(max_time, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(tag, bl);
+ decode(params, bl);
+ decode(magic, bl);
+ decode(min_ofs, bl);
+ decode(last_ofs, bl);
+ decode(next_ofs, bl);
+ decode(min_index, bl);
+ decode(max_index, bl);
+ decode(max_time, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(part_header)
+inline std::ostream& operator <<(std::ostream& m, const part_header& p) {
+ using ceph::operator <<;
+ return m << "tag: " << p.tag << ", "
+ << "params: {" << p.params << "}, "
+ << "magic: " << p.magic << ", "
+ << "min_ofs: " << p.min_ofs << ", "
+ << "last_ofs: " << p.last_ofs << ", "
+ << "next_ofs: " << p.next_ofs << ", "
+ << "min_index: " << p.min_index << ", "
+ << "max_index: " << p.max_index << ", "
+ << "max_time: " << p.max_time;
+}
+} // namespace rados::cls::fifo