From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rgw/driver/rados/rgw_datalog.cc | 1090 +++++++++++++++++++++++++++++++++++ 1 file changed, 1090 insertions(+) create mode 100644 src/rgw/driver/rados/rgw_datalog.cc (limited to 'src/rgw/driver/rados/rgw_datalog.cc') diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc new file mode 100644 index 000000000..7ca37abf6 --- /dev/null +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -0,0 +1,1090 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include + +#include "common/async/yield_context.h" +#include "common/debug.h" +#include "common/containers.h" +#include "common/errno.h" +#include "common/error_code.h" + +#include "common/async/blocked_completion.h" +#include "common/async/librados_completion.h" + +#include "cls/fifo/cls_fifo_types.h" +#include "cls/log/cls_log_client.h" + +#include "cls_fifo_legacy.h" +#include "rgw_bucket_layout.h" +#include "rgw_datalog.h" +#include "rgw_log_backing.h" +#include "rgw_tools.h" + +#define dout_context g_ceph_context +static constexpr auto dout_subsys = ceph_subsys_rgw; + +namespace bs = boost::system; +namespace lr = librados; + +using ceph::containers::tiny_vector; + +void rgw_data_change::dump(ceph::Formatter *f) const +{ + std::string type; + switch (entity_type) { + case ENTITY_TYPE_BUCKET: + type = "bucket"; + break; + default: + type = "unknown"; + } + encode_json("entity_type", type, f); + encode_json("key", key, f); + utime_t ut(timestamp); + encode_json("timestamp", ut, f); + encode_json("gen", gen, f); +} + +void rgw_data_change::decode_json(JSONObj *obj) { + std::string s; + JSONDecoder::decode_json("entity_type", s, obj); + if (s == "bucket") { + entity_type = ENTITY_TYPE_BUCKET; + } else { + entity_type = ENTITY_TYPE_UNKNOWN; + } + JSONDecoder::decode_json("key", key, obj); + utime_t ut; + JSONDecoder::decode_json("timestamp", ut, obj); + timestamp = ut.to_real_time(); + JSONDecoder::decode_json("gen", gen, obj); +} + +void rgw_data_change_log_entry::dump(Formatter *f) const +{ + encode_json("log_id", log_id, f); + utime_t ut(log_timestamp); + encode_json("log_timestamp", ut, f); + encode_json("entry", entry, f); +} + +void rgw_data_change_log_entry::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("log_id", log_id, obj); + utime_t ut; + JSONDecoder::decode_json("log_timestamp", ut, obj); + log_timestamp = ut.to_real_time(); + JSONDecoder::decode_json("entry", entry, obj); +} + +void rgw_data_notify_entry::dump(Formatter *f) const +{ + encode_json("key", key, f); + encode_json("gen", gen, f); +} + +void rgw_data_notify_entry::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("key", key, obj); + JSONDecoder::decode_json("gen", gen, obj); +} + +class RGWDataChangesOmap final : public RGWDataChangesBE { + using centries = std::list; + std::vector oids; + +public: + RGWDataChangesOmap(lr::IoCtx& ioctx, + RGWDataChangesLog& datalog, + uint64_t gen_id, + int num_shards) + : RGWDataChangesBE(ioctx, datalog, gen_id) { + oids.reserve(num_shards); + for (auto i = 0; i < num_shards; ++i) { + oids.push_back(get_oid(i)); + } + } + ~RGWDataChangesOmap() override = default; + + void prepare(ceph::real_time ut, const std::string& key, + ceph::buffer::list&& entry, entries& out) override { + if (!std::holds_alternative(out)) { + ceph_assert(std::visit([](const auto& v) { return std::empty(v); }, out)); + out = centries(); + } + + cls_log_entry e; + cls_log_add_prepare_entry(e, utime_t(ut), {}, key, entry); + std::get(out).push_back(std::move(e)); + } + int push(const DoutPrefixProvider *dpp, int index, entries&& items, optional_yield y) override { + lr::ObjectWriteOperation op; + cls_log_add(op, std::get(items), true); + auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to push to " << oids[index] << cpp_strerror(-r) + << dendl; + } + return r; + } + int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now, + const std::string& key, ceph::buffer::list&& bl, + optional_yield y) override { + lr::ObjectWriteOperation op; + cls_log_add(op, utime_t(now), {}, key, bl); + auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to push to " << oids[index] + << cpp_strerror(-r) << dendl; + } + return r; + } + int list(const DoutPrefixProvider *dpp, int index, int max_entries, + std::vector& entries, + std::optional marker, + std::string* out_marker, bool* truncated, + optional_yield y) override { + std::list log_entries; + lr::ObjectReadOperation op; + cls_log_list(op, {}, {}, std::string(marker.value_or("")), + max_entries, log_entries, out_marker, truncated); + auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, y); + if (r == -ENOENT) { + *truncated = false; + return 0; + } + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to list " << oids[index] + << cpp_strerror(-r) << dendl; + return r; + } + for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) { + rgw_data_change_log_entry log_entry; + log_entry.log_id = iter->id; + auto rt = iter->timestamp.to_real_time(); + log_entry.log_timestamp = rt; + auto liter = iter->data.cbegin(); + try { + decode(log_entry.entry, liter); + } catch (ceph::buffer::error& err) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to decode data changes log entry: " + << err.what() << dendl; + return -EIO; + } + entries.push_back(log_entry); + } + return 0; + } + int get_info(const DoutPrefixProvider *dpp, int index, + RGWDataChangesLogInfo *info, optional_yield y) override { + cls_log_header header; + lr::ObjectReadOperation op; + cls_log_info(op, &header); + auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, y); + if (r == -ENOENT) r = 0; + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to get info from " << oids[index] + << cpp_strerror(-r) << dendl; + } else { + info->marker = header.max_marker; + info->last_update = header.max_time.to_real_time(); + } + return r; + } + int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, + optional_yield y) override { + lr::ObjectWriteOperation op; + cls_log_trim(op, {}, {}, {}, std::string(marker)); + auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, y); + if (r == -ENOENT) r = -ENODATA; + if (r < 0 && r != -ENODATA) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to get info from " << oids[index] + << cpp_strerror(-r) << dendl; + } + return r; + } + int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, + lr::AioCompletion* c) override { + lr::ObjectWriteOperation op; + cls_log_trim(op, {}, {}, {}, std::string(marker)); + auto r = ioctx.aio_operate(oids[index], c, &op, 0); + if (r == -ENOENT) r = -ENODATA; + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to get info from " << oids[index] + << cpp_strerror(-r) << dendl; + } + return r; + } + std::string_view max_marker() const override { + return "99999999"; + } + int is_empty(const DoutPrefixProvider *dpp, optional_yield y) override { + for (auto shard = 0u; shard < oids.size(); ++shard) { + std::list log_entries; + lr::ObjectReadOperation op; + std::string out_marker; + bool truncated; + cls_log_list(op, {}, {}, {}, 1, log_entries, &out_marker, &truncated); + auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, y); + if (r == -ENOENT) { + continue; + } + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to list " << oids[shard] + << cpp_strerror(-r) << dendl; + return r; + } + if (!log_entries.empty()) { + return 0; + } + } + return 1; + } +}; + +class RGWDataChangesFIFO final : public RGWDataChangesBE { + using centries = std::vector; + tiny_vector fifos; + +public: + RGWDataChangesFIFO(lr::IoCtx& ioctx, + RGWDataChangesLog& datalog, + uint64_t gen_id, int shards) + : RGWDataChangesBE(ioctx, datalog, gen_id), + fifos(shards, [&ioctx, this](std::size_t i, auto emplacer) { + emplacer.emplace(ioctx, get_oid(i)); + }) {} + ~RGWDataChangesFIFO() override = default; + void prepare(ceph::real_time, const std::string&, + ceph::buffer::list&& entry, entries& out) override { + if (!std::holds_alternative(out)) { + ceph_assert(std::visit([](auto& v) { return std::empty(v); }, out)); + out = centries(); + } + std::get(out).push_back(std::move(entry)); + } + int push(const DoutPrefixProvider *dpp, int index, entries&& items, + optional_yield y) override { + auto r = fifos[index].push(dpp, std::get(items), y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to push to FIFO: " << get_oid(index) + << ": " << cpp_strerror(-r) << dendl; + } + return r; + } + int push(const DoutPrefixProvider *dpp, int index, ceph::real_time, + const std::string&, ceph::buffer::list&& bl, + optional_yield y) override { + auto r = fifos[index].push(dpp, std::move(bl), y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to push to FIFO: " << get_oid(index) + << ": " << cpp_strerror(-r) << dendl; + } + return r; + } + int list(const DoutPrefixProvider *dpp, int index, int max_entries, + std::vector& entries, + std::optional marker, std::string* out_marker, + bool* truncated, optional_yield y) override { + std::vector log_entries; + bool more = false; + auto r = fifos[index].list(dpp, max_entries, marker, &log_entries, &more, + y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to list FIFO: " << get_oid(index) + << ": " << cpp_strerror(-r) << dendl; + return r; + } + for (const auto& entry : log_entries) { + rgw_data_change_log_entry log_entry; + log_entry.log_id = entry.marker; + log_entry.log_timestamp = entry.mtime; + auto liter = entry.data.cbegin(); + try { + decode(log_entry.entry, liter); + } catch (const buffer::error& err) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to decode data changes log entry: " + << err.what() << dendl; + return -EIO; + } + entries.push_back(std::move(log_entry)); + } + if (truncated) + *truncated = more; + if (out_marker && !log_entries.empty()) { + *out_marker = log_entries.back().marker; + } + return 0; + } + int get_info(const DoutPrefixProvider *dpp, int index, + RGWDataChangesLogInfo *info, optional_yield y) override { + auto& fifo = fifos[index]; + auto r = fifo.read_meta(dpp, y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to get FIFO metadata: " << get_oid(index) + << ": " << cpp_strerror(-r) << dendl; + return r; + } + rados::cls::fifo::info m; + fifo.meta(dpp, m, y); + auto p = m.head_part_num; + if (p < 0) { + info->marker = ""; + info->last_update = ceph::real_clock::zero(); + return 0; + } + rgw::cls::fifo::part_info h; + r = fifo.get_part_info(dpp, p, &h, y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to get part info: " << get_oid(index) << "/" << p + << ": " << cpp_strerror(-r) << dendl; + return r; + } + info->marker = rgw::cls::fifo::marker{p, h.last_ofs}.to_string(); + info->last_update = h.max_time; + return 0; + } + int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, + optional_yield y) override { + auto r = fifos[index].trim(dpp, marker, false, y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to trim FIFO: " << get_oid(index) + << ": " << cpp_strerror(-r) << dendl; + } + return r; + } + int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, + librados::AioCompletion* c) override { + int r = 0; + if (marker == rgw::cls::fifo::marker(0, 0).to_string()) { + rgw_complete_aio_completion(c, -ENODATA); + } else { + // This null_yield is used for lazily opening FIFOs. + // + // shouldn't exist, but it can't be eliminated + // since your caller is an RGWCoroutine in the data sync code. + // + // It can be eliminated after Reef when we can get rid of + // AioCompletion entirely. + fifos[index].trim(dpp, marker, false, c, null_yield); + } + return r; + } + std::string_view max_marker() const override { + static const std::string mm = + rgw::cls::fifo::marker::max().to_string(); + return std::string_view(mm); + } + int is_empty(const DoutPrefixProvider *dpp, optional_yield y) override { + std::vector log_entries; + bool more = false; + for (auto shard = 0u; shard < fifos.size(); ++shard) { + auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more, y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to list FIFO: " << get_oid(shard) + << ": " << cpp_strerror(-r) << dendl; + return r; + } + if (!log_entries.empty()) { + return 0; + } + } + return 1; + } +}; + +RGWDataChangesLog::RGWDataChangesLog(CephContext* cct) + : cct(cct), + num_shards(cct->_conf->rgw_data_log_num_shards), + prefix(get_prefix()), + changes(cct->_conf->rgw_data_log_changes_size) {} + +bs::error_code DataLogBackends::handle_init(entries_t e) noexcept { + std::unique_lock l(m); + + for (const auto& [gen_id, gen] : e) { + if (gen.pruned) { + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": ERROR: given empty generation: gen_id=" << gen_id << dendl; + } + if (count(gen_id) != 0) { + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": ERROR: generation already exists: gen_id=" << gen_id << dendl; + } + try { + switch (gen.type) { + case log_type::omap: + emplace(gen_id, new RGWDataChangesOmap(ioctx, datalog, gen_id, shards)); + break; + case log_type::fifo: + emplace(gen_id, new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards)); + break; + default: + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id + << ", type" << gen.type << dendl; + return bs::error_code(EFAULT, bs::system_category()); + } + } catch (const bs::system_error& err) { + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": error setting up backend: gen_id=" << gen_id + << ", err=" << err.what() << dendl; + return err.code(); + } + } + return {}; +} +bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept { + return handle_init(std::move(e)); +} +bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept { + std::unique_lock l(m); + auto i = cbegin(); + if (i->first < new_tail) { + return {}; + } + if (new_tail >= (cend() - 1)->first) { + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl; + return bs::error_code(EFAULT, bs::system_category()); + } + erase(i, upper_bound(new_tail)); + return {}; +} + + +int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* _zone, + const RGWZoneParams& zoneparams, + librados::Rados* lr) +{ + zone = _zone; + ceph_assert(zone); + auto defbacking = to_log_type( + cct->_conf.get_val("rgw_default_data_log_backing")); + // Should be guaranteed by `set_enum_allowed` + ceph_assert(defbacking); + auto log_pool = zoneparams.log_pool; + auto r = rgw_init_ioctx(dpp, lr, log_pool, ioctx, true, false); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": Failed to initialized ioctx, r=" << r + << ", pool=" << log_pool << dendl; + return -r; + } + + // This null_yield is in startup code, so it doesn't matter that much. + auto besr = logback_generations::init( + dpp, ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) { + return get_oid(gen_id, shard); + }, + num_shards, *defbacking, null_yield, *this); + + + if (!besr) { + lderr(cct) << __PRETTY_FUNCTION__ + << ": Error initializing backends: " + << besr.error().message() << dendl; + return ceph::from_error_code(besr.error()); + } + + bes = std::move(*besr); + renew_thread = make_named_thread("rgw_dt_lg_renew", + &RGWDataChangesLog::renew_run, this); + return 0; +} + +int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { + const auto& name = bs.bucket.name; + auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0); + auto r = (ceph_str_hash_linux(name.data(), name.size()) + + shard_shift) % num_shards; + return static_cast(r); +} + +int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) +{ + if (!zone->log_data) + return 0; + + /* we can't keep the bucket name as part of the cls_log_entry, and we need + * it later, so we keep two lists under the map */ + bc::flat_map, + RGWDataChangesBE::entries>> m; + + std::unique_lock l(lock); + decltype(cur_cycle) entries; + entries.swap(cur_cycle); + l.unlock(); + + auto ut = real_clock::now(); + auto be = bes->head(); + for (const auto& [bs, gen] : entries) { + auto index = choose_oid(bs); + + rgw_data_change change; + bufferlist bl; + change.entity_type = ENTITY_TYPE_BUCKET; + change.key = bs.get_key(); + change.timestamp = ut; + change.gen = gen; + encode(change, bl); + + m[index].first.push_back({bs, gen}); + be->prepare(ut, change.key, std::move(bl), m[index].second); + } + + for (auto& [index, p] : m) { + auto& [buckets, entries] = p; + + auto now = real_clock::now(); + + // This null_yield can stay (for now) as we're in our own thread. + auto ret = be->push(dpp, index, std::move(entries), null_yield); + if (ret < 0) { + /* we don't really need to have a special handling for failed cases here, + * as this is just an optimization. */ + ldpp_dout(dpp, -1) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl; + return ret; + } + + auto expiration = now; + expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); + for (auto& [bs, gen] : buckets) { + update_renewed(bs, gen, expiration); + } + } + + return 0; +} + +auto RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, + uint64_t gen) + -> ChangeStatusPtr +{ + ceph_assert(ceph_mutex_is_locked(lock)); + ChangeStatusPtr status; + if (!changes.find({bs, gen}, status)) { + status = std::make_shared(); + changes.add({bs, gen}, status); + } + return status; +} + +void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs, + const rgw::bucket_log_layout_generation& gen) +{ + std::scoped_lock l{lock}; + cur_cycle.insert({bs, gen.gen}); +} + +void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs, + uint64_t gen, + real_time expiration) +{ + std::unique_lock l{lock}; + auto status = _get_change(bs, gen); + l.unlock(); + + ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" + << bs.bucket.name << " shard_id=" << bs.shard_id + << " expiration=" << expiration << dendl; + + std::unique_lock sl(status->lock); + status->cur_expiration = expiration; +} + +int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) { + rgw_bucket_shard bs(bucket, shard_id); + return choose_oid(bs); +} + +bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp, + const rgw_bucket& bucket, + optional_yield y) const +{ + if (!bucket_filter) { + return true; + } + + return bucket_filter(bucket, y, dpp); +} + +std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const { + return (gen_id > 0 ? + fmt::format("{}@G{}.{}", prefix, gen_id, i) : + fmt::format("{}.{}", prefix, i)); +} + +int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, + int shard_id, optional_yield y) +{ + auto& bucket = bucket_info.bucket; + + if (!filter_bucket(dpp, bucket, y)) { + return 0; + } + + if (observer) { + observer->on_bucket_changed(bucket.get_key()); + } + + rgw_bucket_shard bs(bucket, shard_id); + + int index = choose_oid(bs); + + mark_modified(index, bs, gen.gen); + + std::unique_lock l(lock); + + auto status = _get_change(bs, gen.gen); + l.unlock(); + + auto now = real_clock::now(); + + std::unique_lock sl(status->lock); + + ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name + << " shard_id=" << shard_id << " now=" << now + << " cur_expiration=" << status->cur_expiration << dendl; + + if (now < status->cur_expiration) { + /* no need to send, recently completed */ + sl.unlock(); + register_renew(bs, gen); + return 0; + } + + RefCountedCond* cond; + + if (status->pending) { + cond = status->cond; + + ceph_assert(cond); + + status->cond->get(); + sl.unlock(); + + int ret = cond->wait(); + cond->put(); + if (!ret) { + register_renew(bs, gen); + } + return ret; + } + + status->cond = new RefCountedCond; + status->pending = true; + + ceph::real_time expiration; + + int ret; + + do { + status->cur_sent = now; + + expiration = now; + expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); + + sl.unlock(); + + ceph::buffer::list bl; + rgw_data_change change; + change.entity_type = ENTITY_TYPE_BUCKET; + change.key = bs.get_key(); + change.timestamp = now; + change.gen = gen.gen; + encode(change, bl); + + ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; + + auto be = bes->head(); + ret = be->push(dpp, index, now, change.key, std::move(bl), y); + + now = real_clock::now(); + + sl.lock(); + + } while (!ret && real_clock::now() > expiration); + + cond = status->cond; + + status->pending = false; + /* time of when operation started, not completed */ + status->cur_expiration = status->cur_sent; + status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window); + status->cond = nullptr; + sl.unlock(); + + cond->done(ret); + cond->put(); + + return ret; +} + +int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entries, + std::vector& entries, + std::string_view marker, std::string* out_marker, + bool* truncated, optional_yield y) +{ + const auto [start_id, start_cursor] = cursorgen(marker); + auto gen_id = start_id; + std::string out_cursor; + while (max_entries > 0) { + std::vector gentries; + std::unique_lock l(m); + auto i = lower_bound(gen_id); + if (i == end()) return 0; + auto be = i->second; + l.unlock(); + gen_id = be->gen_id; + auto r = be->list(dpp, shard, max_entries, gentries, + gen_id == start_id ? start_cursor : std::string{}, + &out_cursor, truncated, y); + if (r < 0) + return r; + + if (out_marker && !out_cursor.empty()) { + *out_marker = gencursor(gen_id, out_cursor); + } + for (auto& g : gentries) { + g.log_id = gencursor(gen_id, g.log_id); + } + if (int s = gentries.size(); s < 0 || s > max_entries) + max_entries = 0; + else + max_entries -= gentries.size(); + + std::move(gentries.begin(), gentries.end(), + std::back_inserter(entries)); + ++gen_id; + } + return 0; +} + +int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries, + std::vector& entries, + std::string_view marker, + std::string* out_marker, bool* truncated, + optional_yield y) +{ + assert(shard < num_shards); + return bes->list(dpp, shard, max_entries, entries, marker, out_marker, + truncated, y); +} + +int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries, + std::vector& entries, + LogMarker& marker, bool *ptruncated, + optional_yield y) +{ + bool truncated; + entries.clear(); + for (; marker.shard < num_shards && int(entries.size()) < max_entries; + marker.shard++, marker.marker.clear()) { + int ret = list_entries(dpp, marker.shard, max_entries - entries.size(), + entries, marker.marker, NULL, &truncated, y); + if (ret == -ENOENT) { + continue; + } + if (ret < 0) { + return ret; + } + if (!truncated) { + *ptruncated = false; + return 0; + } + } + *ptruncated = (marker.shard < num_shards); + return 0; +} + +int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id, + RGWDataChangesLogInfo *info, optional_yield y) +{ + assert(shard_id < num_shards); + auto be = bes->head(); + auto r = be->get_info(dpp, shard_id, info, y); + if (!info->marker.empty()) { + info->marker = gencursor(be->gen_id, info->marker); + } + return r; +} + +int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, + std::string_view marker, optional_yield y) +{ + auto [target_gen, cursor] = cursorgen(marker); + std::unique_lock l(m); + const auto head_gen = (end() - 1)->second->gen_id; + const auto tail_gen = begin()->first; + if (target_gen < tail_gen) return 0; + auto r = 0; + for (auto be = lower_bound(0)->second; + be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0; + be = upper_bound(be->gen_id)->second) { + l.unlock(); + auto c = be->gen_id == target_gen ? cursor : be->max_marker(); + r = be->trim(dpp, shard_id, c, y); + if (r == -ENOENT) + r = -ENODATA; + if (r == -ENODATA && be->gen_id < target_gen) + r = 0; + if (be->gen_id == target_gen) + break; + l.lock(); + }; + return r; +} + +int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, + std::string_view marker, optional_yield y) +{ + assert(shard_id < num_shards); + return bes->trim_entries(dpp, shard_id, marker, y); +} + +class GenTrim : public rgw::cls::fifo::Completion { +public: + DataLogBackends* const bes; + const int shard_id; + const uint64_t target_gen; + const std::string cursor; + const uint64_t head_gen; + const uint64_t tail_gen; + boost::intrusive_ptr be; + + GenTrim(const DoutPrefixProvider *dpp, DataLogBackends* bes, int shard_id, uint64_t target_gen, + std::string cursor, uint64_t head_gen, uint64_t tail_gen, + boost::intrusive_ptr be, + lr::AioCompletion* super) + : Completion(dpp, super), bes(bes), shard_id(shard_id), target_gen(target_gen), + cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen), + be(std::move(be)) {} + + void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { + auto gen_id = be->gen_id; + be.reset(); + if (r == -ENOENT) + r = -ENODATA; + if (r == -ENODATA && gen_id < target_gen) + r = 0; + if (r < 0) { + complete(std::move(p), r); + return; + } + + { + std::unique_lock l(bes->m); + auto i = bes->upper_bound(gen_id); + if (i == bes->end() || i->first > target_gen || i->first > head_gen) { + l.unlock(); + complete(std::move(p), -ENODATA); + return; + } + be = i->second; + } + auto c = be->gen_id == target_gen ? cursor : be->max_marker(); + be->trim(dpp, shard_id, c, call(std::move(p))); + } +}; + +void DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, + librados::AioCompletion* c) +{ + auto [target_gen, cursor] = cursorgen(marker); + std::unique_lock l(m); + const auto head_gen = (end() - 1)->second->gen_id; + const auto tail_gen = begin()->first; + if (target_gen < tail_gen) { + l.unlock(); + rgw_complete_aio_completion(c, -ENODATA); + return; + } + auto be = begin()->second; + l.unlock(); + auto gt = std::make_unique(dpp, this, shard_id, target_gen, + std::string(cursor), head_gen, tail_gen, + be, c); + + auto cc = be->gen_id == target_gen ? cursor : be->max_marker(); + be->trim(dpp, shard_id, cc, GenTrim::call(std::move(gt))); +} + +int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, + std::optional& through, + optional_yield y) { + if (size() != 1) { + std::vector candidates; + { + std::scoped_lock l(m); + auto e = cend() - 1; + for (auto i = cbegin(); i < e; ++i) { + candidates.push_back(i->second); + } + } + + std::optional highest; + for (auto& be : candidates) { + auto r = be->is_empty(dpp, y); + if (r < 0) { + return r; + } else if (r == 1) { + highest = be->gen_id; + } else { + break; + } + } + + through = highest; + if (!highest) { + return 0; + } + auto ec = empty_to(dpp, *highest, y); + if (ec) { + return ceph::from_error_code(ec); + } + } + + return ceph::from_error_code(remove_empty(dpp, y)); +} + + +int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, + librados::AioCompletion* c) +{ + assert(shard_id < num_shards); + bes->trim_entries(dpp, shard_id, marker, c); + return 0; +} + +bool RGWDataChangesLog::going_down() const +{ + return down_flag; +} + +RGWDataChangesLog::~RGWDataChangesLog() { + down_flag = true; + if (renew_thread.joinable()) { + renew_stop(); + renew_thread.join(); + } +} + +void RGWDataChangesLog::renew_run() noexcept { + static constexpr auto runs_per_prune = 150; + auto run = 0; + for (;;) { + const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: "); + ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl; + int r = renew_entries(&dp); + if (r < 0) { + ldpp_dout(&dp, 0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl; + } + + if (going_down()) + break; + + if (run == runs_per_prune) { + std::optional through; + ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl; + // This null_yield can stay, for now, as it's in its own thread. + trim_generations(&dp, through, null_yield); + if (r < 0) { + derr << "RGWDataChangesLog::ChangesRenewThread: failed pruning r=" + << r << dendl; + } else if (through) { + ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruned generations " + << "through " << *through << "." << dendl; + } else { + ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: nothing to prune." + << dendl; + } + run = 0; + } else { + ++run; + } + + int interval = cct->_conf->rgw_data_log_window * 3 / 4; + std::unique_lock locker{renew_lock}; + renew_cond.wait_for(locker, std::chrono::seconds(interval)); + } +} + +void RGWDataChangesLog::renew_stop() +{ + std::lock_guard l{renew_lock}; + renew_cond.notify_all(); +} + +void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen) +{ + if (!cct->_conf->rgw_data_notify_interval_msec) { + return; + } + + auto key = bs.get_key(); + { + std::shared_lock rl{modified_lock}; // read lock to check for existence + auto shard = modified_shards.find(shard_id); + if (shard != modified_shards.end() && shard->second.count({key, gen})) { + return; + } + } + + std::unique_lock wl{modified_lock}; // write lock for insertion + modified_shards[shard_id].insert(rgw_data_notify_entry{key, gen}); +} + +std::string RGWDataChangesLog::max_marker() const { + return gencursor(std::numeric_limits::max(), + "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); +} + +int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) { + return ceph::from_error_code(bes->new_backing(dpp, type, y)); +} + +int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, + std::optional& through, + optional_yield y) { + return bes->trim_generations(dpp, through, y); +} + +void RGWDataChangesLogInfo::dump(Formatter *f) const +{ + encode_json("marker", marker, f); + utime_t ut(last_update); + encode_json("last_update", ut, f); +} + +void RGWDataChangesLogInfo::decode_json(JSONObj *obj) +{ + JSONDecoder::decode_json("marker", marker, obj); + utime_t ut; + JSONDecoder::decode_json("last_update", ut, obj); + last_update = ut.to_real_time(); +} + + -- cgit v1.2.3