summaryrefslogtreecommitdiffstats
path: root/src/cls/fifo/cls_fifo.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cls/fifo/cls_fifo.cc')
-rw-r--r--src/cls/fifo/cls_fifo.cc1000
1 files changed, 1000 insertions, 0 deletions
diff --git a/src/cls/fifo/cls_fifo.cc b/src/cls/fifo/cls_fifo.cc
new file mode 100644
index 000000000..14313a735
--- /dev/null
+++ b/src/cls/fifo/cls_fifo.cc
@@ -0,0 +1,1000 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/** \file
+ *
+ * This is an OSD class that implements methods for management
+ * and use of fifo
+ *
+ */
+
+#include <cerrno>
+#include <optional>
+#include <string>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/format.h>
+
+#include "include/buffer.h"
+#include "include/types.h"
+
+#include "objclass/objclass.h"
+
+#include "cls/fifo/cls_fifo_ops.h"
+#include "cls/fifo/cls_fifo_types.h"
+
+CLS_VER(1,0)
+CLS_NAME(fifo)
+
+namespace rados::cls::fifo {
+
+static constexpr auto CLS_FIFO_MAX_PART_HEADER_SIZE = 512;
+
+static std::uint32_t part_entry_overhead;
+
+struct entry_header_pre {
+ ceph_le64 magic;
+ ceph_le64 pre_size;
+ ceph_le64 header_size;
+ ceph_le64 data_size;
+ ceph_le64 index;
+ ceph_le32 reserved;
+} __attribute__ ((packed));
+
+struct entry_header {
+ ceph::real_time mtime;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(mtime, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(mtime, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(entry_header)
+
+namespace {
+
+std::string new_oid_prefix(std::string id, std::optional<std::string>& val)
+{
+ static constexpr auto PREFIX_RND_SIZE = 12;
+ if (val) {
+ return *val;
+ }
+
+ char buf[PREFIX_RND_SIZE + 1];
+ buf[PREFIX_RND_SIZE] = 0;
+
+ cls_gen_rand_base64(buf, sizeof(buf) - 1);
+
+ return fmt::format("{}.{}", id, buf);
+}
+
+int write_header(cls_method_context_t hctx,
+ info& header,
+ bool inc_ver = true)
+{
+ static constexpr auto HEADER_INSTANCE_SIZE = 16;
+ if (header.version.instance.empty()) {
+ char buf[HEADER_INSTANCE_SIZE + 1];
+ buf[HEADER_INSTANCE_SIZE] = 0;
+ cls_gen_rand_base64(buf, sizeof(buf) - 1);
+ header.version.instance = buf;
+ }
+ if (inc_ver) {
+ ++header.version.ver;
+ }
+ ceph::buffer::list bl;
+ encode(header, bl);
+ return cls_cxx_write_full(hctx, &bl);
+}
+
+int read_part_header(cls_method_context_t hctx,
+ part_header* part_header)
+{
+ ceph::buffer::list bl;
+ int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl,
+ CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ auto iter = bl.cbegin();
+ try {
+ decode(*part_header, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed decoding part header", __PRETTY_FUNCTION__);
+ return -EIO;
+ }
+
+ using ceph::operator <<;
+ std::ostringstream ss;
+ ss << part_header->max_time;
+ CLS_LOG(5, "%s:%d read part_header:\n"
+ "\ttag=%s\n"
+ "\tmagic=0x%" PRIx64 "\n"
+ "\tmin_ofs=%" PRId64 "\n"
+ "\tlast_ofs=%" PRId64 "\n"
+ "\tnext_ofs=%" PRId64 "\n"
+ "\tmin_index=%" PRId64 "\n"
+ "\tmax_index=%" PRId64 "\n"
+ "\tmax_time=%s\n",
+ __PRETTY_FUNCTION__, __LINE__,
+ part_header->tag.c_str(),
+ part_header->magic,
+ part_header->min_ofs,
+ part_header->last_ofs,
+ part_header->next_ofs,
+ part_header->min_index,
+ part_header->max_index,
+ ss.str().c_str());
+
+ return 0;
+}
+
+int write_part_header(cls_method_context_t hctx,
+ part_header& part_header)
+{
+ ceph::buffer::list bl;
+ encode(part_header, bl);
+
+ if (bl.length() > CLS_FIFO_MAX_PART_HEADER_SIZE) {
+ CLS_ERR("%s: cannot write part header, buffer exceeds max size", __PRETTY_FUNCTION__);
+ return -EIO;
+ }
+
+ int r = cls_cxx_write2(hctx, 0, bl.length(),
+ &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write part header: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+int read_header(cls_method_context_t hctx,
+ std::optional<objv> objv,
+ info* info, bool get_info = false)
+{
+ std::uint64_t size;
+
+ int r = cls_cxx_stat2(hctx, &size, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ ceph::buffer::list bl;
+ r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ if (r == 0) {
+ if (get_info) {
+ CLS_LOG(5, "%s: Zero length object, likely probe, returning ENODATA", __PRETTY_FUNCTION__);
+ } else {
+ CLS_ERR("ERROR: %s: Zero length object, returning ENODATA", __PRETTY_FUNCTION__);
+ }
+ return -ENODATA;
+ }
+
+ try {
+ auto iter = bl.cbegin();
+ decode(*info, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed decoding header", __PRETTY_FUNCTION__);
+ return -EIO;
+ }
+
+ if (objv && !(info->version== *objv)) {
+ auto s1 = info->version.to_str();
+ auto s2 = objv->to_str();
+ CLS_ERR("%s: version mismatch (header=%s, req=%s), canceled operation",
+ __PRETTY_FUNCTION__, s1.c_str(), s2.c_str());
+ return -ECANCELED;
+ }
+
+ return 0;
+}
+
+int create_meta(cls_method_context_t hctx,
+ ceph::buffer::list* in, ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::create_meta op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__,
+ err.what());
+ return -EINVAL;
+ }
+
+ if (op.id.empty()) {
+ CLS_ERR("%s: ID cannot be empty", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ if (op.max_part_size == 0 ||
+ op.max_entry_size == 0 ||
+ op.max_entry_size > op.max_part_size) {
+ CLS_ERR("ERROR: %s: invalid dimensions.", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ std::uint64_t size;
+
+ int r = cls_cxx_stat2(hctx, &size, nullptr);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ if (op.exclusive && r == 0) {
+ CLS_ERR("%s: exclusive create but queue already exists",
+ __PRETTY_FUNCTION__);
+ return -EEXIST;
+ }
+
+ if (r == 0) {
+ CLS_LOG(5, "%s: FIFO already exists, reading from disk and comparing.",
+ __PRETTY_FUNCTION__);
+ ceph::buffer::list bl;
+ r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ info header;
+ try {
+ auto iter = bl.cbegin();
+ decode(header, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed decoding header: %s",
+ __PRETTY_FUNCTION__, err.what());
+ return -EIO;
+ }
+
+ if (!(header.id == op.id &&
+ (!op.oid_prefix ||
+ header.oid_prefix == *op.oid_prefix) &&
+ (!op.version ||
+ header.version == *op.version))) {
+ CLS_ERR("%s: failed to re-create existing queue "
+ "with different params", __PRETTY_FUNCTION__);
+ return -EEXIST;
+ }
+
+ return 0; /* already exists */
+ }
+ info header;
+
+ header.id = op.id;
+ if (op.version) {
+ header.version = *op.version;
+ } else {
+ static constexpr auto DEFAULT_INSTANCE_SIZE = 16;
+ char buf[DEFAULT_INSTANCE_SIZE + 1];
+ cls_gen_rand_base64(buf, sizeof(buf));
+ buf[DEFAULT_INSTANCE_SIZE] = '\0';
+ header.version.instance = buf;
+ header.version.ver = 1;
+ }
+ header.oid_prefix = new_oid_prefix(op.id, op.oid_prefix);
+
+ header.params.max_part_size = op.max_part_size;
+ header.params.max_entry_size = op.max_entry_size;
+ header.params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead;
+
+ r = write_header(hctx, header, false);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+int update_meta(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::update_meta op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ if (op.version.empty()) {
+ CLS_ERR("%s: no version supplied", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ info header;
+
+ int r = read_header(hctx, op.version, &header);
+ if (r < 0) {
+ return r;
+ }
+
+ auto u = fifo::update().tail_part_num(op.tail_part_num)
+ .head_part_num(op.head_part_num)
+ .min_push_part_num(op.min_push_part_num)
+ .max_push_part_num(op.max_push_part_num)
+ .journal_entries_add(
+ std::move(op.journal_entries_add))
+ .journal_entries_rm(
+ std::move(op.journal_entries_rm));
+
+ auto err = header.apply_update(u);
+ if (err) {
+ std::ostringstream ss;
+ ss << u;
+ CLS_ERR("%s: %s: %s", __PRETTY_FUNCTION__, err->c_str(),
+ ss.str().c_str());
+ return -EINVAL;
+ }
+
+ r = write_header(hctx, header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+int get_meta(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::get_meta op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ op::get_meta_reply reply;
+ int r = read_header(hctx, op.version, &reply.info, true);
+ if (r < 0) {
+ return r;
+ }
+
+ reply.part_header_size = CLS_FIFO_MAX_PART_HEADER_SIZE;
+ reply.part_entry_overhead = part_entry_overhead;
+
+ encode(reply, *out);
+
+ return 0;
+}
+
+int init_part(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list *out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::init_part op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ std::uint64_t size;
+
+ if (op.tag.empty()) {
+ CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ int r = cls_cxx_stat2(hctx, &size, nullptr);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ if (r == 0 && size > 0) {
+ part_header part_header;
+ r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ if (!(part_header.tag == op.tag &&
+ part_header.params == op.params)) {
+ CLS_ERR("%s: failed to re-create existing part with different "
+ "params", __PRETTY_FUNCTION__);
+ return -EEXIST;
+ }
+
+ return 0; /* already exists */
+ }
+
+ part_header part_header;
+
+ part_header.tag = op.tag;
+ part_header.params = op.params;
+
+ part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE;
+ part_header.last_ofs = 0;
+ part_header.next_ofs = part_header.min_ofs;
+ part_header.max_time = ceph::real_clock::now();
+
+ cls_gen_random_bytes(reinterpret_cast<char *>(&part_header.magic),
+ sizeof(part_header.magic));
+
+ r = write_part_header(hctx, part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+bool full_part(const part_header& part_header)
+{
+ return (part_header.next_ofs > part_header.params.full_size_threshold);
+}
+
+int push_part(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::push_part op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error& err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ if (op.tag.empty()) {
+ CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ part_header part_header;
+ int r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ if (!(part_header.tag == op.tag)) {
+ CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ std::uint64_t effective_len = op.total_len + op.data_bufs.size() *
+ part_entry_overhead;
+
+ if (effective_len > part_header.params.max_part_size) {
+ return -EINVAL;
+ }
+
+ if (full_part(part_header)) {
+ return -ERANGE;
+ }
+
+ auto now = ceph::real_clock::now();
+ struct entry_header entry_header = { now };
+ ceph::buffer::list entry_header_bl;
+ encode(entry_header, entry_header_bl);
+
+ auto max_index = part_header.max_index;
+ const auto write_ofs = part_header.next_ofs;
+ auto ofs = part_header.next_ofs;
+
+ entry_header_pre pre_header;
+ pre_header.magic = part_header.magic;
+ pre_header.pre_size = sizeof(pre_header);
+ pre_header.reserved = 0;
+
+ std::uint64_t total_data = 0;
+ for (auto& data : op.data_bufs) {
+ total_data += data.length();
+ }
+ if (total_data != op.total_len) {
+ CLS_ERR("%s: length mismatch: op.total_len=%" PRId64
+ " total data received=%" PRId64,
+ __PRETTY_FUNCTION__, op.total_len, total_data);
+ return -EINVAL;
+ }
+
+
+ int entries_pushed = 0;
+ ceph::buffer::list all_data;
+ for (auto& data : op.data_bufs) {
+ if (full_part(part_header))
+ break;
+
+ pre_header.header_size = entry_header_bl.length();
+ pre_header.data_size = data.length();
+ pre_header.index = max_index;
+
+ bufferptr pre(reinterpret_cast<char*>(&pre_header), sizeof(pre_header));
+ auto entry_write_len = pre.length() + entry_header_bl.length() + data.length();
+ all_data.append(pre);
+ all_data.append(entry_header_bl);
+ all_data.claim_append(data);
+
+ part_header.last_ofs = ofs;
+ ofs += entry_write_len;
+ ++max_index;
+ ++entries_pushed;
+ part_header.max_index = max_index;
+ part_header.next_ofs = ofs;
+ }
+ part_header.max_time = now;
+
+ auto write_len = all_data.length();
+
+ r = cls_cxx_write2(hctx, write_ofs, write_len,
+ &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+
+ if (r < 0) {
+ CLS_ERR("%s: failed to write entries (ofs=%" PRIu64
+ " len=%u): r=%d", __PRETTY_FUNCTION__, write_ofs,
+ write_len, r);
+ return r;
+ }
+
+
+ r = write_part_header(hctx, part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ if (entries_pushed == 0) {
+ CLS_ERR("%s: pushed no entries? Can't happen!", __PRETTY_FUNCTION__);
+ return -EFAULT;
+ }
+
+ return entries_pushed;
+}
+
+class EntryReader {
+ static constexpr std::uint64_t prefetch_len = (128 * 1024);
+
+ cls_method_context_t hctx;
+
+ const fifo::part_header& part_header;
+
+ std::uint64_t ofs;
+ ceph::buffer::list data;
+
+ int fetch(std::uint64_t num_bytes);
+ int read(std::uint64_t num_bytes, ceph::buffer::list* pbl);
+ int peek(std::uint64_t num_bytes, char *dest);
+ int seek(std::uint64_t num_bytes);
+
+public:
+ EntryReader(cls_method_context_t hctx,
+ const fifo::part_header& part_header,
+ uint64_t ofs) : hctx(hctx),
+ part_header(part_header),
+ ofs(ofs < part_header.min_ofs ?
+ part_header.min_ofs :
+ ofs) {}
+
+ std::uint64_t get_ofs() const {
+ return ofs;
+ }
+
+ bool end() const {
+ return (ofs >= part_header.next_ofs);
+ }
+
+ int peek_pre_header(entry_header_pre* pre_header);
+ int get_next_entry(ceph::buffer::list* pbl,
+ std::uint64_t* pofs,
+ ceph::real_time* pmtime);
+};
+
+
+int EntryReader::fetch(std::uint64_t num_bytes)
+{
+ CLS_LOG(5, "%s: fetch %d bytes, ofs=%d data.length()=%d", __PRETTY_FUNCTION__, (int)num_bytes, (int)ofs, (int)data.length());
+ if (data.length() < num_bytes) {
+ ceph::buffer::list bl;
+ CLS_LOG(5, "%s: reading % " PRId64 " bytes at ofs=%" PRId64, __PRETTY_FUNCTION__,
+ prefetch_len, ofs + data.length());
+ int r = cls_cxx_read2(hctx, ofs + data.length(), prefetch_len, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ data.claim_append(bl);
+ }
+
+ if (static_cast<unsigned>(num_bytes) > data.length()) {
+ CLS_ERR("%s: requested %" PRId64 " bytes, but only "
+ "%u were available", __PRETTY_FUNCTION__, num_bytes, data.length());
+ return -ERANGE;
+ }
+
+ return 0;
+}
+
+int EntryReader::read(std::uint64_t num_bytes, ceph::buffer::list* pbl)
+{
+ int r = fetch(num_bytes);
+ if (r < 0) {
+ return r;
+ }
+ data.splice(0, num_bytes, pbl);
+
+ ofs += num_bytes;
+
+ return 0;
+}
+
+int EntryReader::peek(std::uint64_t num_bytes, char* dest)
+{
+ int r = fetch(num_bytes);
+ if (r < 0) {
+ return r;
+ }
+
+ data.begin().copy(num_bytes, dest);
+
+ return 0;
+}
+
+int EntryReader::seek(std::uint64_t num_bytes)
+{
+ ceph::buffer::list bl;
+
+ CLS_LOG(5, "%s:%d: num_bytes=%" PRIu64, __PRETTY_FUNCTION__, __LINE__, num_bytes);
+ return read(num_bytes, &bl);
+}
+
+int EntryReader::peek_pre_header(entry_header_pre* pre_header)
+{
+ if (end()) {
+ return -ENOENT;
+ }
+
+ int r = peek(sizeof(*pre_header),
+ reinterpret_cast<char*>(pre_header));
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: peek() size=%zu failed: r=%d", __PRETTY_FUNCTION__,
+ sizeof(pre_header), r);
+ return r;
+ }
+
+ if (pre_header->magic != part_header.magic) {
+ CLS_ERR("ERROR: %s: unexpected pre_header magic", __PRETTY_FUNCTION__);
+ return -ERANGE;
+ }
+
+ return 0;
+}
+
+
+int EntryReader::get_next_entry(ceph::buffer::list* pbl,
+ std::uint64_t* pofs,
+ ceph::real_time* pmtime)
+{
+ entry_header_pre pre_header;
+ int r = peek_pre_header(&pre_header);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: peek_pre_header() failed: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ if (pofs) {
+ *pofs = ofs;
+ }
+
+ CLS_LOG(5, "%s:%d: pre_header.pre_size=%" PRIu64, __PRETTY_FUNCTION__, __LINE__,
+ uint64_t(pre_header.pre_size));
+ r = seek(pre_header.pre_size);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ ceph::buffer::list header;
+ CLS_LOG(5, "%s:%d: pre_header.header_size=%d", __PRETTY_FUNCTION__, __LINE__, (int)pre_header.header_size);
+ r = read(pre_header.header_size, &header);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to read entry header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ entry_header entry_header;
+ auto iter = header.cbegin();
+ try {
+ decode(entry_header, iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_ERR("%s: failed decoding entry header", __PRETTY_FUNCTION__);
+ return -EIO;
+ }
+
+ if (pmtime) {
+ *pmtime = entry_header.mtime;
+ }
+
+ if (pbl) {
+ r = read(pre_header.data_size, pbl);
+ if (r < 0) {
+ CLS_ERR("%s: failed reading data: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ } else {
+ r = seek(pre_header.data_size);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ }
+
+ return 0;
+}
+
+int trim_part(cls_method_context_t hctx,
+ ceph::buffer::list *in, ceph::buffer::list *out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::trim_part op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ part_header part_header;
+ int r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ if (op.tag &&
+ !(part_header.tag == *op.tag)) {
+ CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ if (op.ofs < part_header.min_ofs) {
+ return 0;
+ }
+ if (op.exclusive && op.ofs == part_header.min_ofs) {
+ return 0;
+ }
+
+ if (op.ofs >= part_header.next_ofs) {
+ if (full_part(part_header)) {
+ /*
+ * trim full part completely: remove object
+ */
+
+ r = cls_cxx_remove(hctx);
+ if (r < 0) {
+ CLS_ERR("%s: ERROR: cls_cxx_remove() returned r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+ }
+
+ part_header.min_ofs = part_header.next_ofs;
+ part_header.min_index = part_header.max_index;
+ } else {
+ EntryReader reader(hctx, part_header, op.ofs);
+
+ entry_header_pre pre_header;
+ int r = reader.peek_pre_header(&pre_header);
+ if (r < 0) {
+ return r;
+ }
+
+ if (op.exclusive) {
+ part_header.min_index = pre_header.index;
+ } else {
+ r = reader.get_next_entry(nullptr, nullptr, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ part_header.min_index = pre_header.index + 1;
+ }
+
+ part_header.min_ofs = reader.get_ofs();
+ }
+
+ r = write_part_header(hctx, part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ return 0;
+}
+
+int list_part(cls_method_context_t hctx, ceph::buffer::list* in,
+ ceph::buffer::list* out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::list_part op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ part_header part_header;
+ int r = read_part_header(hctx, &part_header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ if (op.tag &&
+ !(part_header.tag == *op.tag)) {
+ CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ EntryReader reader(hctx, part_header, op.ofs);
+
+ if (op.ofs >= part_header.min_ofs &&
+ !reader.end()) {
+ r = reader.get_next_entry(nullptr, nullptr, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d", __PRETTY_FUNCTION__, r);
+ return r;
+ }
+ }
+
+ op::list_part_reply reply;
+
+ reply.tag = part_header.tag;
+
+ auto max_entries = std::min(op.max_entries, op::MAX_LIST_ENTRIES);
+
+ for (int i = 0; i < max_entries && !reader.end(); ++i) {
+ ceph::buffer::list data;
+ ceph::real_time mtime;
+ std::uint64_t ofs;
+
+ r = reader.get_next_entry(&data, &ofs, &mtime);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d",
+ __PRETTY_FUNCTION__, r);
+ return r;
+ }
+
+ reply.entries.emplace_back(std::move(data), ofs, mtime);
+ }
+
+ reply.more = !reader.end();
+ reply.full_part = full_part(part_header);
+
+ encode(reply, *out);
+
+ return 0;
+}
+
+int get_part_info(cls_method_context_t hctx, ceph::buffer::list *in,
+ ceph::buffer::list *out)
+{
+ CLS_LOG(5, "%s", __PRETTY_FUNCTION__);
+
+ op::get_part_info op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error &err) {
+ CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__);
+ return -EINVAL;
+ }
+
+ op::get_part_info_reply reply;
+
+ int r = read_part_header(hctx, &reply.header);
+ if (r < 0) {
+ CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
+ return r;
+ }
+
+ encode(reply, *out);
+
+ return 0;
+}
+}
+} // namespace rados::cls::fifo
+
+CLS_INIT(fifo)
+{
+ using namespace rados::cls::fifo;
+ CLS_LOG(10, "Loaded fifo class!");
+
+ cls_handle_t h_class;
+ cls_method_handle_t h_create_meta;
+ cls_method_handle_t h_get_meta;
+ cls_method_handle_t h_update_meta;
+ cls_method_handle_t h_init_part;
+ cls_method_handle_t h_push_part;
+ cls_method_handle_t h_trim_part;
+ cls_method_handle_t h_list_part;
+ cls_method_handle_t h_get_part_info;
+
+ cls_register(op::CLASS, &h_class);
+ cls_register_cxx_method(h_class, op::CREATE_META,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ create_meta, &h_create_meta);
+
+ cls_register_cxx_method(h_class, op::GET_META,
+ CLS_METHOD_RD,
+ get_meta, &h_get_meta);
+
+ cls_register_cxx_method(h_class, op::UPDATE_META,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ update_meta, &h_update_meta);
+
+ cls_register_cxx_method(h_class, op::INIT_PART,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ init_part, &h_init_part);
+
+ cls_register_cxx_method(h_class, op::PUSH_PART,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ push_part, &h_push_part);
+
+ cls_register_cxx_method(h_class, op::TRIM_PART,
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ trim_part, &h_trim_part);
+
+ cls_register_cxx_method(h_class, op::LIST_PART,
+ CLS_METHOD_RD,
+ list_part, &h_list_part);
+
+ cls_register_cxx_method(h_class, op::GET_PART_INFO,
+ CLS_METHOD_RD,
+ get_part_info, &h_get_part_info);
+
+ /* calculate entry overhead */
+ struct entry_header entry_header;
+ ceph::buffer::list entry_header_bl;
+ encode(entry_header, entry_header_bl);
+
+ part_entry_overhead = sizeof(entry_header_pre) + entry_header_bl.length();
+
+ return;
+}