// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2019 Red Hat, Inc. * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #pragma once #include #include #include #include #include #include #include #include #include #if FMT_VERSION >= 90000 #include #endif #include "include/buffer.h" #include "include/encoding.h" #include "include/types.h" #include "common/ceph_time.h" class JSONObj; namespace rados::cls::fifo { struct objv { std::string instance; std::uint64_t ver{0}; void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(instance, bl); encode(ver, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(instance, bl); decode(ver, bl); DECODE_FINISH(bl); } void dump(ceph::Formatter* f) const; void decode_json(JSONObj* obj); bool operator ==(const objv& rhs) const { return (instance == rhs.instance && ver == rhs.ver); } bool operator !=(const objv& rhs) const { return (instance != rhs.instance || ver != rhs.ver); } bool same_or_later(const objv& rhs) const { return (instance == rhs.instance && ver >= rhs.ver); } bool empty() const { return instance.empty(); } std::string to_str() const { return fmt::format("{}{{{}}}", instance, ver); } }; WRITE_CLASS_ENCODER(objv) inline std::ostream& operator <<(std::ostream& os, const objv& objv) { return os << objv.to_str(); } struct data_params { std::uint64_t max_part_size{0}; std::uint64_t max_entry_size{0}; std::uint64_t full_size_threshold{0}; void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(max_part_size, bl); encode(max_entry_size, bl); encode(full_size_threshold, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(max_part_size, bl); decode(max_entry_size, bl); decode(full_size_threshold, bl); DECODE_FINISH(bl); } void dump(ceph::Formatter* f) const; void decode_json(JSONObj* obj); auto operator <=>(const data_params&) const = default; }; WRITE_CLASS_ENCODER(data_params) inline std::ostream& operator <<(std::ostream& m, const data_params& d) { return m << "max_part_size: " << d.max_part_size << ", " << "max_entry_size: " << d.max_entry_size << ", " << "full_size_threshold: " << d.full_size_threshold; } struct journal_entry { enum class Op { unknown = -1, create = 1, set_head = 2, remove = 3, } op{Op::unknown}; std::int64_t part_num{-1}; bool valid() const { using enum Op; switch (op) { case create: [[fallthrough]]; case set_head: [[fallthrough]]; case remove: return part_num >= 0; default: return false; } } journal_entry() = default; journal_entry(Op op, std::int64_t part_num) : op(op), part_num(part_num) {} void encode(ceph::buffer::list& bl) const { ceph_assert(valid()); ENCODE_START(1, 1, bl); encode((int)op, bl); encode(part_num, bl); std::string part_tag; encode(part_tag, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); int i; decode(i, bl); op = static_cast(i); decode(part_num, bl); std::string part_tag; decode(part_tag, bl); DECODE_FINISH(bl); } void dump(ceph::Formatter* f) const; auto operator <=>(const journal_entry&) const = default; }; WRITE_CLASS_ENCODER(journal_entry) inline std::ostream& operator <<(std::ostream& m, const journal_entry::Op& o) { switch (o) { case journal_entry::Op::unknown: return m << "Op::unknown"; case journal_entry::Op::create: return m << "Op::create"; case journal_entry::Op::set_head: return m << "Op::set_head"; case journal_entry::Op::remove: return m << "Op::remove"; } return m << "Bad value: " << static_cast(o); } inline std::ostream& operator <<(std::ostream& m, const journal_entry& j) { return m << "op: " << j.op << ", " << "part_num: " << j.part_num; } // This is actually a useful builder, since otherwise we end up with // four uint64_ts in a row and only care about a subset at a time. class update { std::optional tail_part_num_; std::optional head_part_num_; std::optional min_push_part_num_; std::optional max_push_part_num_; std::vector journal_entries_add_; std::vector journal_entries_rm_; public: update&& tail_part_num(std::optional num) noexcept { tail_part_num_ = num; return std::move(*this); } auto tail_part_num() const noexcept { return tail_part_num_; } update&& head_part_num(std::optional num) noexcept { head_part_num_ = num; return std::move(*this); } auto head_part_num() const noexcept { return head_part_num_; } update&& min_push_part_num(std::optional num) noexcept { min_push_part_num_ = num; return std::move(*this); } auto min_push_part_num() const noexcept { return min_push_part_num_; } update&& max_push_part_num(std::optional num) noexcept { max_push_part_num_ = num; return std::move(*this); } auto max_push_part_num() const noexcept { return max_push_part_num_; } update&& journal_entry_add(fifo::journal_entry entry) { journal_entries_add_.push_back(std::move(entry)); return std::move(*this); } update&& journal_entries_add( std::optional>&& entries) { if (entries) { journal_entries_add_ = std::move(*entries); } else { journal_entries_add_.clear(); } return std::move(*this); } const auto& journal_entries_add() const & noexcept { return journal_entries_add_; } auto&& journal_entries_add() && noexcept { return std::move(journal_entries_add_); } update&& journal_entry_rm(fifo::journal_entry entry) { journal_entries_rm_.push_back(std::move(entry)); return std::move(*this); } update&& journal_entries_rm( std::optional>&& entries) { if (entries) { journal_entries_rm_ = std::move(*entries); } else { journal_entries_rm_.clear(); } return std::move(*this); } const auto& journal_entries_rm() const & noexcept { return journal_entries_rm_; } auto&& journal_entries_rm() && noexcept { return std::move(journal_entries_rm_); } friend std::ostream& operator <<(std::ostream& m, const update& u); }; inline std::ostream& operator <<(std::ostream& m, const update& u) { bool prev = false; if (u.tail_part_num_) { m << "tail_part_num: " << *u.tail_part_num_; prev = true; } if (u.head_part_num_) { if (prev) m << ", "; m << "head_part_num: " << *u.head_part_num_; prev = true; } if (u.min_push_part_num_) { if (prev) m << ", "; m << "min_push_part_num: " << *u.min_push_part_num_; prev = true; } if (u.max_push_part_num_) { if (prev) m << ", "; m << "max_push_part_num: " << *u.max_push_part_num_; prev = true; } if (!u.journal_entries_add_.empty()) { if (prev) m << ", "; m << "journal_entries_add: {" << u.journal_entries_add_ << "}"; prev = true; } if (!u.journal_entries_rm_.empty()) { if (prev) m << ", "; m << "journal_entries_rm: {" << u.journal_entries_rm_ << "}"; prev = true; } if (!prev) m << "(none)"; return m; } struct info { std::string id; objv version; std::string oid_prefix; data_params params; std::int64_t tail_part_num{0}; std::int64_t head_part_num{-1}; std::int64_t min_push_part_num{0}; std::int64_t max_push_part_num{-1}; boost::container::flat_set journal; static_assert(journal_entry::Op::create < journal_entry::Op::set_head); // So we can get rid of the multimap without breaking compatibility void encode_journal(bufferlist& bl) const { using ceph::encode; assert(journal.size() <= std::numeric_limits::max()); uint32_t n = static_cast(journal.size()); encode(n, bl); for (const auto& entry : journal) { encode(entry.part_num, bl); encode(entry, bl); } } void decode_journal( bufferlist::const_iterator& p) { using enum journal_entry::Op; using ceph::decode; uint32_t n; decode(n, p); journal.clear(); while (n--) { decltype(journal_entry::part_num) dummy; decode(dummy, p); journal_entry e; decode(e, p); if (!e.valid()) { throw ceph::buffer::malformed_input(); } else { journal.insert(std::move(e)); } } } bool need_new_head() const { return (head_part_num < min_push_part_num); } bool need_new_part() const { return (max_push_part_num < min_push_part_num); } void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(id, bl); encode(version, bl); encode(oid_prefix, bl); encode(params, bl); encode(tail_part_num, bl); encode(head_part_num, bl); encode(min_push_part_num, bl); encode(max_push_part_num, bl); std::string head_tag; std::map tags; encode(tags, bl); encode(head_tag, bl); encode_journal(bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(id, bl); decode(version, bl); decode(oid_prefix, bl); decode(params, bl); decode(tail_part_num, bl); decode(head_part_num, bl); decode(min_push_part_num, bl); decode(max_push_part_num, bl); std::string head_tag; std::map tags; decode(tags, bl); decode(head_tag, bl); decode_journal(bl); DECODE_FINISH(bl); } void dump(ceph::Formatter* f) const; void decode_json(JSONObj* obj); std::string part_oid(std::int64_t part_num) const { return fmt::format("{}.{}", oid_prefix, part_num); } bool apply_update(const update& update) { bool changed = false; if (update.tail_part_num() && (tail_part_num != *update.tail_part_num())) { tail_part_num = *update.tail_part_num(); changed = true; } if (update.min_push_part_num() && (min_push_part_num != *update.min_push_part_num())) { min_push_part_num = *update.min_push_part_num(); changed = true; } if (update.max_push_part_num() && (max_push_part_num != *update.max_push_part_num())) { max_push_part_num = *update.max_push_part_num(); changed = true; } for (const auto& entry : update.journal_entries_add()) { auto [iter, inserted] = journal.insert(entry); if (inserted) { changed = true; } } for (const auto& entry : update.journal_entries_rm()) { auto count = journal.erase(entry); if (count > 0) { changed = true; } } if (update.head_part_num() && (head_part_num != *update.head_part_num())) { head_part_num = *update.head_part_num(); changed = true; } if (changed) { ++version.ver; } return changed; } }; WRITE_CLASS_ENCODER(info) inline std::ostream& operator <<(std::ostream& m, const info& i) { return m << "id: " << i.id << ", " << "version: " << i.version << ", " << "oid_prefix: " << i.oid_prefix << ", " << "params: {" << i.params << "}, " << "tail_part_num: " << i.tail_part_num << ", " << "head_part_num: " << i.head_part_num << ", " << "min_push_part_num: " << i.min_push_part_num << ", " << "max_push_part_num: " << i.max_push_part_num << ", " << "journal: {" << i.journal; } struct part_list_entry { ceph::buffer::list data; std::uint64_t ofs = 0; ceph::real_time mtime; part_list_entry() {} part_list_entry(ceph::buffer::list&& data, uint64_t ofs, ceph::real_time mtime) : data(std::move(data)), ofs(ofs), mtime(mtime) {} void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); encode(data, bl); encode(ofs, bl); encode(mtime, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); decode(data, bl); decode(ofs, bl); decode(mtime, bl); DECODE_FINISH(bl); } }; WRITE_CLASS_ENCODER(part_list_entry) inline std::ostream& operator <<(std::ostream& m, const part_list_entry& p) { using ceph::operator <<; return m << "data: " << p.data << ", " << "ofs: " << p.ofs << ", " << "mtime: " << p.mtime; } struct part_header { data_params params; std::uint64_t magic{0}; std::uint64_t min_ofs{0}; std::uint64_t last_ofs{0}; std::uint64_t next_ofs{0}; std::uint64_t min_index{0}; std::uint64_t max_index{0}; ceph::real_time max_time; void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); std::string tag; encode(tag, bl); encode(params, bl); encode(magic, bl); encode(min_ofs, bl); encode(last_ofs, bl); encode(next_ofs, bl); encode(min_index, bl); encode(max_index, bl); encode(max_time, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { DECODE_START(1, bl); std::string tag; decode(tag, bl); decode(params, bl); decode(magic, bl); decode(min_ofs, bl); decode(last_ofs, bl); decode(next_ofs, bl); decode(min_index, bl); decode(max_index, bl); decode(max_time, bl); DECODE_FINISH(bl); } }; WRITE_CLASS_ENCODER(part_header) inline std::ostream& operator <<(std::ostream& m, const part_header& p) { using ceph::operator <<; return m << "params: {" << p.params << "}, " << "magic: " << p.magic << ", " << "min_ofs: " << p.min_ofs << ", " << "last_ofs: " << p.last_ofs << ", " << "next_ofs: " << p.next_ofs << ", " << "min_index: " << p.min_index << ", " << "max_index: " << p.max_index << ", " << "max_time: " << p.max_time; } } // namespace rados::cls::fifo #if FMT_VERSION >= 90000 template<> struct fmt::formatter : fmt::ostream_formatter {}; template<> struct fmt::formatter : fmt::ostream_formatter {}; #endif