From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/rgw/rgw_trim_bilog.cc | 1172 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1172 insertions(+) create mode 100644 src/rgw/rgw_trim_bilog.cc (limited to 'src/rgw/rgw_trim_bilog.cc') diff --git a/src/rgw/rgw_trim_bilog.cc b/src/rgw/rgw_trim_bilog.cc new file mode 100644 index 000000000..1aa245175 --- /dev/null +++ b/src/rgw/rgw_trim_bilog.cc @@ -0,0 +1,1172 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * Author: Casey Bodley + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include +#include +#include + +#include "include/scope_guard.h" +#include "common/bounded_key_counter.h" +#include "common/errno.h" +#include "rgw_trim_bilog.h" +#include "rgw_cr_rados.h" +#include "rgw_cr_rest.h" +#include "rgw_cr_tools.h" +#include "rgw_data_sync.h" +#include "rgw_metadata.h" +#include "rgw_sal.h" +#include "rgw_zone.h" +#include "rgw_sync.h" +#include "rgw_bucket.h" + +#include "services/svc_zone.h" +#include "services/svc_meta.h" + +#include +#include "include/ceph_assert.h" + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "trim: ") + +using rgw::BucketTrimConfig; +using BucketChangeCounter = BoundedKeyCounter; + +const std::string rgw::BucketTrimStatus::oid = "bilog.trim"; +using rgw::BucketTrimStatus; + + +// watch/notify api for gateways to coordinate about which buckets to trim +enum TrimNotifyType { + NotifyTrimCounters = 0, + NotifyTrimComplete, +}; +WRITE_RAW_ENCODER(TrimNotifyType); + +struct TrimNotifyHandler { + virtual ~TrimNotifyHandler() = default; + + virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0; +}; + +/// api to share the bucket trim counters between gateways in the same zone. +/// each gateway will process different datalog shards, so the gateway that runs +/// the trim process needs to accumulate their counters +struct TrimCounters { + /// counter for a single bucket + struct BucketCounter { + std::string bucket; //< bucket instance metadata key + int count{0}; + + BucketCounter() = default; + BucketCounter(const std::string& bucket, int count) + : bucket(bucket), count(count) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + using Vector = std::vector; + + /// request bucket trim counters from peer gateways + struct Request { + uint16_t max_buckets; //< maximum number of bucket counters to return + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + + /// return the current bucket trim counters + struct Response { + Vector bucket_counters; + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + + /// server interface to query the hottest buckets + struct Server { + virtual ~Server() = default; + + virtual void get_bucket_counters(int count, Vector& counters) = 0; + virtual void reset_bucket_counters() = 0; + }; + + /// notify handler + class Handler : public TrimNotifyHandler { + Server *const server; + public: + explicit Handler(Server *server) : server(server) {} + + void handle(bufferlist::const_iterator& input, bufferlist& output) override; + }; +}; +std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs) +{ + return out << rhs.bucket << ":" << rhs.count; +} + +void TrimCounters::BucketCounter::encode(bufferlist& bl) const +{ + using ceph::encode; + // no versioning to save space + encode(bucket, bl); + encode(count, bl); +} +void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p) +{ + using ceph::decode; + decode(bucket, p); + decode(count, p); +} +WRITE_CLASS_ENCODER(TrimCounters::BucketCounter); + +void TrimCounters::Request::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + encode(max_buckets, bl); + ENCODE_FINISH(bl); +} +void TrimCounters::Request::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + decode(max_buckets, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimCounters::Request); + +void TrimCounters::Response::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + encode(bucket_counters, bl); + ENCODE_FINISH(bl); +} +void TrimCounters::Response::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + decode(bucket_counters, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimCounters::Response); + +void TrimCounters::Handler::handle(bufferlist::const_iterator& input, + bufferlist& output) +{ + Request request; + decode(request, input); + auto count = std::min(request.max_buckets, 128); + + Response response; + server->get_bucket_counters(count, response.bucket_counters); + encode(response, output); +} + +/// api to notify peer gateways that trim has completed and their bucket change +/// counters can be reset +struct TrimComplete { + struct Request { + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + struct Response { + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + + /// server interface to reset bucket counters + using Server = TrimCounters::Server; + + /// notify handler + class Handler : public TrimNotifyHandler { + Server *const server; + public: + explicit Handler(Server *server) : server(server) {} + + void handle(bufferlist::const_iterator& input, bufferlist& output) override; + }; +}; + +void TrimComplete::Request::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ENCODE_FINISH(bl); +} +void TrimComplete::Request::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimComplete::Request); + +void TrimComplete::Response::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ENCODE_FINISH(bl); +} +void TrimComplete::Response::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimComplete::Response); + +void TrimComplete::Handler::handle(bufferlist::const_iterator& input, + bufferlist& output) +{ + Request request; + decode(request, input); + + server->reset_bucket_counters(); + + Response response; + encode(response, output); +} + + +/// rados watcher for bucket trim notifications +class BucketTrimWatcher : public librados::WatchCtx2 { + rgw::sal::RGWRadosStore *const store; + const rgw_raw_obj& obj; + rgw_rados_ref ref; + uint64_t handle{0}; + + using HandlerPtr = std::unique_ptr; + boost::container::flat_map handlers; + + public: + BucketTrimWatcher(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj, + TrimCounters::Server *counters) + : store(store), obj(obj) { + handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters)); + handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters)); + } + + ~BucketTrimWatcher() { + stop(); + } + + int start(const DoutPrefixProvider *dpp) { + int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref); + if (r < 0) { + return r; + } + + // register a watch on the realm's control object + r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this); + if (r == -ENOENT) { + constexpr bool exclusive = true; + r = ref.pool.ioctx().create(ref.obj.oid, exclusive); + if (r == -EEXIST || r == 0) { + r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this); + } + } + if (r < 0) { + ldpp_dout(dpp, -1) << "Failed to watch " << ref.obj + << " with " << cpp_strerror(-r) << dendl; + ref.pool.ioctx().close(); + return r; + } + + ldpp_dout(dpp, 10) << "Watching " << ref.obj.oid << dendl; + return 0; + } + + int restart() { + int r = ref.pool.ioctx().unwatch2(handle); + if (r < 0) { + lderr(store->ctx()) << "Failed to unwatch on " << ref.obj + << " with " << cpp_strerror(-r) << dendl; + } + r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this); + if (r < 0) { + lderr(store->ctx()) << "Failed to restart watch on " << ref.obj + << " with " << cpp_strerror(-r) << dendl; + ref.pool.ioctx().close(); + } + return r; + } + + void stop() { + if (handle) { + ref.pool.ioctx().unwatch2(handle); + ref.pool.ioctx().close(); + } + } + + /// respond to bucket trim notifications + void handle_notify(uint64_t notify_id, uint64_t cookie, + uint64_t notifier_id, bufferlist& bl) override { + if (cookie != handle) { + return; + } + bufferlist reply; + try { + auto p = bl.cbegin(); + TrimNotifyType type; + decode(type, p); + + auto handler = handlers.find(type); + if (handler != handlers.end()) { + handler->second->handle(p, reply); + } else { + lderr(store->ctx()) << "no handler for notify type " << type << dendl; + } + } catch (const buffer::error& e) { + lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl; + } + ref.pool.ioctx().notify_ack(ref.obj.oid, notify_id, cookie, reply); + } + + /// reestablish the watch if it gets disconnected + void handle_error(uint64_t cookie, int err) override { + if (cookie != handle) { + return; + } + if (err == -ENOTCONN) { + ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl; + restart(); + } + } +}; + + +/// Interface to communicate with the trim manager about completed operations +struct BucketTrimObserver { + virtual ~BucketTrimObserver() = default; + + virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0; + virtual bool trimmed_recently(const std::string_view& bucket_instance) = 0; +}; + +/// populate the status with the minimum stable marker of each shard +template +int take_min_status(CephContext *cct, Iter first, Iter last, + std::vector *status) +{ + for (auto peer = first; peer != last; ++peer) { + if (peer->size() != status->size()) { + // all peers must agree on the number of shards + return -EINVAL; + } + auto m = status->begin(); + for (auto& shard : *peer) { + auto& marker = *m++; + // if no sync has started, we can safely trim everything + if (shard.state == rgw_bucket_shard_sync_info::StateInit) { + continue; + } + // always take the first marker, or any later marker that's smaller + if (peer == first || marker > shard.inc_marker.position) { + marker = std::move(shard.inc_marker.position); + } + } + } + return 0; +} + +/// trim each bilog shard to the given marker, while limiting the number of +/// concurrent requests +class BucketTrimShardCollectCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + const DoutPrefixProvider *dpp; + rgw::sal::RGWRadosStore *const store; + const RGWBucketInfo& bucket_info; + const std::vector& markers; //< shard markers to trim + size_t i{0}; //< index of current shard marker + public: + BucketTrimShardCollectCR(const DoutPrefixProvider *dpp, + rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info, + const std::vector& markers) + : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS), + dpp(dpp), store(store), bucket_info(bucket_info), markers(markers) + {} + bool spawn_next() override; +}; + +bool BucketTrimShardCollectCR::spawn_next() +{ + while (i < markers.size()) { + const auto& marker = markers[i]; + const auto shard_id = i++; + + // skip empty markers + if (!marker.empty()) { + ldpp_dout(dpp, 10) << "trimming bilog shard " << shard_id + << " of " << bucket_info.bucket << " at marker " << marker << dendl; + spawn(new RGWRadosBILogTrimCR(dpp, store, bucket_info, shard_id, + std::string{}, marker), + false); + return true; + } + } + return false; +} + +/// trim the bilog of all of the given bucket instance's shards +class BucketTrimInstanceCR : public RGWCoroutine { + rgw::sal::RGWRadosStore *const store; + RGWHTTPManager *const http; + BucketTrimObserver *const observer; + std::string bucket_instance; + rgw_bucket_get_sync_policy_params get_policy_params; + std::shared_ptr source_policy; + rgw_bucket bucket; + const std::string& zone_id; //< my zone id + RGWBucketInfo _bucket_info; + const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices + int child_ret = 0; + const DoutPrefixProvider *dpp; + + using StatusShards = std::vector; + std::vector peer_status; //< sync status for each peer + std::vector min_markers; //< min marker per shard + + public: + BucketTrimInstanceCR(rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, + BucketTrimObserver *observer, + const std::string& bucket_instance, + const DoutPrefixProvider *dpp) + : RGWCoroutine(store->ctx()), store(store), + http(http), observer(observer), + bucket_instance(bucket_instance), + zone_id(store->svc()->zone->get_zone().id), + dpp(dpp) { + rgw_bucket_parse_bucket_key(cct, bucket_instance, &bucket, nullptr); + source_policy = make_shared(); + } + + int operate(const DoutPrefixProvider *dpp) override; +}; + +int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp) +{ + reenter(this) { + ldpp_dout(dpp, 4) << "starting trim on bucket=" << bucket_instance << dendl; + + get_policy_params.zone = zone_id; + get_policy_params.bucket = bucket; + yield call(new RGWBucketGetSyncPolicyHandlerCR(store->svc()->rados->get_async_processor(), + store, + get_policy_params, + source_policy, + dpp)); + if (retcode < 0) { + if (retcode != -ENOENT) { + ldpp_dout(dpp, 0) << "ERROR: failed to fetch policy handler for bucket=" << bucket << dendl; + } + + return set_cr_error(retcode); + } + + if (auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info(); + opt_bucket_info) { + pbucket_info = &(*opt_bucket_info); + } else { + /* this shouldn't really happen */ + return set_cr_error(-ENOENT); + } + + // query peers for sync status + set_status("fetching sync status from relevant peers"); + yield { + const auto& all_dests = source_policy->policy_handler->get_all_dests(); + + vector zids; + rgw_zone_id last_zid; + for (auto& diter : all_dests) { + const auto& zid = diter.first; + if (zid == last_zid) { + continue; + } + last_zid = zid; + zids.push_back(zid); + } + + peer_status.resize(zids.size()); + + auto& zone_conn_map = store->svc()->zone->get_zone_conn_map(); + + auto p = peer_status.begin(); + for (auto& zid : zids) { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "bucket-index" }, + { "status", nullptr }, + { "options", "merge" }, + { "bucket", bucket_instance.c_str() }, /* equal to source-bucket when `options==merge` and source-bucket + param is not provided */ + { "source-zone", zone_id.c_str() }, + { nullptr, nullptr } + }; + + auto ziter = zone_conn_map.find(zid); + if (ziter == zone_conn_map.end()) { + ldpp_dout(dpp, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl; + return set_cr_error(-ECANCELED); + } + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p), + false); + ++p; + } + } + // wait for a response from each peer. all must respond to attempt trim + while (num_spawned()) { + yield wait_for_child(); + collect(&child_ret, nullptr); + if (child_ret < 0) { + drain_all(); + return set_cr_error(child_ret); + } + } + + // initialize each shard with the maximum marker, which is only used when + // there are no peers syncing from us + min_markers.assign(std::max(1u, pbucket_info->layout.current_index.layout.normal.num_shards), + RGWSyncLogTrimCR::max_marker); + + // determine the minimum marker for each shard + retcode = take_min_status(cct, peer_status.begin(), peer_status.end(), + &min_markers); + if (retcode < 0) { + ldpp_dout(dpp, 4) << "failed to correlate bucket sync status from peers" << dendl; + return set_cr_error(retcode); + } + + // trim shards with a ShardCollectCR + ldpp_dout(dpp, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket + << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl; + set_status("trimming bilog shards"); + yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, min_markers)); + // ENODATA just means there were no keys to trim + if (retcode == -ENODATA) { + retcode = 0; + } + if (retcode < 0) { + ldpp_dout(dpp, 4) << "failed to trim bilog shards: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + observer->on_bucket_trimmed(std::move(bucket_instance)); + return set_cr_done(); + } + return 0; +} + +/// trim each bucket instance while limiting the number of concurrent operations +class BucketTrimInstanceCollectCR : public RGWShardCollectCR { + rgw::sal::RGWRadosStore *const store; + RGWHTTPManager *const http; + BucketTrimObserver *const observer; + std::vector::const_iterator bucket; + std::vector::const_iterator end; + const DoutPrefixProvider *dpp; + public: + BucketTrimInstanceCollectCR(rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, + BucketTrimObserver *observer, + const std::vector& buckets, + int max_concurrent, + const DoutPrefixProvider *dpp) + : RGWShardCollectCR(store->ctx(), max_concurrent), + store(store), http(http), observer(observer), + bucket(buckets.begin()), end(buckets.end()), + dpp(dpp) + {} + bool spawn_next() override; +}; + +bool BucketTrimInstanceCollectCR::spawn_next() +{ + if (bucket == end) { + return false; + } + spawn(new BucketTrimInstanceCR(store, http, observer, *bucket, dpp), false); + ++bucket; + return true; +} + +/// correlate the replies from each peer gateway into the given counter +int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter) +{ + counter.clear(); + + try { + // decode notify responses + auto p = bl.cbegin(); + std::map, bufferlist> replies; + std::set> timeouts; + decode(replies, p); + decode(timeouts, p); + + for (auto& peer : replies) { + auto q = peer.second.cbegin(); + TrimCounters::Response response; + decode(response, q); + for (const auto& b : response.bucket_counters) { + counter.insert(b.bucket, b.count); + } + } + } catch (const buffer::error& e) { + return -EIO; + } + return 0; +} + +/// metadata callback has the signature bool(string&& key, string&& marker) +using MetadataListCallback = std::function; + +/// lists metadata keys, passing each to a callback until it returns false. +/// on reaching the end, it will restart at the beginning and list up to the +/// initial marker +class AsyncMetadataList : public RGWAsyncRadosRequest { + CephContext *const cct; + RGWMetadataManager *const mgr; + const std::string section; + const std::string start_marker; + MetadataListCallback callback; + + int _send_request(const DoutPrefixProvider *dpp) override; + public: + AsyncMetadataList(CephContext *cct, RGWCoroutine *caller, + RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr, + const std::string& section, const std::string& start_marker, + const MetadataListCallback& callback) + : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr), + section(section), start_marker(start_marker), callback(callback) + {} +}; + +int AsyncMetadataList::_send_request(const DoutPrefixProvider *dpp) +{ + void* handle = nullptr; + std::list keys; + bool truncated{false}; + std::string marker; + + // start a listing at the given marker + int r = mgr->list_keys_init(dpp, section, start_marker, &handle); + if (r == -EINVAL) { + // restart with empty marker below + } else if (r < 0) { + ldpp_dout(dpp, 10) << "failed to init metadata listing: " + << cpp_strerror(r) << dendl; + return r; + } else { + ldpp_dout(dpp, 20) << "starting metadata listing at " << start_marker << dendl; + + // release the handle when scope exits + auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); + + do { + // get the next key and marker + r = mgr->list_keys_next(handle, 1, keys, &truncated); + if (r < 0) { + ldpp_dout(dpp, 10) << "failed to list metadata: " + << cpp_strerror(r) << dendl; + return r; + } + marker = mgr->get_marker(handle); + + if (!keys.empty()) { + ceph_assert(keys.size() == 1); + auto& key = keys.front(); + if (!callback(std::move(key), std::move(marker))) { + return 0; + } + } + } while (truncated); + + if (start_marker.empty()) { + // already listed all keys + return 0; + } + } + + // restart the listing from the beginning (empty marker) + handle = nullptr; + + r = mgr->list_keys_init(dpp, section, "", &handle); + if (r < 0) { + ldpp_dout(dpp, 10) << "failed to restart metadata listing: " + << cpp_strerror(r) << dendl; + return r; + } + ldpp_dout(dpp, 20) << "restarting metadata listing" << dendl; + + // release the handle when scope exits + auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); + do { + // get the next key and marker + r = mgr->list_keys_next(handle, 1, keys, &truncated); + if (r < 0) { + ldpp_dout(dpp, 10) << "failed to list metadata: " + << cpp_strerror(r) << dendl; + return r; + } + marker = mgr->get_marker(handle); + + if (!keys.empty()) { + ceph_assert(keys.size() == 1); + auto& key = keys.front(); + // stop at original marker + if (marker > start_marker) { + return 0; + } + if (!callback(std::move(key), std::move(marker))) { + return 0; + } + } + } while (truncated); + + return 0; +} + +/// coroutine wrapper for AsyncMetadataList +class MetadataListCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *const async_rados; + RGWMetadataManager *const mgr; + const std::string& section; + const std::string& start_marker; + MetadataListCallback callback; + RGWAsyncRadosRequest *req{nullptr}; + public: + MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados, + RGWMetadataManager *mgr, const std::string& section, + const std::string& start_marker, + const MetadataListCallback& callback) + : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr), + section(section), start_marker(start_marker), callback(callback) + {} + ~MetadataListCR() override { + request_cleanup(); + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(), + mgr, section, start_marker, callback); + async_rados->queue(req); + return 0; + } + int request_complete() override { + return req->get_ret_status(); + } + void request_cleanup() override { + if (req) { + req->finish(); + req = nullptr; + } + } +}; + +class BucketTrimCR : public RGWCoroutine { + rgw::sal::RGWRadosStore *const store; + RGWHTTPManager *const http; + const BucketTrimConfig& config; + BucketTrimObserver *const observer; + const rgw_raw_obj& obj; + ceph::mono_time start_time; + bufferlist notify_replies; + BucketChangeCounter counter; + std::vector buckets; //< buckets selected for trim + BucketTrimStatus status; + RGWObjVersionTracker objv; //< version tracker for trim status object + std::string last_cold_marker; //< position for next trim marker + const DoutPrefixProvider *dpp; + + static const std::string section; //< metadata section for bucket instances + public: + BucketTrimCR(rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, + const BucketTrimConfig& config, BucketTrimObserver *observer, + const rgw_raw_obj& obj, const DoutPrefixProvider *dpp) + : RGWCoroutine(store->ctx()), store(store), http(http), config(config), + observer(observer), obj(obj), counter(config.counter_size), dpp(dpp) + {} + + int operate(const DoutPrefixProvider *dpp) override; +}; + +const std::string BucketTrimCR::section{"bucket.instance"}; + +int BucketTrimCR::operate(const DoutPrefixProvider *dpp) +{ + reenter(this) { + start_time = ceph::mono_clock::now(); + + if (config.buckets_per_interval) { + // query watch/notify for hot buckets + ldpp_dout(dpp, 10) << "fetching active bucket counters" << dendl; + set_status("fetching active bucket counters"); + yield { + // request the top bucket counters from each peer gateway + const TrimNotifyType type = NotifyTrimCounters; + TrimCounters::Request request{32}; + bufferlist bl; + encode(type, bl); + encode(request, bl); + call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, + ¬ify_replies)); + } + if (retcode < 0) { + ldpp_dout(dpp, 10) << "failed to fetch peer bucket counters" << dendl; + return set_cr_error(retcode); + } + + // select the hottest buckets for trim + retcode = accumulate_peer_counters(notify_replies, counter); + if (retcode < 0) { + ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl; + return set_cr_error(retcode); + } + buckets.reserve(config.buckets_per_interval); + + const int max_count = config.buckets_per_interval - + config.min_cold_buckets_per_interval; + counter.get_highest(max_count, + [this] (const std::string& bucket, int count) { + buckets.push_back(bucket); + }); + } + + if (buckets.size() < config.buckets_per_interval) { + // read BucketTrimStatus for marker position + set_status("reading trim status"); + using ReadStatus = RGWSimpleRadosReadCR; + yield call(new ReadStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj, + &status, true, &objv)); + if (retcode < 0) { + ldpp_dout(dpp, 10) << "failed to read bilog trim status: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + if (status.marker == "MAX") { + status.marker.clear(); // restart at the beginning + } + ldpp_dout(dpp, 10) << "listing cold buckets from marker=" + << status.marker << dendl; + + set_status("listing cold buckets for trim"); + yield { + // capture a reference so 'this' remains valid in the callback + auto ref = boost::intrusive_ptr{this}; + // list cold buckets to consider for trim + auto cb = [this, ref] (std::string&& bucket, std::string&& marker) { + // filter out keys that we trimmed recently + if (observer->trimmed_recently(bucket)) { + return true; + } + // filter out active buckets that we've already selected + auto i = std::find(buckets.begin(), buckets.end(), bucket); + if (i != buckets.end()) { + return true; + } + buckets.emplace_back(std::move(bucket)); + // remember the last cold bucket spawned to update the status marker + last_cold_marker = std::move(marker); + // return true if there's room for more + return buckets.size() < config.buckets_per_interval; + }; + + call(new MetadataListCR(cct, store->svc()->rados->get_async_processor(), + store->ctl()->meta.mgr, + section, status.marker, cb)); + } + if (retcode < 0) { + ldout(cct, 4) << "failed to list bucket instance metadata: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + } + + // trim bucket instances with limited concurrency + set_status("trimming buckets"); + ldpp_dout(dpp, 4) << "collected " << buckets.size() << " buckets for trim" << dendl; + yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets, + config.concurrent_buckets, dpp)); + // ignore errors from individual buckets + + // write updated trim status + if (!last_cold_marker.empty() && status.marker != last_cold_marker) { + set_status("writing updated trim status"); + status.marker = std::move(last_cold_marker); + ldpp_dout(dpp, 20) << "writing bucket trim marker=" << status.marker << dendl; + using WriteStatus = RGWSimpleRadosWriteCR; + yield call(new WriteStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj, + status, &objv)); + if (retcode < 0) { + ldpp_dout(dpp, 4) << "failed to write updated trim status: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + } + + // notify peers that trim completed + set_status("trim completed"); + yield { + const TrimNotifyType type = NotifyTrimComplete; + TrimComplete::Request request; + bufferlist bl; + encode(type, bl); + encode(request, bl); + call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, + nullptr)); + } + if (retcode < 0) { + ldout(cct, 10) << "failed to notify peers of trim completion" << dendl; + return set_cr_error(retcode); + } + + ldpp_dout(dpp, 4) << "bucket index log processing completed in " + << ceph::mono_clock::now() - start_time << dendl; + return set_cr_done(); + } + return 0; +} + +class BucketTrimPollCR : public RGWCoroutine { + rgw::sal::RGWRadosStore *const store; + RGWHTTPManager *const http; + const BucketTrimConfig& config; + BucketTrimObserver *const observer; + const rgw_raw_obj& obj; + const std::string name{"trim"}; //< lock name + const std::string cookie; + const DoutPrefixProvider *dpp; + + public: + BucketTrimPollCR(rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, + const BucketTrimConfig& config, + BucketTrimObserver *observer, const rgw_raw_obj& obj, + const DoutPrefixProvider *dpp) + : RGWCoroutine(store->ctx()), store(store), http(http), + config(config), observer(observer), obj(obj), + cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), + dpp(dpp) {} + + int operate(const DoutPrefixProvider *dpp) override; +}; + +int BucketTrimPollCR::operate(const DoutPrefixProvider *dpp) +{ + reenter(this) { + for (;;) { + set_status("sleeping"); + wait(utime_t{static_cast(config.trim_interval_sec), 0}); + + // prevent others from trimming for our entire wait interval + set_status("acquiring trim lock"); + yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store, + obj, name, cookie, + config.trim_interval_sec)); + if (retcode < 0) { + ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; + continue; + } + + set_status("trimming"); + yield call(new BucketTrimCR(store, http, config, observer, obj, dpp)); + if (retcode < 0) { + // on errors, unlock so other gateways can try + set_status("unlocking"); + yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store, + obj, name, cookie)); + } + } + } + return 0; +} + +/// tracks a bounded list of events with timestamps. old events can be expired, +/// and recent events can be searched by key. expiration depends on events being +/// inserted in temporal order +template +class RecentEventList { + public: + using clock_type = Clock; + using time_point = typename clock_type::time_point; + + RecentEventList(size_t max_size, const ceph::timespan& max_duration) + : events(max_size), max_duration(max_duration) + {} + + /// insert an event at the given point in time. this time must be at least as + /// recent as the last inserted event + void insert(T&& value, const time_point& now) { + // ceph_assert(events.empty() || now >= events.back().time) + events.push_back(Event{std::move(value), now}); + } + + /// performs a linear search for an event matching the given key, whose type + /// U can be any that provides operator==(U, T) + template + bool lookup(const U& key) const { + for (const auto& event : events) { + if (key == event.value) { + return true; + } + } + return false; + } + + /// remove events that are no longer recent compared to the given point in time + void expire_old(const time_point& now) { + const auto expired_before = now - max_duration; + while (!events.empty() && events.front().time < expired_before) { + events.pop_front(); + } + } + + private: + struct Event { + T value; + time_point time; + }; + boost::circular_buffer events; + const ceph::timespan max_duration; +}; + +namespace rgw { + +// read bucket trim configuration from ceph context +void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config) +{ + const auto& conf = cct->_conf; + + config.trim_interval_sec = + conf.get_val("rgw_sync_log_trim_interval"); + config.counter_size = 512; + config.buckets_per_interval = + conf.get_val("rgw_sync_log_trim_max_buckets"); + config.min_cold_buckets_per_interval = + conf.get_val("rgw_sync_log_trim_min_cold_buckets"); + config.concurrent_buckets = + conf.get_val("rgw_sync_log_trim_concurrent_buckets"); + config.notify_timeout_ms = 10000; + config.recent_size = 128; + config.recent_duration = std::chrono::hours(2); +} + +class BucketTrimManager::Impl : public TrimCounters::Server, + public BucketTrimObserver { + public: + rgw::sal::RGWRadosStore *const store; + const BucketTrimConfig config; + + const rgw_raw_obj status_obj; + + /// count frequency of bucket instance entries in the data changes log + BucketChangeCounter counter; + + using RecentlyTrimmedBucketList = RecentEventList; + using clock_type = RecentlyTrimmedBucketList::clock_type; + /// track recently trimmed buckets to focus trim activity elsewhere + RecentlyTrimmedBucketList trimmed; + + /// serve the bucket trim watch/notify api + BucketTrimWatcher watcher; + + /// protect data shared between data sync, trim, and watch/notify threads + std::mutex mutex; + + Impl(rgw::sal::RGWRadosStore *store, const BucketTrimConfig& config) + : store(store), config(config), + status_obj(store->svc()->zone->get_zone_params().log_pool, BucketTrimStatus::oid), + counter(config.counter_size), + trimmed(config.recent_size, config.recent_duration), + watcher(store, status_obj, this) + {} + + /// TrimCounters::Server interface for watch/notify api + void get_bucket_counters(int count, TrimCounters::Vector& buckets) { + buckets.reserve(count); + std::lock_guard lock(mutex); + counter.get_highest(count, [&buckets] (const std::string& key, int count) { + buckets.emplace_back(key, count); + }); + ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; + } + + void reset_bucket_counters() override { + ldout(store->ctx(), 20) << "bucket trim completed" << dendl; + std::lock_guard lock(mutex); + counter.clear(); + trimmed.expire_old(clock_type::now()); + } + + /// BucketTrimObserver interface to remember successfully-trimmed buckets + void on_bucket_trimmed(std::string&& bucket_instance) override { + ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl; + std::lock_guard lock(mutex); + trimmed.insert(std::move(bucket_instance), clock_type::now()); + } + + bool trimmed_recently(const std::string_view& bucket_instance) override { + std::lock_guard lock(mutex); + return trimmed.lookup(bucket_instance); + } +}; + +BucketTrimManager::BucketTrimManager(rgw::sal::RGWRadosStore *store, + const BucketTrimConfig& config) + : impl(new Impl(store, config)) +{ +} +BucketTrimManager::~BucketTrimManager() = default; + +int BucketTrimManager::init() +{ + return impl->watcher.start(this); +} + +void BucketTrimManager::on_bucket_changed(const std::string_view& bucket) +{ + std::lock_guard lock(impl->mutex); + // filter recently trimmed bucket instances out of bucket change counter + if (impl->trimmed.lookup(bucket)) { + return; + } + impl->counter.insert(std::string(bucket)); +} + +RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http) +{ + return new BucketTrimPollCR(impl->store, http, impl->config, + impl.get(), impl->status_obj, this); +} + +RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http) +{ + // return the trim coroutine without any polling + return new BucketTrimCR(impl->store, http, impl->config, + impl.get(), impl->status_obj, this); +} + +CephContext* BucketTrimManager::get_cct() const +{ + return impl->store->ctx(); +} + +unsigned BucketTrimManager::get_subsys() const +{ + return dout_subsys; +} + +std::ostream& BucketTrimManager::gen_prefix(std::ostream& out) const +{ + return out << "rgw bucket trim manager: "; +} + +} // namespace rgw -- cgit v1.2.3