diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/cls/rgw/cls_rgw_client.cc | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/cls/rgw/cls_rgw_client.cc')
-rw-r--r-- | src/cls/rgw/cls_rgw_client.cc | 1207 |
1 files changed, 1207 insertions, 0 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc new file mode 100644 index 000000000..cddd735c5 --- /dev/null +++ b/src/cls/rgw/cls_rgw_client.cc @@ -0,0 +1,1207 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <errno.h> + +#include "cls/rgw/cls_rgw_const.h" +#include "cls/rgw/cls_rgw_client.h" + +#include "common/debug.h" + +using std::list; +using std::map; +using std::pair; +using std::string; +using std::vector; + +using ceph::real_time; + +using namespace librados; + +const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#"; +const string BucketIndexShardsManager::SHARDS_SEPARATOR = ","; + + +int CLSRGWConcurrentIO::operator()() { + int ret = 0; + iter = objs_container.begin(); + for (; iter != objs_container.end() && max_aio-- > 0; ++iter) { + ret = issue_op(iter->first, iter->second); + if (ret < 0) + break; + } + + int num_completions = 0, r = 0; + std::map<int, std::string> completed_objs; + std::map<int, std::string> retry_objs; + while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, + need_multiple_rounds() ? &completed_objs : nullptr, + !need_multiple_rounds() ? &retry_objs : nullptr)) { + if (r >= 0 && ret >= 0) { + for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { + int issue_ret = issue_op(iter->first, iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + + // if we're at the end with this round, see if another round is needed + if (iter == objs_container.end()) { + if (need_multiple_rounds() && !completed_objs.empty()) { + // For those objects which need another round, use them to reset + // the container + reset_container(completed_objs); + iter = objs_container.begin(); + } else if (! need_multiple_rounds() && !retry_objs.empty()) { + reset_container(retry_objs); + iter = objs_container.begin(); + } + + // re-issue ops if container was reset above (i.e., iter != + // objs_container.end()); if it was not reset above (i.e., iter + // == objs_container.end()) the loop will exit immediately + // without iterating + for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { + int issue_ret = issue_op(iter->first, iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } + } + + if (ret < 0) { + cleanup(); + } + return ret; +} // CLSRGWConcurrintIO::operator()() + + +/** + * This class represents the bucket index object operation callback context. + */ +template <typename T> +class ClsBucketIndexOpCtx : public ObjectOperationCompletion { +private: + T *data; + int *ret_code; +public: + ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); } + ~ClsBucketIndexOpCtx() override {} + void handle_completion(int r, bufferlist& outbl) override { + // if successful, or we're asked for a retry, copy result into + // destination (*data) + if (r >= 0 || r == RGWBIAdvanceAndRetryError) { + try { + auto iter = outbl.cbegin(); + decode((*data), iter); + } catch (ceph::buffer::error& err) { + r = -EIO; + } + } + if (ret_code) { + *ret_code = r; + } + } +}; + +void BucketIndexAioManager::do_completion(const int request_id) { + std::lock_guard l{lock}; + + auto iter = pendings.find(request_id); + ceph_assert(iter != pendings.end()); + completions[request_id] = iter->second; + pendings.erase(iter); + + // If the caller needs a list of finished objects, store them + // for further processing + auto miter = pending_objs.find(request_id); + if (miter != pending_objs.end()) { + completion_objs.emplace(request_id, miter->second); + pending_objs.erase(miter); + } + + cond.notify_all(); +} + +bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, + int *num_completions, + int *ret_code, + std::map<int, std::string> *completed_objs, + std::map<int, std::string> *retry_objs) +{ + std::unique_lock locker{lock}; + if (pendings.empty() && completions.empty()) { + return false; + } + + if (completions.empty()) { + // Wait for AIO completion + cond.wait(locker); + } + + // Clear the completed AIOs + auto iter = completions.begin(); + for (; iter != completions.end(); ++iter) { + int r = iter->second->get_return_value(); + + // see if we may need to copy completions or retries + if (completed_objs || retry_objs) { + auto liter = completion_objs.find(iter->first); + if (liter != completion_objs.end()) { + if (completed_objs && r == 0) { /* update list of successfully completed objs */ + (*completed_objs)[liter->second.shard_id] = liter->second.oid; + } + + if (r == RGWBIAdvanceAndRetryError) { + r = 0; + if (retry_objs) { + (*retry_objs)[liter->second.shard_id] = liter->second.oid; + } + } + } else { + // NB: should we log an error here; currently no logging + // context to use + } + } + + if (ret_code && (r < 0 && r != valid_ret_code)) { + (*ret_code) = r; + } + + iter->second->release(); + } + + if (num_completions) { + (*num_completions) = completions.size(); + } + + completions.clear(); + + return true; +} + +// note: currently only called by tesing code +void cls_rgw_bucket_init_index(ObjectWriteOperation& o) +{ + bufferlist in; + o.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in); +} + +static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx, + const int shard_id, + const string& oid, + BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.create(true); + op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx, + const int shard_id, + const string& oid, + BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.remove(); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, + const int shard_id, + const string& oid, + uint64_t timeout, + BucketIndexAioManager *manager) { + bufferlist in; + rgw_cls_tag_timeout_op call; + call.tag_timeout = timeout; + encode(call, in); + ObjectWriteOperation op; + op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid) +{ + return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager); +} + +void CLSRGWIssueBucketIndexInit::cleanup() +{ + // Do best effort removal + for (auto citer = objs_container.begin(); citer != iter; ++citer) { + io_ctx.remove(citer->second); + } +} + +int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid) +{ + return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager); +} + +int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid) +{ + return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager); +} + +void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, + bool absolute, + const map<RGWObjCategory, rgw_bucket_category_stats>& stats) +{ + rgw_cls_bucket_update_stats_op call; + call.absolute = absolute; + call.stats = stats; + bufferlist in; + encode(call, in); + o.exec(RGW_CLASS, RGW_BUCKET_UPDATE_STATS, in); +} + +void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, + const cls_rgw_obj_key& key, const string& locator, bool log_op, + uint16_t bilog_flags, rgw_zone_set& zones_trace) +{ + rgw_cls_obj_prepare_op call; + call.op = op; + call.tag = tag; + call.key = key; + call.locator = locator; + call.log_op = log_op; + call.bilog_flags = bilog_flags; + call.zones_trace = zones_trace; + bufferlist in; + encode(call, in); + o.exec(RGW_CLASS, RGW_BUCKET_PREPARE_OP, in); +} + +void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, + rgw_bucket_entry_ver& ver, + const cls_rgw_obj_key& key, + rgw_bucket_dir_entry_meta& dir_meta, + list<cls_rgw_obj_key> *remove_objs, bool log_op, + uint16_t bilog_flags, + rgw_zone_set *zones_trace) +{ + + bufferlist in; + rgw_cls_obj_complete_op call; + call.op = op; + call.tag = tag; + call.key = key; + call.ver = ver; + call.meta = dir_meta; + call.log_op = log_op; + call.bilog_flags = bilog_flags; + if (remove_objs) + call.remove_objs = *remove_objs; + if (zones_trace) { + call.zones_trace = *zones_trace; + } + encode(call, in); + o.exec(RGW_CLASS, RGW_BUCKET_COMPLETE_OP, in); +} + +void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op, + const cls_rgw_obj_key& start_obj, + const std::string& filter_prefix, + const std::string& delimiter, + uint32_t num_entries, + bool list_versions, + rgw_cls_list_ret* result) +{ + bufferlist in; + rgw_cls_list_op call; + call.start_obj = start_obj; + call.filter_prefix = filter_prefix; + call.delimiter = delimiter; + call.num_entries = num_entries; + call.list_versions = list_versions; + encode(call, in); + + op.exec(RGW_CLASS, RGW_BUCKET_LIST, in, + new ClsBucketIndexOpCtx<rgw_cls_list_ret>(result, NULL)); +} + +static bool issue_bucket_list_op(librados::IoCtx& io_ctx, + const int shard_id, + const std::string& oid, + const cls_rgw_obj_key& start_obj, + const std::string& filter_prefix, + const std::string& delimiter, + uint32_t num_entries, + bool list_versions, + BucketIndexAioManager *manager, + rgw_cls_list_ret *pdata) +{ + librados::ObjectReadOperation op; + cls_rgw_bucket_list_op(op, + start_obj, filter_prefix, delimiter, + num_entries, list_versions, pdata); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid) +{ + // set the marker depending on whether we've already queried this + // shard and gotten a RGWBIAdvanceAndRetryError (defined + // constant) return value; if we have use the marker in the return + // to advance the search, otherwise use the marker passed in by the + // caller + cls_rgw_obj_key marker; + auto iter = result.find(shard_id); + if (iter != result.end()) { + marker = iter->second.marker; + } else { + marker = start_obj; + } + + return issue_bucket_list_op(io_ctx, shard_id, oid, + marker, filter_prefix, delimiter, + num_entries, list_versions, &manager, + &result[shard_id]); +} + + +void CLSRGWIssueBucketList::reset_container(std::map<int, std::string>& objs) +{ + objs_container.swap(objs); + iter = objs_container.begin(); + objs.clear(); +} + + +void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes) +{ + bufferlist in; + rgw_cls_obj_remove_op call; + call.keep_attr_prefixes = keep_attr_prefixes; + encode(call, in); + o.exec(RGW_CLASS, RGW_OBJ_REMOVE, in); +} + +void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation& o, const string& attr) +{ + bufferlist in; + rgw_cls_obj_store_pg_ver_op call; + call.attr = attr; + encode(call, in); + o.exec(RGW_CLASS, RGW_OBJ_STORE_PG_VER, in); +} + +void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation& o, const string& prefix, bool fail_if_exist) +{ + bufferlist in; + rgw_cls_obj_check_attrs_prefix call; + call.check_prefix = prefix; + call.fail_if_exist = fail_if_exist; + encode(call, in); + o.exec(RGW_CLASS, RGW_OBJ_CHECK_ATTRS_PREFIX, in); +} + +void cls_rgw_obj_check_mtime(librados::ObjectOperation& o, const real_time& mtime, bool high_precision_time, RGWCheckMTimeType type) +{ + bufferlist in; + rgw_cls_obj_check_mtime call; + call.mtime = mtime; + call.high_precision_time = high_precision_time; + call.type = type; + encode(call, in); + o.exec(RGW_CLASS, RGW_OBJ_CHECK_MTIME, in); +} + +int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid, + BIIndexType index_type, cls_rgw_obj_key& key, + rgw_cls_bi_entry *entry) +{ + bufferlist in, out; + rgw_cls_bi_get_op call; + call.key = key; + call.type = index_type; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET, in, out); + if (r < 0) + return r; + + rgw_cls_bi_get_ret op_ret; + auto iter = out.cbegin(); + try { + decode(op_ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + + *entry = op_ret.entry; + + return 0; +} + +int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry& entry) +{ + bufferlist in, out; + rgw_cls_bi_put_op call; + call.entry = entry; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_PUT, in, out); + if (r < 0) + return r; + + return 0; +} + +void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, rgw_cls_bi_entry& entry) +{ + bufferlist in, out; + rgw_cls_bi_put_op call; + call.entry = entry; + encode(call, in); + op.exec(RGW_CLASS, RGW_BI_PUT, in); +} + +/* nb: any entries passed in are replaced with the results of the cls + * call, so caller does not need to clear entries between calls + */ +int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid, + const std::string& name_filter, const std::string& marker, uint32_t max, + std::list<rgw_cls_bi_entry> *entries, bool *is_truncated) +{ + bufferlist in, out; + rgw_cls_bi_list_op call; + call.name_filter = name_filter; + call.marker = marker; + call.max = max; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_LIST, in, out); + if (r < 0) + return r; + + rgw_cls_bi_list_ret op_ret; + auto iter = out.cbegin(); + try { + decode(op_ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + + entries->swap(op_ret.entries); + *is_truncated = op_ret.is_truncated; + + return 0; +} + +int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, + const cls_rgw_obj_key& key, bufferlist& olh_tag, + bool delete_marker, const string& op_tag, rgw_bucket_dir_entry_meta *meta, + uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace) +{ + librados::ObjectWriteOperation op; + cls_rgw_bucket_link_olh(op, key, olh_tag, delete_marker, op_tag, meta, + olh_epoch, unmod_since, high_precision_time, log_op, + zones_trace); + + return io_ctx.operate(oid, &op); +} + + +void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& key, + bufferlist& olh_tag, bool delete_marker, + const string& op_tag, rgw_bucket_dir_entry_meta *meta, + uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace) +{ + bufferlist in, out; + rgw_cls_link_olh_op call; + call.key = key; + call.olh_tag = string(olh_tag.c_str(), olh_tag.length()); + call.op_tag = op_tag; + call.delete_marker = delete_marker; + if (meta) { + call.meta = *meta; + } + call.olh_epoch = olh_epoch; + call.log_op = log_op; + call.unmod_since = unmod_since; + call.high_precision_time = high_precision_time; + call.zones_trace = zones_trace; + encode(call, in); + op.exec(RGW_CLASS, RGW_BUCKET_LINK_OLH, in); +} + +int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid, + const cls_rgw_obj_key& key, const string& op_tag, + const string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace) +{ + librados::ObjectWriteOperation op; + cls_rgw_bucket_unlink_instance(op, key, op_tag, olh_tag, olh_epoch, log_op, zones_trace); + int r = io_ctx.operate(oid, &op); + if (r < 0) + return r; + + return 0; +} + +void cls_rgw_bucket_unlink_instance(librados::ObjectWriteOperation& op, + const cls_rgw_obj_key& key, const string& op_tag, + const string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace) +{ + bufferlist in, out; + rgw_cls_unlink_instance_op call; + call.key = key; + call.op_tag = op_tag; + call.olh_epoch = olh_epoch; + call.olh_tag = olh_tag; + call.log_op = log_op; + call.zones_trace = zones_trace; + encode(call, in); + op.exec(RGW_CLASS, RGW_BUCKET_UNLINK_INSTANCE, in); +} + +void cls_rgw_get_olh_log(librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker, const string& olh_tag, rgw_cls_read_olh_log_ret& log_ret, int& op_ret) +{ + bufferlist in; + rgw_cls_read_olh_log_op call; + call.olh = olh; + call.ver_marker = ver_marker; + call.olh_tag = olh_tag; + encode(call, in); + op.exec(RGW_CLASS, RGW_BUCKET_READ_OLH_LOG, in, new ClsBucketIndexOpCtx<rgw_cls_read_olh_log_ret>(&log_ret, &op_ret)); +} + +int cls_rgw_get_olh_log(IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, uint64_t ver_marker, + const string& olh_tag, + rgw_cls_read_olh_log_ret& log_ret) +{ + int op_ret = 0; + librados::ObjectReadOperation op; + cls_rgw_get_olh_log(op, olh, ver_marker, olh_tag, log_ret, op_ret); + int r = io_ctx.operate(oid, &op, NULL); + if (r < 0) { + return r; + } + if (op_ret < 0) { + return op_ret; + } + + return r; +} + +void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, uint64_t ver, const string& olh_tag) +{ + bufferlist in; + rgw_cls_trim_olh_log_op call; + call.olh = olh; + call.ver = ver; + call.olh_tag = olh_tag; + encode(call, in); + op.exec(RGW_CLASS, RGW_BUCKET_TRIM_OLH_LOG, in); +} + +int cls_rgw_clear_olh(IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, const string& olh_tag) +{ + librados::ObjectWriteOperation op; + cls_rgw_clear_olh(op, olh, olh_tag); + + return io_ctx.operate(oid, &op); +} + +void cls_rgw_clear_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, const string& olh_tag) +{ + bufferlist in; + rgw_cls_bucket_clear_olh_op call; + call.key = olh; + call.olh_tag = olh_tag; + encode(call, in); + op.exec(RGW_CLASS, RGW_BUCKET_CLEAR_OLH, in); +} + +void cls_rgw_bilog_list(librados::ObjectReadOperation& op, + const std::string& marker, uint32_t max, + cls_rgw_bi_log_list_ret *pdata, int *ret) +{ + cls_rgw_bi_log_list_op call; + call.marker = marker; + call.max = max; + + bufferlist in; + encode(call, in); + op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret)); +} + +static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id, + BucketIndexShardsManager& marker_mgr, uint32_t max, + BucketIndexAioManager *manager, + cls_rgw_bi_log_list_ret *pdata) +{ + librados::ObjectReadOperation op; + cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid) +{ + return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]); +} + +void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op, + const std::string& start_marker, + const std::string& end_marker) +{ + cls_rgw_bi_log_trim_op call; + call.start_marker = start_marker; + call.end_marker = end_marker; + + bufferlist in; + encode(call, in); + op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in); +} + +static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id, + BucketIndexShardsManager& start_marker_mgr, + BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) { + cls_rgw_bi_log_trim_op call; + librados::ObjectWriteOperation op; + cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""), + end_marker_mgr.get(shard_id, "")); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid) +{ + return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager); +} + +static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager, + rgw_cls_check_index_ret *pdata) { + bufferlist in; + librados::ObjectReadOperation op; + op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>( + pdata, NULL)); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid) +{ + return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]); +} + +static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, + BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid) +{ + return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager); +} + +void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) +{ + updates.append(op); + encode(dirent, updates); +} + +void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) +{ + o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates); +} + +int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid) +{ + cls_rgw_obj_key empty_key; + string empty_prefix; + string empty_delimiter; + return issue_bucket_list_op(io_ctx, shard_id, oid, + empty_key, empty_prefix, empty_delimiter, + 0, false, &manager, &result[shard_id]); +} + +static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) +{ + bufferlist in; + librados::ObjectWriteOperation op; + op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid) +{ + return issue_resync_bi_log(io_ctx, shard_id, oid, &manager); +} + +static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) +{ + bufferlist in; + librados::ObjectWriteOperation op; + op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid) +{ + return issue_bi_log_stop(io_ctx, shard_id, oid, &manager); +} + +class GetDirHeaderCompletion : public ObjectOperationCompletion { + RGWGetDirHeader_CB *ret_ctx; +public: + explicit GetDirHeaderCompletion(RGWGetDirHeader_CB *_ctx) : ret_ctx(_ctx) {} + ~GetDirHeaderCompletion() override { + ret_ctx->put(); + } + void handle_completion(int r, bufferlist& outbl) override { + rgw_cls_list_ret ret; + try { + auto iter = outbl.cbegin(); + decode(ret, iter); + } catch (ceph::buffer::error& err) { + r = -EIO; + } + + ret_ctx->handle_response(r, ret.dir.header); + } +}; + +int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx) +{ + bufferlist in, out; + rgw_cls_list_op call; + call.num_entries = 0; + encode(call, in); + ObjectReadOperation op; + GetDirHeaderCompletion *cb = new GetDirHeaderCompletion(ctx); + op.exec(RGW_CLASS, RGW_BUCKET_LIST, in, cb); + AioCompletion *c = librados::Rados::aio_create_completion(nullptr, nullptr); + int r = io_ctx.aio_operate(oid, c, &op, NULL); + c->release(); + if (r < 0) + return r; + + return 0; +} + +int cls_rgw_usage_log_read(IoCtx& io_ctx, const string& oid, const string& user, const string& bucket, + uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, + string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage, + bool *is_truncated) +{ + if (is_truncated) + *is_truncated = false; + + bufferlist in, out; + rgw_cls_usage_log_read_op call; + call.start_epoch = start_epoch; + call.end_epoch = end_epoch; + call.owner = user; + call.max_entries = max_entries; + call.bucket = bucket; + call.iter = read_iter; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_USER_USAGE_LOG_READ, in, out); + if (r < 0) + return r; + + try { + rgw_cls_usage_log_read_ret result; + auto iter = out.cbegin(); + decode(result, iter); + read_iter = result.next_iter; + if (is_truncated) + *is_truncated = result.truncated; + + usage = result.usage; + } catch (ceph::buffer::error& e) { + return -EINVAL; + } + + return 0; +} + +int cls_rgw_usage_log_trim(IoCtx& io_ctx, const string& oid, const string& user, const string& bucket, + uint64_t start_epoch, uint64_t end_epoch) +{ + bufferlist in; + rgw_cls_usage_log_trim_op call; + call.start_epoch = start_epoch; + call.end_epoch = end_epoch; + call.user = user; + call.bucket = bucket; + encode(call, in); + + bool done = false; + do { + ObjectWriteOperation op; + op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_TRIM, in); + int r = io_ctx.operate(oid, &op); + if (r == -ENODATA) + done = true; + else if (r < 0) + return r; + } while (!done); + + return 0; +} + +void cls_rgw_usage_log_trim(librados::ObjectWriteOperation& op, const string& user, const string& bucket, uint64_t start_epoch, uint64_t end_epoch) +{ + bufferlist in; + rgw_cls_usage_log_trim_op call; + call.start_epoch = start_epoch; + call.end_epoch = end_epoch; + call.user = user; + call.bucket = bucket; + encode(call, in); + + op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_TRIM, in); +} + +void cls_rgw_usage_log_clear(ObjectWriteOperation& op) +{ + bufferlist in; + op.exec(RGW_CLASS, RGW_USAGE_LOG_CLEAR, in); +} + +void cls_rgw_usage_log_add(ObjectWriteOperation& op, rgw_usage_log_info& info) +{ + bufferlist in; + rgw_cls_usage_log_add_op call; + call.info = info; + encode(call, in); + op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_ADD, in); +} + +/* garbage collection */ + +void cls_rgw_gc_set_entry(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info) +{ + bufferlist in; + cls_rgw_gc_set_entry_op call; + call.expiration_secs = expiration_secs; + call.info = info; + encode(call, in); + op.exec(RGW_CLASS, RGW_GC_SET_ENTRY, in); +} + +void cls_rgw_gc_defer_entry(ObjectWriteOperation& op, uint32_t expiration_secs, const string& tag) +{ + bufferlist in; + cls_rgw_gc_defer_entry_op call; + call.expiration_secs = expiration_secs; + call.tag = tag; + encode(call, in); + op.exec(RGW_CLASS, RGW_GC_DEFER_ENTRY, in); +} + +int cls_rgw_gc_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only, + list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker) +{ + bufferlist in, out; + cls_rgw_gc_list_op call; + call.marker = marker; + call.max = max; + call.expired_only = expired_only; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_GC_LIST, in, out); + if (r < 0) + return r; + + cls_rgw_gc_list_ret ret; + try { + auto iter = out.cbegin(); + decode(ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + + entries.swap(ret.entries); + + if (truncated) + *truncated = ret.truncated; + next_marker = std::move(ret.next_marker); + return r; +} + +void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const vector<string>& tags) +{ + bufferlist in; + cls_rgw_gc_remove_op call; + call.tags = tags; + encode(call, in); + op.exec(RGW_CLASS, RGW_GC_REMOVE, in); +} + +int cls_rgw_lc_get_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head) +{ + bufferlist in, out; + int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_HEAD, in, out); + if (r < 0) + return r; + + cls_rgw_lc_get_head_ret ret; + try { + auto iter = out.cbegin(); + decode(ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + head = ret.head; + + return r; +} + +int cls_rgw_lc_put_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head) +{ + bufferlist in, out; + cls_rgw_lc_put_head_op call; + call.head = head; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_PUT_HEAD, in, out); + return r; +} + +int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, string& marker, + cls_rgw_lc_entry& entry) +{ + bufferlist in, out; + cls_rgw_lc_get_next_entry_op call; + call.marker = marker; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_NEXT_ENTRY, in, out); + if (r < 0) + return r; + + cls_rgw_lc_get_next_entry_ret ret; + try { + auto iter = out.cbegin(); + decode(ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + entry = ret.entry; + + return r; +} + +int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid, + const cls_rgw_lc_entry& entry) +{ + bufferlist in, out; + cls_rgw_lc_rm_entry_op call; + call.entry = entry; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_RM_ENTRY, in, out); + return r; +} + +int cls_rgw_lc_set_entry(IoCtx& io_ctx, const string& oid, + const cls_rgw_lc_entry& entry) +{ + bufferlist in, out; + cls_rgw_lc_set_entry_op call; + call.entry = entry; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_SET_ENTRY, in, out); + return r; +} + +int cls_rgw_lc_get_entry(IoCtx& io_ctx, const string& oid, + const std::string& marker, cls_rgw_lc_entry& entry) +{ + bufferlist in, out; + cls_rgw_lc_get_entry_op call{marker};; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_ENTRY, in, out); + + if (r < 0) { + return r; + } + + cls_rgw_lc_get_entry_ret ret; + try { + auto iter = out.cbegin(); + decode(ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + + entry = std::move(ret.entry); + return r; +} + +int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid, + const string& marker, + uint32_t max_entries, + vector<cls_rgw_lc_entry>& entries) +{ + bufferlist in, out; + cls_rgw_lc_list_entries_op op; + + entries.clear(); + + op.marker = marker; + op.max_entries = max_entries; + + encode(op, in); + + int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_LIST_ENTRIES, in, out); + if (r < 0) + return r; + + cls_rgw_lc_list_entries_ret ret; + try { + auto iter = out.cbegin(); + decode(ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + + std::sort(std::begin(ret.entries), std::end(ret.entries), + [](const cls_rgw_lc_entry& a, const cls_rgw_lc_entry& b) + { return a.bucket < b.bucket; }); + entries = std::move(ret.entries); + return r; +} + +void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry) +{ + bufferlist in; + cls_rgw_reshard_add_op call; + call.entry = entry; + encode(call, in); + op.exec(RGW_CLASS, RGW_RESHARD_ADD, in); +} + +int cls_rgw_reshard_list(librados::IoCtx& io_ctx, const string& oid, string& marker, uint32_t max, + list<cls_rgw_reshard_entry>& entries, bool* is_truncated) +{ + bufferlist in, out; + cls_rgw_reshard_list_op call; + call.marker = marker; + call.max = max; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_RESHARD_LIST, in, out); + if (r < 0) + return r; + + cls_rgw_reshard_list_ret op_ret; + auto iter = out.cbegin(); + try { + decode(op_ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + + entries.swap(op_ret.entries); + *is_truncated = op_ret.is_truncated; + + return 0; +} + +int cls_rgw_reshard_get(librados::IoCtx& io_ctx, const string& oid, cls_rgw_reshard_entry& entry) +{ + bufferlist in, out; + cls_rgw_reshard_get_op call; + call.entry = entry; + encode(call, in); + int r = io_ctx.exec(oid, RGW_CLASS, RGW_RESHARD_GET, in, out); + if (r < 0) + return r; + + cls_rgw_reshard_get_ret op_ret; + auto iter = out.cbegin(); + try { + decode(op_ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + + entry = op_ret.entry; + + return 0; +} + +void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry) +{ + bufferlist in; + cls_rgw_reshard_remove_op call; + call.tenant = entry.tenant; + call.bucket_name = entry.bucket_name; + call.bucket_id = entry.bucket_id; + encode(call, in); + op.exec(RGW_CLASS, RGW_RESHARD_REMOVE, in); +} + +int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid, + const cls_rgw_bucket_instance_entry& entry) +{ + bufferlist in, out; + cls_rgw_set_bucket_resharding_op call; + call.entry = entry; + encode(call, in); + return io_ctx.exec(oid, RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in, out); +} + +int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const string& oid) +{ + bufferlist in, out; + cls_rgw_clear_bucket_resharding_op call; + encode(call, in); + return io_ctx.exec(oid, RGW_CLASS, RGW_CLEAR_BUCKET_RESHARDING, in, out); +} + +int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const string& oid, + cls_rgw_bucket_instance_entry *entry) +{ + bufferlist in, out; + cls_rgw_get_bucket_resharding_op call; + encode(call, in); + int r= io_ctx.exec(oid, RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, out); + if (r < 0) + return r; + + cls_rgw_get_bucket_resharding_ret op_ret; + auto iter = out.cbegin(); + try { + decode(op_ret, iter); + } catch (ceph::buffer::error& err) { + return -EIO; + } + + *entry = op_ret.new_instance; + + return 0; +} + +void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err) +{ + bufferlist in, out; + cls_rgw_guard_bucket_resharding_op call; + call.ret_err = ret_err; + encode(call, in); + op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in); +} + +static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, + const int shard_id, const string& oid, + const cls_rgw_bucket_instance_entry& entry, + BucketIndexAioManager *manager) { + bufferlist in; + cls_rgw_set_bucket_resharding_op call; + call.entry = entry; + encode(call, in); + librados::ObjectWriteOperation op; + op.assert_exists(); // the shard must exist; if not fail rather than recreate + op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in); + return manager->aio_operate(io_ctx, shard_id, oid, &op); +} + +int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid) +{ + return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager); +} |