From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/rgw/rgw_trim_datalog.cc | 252 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 src/rgw/rgw_trim_datalog.cc (limited to 'src/rgw/rgw_trim_datalog.cc') diff --git a/src/rgw/rgw_trim_datalog.cc b/src/rgw/rgw_trim_datalog.cc new file mode 100644 index 000000000..1cd9bb194 --- /dev/null +++ b/src/rgw/rgw_trim_datalog.cc @@ -0,0 +1,252 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include + +#include "common/errno.h" + +#include "rgw_trim_datalog.h" +#include "rgw_cr_rados.h" +#include "rgw_cr_rest.h" +#include "rgw_datalog.h" +#include "rgw_data_sync.h" +#include "rgw_zone.h" +#include "rgw_bucket.h" + +#include "services/svc_zone.h" + +#include + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "data trim: ") + +namespace { + +class DatalogTrimImplCR : public RGWSimpleCoroutine { + const DoutPrefixProvider *dpp; + rgw::sal::RGWRadosStore *store; + boost::intrusive_ptr cn; + int shard; + std::string marker; + std::string* last_trim_marker; + + public: + DatalogTrimImplCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore* store, int shard, + const std::string& marker, std::string* last_trim_marker) + : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), shard(shard), + marker(marker), last_trim_marker(last_trim_marker) { + set_description() << "Datalog trim shard=" << shard + << " marker=" << marker; + } + + int send_request(const DoutPrefixProvider *dpp) override { + set_status() << "sending request"; + cn = stack->create_completion_notifier(); + return store->svc()->datalog_rados->trim_entries(dpp, shard, marker, + cn->completion()); + } + int request_complete() override { + int r = cn->completion()->get_return_value(); + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << "(): trim of shard=" << shard + << " marker=" << marker << " returned r=" << r << dendl; + + set_status() << "request complete; ret=" << r; + if (r != -ENODATA) { + return r; + } + // nothing left to trim, update last_trim_marker + if (*last_trim_marker < marker && + marker != store->svc()->datalog_rados->max_marker()) { + *last_trim_marker = marker; + } + return 0; + } +}; + +/// return the marker that it's safe to trim up to +const std::string& get_stable_marker(const rgw_data_sync_marker& m) +{ + return m.state == m.FullSync ? m.next_step_marker : m.marker; +} + +/// populate the container starting with 'dest' with the minimum stable marker +/// of each shard for all of the peers in [first, last) +template +void take_min_markers(IterIn first, IterIn last, IterOut dest) +{ + if (first == last) { + return; + } + for (auto p = first; p != last; ++p) { + auto m = dest; + for (auto &shard : p->sync_markers) { + const auto& stable = get_stable_marker(shard.second); + if (*m > stable) { + *m = stable; + } + ++m; + } + } +} + +} // anonymous namespace + +class DataLogTrimCR : public RGWCoroutine { + using TrimCR = DatalogTrimImplCR; + const DoutPrefixProvider *dpp; + rgw::sal::RGWRadosStore *store; + RGWHTTPManager *http; + const int num_shards; + const std::string& zone_id; //< my zone id + std::vector peer_status; //< sync status for each peer + std::vector min_shard_markers; //< min marker per shard + std::vector& last_trim; //< last trimmed marker per shard + int ret{0}; + + public: + DataLogTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, + int num_shards, std::vector& last_trim) + : RGWCoroutine(store->ctx()), dpp(dpp), store(store), http(http), + num_shards(num_shards), + zone_id(store->svc()->zone->get_zone().id), + peer_status(store->svc()->zone->get_zone_data_notify_to_map().size()), + min_shard_markers(num_shards, + std::string(store->svc()->datalog_rados->max_marker())), + last_trim(last_trim) + {} + + int operate(const DoutPrefixProvider *dpp) override; +}; + +int DataLogTrimCR::operate(const DoutPrefixProvider *dpp) +{ + reenter(this) { + ldpp_dout(dpp, 10) << "fetching sync status for zone " << zone_id << dendl; + set_status("fetching sync status"); + yield { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "data" }, + { "status", nullptr }, + { "source-zone", zone_id.c_str() }, + { nullptr, nullptr } + }; + + auto p = peer_status.begin(); + for (auto& c : store->svc()->zone->get_zone_data_notify_to_map()) { + ldpp_dout(dpp, 20) << "query sync status from " << c.first << dendl; + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), + false); + ++p; + } + } + + // must get a successful reply from all peers to consider trimming + ret = 0; + while (ret == 0 && num_spawned() > 0) { + yield wait_for_child(); + collect_next(&ret); + } + drain_all(); + + if (ret < 0) { + ldpp_dout(dpp, 4) << "failed to fetch sync status from all peers" << dendl; + return set_cr_error(ret); + } + + ldpp_dout(dpp, 10) << "trimming log shards" << dendl; + set_status("trimming log shards"); + yield { + // determine the minimum marker for each shard + take_min_markers(peer_status.begin(), peer_status.end(), + min_shard_markers.begin()); + + for (int i = 0; i < num_shards; i++) { + const auto& m = min_shard_markers[i]; + if (m <= last_trim[i]) { + continue; + } + ldpp_dout(dpp, 10) << "trimming log shard " << i + << " at marker=" << m + << " last_trim=" << last_trim[i] << dendl; + spawn(new TrimCR(dpp, store, i, m, &last_trim[i]), + true); + } + } + return set_cr_done(); + } + return 0; +} + +RGWCoroutine* create_admin_data_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, + RGWHTTPManager *http, + int num_shards, + std::vector& markers) +{ + return new DataLogTrimCR(dpp, store, http, num_shards, markers); +} + +class DataLogTrimPollCR : public RGWCoroutine { + const DoutPrefixProvider *dpp; + rgw::sal::RGWRadosStore *store; + RGWHTTPManager *http; + const int num_shards; + const utime_t interval; //< polling interval + const std::string lock_oid; //< use first data log shard for lock + const std::string lock_cookie; + std::vector last_trim; //< last trimmed marker per shard + + public: + DataLogTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : RGWCoroutine(store->ctx()), dpp(dpp), store(store), http(http), + num_shards(num_shards), interval(interval), + lock_oid(store->svc()->datalog_rados->get_oid(0, 0)), + lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), + last_trim(num_shards) + {} + + int operate(const DoutPrefixProvider *dpp) override; +}; + +int DataLogTrimPollCR::operate(const DoutPrefixProvider *dpp) +{ + reenter(this) { + for (;;) { + set_status("sleeping"); + wait(interval); + + // request a 'data_trim' lock that covers the entire wait interval to + // prevent other gateways from attempting to trim for the duration + set_status("acquiring trim lock"); + yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store, + rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, lock_oid), + "data_trim", lock_cookie, + interval.sec())); + if (retcode < 0) { + // if the lock is already held, go back to sleep and try again later + ldpp_dout(dpp, 4) << "failed to lock " << lock_oid << ", trying again in " + << interval.sec() << "s" << dendl; + continue; + } + + set_status("trimming"); + yield call(new DataLogTrimCR(dpp, store, http, num_shards, last_trim)); + + // note that the lock is not released. this is intentional, as it avoids + // duplicating this work in other gateways + } + } + return 0; +} + +RGWCoroutine* create_data_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, + RGWHTTPManager *http, + int num_shards, utime_t interval) +{ + return new DataLogTrimPollCR(dpp, store, http, num_shards, interval); +} -- cgit v1.2.3