summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_reshard.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rgw/rgw_reshard.h
parentInitial commit. (diff)
downloadceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.tar.xz
ceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_reshard.h')
-rw-r--r--src/rgw/rgw_reshard.h286
1 files changed, 286 insertions, 0 deletions
diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h
new file mode 100644
index 000000000..ecb18690f
--- /dev/null
+++ b/src/rgw/rgw_reshard.h
@@ -0,0 +1,286 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#ifndef RGW_RESHARD_H
+#define RGW_RESHARD_H
+
+#include <vector>
+#include <initializer_list>
+#include <functional>
+#include <iterator>
+#include <algorithm>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/asio/basic_waitable_timer.hpp>
+
+#include "include/common_fwd.h"
+#include "include/rados/librados.hpp"
+#include "common/ceph_time.h"
+#include "common/async/yield_context.h"
+#include "cls/rgw/cls_rgw_types.h"
+#include "cls/lock/cls_lock_client.h"
+
+#include "rgw_common.h"
+
+
+class RGWReshard;
+namespace rgw { namespace sal {
+ class RGWRadosStore;
+} }
+
+class RGWBucketReshardLock {
+ using Clock = ceph::coarse_mono_clock;
+
+ rgw::sal::RGWRadosStore* store;
+ const std::string lock_oid;
+ const bool ephemeral;
+ rados::cls::lock::Lock internal_lock;
+ std::chrono::seconds duration;
+
+ Clock::time_point start_time;
+ Clock::time_point renew_thresh;
+
+ void reset_time(const Clock::time_point& now) {
+ start_time = now;
+ renew_thresh = start_time + duration / 2;
+ }
+
+public:
+ RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store,
+ const std::string& reshard_lock_oid,
+ bool _ephemeral);
+ RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store,
+ const RGWBucketInfo& bucket_info,
+ bool _ephemeral) :
+ RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral)
+ {}
+
+ int lock();
+ void unlock();
+ int renew(const Clock::time_point&);
+
+ bool should_renew(const Clock::time_point& now) const {
+ return now >= renew_thresh;
+ }
+}; // class RGWBucketReshardLock
+
+class RGWBucketReshard {
+public:
+
+ friend class RGWReshard;
+
+ using Clock = ceph::coarse_mono_clock;
+
+private:
+
+ rgw::sal::RGWRadosStore *store;
+ RGWBucketInfo bucket_info;
+ std::map<string, bufferlist> bucket_attrs;
+
+ RGWBucketReshardLock reshard_lock;
+ RGWBucketReshardLock* outer_reshard_lock;
+
+ // using an initializer_list as an array in contiguous memory
+ // allocated in at once
+ static const std::initializer_list<uint16_t> reshard_primes;
+
+ int create_new_bucket_instance(int new_num_shards,
+ RGWBucketInfo& new_bucket_info,
+ const DoutPrefixProvider *dpp);
+ int do_reshard(int num_shards,
+ RGWBucketInfo& new_bucket_info,
+ int max_entries,
+ bool verbose,
+ ostream *os,
+ Formatter *formatter,
+ const DoutPrefixProvider *dpp);
+public:
+
+ // pass nullptr for the final parameter if no outer reshard lock to
+ // manage
+ RGWBucketReshard(rgw::sal::RGWRadosStore *_store,
+ const RGWBucketInfo& _bucket_info,
+ const std::map<string, bufferlist>& _bucket_attrs,
+ RGWBucketReshardLock* _outer_reshard_lock);
+ int execute(int num_shards, int max_op_entries,
+ const DoutPrefixProvider *dpp,
+ bool verbose = false, ostream *out = nullptr,
+ Formatter *formatter = nullptr,
+ RGWReshard *reshard_log = nullptr);
+ int get_status(const DoutPrefixProvider *dpp, std::list<cls_rgw_bucket_instance_entry> *status);
+ int cancel(const DoutPrefixProvider *dpp);
+ static int clear_resharding(const DoutPrefixProvider *dpp,
+ rgw::sal::RGWRadosStore* store,
+ const RGWBucketInfo& bucket_info);
+ int clear_resharding(const DoutPrefixProvider *dpp) {
+ return clear_resharding(dpp, store, bucket_info);
+ }
+ static int clear_index_shard_reshard_status(const DoutPrefixProvider *dpp,
+ rgw::sal::RGWRadosStore* store,
+ const RGWBucketInfo& bucket_info);
+ int clear_index_shard_reshard_status(const DoutPrefixProvider *dpp) {
+ return clear_index_shard_reshard_status(dpp, store, bucket_info);
+ }
+ static int set_resharding_status(const DoutPrefixProvider *dpp,
+ rgw::sal::RGWRadosStore* store,
+ const RGWBucketInfo& bucket_info,
+ const string& new_instance_id,
+ int32_t num_shards,
+ cls_rgw_reshard_status status);
+ int set_resharding_status(const DoutPrefixProvider *dpp, const string& new_instance_id,
+ int32_t num_shards,
+ cls_rgw_reshard_status status) {
+ return set_resharding_status(dpp, store, bucket_info,
+ new_instance_id, num_shards, status);
+ }
+
+ static uint32_t get_max_prime_shards() {
+ return *std::crbegin(reshard_primes);
+ }
+
+ // returns the prime in our list less than or equal to the
+ // parameter; the lowest value that can be returned is 1
+ static uint32_t get_prime_shards_less_or_equal(uint32_t requested_shards) {
+ auto it = std::upper_bound(reshard_primes.begin(), reshard_primes.end(),
+ requested_shards);
+ if (it == reshard_primes.begin()) {
+ return 1;
+ } else {
+ return *(--it);
+ }
+ }
+
+ // returns the prime in our list greater than or equal to the
+ // parameter; if we do not have such a prime, 0 is returned
+ static uint32_t get_prime_shards_greater_or_equal(
+ uint32_t requested_shards)
+ {
+ auto it = std::lower_bound(reshard_primes.begin(), reshard_primes.end(),
+ requested_shards);
+ if (it == reshard_primes.end()) {
+ return 0;
+ } else {
+ return *it;
+ }
+ }
+
+ // returns a preferred number of shards given a calculated number of
+ // shards based on max_dynamic_shards and the list of prime values
+ static uint32_t get_preferred_shards(uint32_t suggested_shards,
+ uint32_t max_dynamic_shards) {
+
+ // use a prime if max is within our prime range, otherwise use
+ // specified max
+ const uint32_t absolute_max =
+ max_dynamic_shards >= get_max_prime_shards() ?
+ max_dynamic_shards :
+ get_prime_shards_less_or_equal(max_dynamic_shards);
+
+ // if we can use a prime number, use it, otherwise use suggested;
+ // note get_prime_shards_greater_or_equal will return 0 if no prime in
+ // prime range
+ const uint32_t prime_ish_num_shards =
+ std::max(get_prime_shards_greater_or_equal(suggested_shards),
+ suggested_shards);
+
+ // dynamic sharding cannot reshard more than defined maximum
+ const uint32_t final_num_shards =
+ std::min(prime_ish_num_shards, absolute_max);
+
+ return final_num_shards;
+ }
+}; // RGWBucketReshard
+
+
+class RGWReshard {
+public:
+ using Clock = ceph::coarse_mono_clock;
+
+private:
+ rgw::sal::RGWRadosStore *store;
+ string lock_name;
+ rados::cls::lock::Lock instance_lock;
+ int num_logshards;
+
+ bool verbose;
+ ostream *out;
+ Formatter *formatter;
+
+ void get_logshard_oid(int shard_num, string *shard);
+protected:
+ class ReshardWorker : public Thread, public DoutPrefixProvider {
+ CephContext *cct;
+ RGWReshard *reshard;
+ ceph::mutex lock = ceph::make_mutex("ReshardWorker");
+ ceph::condition_variable cond;
+
+ public:
+ ReshardWorker(CephContext * const _cct,
+ RGWReshard * const _reshard)
+ : cct(_cct),
+ reshard(_reshard) {}
+
+ void *entry() override;
+ void stop();
+
+ CephContext *get_cct() const override;
+ unsigned get_subsys() const;
+ std::ostream& gen_prefix(std::ostream& out) const;
+ };
+
+ ReshardWorker *worker = nullptr;
+ std::atomic<bool> down_flag = { false };
+
+ string get_logshard_key(const string& tenant, const string& bucket_name);
+ void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid);
+
+public:
+ RGWReshard(rgw::sal::RGWRadosStore* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr);
+ int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
+ int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
+ int get(cls_rgw_reshard_entry& entry);
+ int remove(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
+ int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
+ int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
+
+ /* reshard thread */
+ int process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp);
+ int process_all_logshards(const DoutPrefixProvider *dpp);
+ bool going_down();
+ void start_processor();
+ void stop_processor();
+};
+
+class RGWReshardWait {
+ public:
+ // the blocking wait uses std::condition_variable::wait_for(), which uses the
+ // std::chrono::steady_clock. use that for the async waits as well
+ using Clock = std::chrono::steady_clock;
+ private:
+ const ceph::timespan duration;
+ ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock");
+ ceph::condition_variable cond;
+
+ struct Waiter : boost::intrusive::list_base_hook<> {
+ using Executor = boost::asio::io_context::executor_type;
+ using Timer = boost::asio::basic_waitable_timer<Clock,
+ boost::asio::wait_traits<Clock>, Executor>;
+ Timer timer;
+ explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
+ };
+ boost::intrusive::list<Waiter> waiters;
+
+ bool going_down{false};
+
+public:
+ RGWReshardWait(ceph::timespan duration = std::chrono::seconds(5))
+ : duration(duration) {}
+ ~RGWReshardWait() {
+ ceph_assert(going_down);
+ }
+ int wait(optional_yield y);
+ // unblock any threads waiting on reshard
+ void stop();
+};
+
+#endif