diff options
Diffstat (limited to '')
-rw-r--r-- | src/rgw/rgw_reshard.cc | 1177 |
1 files changed, 1177 insertions, 0 deletions
diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc new file mode 100644 index 00000000..eb86b220 --- /dev/null +++ b/src/rgw/rgw_reshard.cc @@ -0,0 +1,1177 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <limits> +#include <sstream> + +#include "rgw_rados.h" +#include "rgw_zone.h" +#include "rgw_bucket.h" +#include "rgw_reshard.h" +#include "cls/rgw/cls_rgw_client.h" +#include "cls/lock/cls_lock_client.h" +#include "common/errno.h" +#include "common/ceph_json.h" + +#include "common/dout.h" + +#include "services/svc_zone.h" +#include "services/svc_sys_obj.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +const string reshard_oid_prefix = "reshard."; +const string reshard_lock_name = "reshard_process"; +const string bucket_instance_lock_name = "bucket_instance_lock"; + + +class BucketReshardShard { + RGWRados *store; + const RGWBucketInfo& bucket_info; + int num_shard; + RGWRados::BucketShard bs; + vector<rgw_cls_bi_entry> entries; + map<RGWObjCategory, rgw_bucket_category_stats> stats; + deque<librados::AioCompletion *>& aio_completions; + uint64_t max_aio_completions; + uint64_t reshard_shard_batch_size; + + int wait_next_completion() { + librados::AioCompletion *c = aio_completions.front(); + aio_completions.pop_front(); + + c->wait_for_safe(); + + int ret = c->get_return_value(); + c->release(); + + if (ret < 0) { + derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl; + return ret; + } + + return 0; + } + + int get_completion(librados::AioCompletion **c) { + if (aio_completions.size() >= max_aio_completions) { + int ret = wait_next_completion(); + if (ret < 0) { + return ret; + } + } + + *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + aio_completions.push_back(*c); + + return 0; + } + +public: + BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info, + int _num_shard, + deque<librados::AioCompletion *>& _completions) : + store(_store), bucket_info(_bucket_info), bs(store), + aio_completions(_completions) + { + num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); + bs.init(bucket_info.bucket, num_shard, nullptr /* no RGWBucketInfo */); + + max_aio_completions = + store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_max_aio"); + reshard_shard_batch_size = + store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_batch_size"); + } + + int get_num_shard() { + return num_shard; + } + + int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category, + const rgw_bucket_category_stats& entry_stats) { + entries.push_back(entry); + if (account) { + rgw_bucket_category_stats& target = stats[category]; + target.num_entries += entry_stats.num_entries; + target.total_size += entry_stats.total_size; + target.total_size_rounded += entry_stats.total_size_rounded; + target.actual_size += entry_stats.actual_size; + } + if (entries.size() >= reshard_shard_batch_size) { + int ret = flush(); + if (ret < 0) { + return ret; + } + } + + return 0; + } + + int flush() { + if (entries.size() == 0) { + return 0; + } + + librados::ObjectWriteOperation op; + for (auto& entry : entries) { + store->bi_put(op, bs, entry); + } + cls_rgw_bucket_update_stats(op, false, stats); + + librados::AioCompletion *c; + int ret = get_completion(&c); + if (ret < 0) { + return ret; + } + ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op); + if (ret < 0) { + derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl; + return ret; + } + entries.clear(); + stats.clear(); + return 0; + } + + int wait_all_aio() { + int ret = 0; + while (!aio_completions.empty()) { + int r = wait_next_completion(); + if (r < 0) { + ret = r; + } + } + return ret; + } +}; // class BucketReshardShard + + +class BucketReshardManager { + RGWRados *store; + const RGWBucketInfo& target_bucket_info; + deque<librados::AioCompletion *> completions; + int num_target_shards; + vector<BucketReshardShard *> target_shards; + +public: + BucketReshardManager(RGWRados *_store, + const RGWBucketInfo& _target_bucket_info, + int _num_target_shards) : + store(_store), target_bucket_info(_target_bucket_info), + num_target_shards(_num_target_shards) + { + target_shards.resize(num_target_shards); + for (int i = 0; i < num_target_shards; ++i) { + target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions); + } + } + + ~BucketReshardManager() { + for (auto& shard : target_shards) { + int ret = shard->wait_all_aio(); + if (ret < 0) { + ldout(store->ctx(), 20) << __func__ << + ": shard->wait_all_aio() returned ret=" << ret << dendl; + } + } + } + + int add_entry(int shard_index, + rgw_cls_bi_entry& entry, bool account, RGWObjCategory category, + const rgw_bucket_category_stats& entry_stats) { + int ret = target_shards[shard_index]->add_entry(entry, account, category, + entry_stats); + if (ret < 0) { + derr << "ERROR: target_shards.add_entry(" << entry.idx << + ") returned error: " << cpp_strerror(-ret) << dendl; + return ret; + } + + return 0; + } + + int finish() { + int ret = 0; + for (auto& shard : target_shards) { + int r = shard->flush(); + if (r < 0) { + derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl; + ret = r; + } + } + for (auto& shard : target_shards) { + int r = shard->wait_all_aio(); + if (r < 0) { + derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl; + ret = r; + } + delete shard; + } + target_shards.clear(); + return ret; + } +}; // class BucketReshardManager + +RGWBucketReshard::RGWBucketReshard(RGWRados *_store, + const RGWBucketInfo& _bucket_info, + const map<string, bufferlist>& _bucket_attrs, + RGWBucketReshardLock* _outer_reshard_lock) : + store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), + reshard_lock(store, bucket_info, true), + outer_reshard_lock(_outer_reshard_lock) +{ } + +int RGWBucketReshard::set_resharding_status(RGWRados* store, + const RGWBucketInfo& bucket_info, + const string& new_instance_id, + int32_t num_shards, + cls_rgw_reshard_status status) +{ + if (new_instance_id.empty()) { + ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl; + return -EINVAL; + } + + cls_rgw_bucket_instance_entry instance_entry; + instance_entry.set_status(new_instance_id, num_shards, status); + + int ret = store->bucket_set_reshard(bucket_info, instance_entry); + if (ret < 0) { + ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: " + << cpp_strerror(-ret) << dendl; + return ret; + } + return 0; +} + +// reshard lock assumes lock is held +int RGWBucketReshard::clear_resharding(RGWRados* store, + const RGWBucketInfo& bucket_info) +{ + int ret = clear_index_shard_reshard_status(store, bucket_info); + if (ret < 0) { + ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ << + " ERROR: error clearing reshard status from index shard " << + cpp_strerror(-ret) << dendl; + return ret; + } + + cls_rgw_bucket_instance_entry instance_entry; + ret = store->bucket_set_reshard(bucket_info, instance_entry); + if (ret < 0) { + ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << + " ERROR: error setting bucket resharding flag on bucket index: " << + cpp_strerror(-ret) << dendl; + return ret; + } + + return 0; +} + +int RGWBucketReshard::clear_index_shard_reshard_status(RGWRados* store, + const RGWBucketInfo& bucket_info) +{ + uint32_t num_shards = bucket_info.num_shards; + + if (num_shards < std::numeric_limits<uint32_t>::max()) { + int ret = set_resharding_status(store, bucket_info, + bucket_info.bucket.bucket_id, + (num_shards < 1 ? 1 : num_shards), + CLS_RGW_RESHARD_NOT_RESHARDING); + if (ret < 0) { + ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ << + " ERROR: error clearing reshard status from index shard " << + cpp_strerror(-ret) << dendl; + return ret; + } + } + + return 0; +} + +static int create_new_bucket_instance(RGWRados *store, + int new_num_shards, + const RGWBucketInfo& bucket_info, + map<string, bufferlist>& attrs, + RGWBucketInfo& new_bucket_info) +{ + new_bucket_info = bucket_info; + + store->create_bucket_id(&new_bucket_info.bucket.bucket_id); + new_bucket_info.bucket.oid.clear(); + + new_bucket_info.num_shards = new_num_shards; + new_bucket_info.objv_tracker.clear(); + + new_bucket_info.new_bucket_instance_id.clear(); + new_bucket_info.reshard_status = 0; + + int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards); + if (ret < 0) { + cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl; + return ret; + } + + ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs); + if (ret < 0) { + cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl; + return ret; + } + + return 0; +} + +int RGWBucketReshard::create_new_bucket_instance(int new_num_shards, + RGWBucketInfo& new_bucket_info) +{ + return ::create_new_bucket_instance(store, new_num_shards, + bucket_info, bucket_attrs, new_bucket_info); +} + +int RGWBucketReshard::cancel() +{ + int ret = reshard_lock.lock(); + if (ret < 0) { + return ret; + } + + ret = clear_resharding(); + + reshard_lock.unlock(); + return ret; +} + +class BucketInfoReshardUpdate +{ + RGWRados *store; + RGWBucketInfo& bucket_info; + std::map<string, bufferlist> bucket_attrs; + + bool in_progress{false}; + + int set_status(cls_rgw_reshard_status s) { + bucket_info.reshard_status = s; + int ret = store->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl; + return ret; + } + return 0; + } + +public: + BucketInfoReshardUpdate(RGWRados *_store, + RGWBucketInfo& _bucket_info, + map<string, bufferlist>& _bucket_attrs, + const string& new_bucket_id) : + store(_store), + bucket_info(_bucket_info), + bucket_attrs(_bucket_attrs) + { + bucket_info.new_bucket_instance_id = new_bucket_id; + } + + ~BucketInfoReshardUpdate() { + if (in_progress) { + // resharding must not have ended correctly, clean up + int ret = + RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info); + if (ret < 0) { + lderr(store->ctx()) << "Error: " << __func__ << + " clear_index_shard_status returned " << ret << dendl; + } + bucket_info.new_bucket_instance_id.clear(); + set_status(CLS_RGW_RESHARD_NOT_RESHARDING); // clears new_bucket_instance as well + } + } + + int start() { + int ret = set_status(CLS_RGW_RESHARD_IN_PROGRESS); + if (ret < 0) { + return ret; + } + in_progress = true; + return 0; + } + + int complete() { + int ret = set_status(CLS_RGW_RESHARD_DONE); + if (ret < 0) { + return ret; + } + in_progress = false; + return 0; + } +}; + + +RGWBucketReshardLock::RGWBucketReshardLock(RGWRados* _store, + const std::string& reshard_lock_oid, + bool _ephemeral) : + store(_store), + lock_oid(reshard_lock_oid), + ephemeral(_ephemeral), + internal_lock(reshard_lock_name) +{ + const int lock_dur_secs = store->ctx()->_conf.get_val<uint64_t>( + "rgw_reshard_bucket_lock_duration"); + duration = std::chrono::seconds(lock_dur_secs); + +#define COOKIE_LEN 16 + char cookie_buf[COOKIE_LEN + 1]; + gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); + cookie_buf[COOKIE_LEN] = '\0'; + + internal_lock.set_cookie(cookie_buf); + internal_lock.set_duration(duration); +} + +int RGWBucketReshardLock::lock() { + internal_lock.set_must_renew(false); + int ret; + if (ephemeral) { + ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx, + lock_oid); + } else { + ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid); + } + if (ret < 0) { + ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ << + " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl; + return ret; + } + reset_time(Clock::now()); + + return 0; +} + +void RGWBucketReshardLock::unlock() { + int ret = internal_lock.unlock(&store->reshard_pool_ctx, lock_oid); + if (ret < 0) { + ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ << + " failed to drop lock on " << lock_oid << " ret=" << ret << dendl; + } +} + +int RGWBucketReshardLock::renew(const Clock::time_point& now) { + internal_lock.set_must_renew(true); + int ret; + if (ephemeral) { + ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx, + lock_oid); + } else { + ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid); + } + if (ret < 0) { /* expired or already locked by another processor */ + std::stringstream error_s; + if (-ENOENT == ret) { + error_s << "ENOENT (lock expired or never initially locked)"; + } else { + error_s << ret << " (" << cpp_strerror(-ret) << ")"; + } + ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " << + lock_oid << " with error " << error_s.str() << dendl; + return ret; + } + internal_lock.set_must_renew(false); + + reset_time(now); + ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " << + lock_oid << dendl; + + return 0; +} + + +int RGWBucketReshard::do_reshard(int num_shards, + RGWBucketInfo& new_bucket_info, + int max_entries, + bool verbose, + ostream *out, + Formatter *formatter) +{ + rgw_bucket& bucket = bucket_info.bucket; + + int ret = 0; + + if (out) { + (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl; + (*out) << "bucket name: " << bucket_info.bucket.name << std::endl; + (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << + std::endl; + (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << + std::endl; + } + + /* update bucket info -- in progress*/ + list<rgw_cls_bi_entry> entries; + + if (max_entries < 0) { + ldout(store->ctx(), 0) << __func__ << + ": can't reshard, negative max_entries" << dendl; + return -EINVAL; + } + + // NB: destructor cleans up sharding state if reshard does not + // complete successfully + BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id); + + ret = bucket_info_updater.start(); + if (ret < 0) { + ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; + return ret; + } + + int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1); + + BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards); + + bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr); + + if (verbose_json_out) { + formatter->open_array_section("entries"); + } + + uint64_t total_entries = 0; + + if (!verbose_json_out && out) { + (*out) << "total entries:"; + } + + const int num_source_shards = + (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); + string marker; + for (int i = 0; i < num_source_shards; ++i) { + bool is_truncated = true; + marker.clear(); + while (is_truncated) { + entries.clear(); + ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated); + if (ret < 0 && ret != -ENOENT) { + derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl; + return ret; + } + + for (auto iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + if (verbose_json_out) { + formatter->open_object_section("entry"); + + encode_json("shard_id", i, formatter); + encode_json("num_entry", total_entries, formatter); + encode_json("entry", entry, formatter); + } + total_entries++; + + marker = entry.idx; + + int target_shard_id; + cls_rgw_obj_key cls_key; + RGWObjCategory category; + rgw_bucket_category_stats stats; + bool account = entry.get_info(&cls_key, &category, &stats); + rgw_obj_key key(cls_key); + rgw_obj obj(new_bucket_info.bucket, key); + RGWMPObj mp; + if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) { + // place the multipart .meta object on the same shard as its head object + obj.index_hash_source = mp.get_key(); + } + int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; + return ret; + } + + int shard_index = (target_shard_id > 0 ? target_shard_id : 0); + + ret = target_shards_mgr.add_entry(shard_index, entry, account, + category, stats); + if (ret < 0) { + return ret; + } + + Clock::time_point now = Clock::now(); + if (reshard_lock.should_renew(now)) { + // assume outer locks have timespans at least the size of ours, so + // can call inside conditional + if (outer_reshard_lock) { + ret = outer_reshard_lock->renew(now); + if (ret < 0) { + return ret; + } + } + ret = reshard_lock.renew(now); + if (ret < 0) { + lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl; + return ret; + } + } + if (verbose_json_out) { + formatter->close_section(); + formatter->flush(*out); + } else if (out && !(total_entries % 1000)) { + (*out) << " " << total_entries; + } + } // entries loop + } + } + + if (verbose_json_out) { + formatter->close_section(); + formatter->flush(*out); + } else if (out) { + (*out) << " " << total_entries << std::endl; + } + + ret = target_shards_mgr.finish(); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to reshard" << dendl; + return -EIO; + } + + ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time); + if (ret < 0) { + lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl; + return ret; + } + + ret = bucket_info_updater.complete(); + if (ret < 0) { + ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; + /* don't error out, reshard process succeeded */ + } + + return 0; + // NB: some error clean-up is done by ~BucketInfoReshardUpdate +} // RGWBucketReshard::do_reshard + +int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status) +{ + librados::IoCtx index_ctx; + map<int, string> bucket_objs; + + int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs); + if (r < 0) { + return r; + } + + for (auto i : bucket_objs) { + cls_rgw_bucket_instance_entry entry; + + int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry); + if (ret < 0 && ret != -ENOENT) { + lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl; + return ret; + } + + status->push_back(entry); + } + + return 0; +} + + +int RGWBucketReshard::execute(int num_shards, int max_op_entries, + bool verbose, ostream *out, Formatter *formatter, + RGWReshard* reshard_log) +{ + Clock::time_point now; + + int ret = reshard_lock.lock(); + if (ret < 0) { + return ret; + } + + RGWBucketInfo new_bucket_info; + ret = create_new_bucket_instance(num_shards, new_bucket_info); + if (ret < 0) { + // shard state is uncertain, but this will attempt to remove them anyway + goto error_out; + } + + if (reshard_log) { + ret = reshard_log->update(bucket_info, new_bucket_info); + if (ret < 0) { + goto error_out; + } + } + + // set resharding status of current bucket_info & shards with + // information about planned resharding + ret = set_resharding_status(new_bucket_info.bucket.bucket_id, + num_shards, CLS_RGW_RESHARD_IN_PROGRESS); + if (ret < 0) { + reshard_lock.unlock(); + return ret; + } + + ret = do_reshard(num_shards, + new_bucket_info, + max_op_entries, + verbose, out, formatter); + if (ret < 0) { + goto error_out; + } + + // at this point we've done the main work; we'll make a best-effort + // to clean-up but will not indicate any errors encountered + + reshard_lock.unlock(); + + // resharding successful, so remove old bucket index shards; use + // best effort and don't report out an error; the lock isn't needed + // at this point since all we're using a best effor to to remove old + // shard objects + ret = store->clean_bucket_index(bucket_info, bucket_info.num_shards); + if (ret < 0) { + lderr(store->ctx()) << "Error: " << __func__ << + " failed to clean up old shards; " << + "RGWRados::clean_bucket_index returned " << ret << dendl; + } + + ret = rgw_bucket_instance_remove_entry(store, + bucket_info.bucket.get_key(), + nullptr); + if (ret < 0) { + lderr(store->ctx()) << "Error: " << __func__ << + " failed to clean old bucket info object \"" << + bucket_info.bucket.get_key() << + "\"created after successful resharding with error " << ret << dendl; + } + + ldout(store->ctx(), 1) << __func__ << + " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" << + bucket_info.bucket.get_key() << "\" to \"" << + new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl; + + return 0; + +error_out: + + reshard_lock.unlock(); + + // since the real problem is the issue that led to this error code + // path, we won't touch ret and instead use another variable to + // temporarily error codes + int ret2 = store->clean_bucket_index(new_bucket_info, + new_bucket_info.num_shards); + if (ret2 < 0) { + lderr(store->ctx()) << "Error: " << __func__ << + " failed to clean up shards from failed incomplete resharding; " << + "RGWRados::clean_bucket_index returned " << ret2 << dendl; + } + + ret2 = rgw_bucket_instance_remove_entry(store, + new_bucket_info.bucket.get_key(), + nullptr); + if (ret2 < 0) { + lderr(store->ctx()) << "Error: " << __func__ << + " failed to clean bucket info object \"" << + new_bucket_info.bucket.get_key() << + "\"created during incomplete resharding with error " << ret2 << dendl; + } + + return ret; +} // execute + + +RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out, + Formatter *_formatter) : + store(_store), instance_lock(bucket_instance_lock_name), + verbose(_verbose), out(_out), formatter(_formatter) +{ + num_logshards = store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_num_logs"); +} + +string RGWReshard::get_logshard_key(const string& tenant, + const string& bucket_name) +{ + return tenant + ":" + bucket_name; +} + +#define MAX_RESHARD_LOGSHARDS_PRIME 7877 + +void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid) +{ + string key = get_logshard_key(tenant, bucket_name); + + uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size()); + uint32_t sid2 = sid ^ ((sid & 0xFF) << 24); + sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards; + + get_logshard_oid(int(sid), oid); +} + +int RGWReshard::add(cls_rgw_reshard_entry& entry) +{ + if (!store->svc.zone->can_reshard()) { + ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl; + return 0; + } + + string logshard_oid; + + get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); + + librados::ObjectWriteOperation op; + cls_rgw_reshard_add(op, entry); + + int ret = store->reshard_pool_ctx.operate(logshard_oid, &op); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; + return ret; + } + return 0; +} + +int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info) +{ + cls_rgw_reshard_entry entry; + entry.bucket_name = bucket_info.bucket.name; + entry.bucket_id = bucket_info.bucket.bucket_id; + entry.tenant = bucket_info.owner.tenant; + + int ret = get(entry); + if (ret < 0) { + return ret; + } + + entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id; + + ret = add(entry); + if (ret < 0) { + ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " << + cpp_strerror(-ret) << dendl; + } + + return ret; +} + + +int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated) +{ + string logshard_oid; + + get_logshard_oid(logshard_num, &logshard_oid); + + int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated); + + if (ret < 0) { + if (ret == -ENOENT) { + *is_truncated = false; + ret = 0; + } + lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl; + if (ret == -EACCES) { + lderr(store->ctx()) << "access denied to pool " << store->svc.zone->get_zone_params().reshard_pool + << ". Fix the pool access permissions of your client" << dendl; + } + } + + return ret; +} + +int RGWReshard::get(cls_rgw_reshard_entry& entry) +{ + string logshard_oid; + + get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); + + int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry); + if (ret < 0) { + if (ret != -ENOENT) { + lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << + " bucket=" << entry.bucket_name << dendl; + } + return ret; + } + + return 0; +} + +int RGWReshard::remove(cls_rgw_reshard_entry& entry) +{ + string logshard_oid; + + get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); + + librados::ObjectWriteOperation op; + cls_rgw_reshard_remove(op, entry); + + int ret = store->reshard_pool_ctx.operate(logshard_oid, &op); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; + return ret; + } + + return ret; +} + +int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry) +{ + int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl; + return ret; + } + + return 0; +} + +int RGWReshardWait::wait(optional_yield y) +{ + std::unique_lock lock(mutex); + + if (going_down) { + return -ECANCELED; + } + +#ifdef HAVE_BOOST_CONTEXT + if (y) { + auto& context = y.get_io_context(); + auto& yield = y.get_yield_context(); + + Waiter waiter(context); + waiters.push_back(waiter); + lock.unlock(); + + waiter.timer.expires_after(duration); + + boost::system::error_code ec; + waiter.timer.async_wait(yield[ec]); + + lock.lock(); + waiters.erase(waiters.iterator_to(waiter)); + return -ec.value(); + } +#endif + + cond.wait_for(lock, duration); + + if (going_down) { + return -ECANCELED; + } + + return 0; +} + +void RGWReshardWait::stop() +{ + std::scoped_lock lock(mutex); + going_down = true; + cond.notify_all(); + for (auto& waiter : waiters) { + // unblock any waiters with ECANCELED + waiter.timer.cancel(); + } +} + +int RGWReshard::process_single_logshard(int logshard_num) +{ + string marker; + bool truncated = true; + + CephContext *cct = store->ctx(); + constexpr uint32_t max_entries = 1000; + + string logshard_oid; + get_logshard_oid(logshard_num, &logshard_oid); + + RGWBucketReshardLock logshard_lock(store, logshard_oid, false); + + int ret = logshard_lock.lock(); + if (ret < 0) { + ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << + logshard_oid << ", ret = " << ret <<dendl; + return ret; + } + + do { + std::list<cls_rgw_reshard_entry> entries; + ret = list(logshard_num, marker, max_entries, entries, &truncated); + if (ret < 0) { + ldout(cct, 10) << "cannot list all reshards in logshard oid=" << + logshard_oid << dendl; + continue; + } + + for(auto& entry: entries) { // logshard entries + if(entry.new_instance_id.empty()) { + + ldout(store->ctx(), 20) << __func__ << " resharding " << + entry.bucket_name << dendl; + + auto obj_ctx = store->svc.sysobj->init_obj_ctx(); + rgw_bucket bucket; + RGWBucketInfo bucket_info; + map<string, bufferlist> attrs; + + ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, + bucket_info, nullptr, &attrs); + if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) { + if (ret < 0) { + ldout(cct, 0) << __func__ << + ": Error in get_bucket_info for bucket " << entry.bucket_name << + ": " << cpp_strerror(-ret) << dendl; + if (ret != -ENOENT) { + // any error other than ENOENT will abort + return ret; + } + } else { + ldout(cct,0) << __func__ << + ": Bucket: " << entry.bucket_name << + " already resharded by someone, skipping " << dendl; + } + + // we've encountered a reshard queue entry for an apparently + // non-existent bucket; let's try to recover by cleaning up + ldout(cct, 0) << __func__ << + ": removing reshard queue entry for a resharded or non-existent bucket" << + entry.bucket_name << dendl; + + ret = remove(entry); + if (ret < 0) { + ldout(cct, 0) << __func__ << + ": Error removing non-existent bucket " << + entry.bucket_name << " from resharding queue: " << + cpp_strerror(-ret) << dendl; + return ret; + } + + // we cleaned up, move on to the next entry + goto finished_entry; + } + + RGWBucketReshard br(store, bucket_info, attrs, nullptr); + ret = br.execute(entry.new_num_shards, max_entries, false, nullptr, + nullptr, this); + if (ret < 0) { + ldout(store->ctx(), 0) << __func__ << + ": Error during resharding bucket " << entry.bucket_name << ":" << + cpp_strerror(-ret)<< dendl; + return ret; + } + + ldout(store->ctx(), 20) << __func__ << + " removing reshard queue entry for bucket " << entry.bucket_name << + dendl; + + ret = remove(entry); + if (ret < 0) { + ldout(cct, 0) << __func__ << ": Error removing bucket " << + entry.bucket_name << " from resharding queue: " << + cpp_strerror(-ret) << dendl; + return ret; + } + } // if new instance id is empty + + finished_entry: + + Clock::time_point now = Clock::now(); + if (logshard_lock.should_renew(now)) { + ret = logshard_lock.renew(now); + if (ret < 0) { + return ret; + } + } + + entry.get_key(&marker); + } // entry for loop + } while (truncated); + + logshard_lock.unlock(); + return 0; +} + + +void RGWReshard::get_logshard_oid(int shard_num, string *logshard) +{ + char buf[32]; + snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num); + + string objname(reshard_oid_prefix); + *logshard = objname + buf; +} + +int RGWReshard::process_all_logshards() +{ + if (!store->svc.zone->can_reshard()) { + ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl; + return 0; + } + int ret = 0; + + for (int i = 0; i < num_logshards; i++) { + string logshard; + get_logshard_oid(i, &logshard); + + ldout(store->ctx(), 20) << "processing logshard = " << logshard << dendl; + + ret = process_single_logshard(i); + if (ret <0) { + return ret; + } + } + + return 0; +} + +bool RGWReshard::going_down() +{ + return down_flag; +} + +void RGWReshard::start_processor() +{ + worker = new ReshardWorker(store->ctx(), this); + worker->create("rgw_reshard"); +} + +void RGWReshard::stop_processor() +{ + down_flag = true; + if (worker) { + worker->stop(); + worker->join(); + } + delete worker; + worker = nullptr; +} + +void *RGWReshard::ReshardWorker::entry() { + utime_t last_run; + do { + utime_t start = ceph_clock_now(); + if (reshard->process_all_logshards()) { + /* All shards have been processed properly. Next time we can start + * from this moment. */ + last_run = start; + } + + if (reshard->going_down()) + break; + + utime_t end = ceph_clock_now(); + end -= start; + int secs = cct->_conf.get_val<uint64_t>("rgw_reshard_thread_interval"); + + if (secs <= end.sec()) + continue; // next round + + secs -= end.sec(); + + lock.Lock(); + cond.WaitInterval(lock, utime_t(secs, 0)); + lock.Unlock(); + } while (!reshard->going_down()); + + return NULL; +} + +void RGWReshard::ReshardWorker::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} |