diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/cls/fifo | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/cls/fifo')
-rw-r--r-- | src/cls/fifo/cls_fifo.cc | 958 | ||||
-rw-r--r-- | src/cls/fifo/cls_fifo_ops.h | 311 | ||||
-rw-r--r-- | src/cls/fifo/cls_fifo_types.h | 559 |
3 files changed, 1828 insertions, 0 deletions
diff --git a/src/cls/fifo/cls_fifo.cc b/src/cls/fifo/cls_fifo.cc new file mode 100644 index 000000000..85022eeb0 --- /dev/null +++ b/src/cls/fifo/cls_fifo.cc @@ -0,0 +1,958 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/** \file + * + * This is an OSD class that implements methods for management + * and use of fifo + * + */ + +#include <cerrno> +#include <optional> +#include <string> + +#include <fmt/format.h> + +#include "include/buffer.h" +#include "include/types.h" + +#include "objclass/objclass.h" + +#include "cls/fifo/cls_fifo_ops.h" +#include "cls/fifo/cls_fifo_types.h" + +CLS_VER(1,0) +CLS_NAME(fifo) + +namespace rados::cls::fifo { + +static constexpr auto CLS_FIFO_MAX_PART_HEADER_SIZE = 512; + +static std::uint32_t part_entry_overhead; + +struct entry_header_pre { + ceph_le64 magic; + ceph_le64 pre_size; + ceph_le64 header_size; + ceph_le64 data_size; + ceph_le64 index; + ceph_le32 reserved; +} __attribute__ ((packed)); + +struct entry_header { + ceph::real_time mtime; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(mtime, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(mtime, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(entry_header) + +namespace { + +std::string new_oid_prefix(std::string id, std::optional<std::string>& val) +{ + static constexpr auto PREFIX_RND_SIZE = 12; + if (val) { + return *val; + } + + char buf[PREFIX_RND_SIZE + 1]; + buf[PREFIX_RND_SIZE] = 0; + + cls_gen_rand_base64(buf, sizeof(buf) - 1); + + return fmt::format("{}.{}", id, buf); +} + +int write_header(cls_method_context_t hctx, + info& header) +{ + static constexpr auto HEADER_INSTANCE_SIZE = 16; + if (header.version.instance.empty()) { + char buf[HEADER_INSTANCE_SIZE + 1]; + buf[HEADER_INSTANCE_SIZE] = 0; + cls_gen_rand_base64(buf, sizeof(buf) - 1); + header.version.instance = buf; + } + ceph::buffer::list bl; + encode(header, bl); + return cls_cxx_write_full(hctx, &bl); +} + +int read_part_header(cls_method_context_t hctx, + part_header* part_header) +{ + ceph::buffer::list bl; + int r = cls_cxx_read2(hctx, 0, CLS_FIFO_MAX_PART_HEADER_SIZE, &bl, + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r); + return r; + } + + auto iter = bl.cbegin(); + try { + decode(*part_header, iter); + } catch (const ceph::buffer::error& err) { + CLS_ERR("ERROR: %s: failed decoding part header", __PRETTY_FUNCTION__); + return -EIO; + } + + using ceph::operator <<; + std::ostringstream ss; + ss << part_header->max_time; + CLS_LOG(5, "%s:%d read part_header:\n" + "\tmagic=0x%" PRIx64 "\n" + "\tmin_ofs=%" PRId64 "\n" + "\tlast_ofs=%" PRId64 "\n" + "\tnext_ofs=%" PRId64 "\n" + "\tmin_index=%" PRId64 "\n" + "\tmax_index=%" PRId64 "\n" + "\tmax_time=%s\n", + __PRETTY_FUNCTION__, __LINE__, + part_header->magic, + part_header->min_ofs, + part_header->last_ofs, + part_header->next_ofs, + part_header->min_index, + part_header->max_index, + ss.str().c_str()); + + return 0; +} + +int write_part_header(cls_method_context_t hctx, + part_header& part_header) +{ + ceph::buffer::list bl; + encode(part_header, bl); + + if (bl.length() > CLS_FIFO_MAX_PART_HEADER_SIZE) { + CLS_ERR("%s: cannot write part header, buffer exceeds max size", __PRETTY_FUNCTION__); + return -EIO; + } + + int r = cls_cxx_write2(hctx, 0, bl.length(), + &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_ERR("%s: failed to write part header: r=%d", + __PRETTY_FUNCTION__, r); + return r; + } + + return 0; +} + +int read_header(cls_method_context_t hctx, + std::optional<objv> objv, + info* info, bool get_info = false) +{ + std::uint64_t size; + + int r = cls_cxx_stat2(hctx, &size, nullptr); + if (r < 0) { + CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r); + return r; + } + + ceph::buffer::list bl; + r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r); + return r; + } + + if (r == 0) { + if (get_info) { + CLS_LOG(5, "%s: Zero length object, likely probe, returning ENODATA", __PRETTY_FUNCTION__); + } else { + CLS_ERR("ERROR: %s: Zero length object, returning ENODATA", __PRETTY_FUNCTION__); + } + return -ENODATA; + } + + try { + auto iter = bl.cbegin(); + decode(*info, iter); + } catch (const ceph::buffer::error& err) { + CLS_ERR("ERROR: %s: failed decoding header", __PRETTY_FUNCTION__); + return -EIO; + } + + if (objv && !(info->version== *objv)) { + auto s1 = info->version.to_str(); + auto s2 = objv->to_str(); + CLS_ERR("%s: version mismatch (header=%s, req=%s), canceled operation", + __PRETTY_FUNCTION__, s1.c_str(), s2.c_str()); + return -ECANCELED; + } + + return 0; +} + +int create_meta(cls_method_context_t hctx, + ceph::buffer::list* in, ceph::buffer::list* out) +{ + CLS_LOG(5, "%s", __PRETTY_FUNCTION__); + + op::create_meta op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const ceph::buffer::error& err) { + CLS_ERR("ERROR: %s: failed to decode request: %s", __PRETTY_FUNCTION__, + err.what()); + return -EINVAL; + } + + if (op.id.empty()) { + CLS_ERR("%s: ID cannot be empty", __PRETTY_FUNCTION__); + return -EINVAL; + } + + if (op.max_part_size == 0 || + op.max_entry_size == 0 || + op.max_entry_size > op.max_part_size) { + CLS_ERR("ERROR: %s: invalid dimensions.", __PRETTY_FUNCTION__); + return -EINVAL; + } + + std::uint64_t size; + + int r = cls_cxx_stat2(hctx, &size, nullptr); + if (r < 0 && r != -ENOENT) { + CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", + __PRETTY_FUNCTION__, r); + return r; + } + if (op.exclusive && r == 0) { + CLS_ERR("%s: exclusive create but queue already exists", + __PRETTY_FUNCTION__); + return -EEXIST; + } + + if (r == 0) { + CLS_LOG(5, "%s: FIFO already exists, reading from disk and comparing.", + __PRETTY_FUNCTION__); + ceph::buffer::list bl; + r = cls_cxx_read2(hctx, 0, size, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", + __PRETTY_FUNCTION__, r); + return r; + } + + info header; + try { + auto iter = bl.cbegin(); + decode(header, iter); + } catch (const ceph::buffer::error& err) { + CLS_ERR("ERROR: %s: failed decoding header: %s", + __PRETTY_FUNCTION__, err.what()); + return -EIO; + } + + if (!(header.id == op.id && + (!op.oid_prefix || + header.oid_prefix == *op.oid_prefix) && + (!op.version || + header.version == *op.version))) { + CLS_ERR("%s: failed to re-create existing queue " + "with different params", __PRETTY_FUNCTION__); + return -EEXIST; + } + + return 0; /* already exists */ + } + info header; + + header.id = op.id; + if (op.version) { + header.version = *op.version; + } else { + static constexpr auto DEFAULT_INSTANCE_SIZE = 16; + char buf[DEFAULT_INSTANCE_SIZE + 1]; + cls_gen_rand_base64(buf, sizeof(buf)); + buf[DEFAULT_INSTANCE_SIZE] = '\0'; + header.version.instance = buf; + header.version.ver = 1; + } + header.oid_prefix = new_oid_prefix(op.id, op.oid_prefix); + + header.params.max_part_size = op.max_part_size; + header.params.max_entry_size = op.max_entry_size; + header.params.full_size_threshold = op.max_part_size - op.max_entry_size - part_entry_overhead; + + r = write_header(hctx, header); + if (r < 0) { + CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + + return 0; +} + +int update_meta(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list* out) +{ + CLS_LOG(5, "%s", __PRETTY_FUNCTION__); + + op::update_meta op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const ceph::buffer::error& err) { + CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__); + return -EINVAL; + } + + if (op.version.empty()) { + CLS_ERR("%s: no version supplied", __PRETTY_FUNCTION__); + return -EINVAL; + } + + info header; + + int r = read_header(hctx, op.version, &header); + if (r < 0) { + return r; + } + + auto u = fifo::update().tail_part_num(op.tail_part_num) + .head_part_num(op.head_part_num) + .min_push_part_num(op.min_push_part_num) + .max_push_part_num(op.max_push_part_num) + .journal_entries_add( + std::move(op.journal_entries_add)) + .journal_entries_rm( + std::move(op.journal_entries_rm)); + + auto changed = header.apply_update(u); + if (changed) { + r = write_header(hctx, header); + if (r < 0) { + CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + } else { + CLS_LOG(10, "%s: No change, nothing to write.", + __PRETTY_FUNCTION__); + } + + return 0; +} + +int get_meta(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list* out) +{ + CLS_LOG(5, "%s", __PRETTY_FUNCTION__); + + op::get_meta op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__); + return -EINVAL; + } + + op::get_meta_reply reply; + int r = read_header(hctx, op.version, &reply.info, true); + if (r < 0) { + return r; + } + + reply.part_header_size = CLS_FIFO_MAX_PART_HEADER_SIZE; + reply.part_entry_overhead = part_entry_overhead; + + encode(reply, *out); + + return 0; +} + +int init_part(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list *out) +{ + CLS_LOG(5, "%s", __PRETTY_FUNCTION__); + + op::init_part op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__); + return -EINVAL; + } + + std::uint64_t size; + + int r = cls_cxx_stat2(hctx, &size, nullptr); + if (r < 0 && r != -ENOENT) { + CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r); + return r; + } + if (r == 0 && size > 0) { + part_header part_header; + r = read_part_header(hctx, &part_header); + if (r < 0) { + CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__); + return r; + } + + if (!(part_header.params == op.params)) { + CLS_ERR("%s: failed to re-create existing part with different " + "params", __PRETTY_FUNCTION__); + return -EEXIST; + } + + return 0; /* already exists */ + } + + part_header part_header; + + part_header.params = op.params; + + part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE; + part_header.last_ofs = 0; + part_header.next_ofs = part_header.min_ofs; + part_header.max_time = ceph::real_clock::now(); + + cls_gen_random_bytes(reinterpret_cast<char *>(&part_header.magic), + sizeof(part_header.magic)); + + r = write_part_header(hctx, part_header); + if (r < 0) { + CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + + return 0; +} + +bool full_part(const part_header& part_header) +{ + return (part_header.next_ofs > part_header.params.full_size_threshold); +} + +int push_part(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list* out) +{ + CLS_LOG(5, "%s", __PRETTY_FUNCTION__); + + op::push_part op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const ceph::buffer::error& err) { + CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__); + return -EINVAL; + } + + part_header part_header; + int r = read_part_header(hctx, &part_header); + if (r < 0) { + CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__); + return r; + } + + std::uint64_t effective_len = op.total_len + op.data_bufs.size() * + part_entry_overhead; + + if (effective_len > part_header.params.max_part_size) { + return -EINVAL; + } + + if (full_part(part_header)) { + return -ERANGE; + } + + auto now = ceph::real_clock::now(); + struct entry_header entry_header = { now }; + ceph::buffer::list entry_header_bl; + encode(entry_header, entry_header_bl); + + auto max_index = part_header.max_index; + const auto write_ofs = part_header.next_ofs; + auto ofs = part_header.next_ofs; + + entry_header_pre pre_header; + pre_header.magic = part_header.magic; + pre_header.pre_size = sizeof(pre_header); + pre_header.reserved = 0; + + std::uint64_t total_data = 0; + for (auto& data : op.data_bufs) { + total_data += data.length(); + } + if (total_data != op.total_len) { + CLS_ERR("%s: length mismatch: op.total_len=%" PRId64 + " total data received=%" PRId64, + __PRETTY_FUNCTION__, op.total_len, total_data); + return -EINVAL; + } + + + int entries_pushed = 0; + ceph::buffer::list all_data; + for (auto& data : op.data_bufs) { + if (full_part(part_header)) + break; + + pre_header.header_size = entry_header_bl.length(); + pre_header.data_size = data.length(); + pre_header.index = max_index; + + bufferptr pre(reinterpret_cast<char*>(&pre_header), sizeof(pre_header)); + auto entry_write_len = pre.length() + entry_header_bl.length() + data.length(); + all_data.append(pre); + all_data.append(entry_header_bl); + all_data.claim_append(data); + + part_header.last_ofs = ofs; + ofs += entry_write_len; + ++max_index; + ++entries_pushed; + part_header.max_index = max_index; + part_header.next_ofs = ofs; + } + part_header.max_time = now; + + auto write_len = all_data.length(); + + r = cls_cxx_write2(hctx, write_ofs, write_len, + &all_data, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + + if (r < 0) { + CLS_ERR("%s: failed to write entries (ofs=%" PRIu64 + " len=%u): r=%d", __PRETTY_FUNCTION__, write_ofs, + write_len, r); + return r; + } + + + r = write_part_header(hctx, part_header); + if (r < 0) { + CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + + if (entries_pushed == 0) { + CLS_ERR("%s: pushed no entries? Can't happen!", __PRETTY_FUNCTION__); + return -EFAULT; + } + + return entries_pushed; +} + +class EntryReader { + static constexpr std::uint64_t prefetch_len = (128 * 1024); + + cls_method_context_t hctx; + + const fifo::part_header& part_header; + + std::uint64_t ofs; + ceph::buffer::list data; + + int fetch(std::uint64_t num_bytes); + int read(std::uint64_t num_bytes, ceph::buffer::list* pbl); + int peek(std::uint64_t num_bytes, char *dest); + int seek(std::uint64_t num_bytes); + +public: + EntryReader(cls_method_context_t hctx, + const fifo::part_header& part_header, + uint64_t ofs) : hctx(hctx), + part_header(part_header), + ofs(ofs < part_header.min_ofs ? + part_header.min_ofs : + ofs) {} + + std::uint64_t get_ofs() const { + return ofs; + } + + bool end() const { + return (ofs >= part_header.next_ofs); + } + + int peek_pre_header(entry_header_pre* pre_header); + int get_next_entry(ceph::buffer::list* pbl, + std::uint64_t* pofs, + ceph::real_time* pmtime); +}; + + +int EntryReader::fetch(std::uint64_t num_bytes) +{ + CLS_LOG(5, "%s: fetch %d bytes, ofs=%d data.length()=%d", __PRETTY_FUNCTION__, (int)num_bytes, (int)ofs, (int)data.length()); + if (data.length() < num_bytes) { + ceph::buffer::list bl; + CLS_LOG(5, "%s: reading % " PRId64 " bytes at ofs=%" PRId64, __PRETTY_FUNCTION__, + prefetch_len, ofs + data.length()); + int r = cls_cxx_read2(hctx, ofs + data.length(), prefetch_len, &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (r < 0) { + CLS_ERR("ERROR: %s: cls_cxx_read2() on obj returned %d", __PRETTY_FUNCTION__, r); + return r; + } + data.claim_append(bl); + } + + if (static_cast<unsigned>(num_bytes) > data.length()) { + CLS_ERR("%s: requested %" PRId64 " bytes, but only " + "%u were available", __PRETTY_FUNCTION__, num_bytes, data.length()); + return -ERANGE; + } + + return 0; +} + +int EntryReader::read(std::uint64_t num_bytes, ceph::buffer::list* pbl) +{ + int r = fetch(num_bytes); + if (r < 0) { + return r; + } + data.splice(0, num_bytes, pbl); + + ofs += num_bytes; + + return 0; +} + +int EntryReader::peek(std::uint64_t num_bytes, char* dest) +{ + int r = fetch(num_bytes); + if (r < 0) { + return r; + } + + data.begin().copy(num_bytes, dest); + + return 0; +} + +int EntryReader::seek(std::uint64_t num_bytes) +{ + ceph::buffer::list bl; + + CLS_LOG(5, "%s:%d: num_bytes=%" PRIu64, __PRETTY_FUNCTION__, __LINE__, num_bytes); + return read(num_bytes, &bl); +} + +int EntryReader::peek_pre_header(entry_header_pre* pre_header) +{ + if (end()) { + return -ENOENT; + } + + int r = peek(sizeof(*pre_header), + reinterpret_cast<char*>(pre_header)); + if (r < 0) { + CLS_ERR("ERROR: %s: peek() size=%zu failed: r=%d", __PRETTY_FUNCTION__, + sizeof(pre_header), r); + return r; + } + + if (pre_header->magic != part_header.magic) { + CLS_ERR("ERROR: %s: unexpected pre_header magic", __PRETTY_FUNCTION__); + return -ERANGE; + } + + return 0; +} + + +int EntryReader::get_next_entry(ceph::buffer::list* pbl, + std::uint64_t* pofs, + ceph::real_time* pmtime) +{ + entry_header_pre pre_header; + int r = peek_pre_header(&pre_header); + if (r < 0) { + CLS_ERR("ERROR: %s: peek_pre_header() failed: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + + if (pofs) { + *pofs = ofs; + } + + CLS_LOG(5, "%s:%d: pre_header.pre_size=%" PRIu64, __PRETTY_FUNCTION__, __LINE__, + uint64_t(pre_header.pre_size)); + r = seek(pre_header.pre_size); + if (r < 0) { + CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + + ceph::buffer::list header; + CLS_LOG(5, "%s:%d: pre_header.header_size=%d", __PRETTY_FUNCTION__, __LINE__, (int)pre_header.header_size); + r = read(pre_header.header_size, &header); + if (r < 0) { + CLS_ERR("ERROR: %s: failed to read entry header: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + + entry_header entry_header; + auto iter = header.cbegin(); + try { + decode(entry_header, iter); + } catch (ceph::buffer::error& err) { + CLS_ERR("%s: failed decoding entry header", __PRETTY_FUNCTION__); + return -EIO; + } + + if (pmtime) { + *pmtime = entry_header.mtime; + } + + if (pbl) { + r = read(pre_header.data_size, pbl); + if (r < 0) { + CLS_ERR("%s: failed reading data: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + } else { + r = seek(pre_header.data_size); + if (r < 0) { + CLS_ERR("ERROR: %s: failed to seek: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + } + + return 0; +} + +int trim_part(cls_method_context_t hctx, + ceph::buffer::list *in, ceph::buffer::list *out) +{ + CLS_LOG(5, "%s", __PRETTY_FUNCTION__); + + op::trim_part op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__); + return -EINVAL; + } + + part_header part_header; + int r = read_part_header(hctx, &part_header); + if (r < 0) { + CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__); + return r; + } + + if (op.ofs < part_header.min_ofs) { + return 0; + } + if (op.exclusive && op.ofs == part_header.min_ofs) { + return 0; + } + + if (op.ofs >= part_header.next_ofs) { + if (full_part(part_header)) { + /* + * trim full part completely: remove object + */ + + r = cls_cxx_remove(hctx); + if (r < 0) { + CLS_ERR("%s: ERROR: cls_cxx_remove() returned r=%d", __PRETTY_FUNCTION__, r); + return r; + } + + return 0; + } + + part_header.min_ofs = part_header.next_ofs; + part_header.min_index = part_header.max_index; + } else { + EntryReader reader(hctx, part_header, op.ofs); + + entry_header_pre pre_header; + int r = reader.peek_pre_header(&pre_header); + if (r < 0) { + return r; + } + + if (op.exclusive) { + part_header.min_index = pre_header.index; + } else { + r = reader.get_next_entry(nullptr, nullptr, nullptr); + if (r < 0) { + CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d", + __PRETTY_FUNCTION__, r); + return r; + } + part_header.min_index = pre_header.index + 1; + } + + part_header.min_ofs = reader.get_ofs(); + } + + r = write_part_header(hctx, part_header); + if (r < 0) { + CLS_ERR("%s: failed to write header: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + + return 0; +} + +int list_part(cls_method_context_t hctx, ceph::buffer::list* in, + ceph::buffer::list* out) +{ + CLS_LOG(5, "%s", __PRETTY_FUNCTION__); + + op::list_part op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const buffer::error &err) { + CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__); + return -EINVAL; + } + + part_header part_header; + int r = read_part_header(hctx, &part_header); + if (r < 0) { + CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__); + return r; + } + + EntryReader reader(hctx, part_header, op.ofs); + + if (op.ofs >= part_header.min_ofs && + !reader.end()) { + r = reader.get_next_entry(nullptr, nullptr, nullptr); + if (r < 0) { + CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d", __PRETTY_FUNCTION__, r); + return r; + } + } + + op::list_part_reply reply; + + auto max_entries = std::min(op.max_entries, op::MAX_LIST_ENTRIES); + + for (int i = 0; i < max_entries && !reader.end(); ++i) { + ceph::buffer::list data; + ceph::real_time mtime; + std::uint64_t ofs; + + r = reader.get_next_entry(&data, &ofs, &mtime); + if (r < 0) { + CLS_ERR("ERROR: %s: unexpected failure at get_next_entry: r=%d", + __PRETTY_FUNCTION__, r); + return r; + } + + reply.entries.emplace_back(std::move(data), ofs, mtime); + } + + reply.more = !reader.end(); + reply.full_part = full_part(part_header); + + encode(reply, *out); + + return 0; +} + +int get_part_info(cls_method_context_t hctx, ceph::buffer::list *in, + ceph::buffer::list *out) +{ + CLS_LOG(5, "%s", __PRETTY_FUNCTION__); + + op::get_part_info op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("ERROR: %s: failed to decode request", __PRETTY_FUNCTION__); + return -EINVAL; + } + + op::get_part_info_reply reply; + + int r = read_part_header(hctx, &reply.header); + if (r < 0) { + CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__); + return r; + } + + encode(reply, *out); + + return 0; +} +} +} // namespace rados::cls::fifo + +CLS_INIT(fifo) +{ + using namespace rados::cls::fifo; + CLS_LOG(10, "Loaded fifo class!"); + + cls_handle_t h_class; + cls_method_handle_t h_create_meta; + cls_method_handle_t h_get_meta; + cls_method_handle_t h_update_meta; + cls_method_handle_t h_init_part; + cls_method_handle_t h_push_part; + cls_method_handle_t h_trim_part; + cls_method_handle_t h_list_part; + cls_method_handle_t h_get_part_info; + + cls_register(op::CLASS, &h_class); + cls_register_cxx_method(h_class, op::CREATE_META, + CLS_METHOD_RD | CLS_METHOD_WR, + create_meta, &h_create_meta); + + cls_register_cxx_method(h_class, op::GET_META, + CLS_METHOD_RD, + get_meta, &h_get_meta); + + cls_register_cxx_method(h_class, op::UPDATE_META, + CLS_METHOD_RD | CLS_METHOD_WR, + update_meta, &h_update_meta); + + cls_register_cxx_method(h_class, op::INIT_PART, + CLS_METHOD_RD | CLS_METHOD_WR, + init_part, &h_init_part); + + cls_register_cxx_method(h_class, op::PUSH_PART, + CLS_METHOD_RD | CLS_METHOD_WR, + push_part, &h_push_part); + + cls_register_cxx_method(h_class, op::TRIM_PART, + CLS_METHOD_RD | CLS_METHOD_WR, + trim_part, &h_trim_part); + + cls_register_cxx_method(h_class, op::LIST_PART, + CLS_METHOD_RD, + list_part, &h_list_part); + + cls_register_cxx_method(h_class, op::GET_PART_INFO, + CLS_METHOD_RD, + get_part_info, &h_get_part_info); + + /* calculate entry overhead */ + struct entry_header entry_header; + ceph::buffer::list entry_header_bl; + encode(entry_header, entry_header_bl); + + part_entry_overhead = sizeof(entry_header_pre) + entry_header_bl.length(); + + return; +} diff --git a/src/cls/fifo/cls_fifo_ops.h b/src/cls/fifo/cls_fifo_ops.h new file mode 100644 index 000000000..e850c635c --- /dev/null +++ b/src/cls/fifo/cls_fifo_ops.h @@ -0,0 +1,311 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * Copyright (C) 2019 SUSE LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <cstdint> +#include <optional> +#include <string> +#include <vector> + +#include "include/buffer.h" +#include "include/encoding.h" +#include "include/types.h" + +#include "cls/fifo/cls_fifo_types.h" + +namespace rados::cls::fifo::op { +struct create_meta +{ + std::string id; + std::optional<objv> version; + struct { + std::string name; + std::string ns; + } pool; + std::optional<std::string> oid_prefix; + + std::uint64_t max_part_size{0}; + std::uint64_t max_entry_size{0}; + + bool exclusive{false}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(version, bl); + encode(pool.name, bl); + encode(pool.ns, bl); + encode(oid_prefix, bl); + encode(max_part_size, bl); + encode(max_entry_size, bl); + encode(exclusive, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(version, bl); + decode(pool.name, bl); + decode(pool.ns, bl); + decode(oid_prefix, bl); + decode(max_part_size, bl); + decode(max_entry_size, bl); + decode(exclusive, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(create_meta) + +struct get_meta +{ + std::optional<objv> version; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(version, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(version, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(get_meta) + +struct get_meta_reply +{ + fifo::info info; + std::uint32_t part_header_size{0}; + /* per entry extra data that is stored */ + std::uint32_t part_entry_overhead{0}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(info, bl); + encode(part_header_size, bl); + encode(part_entry_overhead, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(info, bl); + decode(part_header_size, bl); + decode(part_entry_overhead, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(get_meta_reply) + +struct update_meta +{ + objv version; + + std::optional<std::uint64_t> tail_part_num; + std::optional<std::uint64_t> head_part_num; + std::optional<std::uint64_t> min_push_part_num; + std::optional<std::uint64_t> max_push_part_num; + std::vector<journal_entry> journal_entries_add; + std::vector<journal_entry> journal_entries_rm; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(version, bl); + encode(tail_part_num, bl); + encode(head_part_num, bl); + encode(min_push_part_num, bl); + encode(max_push_part_num, bl); + encode(journal_entries_add, bl); + encode(journal_entries_rm, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(version, bl); + decode(tail_part_num, bl); + decode(head_part_num, bl); + decode(min_push_part_num, bl); + decode(max_push_part_num, bl); + decode(journal_entries_add, bl); + decode(journal_entries_rm, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(update_meta) + +struct init_part +{ + data_params params; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + std::string tag; + encode(tag, bl); + encode(params, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + std::string tag; + decode(tag, bl); + decode(params, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(init_part) + +struct push_part +{ + std::deque<ceph::buffer::list> data_bufs; + std::uint64_t total_len{0}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + std::string tag; + encode(tag, bl); + encode(data_bufs, bl); + encode(total_len, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + std::string tag; + decode(tag, bl); + decode(data_bufs, bl); + decode(total_len, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(push_part) + +struct trim_part +{ + std::uint64_t ofs{0}; + bool exclusive = false; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + std::optional<std::string> tag; + encode(tag, bl); + encode(ofs, bl); + encode(exclusive, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + std::optional<std::string> tag; + decode(tag, bl); + decode(ofs, bl); + decode(exclusive, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(trim_part) + +struct list_part +{ + std::uint64_t ofs{0}; + int max_entries{100}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + std::optional<std::string> tag; + encode(tag, bl); + encode(ofs, bl); + encode(max_entries, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + std::optional<std::string> tag; + decode(tag, bl); + decode(ofs, bl); + decode(max_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(list_part) +inline constexpr int MAX_LIST_ENTRIES = 512; + +struct list_part_reply +{ + std::vector<part_list_entry> entries; + bool more{false}; + bool full_part{false}; /* whether part is full or still can be written to. + A non full part is by definition head part */ + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + std::string tag; + encode(tag, bl); + encode(entries, bl); + encode(more, bl); + encode(full_part, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + std::string tag; + decode(tag, bl); + decode(entries, bl); + decode(more, bl); + decode(full_part, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(list_part_reply) + +struct get_part_info +{ + void encode(ceph::buffer::list &bl) const { + ENCODE_START(1, 1, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator &bl) { + DECODE_START(1, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(get_part_info) + +struct get_part_info_reply +{ + part_header header; + + void encode(ceph::buffer::list &bl) const { + ENCODE_START(1, 1, bl); + encode(header, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator &bl) { + DECODE_START(1, bl); + decode(header, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(get_part_info_reply) + +inline constexpr auto CLASS = "fifo"; +inline constexpr auto CREATE_META = "create_meta"; +inline constexpr auto GET_META = "get_meta"; +inline constexpr auto UPDATE_META = "update_meta"; +inline constexpr auto INIT_PART = "init_part"; +inline constexpr auto PUSH_PART = "push_part"; +inline constexpr auto TRIM_PART = "trim_part"; +inline constexpr auto LIST_PART = "part_list"; +inline constexpr auto GET_PART_INFO = "get_part_info"; +} // namespace rados::cls::fifo::op diff --git a/src/cls/fifo/cls_fifo_types.h b/src/cls/fifo/cls_fifo_types.h new file mode 100644 index 000000000..1c69c1f08 --- /dev/null +++ b/src/cls/fifo/cls_fifo_types.h @@ -0,0 +1,559 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <algorithm> +#include <cstdint> +#include <map> +#include <optional> +#include <ostream> +#include <string> +#include <vector> + +#include <boost/container/flat_set.hpp> + +#include <fmt/format.h> +#if FMT_VERSION >= 90000 +#include <fmt/ostream.h> +#endif +#include "include/buffer.h" +#include "include/encoding.h" +#include "include/types.h" + +#include "common/ceph_time.h" + +class JSONObj; + +namespace rados::cls::fifo { +struct objv { + std::string instance; + std::uint64_t ver{0}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(instance, bl); + encode(ver, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(instance, bl); + decode(ver, bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter* f) const; + void decode_json(JSONObj* obj); + + bool operator ==(const objv& rhs) const { + return (instance == rhs.instance && + ver == rhs.ver); + } + bool operator !=(const objv& rhs) const { + return (instance != rhs.instance || + ver != rhs.ver); + } + bool same_or_later(const objv& rhs) const { + return (instance == rhs.instance && + ver >= rhs.ver); + } + + bool empty() const { + return instance.empty(); + } + + std::string to_str() const { + return fmt::format("{}{{{}}}", instance, ver); + } +}; +WRITE_CLASS_ENCODER(objv) +inline std::ostream& operator <<(std::ostream& os, const objv& objv) +{ + return os << objv.to_str(); +} + +struct data_params { + std::uint64_t max_part_size{0}; + std::uint64_t max_entry_size{0}; + std::uint64_t full_size_threshold{0}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(max_part_size, bl); + encode(max_entry_size, bl); + encode(full_size_threshold, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(max_part_size, bl); + decode(max_entry_size, bl); + decode(full_size_threshold, bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter* f) const; + void decode_json(JSONObj* obj); + + auto operator <=>(const data_params&) const = default; +}; +WRITE_CLASS_ENCODER(data_params) +inline std::ostream& operator <<(std::ostream& m, const data_params& d) { + return m << "max_part_size: " << d.max_part_size << ", " + << "max_entry_size: " << d.max_entry_size << ", " + << "full_size_threshold: " << d.full_size_threshold; +} + +struct journal_entry { + enum class Op { + unknown = -1, + create = 1, + set_head = 2, + remove = 3, + } op{Op::unknown}; + + std::int64_t part_num{-1}; + + bool valid() const { + using enum Op; + switch (op) { + case create: [[fallthrough]]; + case set_head: [[fallthrough]]; + case remove: + return part_num >= 0; + + default: + return false; + } + } + + journal_entry() = default; + journal_entry(Op op, std::int64_t part_num) + : op(op), part_num(part_num) {} + + void encode(ceph::buffer::list& bl) const { + ceph_assert(valid()); + ENCODE_START(1, 1, bl); + encode((int)op, bl); + encode(part_num, bl); + std::string part_tag; + encode(part_tag, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + int i; + decode(i, bl); + op = static_cast<Op>(i); + decode(part_num, bl); + std::string part_tag; + decode(part_tag, bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter* f) const; + + auto operator <=>(const journal_entry&) const = default; +}; +WRITE_CLASS_ENCODER(journal_entry) +inline std::ostream& operator <<(std::ostream& m, const journal_entry::Op& o) { + switch (o) { + case journal_entry::Op::unknown: + return m << "Op::unknown"; + case journal_entry::Op::create: + return m << "Op::create"; + case journal_entry::Op::set_head: + return m << "Op::set_head"; + case journal_entry::Op::remove: + return m << "Op::remove"; + } + return m << "Bad value: " << static_cast<int>(o); +} +inline std::ostream& operator <<(std::ostream& m, const journal_entry& j) { + return m << "op: " << j.op << ", " + << "part_num: " << j.part_num; +} + +// This is actually a useful builder, since otherwise we end up with +// four uint64_ts in a row and only care about a subset at a time. +class update { + std::optional<std::int64_t> tail_part_num_; + std::optional<std::int64_t> head_part_num_; + std::optional<std::int64_t> min_push_part_num_; + std::optional<std::int64_t> max_push_part_num_; + std::vector<fifo::journal_entry> journal_entries_add_; + std::vector<fifo::journal_entry> journal_entries_rm_; + +public: + + update&& tail_part_num(std::optional<std::int64_t> num) noexcept { + tail_part_num_ = num; + return std::move(*this); + } + auto tail_part_num() const noexcept { + return tail_part_num_; + } + + update&& head_part_num(std::optional<std::int64_t> num) noexcept { + head_part_num_ = num; + return std::move(*this); + } + auto head_part_num() const noexcept { + return head_part_num_; + } + + update&& min_push_part_num(std::optional<std::int64_t> num) + noexcept { + min_push_part_num_ = num; + return std::move(*this); + } + auto min_push_part_num() const noexcept { + return min_push_part_num_; + } + + update&& max_push_part_num(std::optional<std::int64_t> num) noexcept { + max_push_part_num_ = num; + return std::move(*this); + } + auto max_push_part_num() const noexcept { + return max_push_part_num_; + } + + update&& journal_entry_add(fifo::journal_entry entry) { + journal_entries_add_.push_back(std::move(entry)); + return std::move(*this); + } + update&& journal_entries_add( + std::optional<std::vector<fifo::journal_entry>>&& entries) { + if (entries) { + journal_entries_add_ = std::move(*entries); + } else { + journal_entries_add_.clear(); + } + return std::move(*this); + } + const auto& journal_entries_add() const & noexcept { + return journal_entries_add_; + } + auto&& journal_entries_add() && noexcept { + return std::move(journal_entries_add_); + } + + update&& journal_entry_rm(fifo::journal_entry entry) { + journal_entries_rm_.push_back(std::move(entry)); + return std::move(*this); + } + update&& journal_entries_rm( + std::optional<std::vector<fifo::journal_entry>>&& entries) { + if (entries) { + journal_entries_rm_ = std::move(*entries); + } else { + journal_entries_rm_.clear(); + } + return std::move(*this); + } + const auto& journal_entries_rm() const & noexcept { + return journal_entries_rm_; + } + auto&& journal_entries_rm() && noexcept { + return std::move(journal_entries_rm_); + } + friend std::ostream& operator <<(std::ostream& m, const update& u); +}; +inline std::ostream& operator <<(std::ostream& m, const update& u) { + bool prev = false; + if (u.tail_part_num_) { + m << "tail_part_num: " << *u.tail_part_num_; + prev = true; + } + if (u.head_part_num_) { + if (prev) + m << ", "; + m << "head_part_num: " << *u.head_part_num_; + prev = true; + } + if (u.min_push_part_num_) { + if (prev) + m << ", "; + m << "min_push_part_num: " << *u.min_push_part_num_; + prev = true; + } + if (u.max_push_part_num_) { + if (prev) + m << ", "; + m << "max_push_part_num: " << *u.max_push_part_num_; + prev = true; + } + if (!u.journal_entries_add_.empty()) { + if (prev) + m << ", "; + m << "journal_entries_add: {" << u.journal_entries_add_ << "}"; + prev = true; + } + if (!u.journal_entries_rm_.empty()) { + if (prev) + m << ", "; + m << "journal_entries_rm: {" << u.journal_entries_rm_ << "}"; + prev = true; + } + if (!prev) + m << "(none)"; + return m; +} + +struct info { + std::string id; + objv version; + std::string oid_prefix; + data_params params; + + std::int64_t tail_part_num{0}; + std::int64_t head_part_num{-1}; + std::int64_t min_push_part_num{0}; + std::int64_t max_push_part_num{-1}; + + boost::container::flat_set<journal_entry> journal; + static_assert(journal_entry::Op::create < journal_entry::Op::set_head); + + // So we can get rid of the multimap without breaking compatibility + void encode_journal(bufferlist& bl) const { + using ceph::encode; + assert(journal.size() <= std::numeric_limits<uint32_t>::max()); + uint32_t n = static_cast<uint32_t>(journal.size()); + encode(n, bl); + for (const auto& entry : journal) { + encode(entry.part_num, bl); + encode(entry, bl); + } + } + + void decode_journal( bufferlist::const_iterator& p) { + using enum journal_entry::Op; + using ceph::decode; + uint32_t n; + decode(n, p); + journal.clear(); + while (n--) { + decltype(journal_entry::part_num) dummy; + decode(dummy, p); + journal_entry e; + decode(e, p); + if (!e.valid()) { + throw ceph::buffer::malformed_input(); + } else { + journal.insert(std::move(e)); + } + } + } + bool need_new_head() const { + return (head_part_num < min_push_part_num); + } + + bool need_new_part() const { + return (max_push_part_num < min_push_part_num); + } + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(version, bl); + encode(oid_prefix, bl); + encode(params, bl); + encode(tail_part_num, bl); + encode(head_part_num, bl); + encode(min_push_part_num, bl); + encode(max_push_part_num, bl); + std::string head_tag; + std::map<int64_t, std::string> tags; + encode(tags, bl); + encode(head_tag, bl); + encode_journal(bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(version, bl); + decode(oid_prefix, bl); + decode(params, bl); + decode(tail_part_num, bl); + decode(head_part_num, bl); + decode(min_push_part_num, bl); + decode(max_push_part_num, bl); + std::string head_tag; + std::map<int64_t, std::string> tags; + decode(tags, bl); + decode(head_tag, bl); + decode_journal(bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter* f) const; + void decode_json(JSONObj* obj); + + std::string part_oid(std::int64_t part_num) const { + return fmt::format("{}.{}", oid_prefix, part_num); + } + + bool apply_update(const update& update) { + bool changed = false; + if (update.tail_part_num() && (tail_part_num != *update.tail_part_num())) { + tail_part_num = *update.tail_part_num(); + changed = true; + } + + if (update.min_push_part_num() && + (min_push_part_num != *update.min_push_part_num())) { + min_push_part_num = *update.min_push_part_num(); + changed = true; + } + + if (update.max_push_part_num() && + (max_push_part_num != *update.max_push_part_num())) { + max_push_part_num = *update.max_push_part_num(); + changed = true; + } + + for (const auto& entry : update.journal_entries_add()) { + auto [iter, inserted] = journal.insert(entry); + if (inserted) { + changed = true; + } + } + + for (const auto& entry : update.journal_entries_rm()) { + auto count = journal.erase(entry); + if (count > 0) { + changed = true; + } + } + + if (update.head_part_num() && (head_part_num != *update.head_part_num())) { + head_part_num = *update.head_part_num(); + changed = true; + } + if (changed) { + ++version.ver; + } + return changed; + } +}; +WRITE_CLASS_ENCODER(info) +inline std::ostream& operator <<(std::ostream& m, const info& i) { + return m << "id: " << i.id << ", " + << "version: " << i.version << ", " + << "oid_prefix: " << i.oid_prefix << ", " + << "params: {" << i.params << "}, " + << "tail_part_num: " << i.tail_part_num << ", " + << "head_part_num: " << i.head_part_num << ", " + << "min_push_part_num: " << i.min_push_part_num << ", " + << "max_push_part_num: " << i.max_push_part_num << ", " + << "journal: {" << i.journal; +} + +struct part_list_entry { + ceph::buffer::list data; + std::uint64_t ofs = 0; + ceph::real_time mtime; + + part_list_entry() {} + part_list_entry(ceph::buffer::list&& data, + uint64_t ofs, + ceph::real_time mtime) + : data(std::move(data)), ofs(ofs), mtime(mtime) {} + + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(data, bl); + encode(ofs, bl); + encode(mtime, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(data, bl); + decode(ofs, bl); + decode(mtime, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(part_list_entry) +inline std::ostream& operator <<(std::ostream& m, + const part_list_entry& p) { + using ceph::operator <<; + return m << "data: " << p.data << ", " + << "ofs: " << p.ofs << ", " + << "mtime: " << p.mtime; +} + +struct part_header { + data_params params; + + std::uint64_t magic{0}; + + std::uint64_t min_ofs{0}; + std::uint64_t last_ofs{0}; + std::uint64_t next_ofs{0}; + std::uint64_t min_index{0}; + std::uint64_t max_index{0}; + ceph::real_time max_time; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + std::string tag; + encode(tag, bl); + encode(params, bl); + encode(magic, bl); + encode(min_ofs, bl); + encode(last_ofs, bl); + encode(next_ofs, bl); + encode(min_index, bl); + encode(max_index, bl); + encode(max_time, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + std::string tag; + decode(tag, bl); + decode(params, bl); + decode(magic, bl); + decode(min_ofs, bl); + decode(last_ofs, bl); + decode(next_ofs, bl); + decode(min_index, bl); + decode(max_index, bl); + decode(max_time, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(part_header) +inline std::ostream& operator <<(std::ostream& m, const part_header& p) { + using ceph::operator <<; + return m << "params: {" << p.params << "}, " + << "magic: " << p.magic << ", " + << "min_ofs: " << p.min_ofs << ", " + << "last_ofs: " << p.last_ofs << ", " + << "next_ofs: " << p.next_ofs << ", " + << "min_index: " << p.min_index << ", " + << "max_index: " << p.max_index << ", " + << "max_time: " << p.max_time; +} +} // namespace rados::cls::fifo + +#if FMT_VERSION >= 90000 +template<> +struct fmt::formatter<rados::cls::fifo::info> : fmt::ostream_formatter {}; +template<> +struct fmt::formatter<rados::cls::fifo::part_header> : fmt::ostream_formatter {}; +#endif |