diff options
Diffstat (limited to 'src/rgw/rgw_sal_motr.cc')
-rw-r--r-- | src/rgw/rgw_sal_motr.cc | 4024 |
1 files changed, 4024 insertions, 0 deletions
diff --git a/src/rgw/rgw_sal_motr.cc b/src/rgw/rgw_sal_motr.cc new file mode 100644 index 000000000..de18ba944 --- /dev/null +++ b/src/rgw/rgw_sal_motr.cc @@ -0,0 +1,4024 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=2 sw=2 expandtab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * SAL implementation for the CORTX Motr backend + * + * Copyright (C) 2021 Seagate Technology LLC and/or its Affiliates + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <errno.h> +#include <stdlib.h> +#include <unistd.h> + +extern "C" { +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wextern-c-compat" +#pragma clang diagnostic ignored "-Wdeprecated-anon-enum-enum-conversion" +#include "motr/config.h" +#include "lib/types.h" +#include "lib/trace.h" // m0_trace_set_mmapped_buffer +#include "motr/layout.h" // M0_OBJ_LAYOUT_ID +#include "helpers/helpers.h" // m0_ufid_next +#pragma clang diagnostic pop +} + +#include "common/Clock.h" +#include "common/errno.h" + +#include "rgw_compression.h" +#include "rgw_sal.h" +#include "rgw_sal_motr.h" +#include "rgw_bucket.h" + +#define dout_subsys ceph_subsys_rgw + +using std::string; +using std::map; +using std::vector; +using std::set; +using std::list; + +static string mp_ns = RGW_OBJ_NS_MULTIPART; +static struct m0_ufid_generator ufid_gr; + +namespace rgw::sal { + +using ::ceph::encode; +using ::ceph::decode; + +static std::string motr_global_indices[] = { + RGW_MOTR_USERS_IDX_NAME, + RGW_MOTR_BUCKET_INST_IDX_NAME, + RGW_MOTR_BUCKET_HD_IDX_NAME, + RGW_IAM_MOTR_ACCESS_KEY, + RGW_IAM_MOTR_EMAIL_KEY +}; + +void MotrMetaCache::invalid(const DoutPrefixProvider *dpp, + const string& name) +{ + cache.invalidate_remove(dpp, name); +} + +int MotrMetaCache::put(const DoutPrefixProvider *dpp, + const string& name, + const bufferlist& data) +{ + ldpp_dout(dpp, 0) << "Put into cache: name = " << name << dendl; + + ObjectCacheInfo info; + info.status = 0; + info.data = data; + info.flags = CACHE_FLAG_DATA; + info.meta.mtime = ceph::real_clock::now(); + info.meta.size = data.length(); + cache.put(dpp, name, info, NULL); + + // Inform other rgw instances. Do nothing if it gets some error? + int rc = distribute_cache(dpp, name, info, UPDATE_OBJ); + if (rc < 0) + ldpp_dout(dpp, 0) << "ERROR: failed to distribute cache for " << name << dendl; + + return 0; +} + +int MotrMetaCache::get(const DoutPrefixProvider *dpp, + const string& name, + bufferlist& data) +{ + ObjectCacheInfo info; + uint32_t flags = CACHE_FLAG_DATA; + int rc = cache.get(dpp, name, info, flags, NULL); + if (rc == 0) { + if (info.status < 0) + return info.status; + + bufferlist& bl = info.data; + bufferlist::iterator it = bl.begin(); + data.clear(); + + it.copy_all(data); + ldpp_dout(dpp, 0) << "Cache hit: name = " << name << dendl; + return 0; + } + ldpp_dout(dpp, 0) << "Cache miss: name = " << name << ", rc = "<< rc << dendl; + if(rc == -ENODATA) + return -ENOENT; + + return rc; +} + +int MotrMetaCache::remove(const DoutPrefixProvider *dpp, + const string& name) + +{ + cache.invalidate_remove(dpp, name); + + ObjectCacheInfo info; + int rc = distribute_cache(dpp, name, info, INVALIDATE_OBJ); + if (rc < 0) { + ldpp_dout(dpp, 0) << "ERROR: " <<__func__<< "(): failed to distribute cache: rc =" << rc << dendl; + } + + ldpp_dout(dpp, 0) << "Remove from cache: name = " << name << dendl; + return 0; +} + +int MotrMetaCache::distribute_cache(const DoutPrefixProvider *dpp, + const string& normal_name, + ObjectCacheInfo& obj_info, int op) +{ + return 0; +} + +int MotrMetaCache::watch_cb(const DoutPrefixProvider *dpp, + uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) +{ + return 0; +} + +void MotrMetaCache::set_enabled(bool status) +{ + cache.set_enabled(status); +} + +// TODO: properly handle the number of key/value pairs to get in +// one query. Now the POC simply tries to retrieve all `max` number of pairs +// with starting key `marker`. +int MotrUser::list_buckets(const DoutPrefixProvider *dpp, const string& marker, + const string& end_marker, uint64_t max, bool need_stats, + BucketList &buckets, optional_yield y) +{ + int rc; + vector<string> keys(max); + vector<bufferlist> vals(max); + bool is_truncated = false; + + ldpp_dout(dpp, 20) <<__func__<< ": list_user_buckets: marker=" << marker + << " end_marker=" << end_marker + << " max=" << max << dendl; + + // Retrieve all `max` number of pairs. + buckets.clear(); + string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str(); + keys[0] = marker; + rc = store->next_query_by_name(user_info_iname, keys, vals); + if (rc < 0) { + ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; + return rc; + } + + // Process the returned pairs to add into BucketList. + uint64_t bcount = 0; + for (const auto& bl: vals) { + if (bl.length() == 0) + break; + + RGWBucketEnt ent; + auto iter = bl.cbegin(); + ent.decode(iter); + + std::time_t ctime = ceph::real_clock::to_time_t(ent.creation_time); + ldpp_dout(dpp, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl; + + if (!end_marker.empty() && + end_marker.compare(ent.bucket.marker) <= 0) + break; + + buckets.add(std::make_unique<MotrBucket>(this->store, ent, this)); + bcount++; + } + if (bcount == max) + is_truncated = true; + buckets.set_truncated(is_truncated); + + return 0; +} + +int MotrUser::create_bucket(const DoutPrefixProvider* dpp, + const rgw_bucket& b, + const std::string& zonegroup_id, + rgw_placement_rule& placement_rule, + std::string& swift_ver_location, + const RGWQuotaInfo* pquota_info, + const RGWAccessControlPolicy& policy, + Attrs& attrs, + RGWBucketInfo& info, + obj_version& ep_objv, + bool exclusive, + bool obj_lock_enabled, + bool* existed, + req_info& req_info, + std::unique_ptr<Bucket>* bucket_out, + optional_yield y) +{ + int ret; + std::unique_ptr<Bucket> bucket; + + // Look up the bucket. Create it if it doesn't exist. + ret = this->store->get_bucket(dpp, this, b, &bucket, y); + if (ret < 0 && ret != -ENOENT) + return ret; + + if (ret != -ENOENT) { + *existed = true; + // if (swift_ver_location.empty()) { + // swift_ver_location = bucket->get_info().swift_ver_location; + // } + // placement_rule.inherit_from(bucket->get_info().placement_rule); + + // TODO: ACL policy + // // don't allow changes to the acl policy + //RGWAccessControlPolicy old_policy(ctx()); + //int rc = rgw_op_get_bucket_policy_from_attr( + // dpp, this, u, bucket->get_attrs(), &old_policy, y); + //if (rc >= 0 && old_policy != policy) { + // bucket_out->swap(bucket); + // return -EEXIST; + //} + } else { + + placement_rule.name = "default"; + placement_rule.storage_class = "STANDARD"; + bucket = std::make_unique<MotrBucket>(store, b, this); + bucket->set_attrs(attrs); + *existed = false; + } + + if (!*existed){ + // TODO: how to handle zone and multi-site. + info.placement_rule = placement_rule; + info.bucket = b; + info.owner = this->get_info().user_id; + info.zonegroup = zonegroup_id; + if (obj_lock_enabled) + info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED; + bucket->set_version(ep_objv); + bucket->get_info() = info; + + // Create a new bucket: (1) Add a key/value pair in the + // bucket instance index. (2) Create a new bucket index. + MotrBucket* mbucket = static_cast<MotrBucket*>(bucket.get()); + ret = mbucket->put_info(dpp, y, ceph::real_time())? : + mbucket->create_bucket_index() ? : + mbucket->create_multipart_indices(); + if (ret < 0) + ldpp_dout(dpp, 0) << "ERROR: failed to create bucket indices! " << ret << dendl; + + // Insert the bucket entry into the user info index. + ret = mbucket->link_user(dpp, this, y); + if (ret < 0) + ldpp_dout(dpp, 0) << "ERROR: failed to add bucket entry! " << ret << dendl; + } else { + return -EEXIST; + // bucket->set_version(ep_objv); + // bucket->get_info() = info; + } + + bucket_out->swap(bucket); + + return ret; +} + +int MotrUser::read_attrs(const DoutPrefixProvider* dpp, optional_yield y) +{ + return 0; +} + +int MotrUser::read_stats(const DoutPrefixProvider *dpp, + optional_yield y, RGWStorageStats* stats, + ceph::real_time *last_stats_sync, + ceph::real_time *last_stats_update) +{ + return 0; +} + +/* stats - Not for first pass */ +int MotrUser::read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB *cb) +{ + return 0; +} + +int MotrUser::complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y) +{ + return 0; +} + +int MotrUser::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, + bool *is_truncated, RGWUsageIter& usage_iter, + map<rgw_user_bucket, rgw_usage_log_entry>& usage) +{ + return 0; +} + +int MotrUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) +{ + return 0; +} + +int MotrUser::load_user_from_idx(const DoutPrefixProvider *dpp, + MotrStore *store, + RGWUserInfo& info, map<string, bufferlist> *attrs, + RGWObjVersionTracker *objv_tr) +{ + struct MotrUserInfo muinfo; + bufferlist bl; + ldpp_dout(dpp, 20) << "info.user_id.id = " << info.user_id.id << dendl; + if (store->get_user_cache()->get(dpp, info.user_id.id, bl)) { + // Cache misses + int rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME, + M0_IC_GET, info.user_id.to_str(), bl); + ldpp_dout(dpp, 20) << "do_idx_op_by_name() = " << rc << dendl; + if (rc < 0) + return rc; + + // Put into cache. + store->get_user_cache()->put(dpp, info.user_id.id, bl); + } + + bufferlist& blr = bl; + auto iter = blr.cbegin(); + muinfo.decode(iter); + info = muinfo.info; + if (attrs) + *attrs = muinfo.attrs; + if (objv_tr) + { + objv_tr->read_version = muinfo.user_version; + objv_tracker.read_version = objv_tr->read_version; + } + + if (!info.access_keys.empty()) { + for(auto key : info.access_keys) { + access_key_tracker.insert(key.first); + } + } + + return 0; +} + +int MotrUser::load_user(const DoutPrefixProvider *dpp, + optional_yield y) +{ + ldpp_dout(dpp, 20) << "load user: user id = " << info.user_id.to_str() << dendl; + return load_user_from_idx(dpp, store, info, &attrs, &objv_tracker); +} + +int MotrUser::create_user_info_idx() +{ + string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str(); + return store->create_motr_idx_by_name(user_info_iname); +} + +int MotrUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) +{ + for (auto& it : new_attrs) + attrs[it.first] = it.second; + + return store_user(dpp, y, false); +} + +int MotrUser::store_user(const DoutPrefixProvider* dpp, + optional_yield y, bool exclusive, RGWUserInfo* old_info) +{ + bufferlist bl; + struct MotrUserInfo muinfo; + RGWUserInfo orig_info; + RGWObjVersionTracker objv_tr = {}; + obj_version& obj_ver = objv_tr.read_version; + + ldpp_dout(dpp, 20) << "Store_user(): User = " << info.user_id.id << dendl; + orig_info.user_id = info.user_id; + // XXX: we open and close motr idx 2 times in this method: + // 1) on load_user_from_idx() here and 2) on do_idx_op_by_name(PUT) below. + // Maybe this can be optimised later somewhow. + int rc = load_user_from_idx(dpp, store, orig_info, nullptr, &objv_tr); + ldpp_dout(dpp, 10) << "Get user: rc = " << rc << dendl; + + // Check if the user already exists + if (rc == 0 && obj_ver.ver > 0) { + if (old_info) + *old_info = orig_info; + + if (obj_ver.ver != objv_tracker.read_version.ver) { + rc = -ECANCELED; + ldpp_dout(dpp, 0) << "ERROR: User Read version mismatch" << dendl; + goto out; + } + + if (exclusive) + return rc; + + obj_ver.ver++; + } else { + obj_ver.ver = 1; + obj_ver.tag = "UserTAG"; + } + + // Insert the user to user info index. + muinfo.info = info; + muinfo.attrs = attrs; + muinfo.user_version = obj_ver; + muinfo.encode(bl); + rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME, + M0_IC_PUT, info.user_id.to_str(), bl); + ldpp_dout(dpp, 10) << "Store user to motr index: rc = " << rc << dendl; + if (rc == 0) { + objv_tracker.read_version = obj_ver; + objv_tracker.write_version = obj_ver; + } + + // Store access key in access key index + if (!info.access_keys.empty()) { + std::string access_key; + std::string secret_key; + std::map<std::string, RGWAccessKey>::const_iterator iter = info.access_keys.begin(); + const RGWAccessKey& k = iter->second; + access_key = k.id; + secret_key = k.key; + MotrAccessKey MGWUserKeys(access_key, secret_key, info.user_id.to_str()); + store->store_access_key(dpp, y, MGWUserKeys); + access_key_tracker.insert(access_key); + } + + // Check if any key need to be deleted + if (access_key_tracker.size() != info.access_keys.size()) { + std::string key_for_deletion; + for (auto key : access_key_tracker) { + if (!info.get_key(key)) { + key_for_deletion = key; + ldpp_dout(dpp, 0) << "Deleting access key: " << key_for_deletion << dendl; + store->delete_access_key(dpp, y, key_for_deletion); + if (rc < 0) { + ldpp_dout(dpp, 0) << "Unable to delete access key" << rc << dendl; + } + } + } + if(rc >= 0){ + access_key_tracker.erase(key_for_deletion); + } + } + + if (!info.user_email.empty()) { + MotrEmailInfo MGWEmailInfo(info.user_id.to_str(), info.user_email); + store->store_email_info(dpp, y, MGWEmailInfo); + } + + // Create user info index to store all buckets that are belong + // to this bucket. + rc = create_user_info_idx(); + if (rc < 0 && rc != -EEXIST) { + ldpp_dout(dpp, 0) << "Failed to create user info index: rc = " << rc << dendl; + goto out; + } + + // Put the user info into cache. + rc = store->get_user_cache()->put(dpp, info.user_id.id, bl); + +out: + return rc; +} + +int MotrUser::remove_user(const DoutPrefixProvider* dpp, optional_yield y) +{ + // Remove user info from cache + // Delete access keys for user + // Delete user info + // Delete user from user index + // Delete email for user - TODO + bufferlist bl; + int rc; + // Remove the user info from cache. + store->get_user_cache()->remove(dpp, info.user_id.id); + + // Delete all access key of user + if (!info.access_keys.empty()) { + for(auto acc_key = info.access_keys.begin(); acc_key != info.access_keys.end(); acc_key++) { + auto access_key = acc_key->first; + rc = store->delete_access_key(dpp, y, access_key); + // TODO + // Check error code for access_key does not exist + // Continue to next step only if delete failed because key doesn't exists + if (rc < 0){ + ldpp_dout(dpp, 0) << "Unable to delete access key" << rc << dendl; + } + } + } + + //Delete email id + if (!info.user_email.empty()) { + rc = store->do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, + M0_IC_DEL, info.user_email, bl); + if (rc < 0 && rc != -ENOENT) { + ldpp_dout(dpp, 0) << "Unable to delete email id " << rc << dendl; + } + } + + // Delete user info index + string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str(); + store->delete_motr_idx_by_name(user_info_iname); + ldpp_dout(dpp, 10) << "Deleted user info index - " << user_info_iname << dendl; + + // Delete user from user index + rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME, + M0_IC_DEL, info.user_id.to_str(), bl); + if (rc < 0){ + ldpp_dout(dpp, 0) << "Unable to delete user from user index " << rc << dendl; + return rc; + } + + // TODO + // Delete email for user + // rc = store->do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, + // M0_IC_DEL, info.user_email, bl); + // if (rc < 0){ + // ldpp_dout(dpp, 0) << "Unable to delete email for user" << rc << dendl; + // return rc; + // } + return 0; +} + +int MotrUser::verify_mfa(const std::string& mfa_str, bool* verified, const DoutPrefixProvider *dpp, optional_yield y) +{ + *verified = false; + return 0; +} + +int MotrBucket::remove_bucket(const DoutPrefixProvider *dpp, bool delete_children, bool forward_to_master, req_info* req_info, optional_yield y) +{ + int ret; + + ldpp_dout(dpp, 20) << "remove_bucket Entry=" << info.bucket.name << dendl; + + // Refresh info + ret = load_bucket(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: remove_bucket load_bucket failed rc=" << ret << dendl; + return ret; + } + + ListParams params; + params.list_versions = true; + params.allow_unordered = true; + + ListResults results; + + // 1. Check if Bucket has objects. + // If bucket contains objects and delete_children is true, delete all objects. + // Else throw error that bucket is not empty. + do { + results.objs.clear(); + + // Check if bucket has objects. + ret = list(dpp, params, 1000, results, y); + if (ret < 0) { + return ret; + } + + // If result contains entries, bucket is not empty. + if (!results.objs.empty() && !delete_children) { + ldpp_dout(dpp, 0) << "ERROR: could not remove non-empty bucket " << info.bucket.name << dendl; + return -ENOTEMPTY; + } + + for (const auto& obj : results.objs) { + rgw_obj_key key(obj.key); + /* xxx dang */ + ret = rgw_remove_object(dpp, store, this, key); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 0) << "ERROR: remove_bucket rgw_remove_object failed rc=" << ret << dendl; + return ret; + } + } + } while(results.is_truncated); + + // 2. Abort Mp uploads on the bucket. + ret = abort_multiparts(dpp, store->ctx()); + if (ret < 0) { + return ret; + } + + // 3. Remove mp index?? + string bucket_multipart_iname = "motr.rgw.bucket." + info.bucket.name + ".multiparts"; + ret = store->delete_motr_idx_by_name(bucket_multipart_iname); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove multipart index rc=" << ret << dendl; + return ret; + } + + // 4. Sync user stats. + ret = this->sync_user_stats(dpp, y); + if (ret < 0) { + ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl; + } + + // 5. Remove the bucket from user info index. (unlink user) + ret = this->unlink_user(dpp, owner, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: remove_bucket unlink_user failed rc=" << ret << dendl; + return ret; + } + + // 6. Remove bucket index. + string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name; + ret = store->delete_motr_idx_by_name(bucket_index_iname); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: remove_bucket unlink_user failed rc=" << ret << dendl; + return ret; + } + + // 7. Remove bucket instance info. + bufferlist bl; + ret = store->get_bucket_inst_cache()->remove(dpp, info.bucket.name); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove bucket instance from cache rc=" + << ret << dendl; + return ret; + } + + ret = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME, + M0_IC_DEL, info.bucket.name, bl); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove bucket instance rc=" + << ret << dendl; + return ret; + } + + // TODO : + // 8. Remove Notifications + // if bucket has notification definitions associated with it + // they should be removed (note that any pending notifications on the bucket are still going to be sent) + + // 9. Forward request to master. + if (forward_to_master) { + bufferlist in_data; + ret = store->forward_request_to_master(dpp, owner, &bucket_version, in_data, nullptr, *req_info, y); + if (ret < 0) { + if (ret == -ENOENT) { + /* adjust error, we want to return with NoSuchBucket and not + * NoSuchKey */ + ret = -ERR_NO_SUCH_BUCKET; + } + ldpp_dout(dpp, 0) << "ERROR: Forward to master failed. ret=" << ret << dendl; + return ret; + } + } + + ldpp_dout(dpp, 20) << "remove_bucket Exit=" << info.bucket.name << dendl; + + return ret; +} + +int MotrBucket::remove_bucket_bypass_gc(int concurrent_max, bool + keep_index_consistent, + optional_yield y, const + DoutPrefixProvider *dpp) { + return 0; +} + +int MotrBucket::put_info(const DoutPrefixProvider *dpp, bool exclusive, ceph::real_time _mtime) +{ + bufferlist bl; + struct MotrBucketInfo mbinfo; + + ldpp_dout(dpp, 20) << "put_info(): bucket_id=" << info.bucket.bucket_id << dendl; + mbinfo.info = info; + mbinfo.bucket_attrs = attrs; + mbinfo.mtime = _mtime; + mbinfo.bucket_version = bucket_version; + mbinfo.encode(bl); + + // Insert bucket instance using bucket's marker (string). + int rc = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME, + M0_IC_PUT, info.bucket.name, bl, !exclusive); + if (rc == 0) + store->get_bucket_inst_cache()->put(dpp, info.bucket.name, bl); + + return rc; +} + +int MotrBucket::load_bucket(const DoutPrefixProvider *dpp, optional_yield y, bool get_stats) +{ + // Get bucket instance using bucket's name (string). or bucket id? + bufferlist bl; + if (store->get_bucket_inst_cache()->get(dpp, info.bucket.name, bl)) { + // Cache misses. + ldpp_dout(dpp, 20) << "load_bucket(): name=" << info.bucket.name << dendl; + int rc = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME, + M0_IC_GET, info.bucket.name, bl); + ldpp_dout(dpp, 20) << "load_bucket(): rc=" << rc << dendl; + if (rc < 0) + return rc; + store->get_bucket_inst_cache()->put(dpp, info.bucket.name, bl); + } + + struct MotrBucketInfo mbinfo; + bufferlist& blr = bl; + auto iter =blr.cbegin(); + mbinfo.decode(iter); //Decode into MotrBucketInfo. + + info = mbinfo.info; + ldpp_dout(dpp, 20) << "load_bucket(): bucket_id=" << info.bucket.bucket_id << dendl; + rgw_placement_rule placement_rule; + placement_rule.name = "default"; + placement_rule.storage_class = "STANDARD"; + info.placement_rule = placement_rule; + + attrs = mbinfo.bucket_attrs; + mtime = mbinfo.mtime; + bucket_version = mbinfo.bucket_version; + + return 0; +} + +int MotrBucket::link_user(const DoutPrefixProvider* dpp, User* new_user, optional_yield y) +{ + bufferlist bl; + RGWBucketEnt new_bucket; + ceph::real_time creation_time = get_creation_time(); + + // RGWBucketEnt or cls_user_bucket_entry is the structure that is stored. + new_bucket.bucket = info.bucket; + new_bucket.size = 0; + if (real_clock::is_zero(creation_time)) + creation_time = ceph::real_clock::now(); + new_bucket.creation_time = creation_time; + new_bucket.encode(bl); + std::time_t ctime = ceph::real_clock::to_time_t(new_bucket.creation_time); + ldpp_dout(dpp, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl; + + // Insert the user into the user info index. + string user_info_idx_name = "motr.rgw.user.info." + new_user->get_info().user_id.to_str(); + return store->do_idx_op_by_name(user_info_idx_name, + M0_IC_PUT, info.bucket.name, bl); + +} + +int MotrBucket::unlink_user(const DoutPrefixProvider* dpp, User* new_user, optional_yield y) +{ + // Remove the user into the user info index. + bufferlist bl; + string user_info_idx_name = "motr.rgw.user.info." + new_user->get_info().user_id.to_str(); + return store->do_idx_op_by_name(user_info_idx_name, + M0_IC_DEL, info.bucket.name, bl); +} + +/* stats - Not for first pass */ +int MotrBucket::read_stats(const DoutPrefixProvider *dpp, + const bucket_index_layout_generation& idx_layout, int shard_id, + std::string *bucket_ver, std::string *master_ver, + std::map<RGWObjCategory, RGWStorageStats>& stats, + std::string *max_marker, bool *syncstopped) +{ + return 0; +} + +int MotrBucket::create_bucket_index() +{ + string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name; + return store->create_motr_idx_by_name(bucket_index_iname); +} + +int MotrBucket::create_multipart_indices() +{ + int rc; + + // Bucket multipart index stores in-progress multipart uploads. + // Key is the object name + upload_id, value is a rgw_bucket_dir_entry. + // An entry is inserted when a multipart upload is initialised ( + // MotrMultipartUpload::init()) and will be removed when the upload + // is completed (MotrMultipartUpload::complete()). + // MotrBucket::list_multiparts() will scan this index to return all + // in-progress multipart uploads in the bucket. + string bucket_multipart_iname = "motr.rgw.bucket." + info.bucket.name + ".multiparts"; + rc = store->create_motr_idx_by_name(bucket_multipart_iname); + if (rc < 0) { + ldout(store->cctx, 0) << "Failed to create bucket multipart index " << bucket_multipart_iname << dendl; + return rc; + } + + return 0; +} + + +int MotrBucket::read_stats_async(const DoutPrefixProvider *dpp, + const bucket_index_layout_generation& idx_layout, + int shard_id, RGWGetBucketStats_CB *ctx) +{ + return 0; +} + +int MotrBucket::sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) +{ + return 0; +} + +int MotrBucket::update_container_stats(const DoutPrefixProvider *dpp) +{ + return 0; +} + +int MotrBucket::check_bucket_shards(const DoutPrefixProvider *dpp) +{ + return 0; +} + +int MotrBucket::chown(const DoutPrefixProvider *dpp, User& new_user, optional_yield y) +{ + // TODO: update bucket with new owner + return 0; +} + +/* Make sure to call load_bucket() if you need it first */ +bool MotrBucket::is_owner(User* user) +{ + return (info.owner.compare(user->get_id()) == 0); +} + +int MotrBucket::check_empty(const DoutPrefixProvider *dpp, optional_yield y) +{ + /* XXX: Check if bucket contains any objects */ + return 0; +} + +int MotrBucket::check_quota(const DoutPrefixProvider *dpp, RGWQuota& quota, uint64_t obj_size, + optional_yield y, bool check_size_only) +{ + /* Not Handled in the first pass as stats are also needed */ + return 0; +} + +int MotrBucket::merge_and_store_attrs(const DoutPrefixProvider *dpp, Attrs& new_attrs, optional_yield y) +{ + for (auto& it : new_attrs) + attrs[it.first] = it.second; + + return put_info(dpp, y, ceph::real_time()); +} + +int MotrBucket::try_refresh_info(const DoutPrefixProvider *dpp, ceph::real_time *pmtime) +{ + return 0; +} + +/* XXX: usage and stats not supported in the first pass */ +int MotrBucket::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, + uint32_t max_entries, bool *is_truncated, + RGWUsageIter& usage_iter, + map<rgw_user_bucket, rgw_usage_log_entry>& usage) +{ + return 0; +} + +int MotrBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) +{ + return 0; +} + +int MotrBucket::remove_objs_from_index(const DoutPrefixProvider *dpp, std::list<rgw_obj_index_key>& objs_to_unlink) +{ + /* XXX: CHECK: Unlike RadosStore, there is no seperate bucket index table. + * Delete all the object in the list from the object table of this + * bucket + */ + return 0; +} + +int MotrBucket::check_index(const DoutPrefixProvider *dpp, std::map<RGWObjCategory, RGWStorageStats>& existing_stats, std::map<RGWObjCategory, RGWStorageStats>& calculated_stats) +{ + /* XXX: stats not supported yet */ + return 0; +} + +int MotrBucket::rebuild_index(const DoutPrefixProvider *dpp) +{ + /* there is no index table in dbstore. Not applicable */ + return 0; +} + +int MotrBucket::set_tag_timeout(const DoutPrefixProvider *dpp, uint64_t timeout) +{ + /* XXX: CHECK: set tag timeout for all the bucket objects? */ + return 0; +} + +int MotrBucket::purge_instance(const DoutPrefixProvider *dpp) +{ + /* XXX: CHECK: for dbstore only single instance supported. + * Remove all the objects for that instance? Anything extra needed? + */ + return 0; +} + +int MotrBucket::set_acl(const DoutPrefixProvider *dpp, RGWAccessControlPolicy &acl, optional_yield y) +{ + int ret = 0; + bufferlist aclbl; + + acls = acl; + acl.encode(aclbl); + + Attrs attrs = get_attrs(); + attrs[RGW_ATTR_ACL] = aclbl; + + // TODO: update bucket entry with the new attrs + + return ret; +} + +std::unique_ptr<Object> MotrBucket::get_object(const rgw_obj_key& k) +{ + return std::make_unique<MotrObject>(this->store, k, this); +} + +int MotrBucket::list(const DoutPrefixProvider *dpp, ListParams& params, int max, ListResults& results, optional_yield y) +{ + int rc; + vector<string> keys(max); + vector<bufferlist> vals(max); + + ldpp_dout(dpp, 20) << "bucket=" << info.bucket.name + << " prefix=" << params.prefix + << " marker=" << params.marker + << " max=" << max << dendl; + + // Retrieve all `max` number of pairs. + string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name; + keys[0] = params.marker.empty() ? params.prefix : + params.marker.get_oid(); + rc = store->next_query_by_name(bucket_index_iname, keys, vals, params.prefix, + params.delim); + if (rc < 0) { + ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; + return rc; + } + + // Process the returned pairs to add into ListResults. + int i = 0; + for (; i < rc; ++i) { + if (vals[i].length() == 0) { + results.common_prefixes[keys[i]] = true; + } else { + rgw_bucket_dir_entry ent; + auto iter = vals[i].cbegin(); + ent.decode(iter); + if (params.list_versions || ent.is_visible()) + results.objs.emplace_back(std::move(ent)); + } + } + + if (i == max) { + results.is_truncated = true; + results.next_marker = keys[max - 1] + " "; + } else { + results.is_truncated = false; + } + + return 0; +} + +int MotrBucket::list_multiparts(const DoutPrefixProvider *dpp, + const string& prefix, + string& marker, + const string& delim, + const int& max_uploads, + vector<std::unique_ptr<MultipartUpload>>& uploads, + map<string, bool> *common_prefixes, + bool *is_truncated) +{ + int rc; + vector<string> key_vec(max_uploads); + vector<bufferlist> val_vec(max_uploads); + + string bucket_multipart_iname = + "motr.rgw.bucket." + this->get_name() + ".multiparts"; + key_vec[0].clear(); + key_vec[0].assign(marker.begin(), marker.end()); + rc = store->next_query_by_name(bucket_multipart_iname, key_vec, val_vec); + if (rc < 0) { + ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; + return rc; + } + + // Process the returned pairs to add into ListResults. + // The POC can only support listing all objects or selecting + // with prefix. + int ocount = 0; + rgw_obj_key last_obj_key; + *is_truncated = false; + for (const auto& bl: val_vec) { + if (bl.length() == 0) + break; + + rgw_bucket_dir_entry ent; + auto iter = bl.cbegin(); + ent.decode(iter); + + if (prefix.size() && + (0 != ent.key.name.compare(0, prefix.size(), prefix))) { + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << + ": skippping \"" << ent.key << + "\" because doesn't match prefix" << dendl; + continue; + } + + rgw_obj_key key(ent.key); + uploads.push_back(this->get_multipart_upload(key.name)); + last_obj_key = key; + ocount++; + if (ocount == max_uploads) { + *is_truncated = true; + break; + } + } + marker = last_obj_key.name; + + // What is common prefix? We don't handle it for now. + + return 0; + +} + +int MotrBucket::abort_multiparts(const DoutPrefixProvider *dpp, CephContext *cct) +{ + return 0; +} + +void MotrStore::finalize(void) +{ + // close connection with motr + m0_client_fini(this->instance, true); +} + +const std::string& MotrZoneGroup::get_endpoint() const +{ + if (!group.endpoints.empty()) { + return group.endpoints.front(); + } else { + // use zonegroup's master zone endpoints + auto z = group.zones.find(group.master_zone); + if (z != group.zones.end() && !z->second.endpoints.empty()) { + return z->second.endpoints.front(); + } + } + return empty; +} + +bool MotrZoneGroup::placement_target_exists(std::string& target) const +{ + return !!group.placement_targets.count(target); +} + +void MotrZoneGroup::get_placement_target_names(std::set<std::string>& names) const +{ + for (const auto& target : group.placement_targets) { + names.emplace(target.second.name); + } +} + +int MotrZoneGroup::get_placement_tier(const rgw_placement_rule& rule, + std::unique_ptr<PlacementTier>* tier) +{ + std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer; + titer = group.placement_targets.find(rule.name); + if (titer == group.placement_targets.end()) { + return -ENOENT; + } + + const auto& target_rule = titer->second; + std::map<std::string, RGWZoneGroupPlacementTier>::const_iterator ttier; + ttier = target_rule.tier_targets.find(rule.storage_class); + if (ttier == target_rule.tier_targets.end()) { + // not found + return -ENOENT; + } + + PlacementTier* t; + t = new MotrPlacementTier(store, ttier->second); + if (!t) + return -ENOMEM; + + tier->reset(t); + return 0; +} + +ZoneGroup& MotrZone::get_zonegroup() +{ + return zonegroup; +} + +const std::string& MotrZone::get_id() +{ + return zone_params->get_id(); +} + +const std::string& MotrZone::get_name() const +{ + return zone_params->get_name(); +} + +bool MotrZone::is_writeable() +{ + return true; +} + +bool MotrZone::get_redirect_endpoint(std::string* endpoint) +{ + return false; +} + +bool MotrZone::has_zonegroup_api(const std::string& api) const +{ + return (zonegroup->api_name == api); +} + +const std::string& MotrZone::get_current_period_id() +{ + return current_period->get_id(); +} + +std::unique_ptr<LuaManager> MotrStore::get_lua_manager() +{ + return std::make_unique<MotrLuaManager>(this); +} + +int MotrObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **_state, optional_yield y, bool follow_olh) +{ + // Get object's metadata (those stored in rgw_bucket_dir_entry). + bufferlist bl; + if (this->store->get_obj_meta_cache()->get(dpp, this->get_key().get_oid(), bl)) { + // Cache misses. + string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name(); + int rc = this->store->do_idx_op_by_name(bucket_index_iname, + M0_IC_GET, this->get_key().get_oid(), bl); + if (rc < 0) { + ldpp_dout(dpp, 0) << "Failed to get object's entry from bucket index. " << dendl; + return rc; + } + + // Put into cache. + this->store->get_obj_meta_cache()->put(dpp, this->get_key().get_oid(), bl); + } + + rgw_bucket_dir_entry ent; + bufferlist& blr = bl; + auto iter = blr.cbegin(); + ent.decode(iter); + + // Set object's type. + this->category = ent.meta.category; + + // Set object state. + state.exists = true; + state.size = ent.meta.size; + state.accounted_size = ent.meta.size; + state.mtime = ent.meta.mtime; + + state.has_attrs = true; + bufferlist etag_bl; + string& etag = ent.meta.etag; + ldpp_dout(dpp, 20) <<__func__<< ": object's etag: " << ent.meta.etag << dendl; + etag_bl.append(etag); + state.attrset[RGW_ATTR_ETAG] = etag_bl; + + return 0; +} + +MotrObject::~MotrObject() { + this->close_mobj(); +} + +// int MotrObject::read_attrs(const DoutPrefixProvider* dpp, Motr::Object::Read &read_op, optional_yield y, rgw_obj* target_obj) +// { +// read_op.params.attrs = &attrs; +// read_op.params.target_obj = target_obj; +// read_op.params.obj_size = &obj_size; +// read_op.params.lastmod = &mtime; +// +// return read_op.prepare(dpp); +// } + +int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) +{ + // TODO: implement + ldpp_dout(dpp, 20) <<__func__<< ": MotrObject::set_obj_attrs()" << dendl; + return 0; +} + +int MotrObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj) +{ + if (this->category == RGWObjCategory::MultiMeta) + return 0; + + string bname, key; + if (target_obj) { + bname = target_obj->bucket.name; + key = target_obj->key.get_oid(); + } else { + bname = this->get_bucket()->get_name(); + key = this->get_key().get_oid(); + } + ldpp_dout(dpp, 20) << "MotrObject::get_obj_attrs(): " + << bname << "/" << key << dendl; + + // Get object's metadata (those stored in rgw_bucket_dir_entry). + bufferlist bl; + if (this->store->get_obj_meta_cache()->get(dpp, key, bl)) { + // Cache misses. + string bucket_index_iname = "motr.rgw.bucket.index." + bname; + int rc = this->store->do_idx_op_by_name(bucket_index_iname, M0_IC_GET, key, bl); + if (rc < 0) { + ldpp_dout(dpp, 0) << "Failed to get object's entry from bucket index. " << dendl; + return rc; + } + + // Put into cache. + this->store->get_obj_meta_cache()->put(dpp, key, bl); + } + + rgw_bucket_dir_entry ent; + bufferlist& blr = bl; + auto iter = blr.cbegin(); + ent.decode(iter); + decode(attrs, iter); + + return 0; +} + +int MotrObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) +{ + rgw_obj target = get_obj(); + int r = get_obj_attrs(y, dpp, &target); + if (r < 0) { + return r; + } + set_atomic(); + attrs[attr_name] = attr_val; + return set_obj_attrs(dpp, &attrs, nullptr, y); +} + +int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) +{ + rgw_obj target = get_obj(); + Attrs rmattr; + bufferlist bl; + + set_atomic(); + rmattr[attr_name] = bl; + return set_obj_attrs(dpp, nullptr, &rmattr, y); +} + +bool MotrObject::is_expired() { + return false; +} + +// Taken from rgw_rados.cc +void MotrObject::gen_rand_obj_instance_name() +{ + enum {OBJ_INSTANCE_LEN = 32}; + char buf[OBJ_INSTANCE_LEN + 1]; + + gen_rand_alphanumeric_no_underscore(store->ctx(), buf, OBJ_INSTANCE_LEN); + state.obj.key.set_instance(buf); +} + +int MotrObject::omap_get_vals(const DoutPrefixProvider *dpp, const std::string& marker, uint64_t count, + std::map<std::string, bufferlist> *m, + bool* pmore, optional_yield y) +{ + return 0; +} + +int MotrObject::omap_get_all(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist> *m, + optional_yield y) +{ + return 0; +} + +int MotrObject::omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid, + const std::set<std::string>& keys, + Attrs* vals) +{ + return 0; +} + +int MotrObject::omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val, + bool must_exist, optional_yield y) +{ + return 0; +} + +int MotrObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_yield y) +{ + return 0; +} + +std::unique_ptr<MPSerializer> MotrObject::get_serializer(const DoutPrefixProvider *dpp, + const std::string& lock_name) +{ + return std::make_unique<MPMotrSerializer>(dpp, store, this, lock_name); +} + +int MotrObject::transition(Bucket* bucket, + const rgw_placement_rule& placement_rule, + const real_time& mtime, + uint64_t olh_epoch, + const DoutPrefixProvider* dpp, + optional_yield y) +{ + return 0; +} + +bool MotrObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) +{ + /* XXX: support single default zone and zonegroup for now */ + return true; +} + +int MotrObject::dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) +{ + return 0; +} + +std::unique_ptr<Object::ReadOp> MotrObject::get_read_op() +{ + return std::make_unique<MotrObject::MotrReadOp>(this); +} + +MotrObject::MotrReadOp::MotrReadOp(MotrObject *_source) : + source(_source) +{ } + +int MotrObject::MotrReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp) +{ + int rc; + ldpp_dout(dpp, 20) <<__func__<< ": bucket=" << source->get_bucket()->get_name() << dendl; + + rgw_bucket_dir_entry ent; + rc = source->get_bucket_dir_ent(dpp, ent); + if (rc < 0) + return rc; + + // Set source object's attrs. The attrs is key/value map and is used + // in send_response_data() to set attributes, including etag. + bufferlist etag_bl; + string& etag = ent.meta.etag; + ldpp_dout(dpp, 20) <<__func__<< ": object's etag: " << ent.meta.etag << dendl; + etag_bl.append(etag.c_str(), etag.size()); + source->get_attrs().emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl)); + + source->set_key(ent.key); + source->set_obj_size(ent.meta.size); + source->category = ent.meta.category; + *params.lastmod = ent.meta.mtime; + + if (params.mod_ptr || params.unmod_ptr) { + // Convert all times go GMT to make them compatible + obj_time_weight src_weight; + src_weight.init(*params.lastmod, params.mod_zone_id, params.mod_pg_ver); + src_weight.high_precision = params.high_precision_time; + + obj_time_weight dest_weight; + dest_weight.high_precision = params.high_precision_time; + + // Check if-modified-since condition + if (params.mod_ptr && !params.if_nomatch) { + dest_weight.init(*params.mod_ptr, params.mod_zone_id, params.mod_pg_ver); + ldpp_dout(dpp, 10) << "If-Modified-Since: " << dest_weight << " & " + << "Last-Modified: " << src_weight << dendl; + if (!(dest_weight < src_weight)) { + return -ERR_NOT_MODIFIED; + } + } + + // Check if-unmodified-since condition + if (params.unmod_ptr && !params.if_match) { + dest_weight.init(*params.unmod_ptr, params.mod_zone_id, params.mod_pg_ver); + ldpp_dout(dpp, 10) << "If-UnModified-Since: " << dest_weight << " & " + << "Last-Modified: " << src_weight << dendl; + if (dest_weight < src_weight) { + return -ERR_PRECONDITION_FAILED; + } + } + } + // Check if-match condition + if (params.if_match) { + string if_match_str = rgw_string_unquote(params.if_match); + ldpp_dout(dpp, 10) << "ETag: " << etag << " & " + << "If-Match: " << if_match_str << dendl; + if (if_match_str.compare(etag) != 0) { + return -ERR_PRECONDITION_FAILED; + } + } + // Check if-none-match condition + if (params.if_nomatch) { + string if_nomatch_str = rgw_string_unquote(params.if_nomatch); + ldpp_dout(dpp, 10) << "ETag: " << etag << " & " + << "If-NoMatch: " << if_nomatch_str << dendl; + if (if_nomatch_str.compare(etag) == 0) { + return -ERR_NOT_MODIFIED; + } + } + + // Skip opening an empty object. + if(source->get_obj_size() == 0) + return 0; + + // Open the object here. + if (source->category == RGWObjCategory::MultiMeta) { + ldpp_dout(dpp, 20) <<__func__<< ": open obj parts..." << dendl; + rc = source->get_part_objs(dpp, this->part_objs)? : + source->open_part_objs(dpp, this->part_objs); + return rc; + } else { + ldpp_dout(dpp, 20) <<__func__<< ": open object..." << dendl; + return source->open_mobj(dpp); + } +} + +int MotrObject::MotrReadOp::read(int64_t off, int64_t end, bufferlist& bl, optional_yield y, const DoutPrefixProvider* dpp) +{ + ldpp_dout(dpp, 20) << "MotrReadOp::read(): sync read." << dendl; + return 0; +} + +// RGWGetObj::execute() calls ReadOp::iterate() to read object from 'off' to 'end'. +// The returned data is processed in 'cb' which is a chain of post-processing +// filters such as decompression, de-encryption and sending back data to client +// (RGWGetObj_CB::handle_dta which in turn calls RGWGetObj::get_data_cb() to +// send data back.). +// +// POC implements a simple sync version of iterate() function in which it reads +// a block of data each time and call 'cb' for post-processing. +int MotrObject::MotrReadOp::iterate(const DoutPrefixProvider* dpp, int64_t off, int64_t end, RGWGetDataCB* cb, optional_yield y) +{ + int rc; + + if (source->category == RGWObjCategory::MultiMeta) + rc = source->read_multipart_obj(dpp, off, end, cb, part_objs); + else + rc = source->read_mobj(dpp, off, end, cb); + + return rc; +} + +int MotrObject::MotrReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y) +{ + //return 0; + return -ENODATA; +} + +std::unique_ptr<Object::DeleteOp> MotrObject::get_delete_op() +{ + return std::make_unique<MotrObject::MotrDeleteOp>(this); +} + +MotrObject::MotrDeleteOp::MotrDeleteOp(MotrObject *_source) : + source(_source) +{ } + +// Implementation of DELETE OBJ also requires MotrObject::get_obj_state() +// to retrieve and set object's state from object's metadata. +// +// TODO: +// 1. The POC only remove the object's entry from bucket index and delete +// corresponding Motr objects. It doesn't handle the DeleteOp::params. +// Delete::delete_obj() in rgw_rados.cc shows how rados backend process the +// params. +// 2. Delete an object when its versioning is turned on. +int MotrObject::MotrDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y) +{ + ldpp_dout(dpp, 20) << "delete " << source->get_key().get_oid() << " from " << source->get_bucket()->get_name() << dendl; + + rgw_bucket_dir_entry ent; + int rc = source->get_bucket_dir_ent(dpp, ent); + if (rc < 0) { + return rc; + } + + //TODO: When integrating with background GC for object deletion, + // we should consider adding object entry to GC before deleting the metadata. + // Delete from the cache first. + source->store->get_obj_meta_cache()->remove(dpp, source->get_key().get_oid()); + + // Delete the object's entry from the bucket index. + bufferlist bl; + string bucket_index_iname = "motr.rgw.bucket.index." + source->get_bucket()->get_name(); + rc = source->store->do_idx_op_by_name(bucket_index_iname, + M0_IC_DEL, source->get_key().get_oid(), bl); + if (rc < 0) { + ldpp_dout(dpp, 0) << "Failed to del object's entry from bucket index. " << dendl; + return rc; + } + + if (ent.meta.size == 0) { + ldpp_dout(dpp, 0) << __func__ << ": Object size is 0, not deleting motr object." << dendl; + return 0; + } + // Remove the motr objects. + if (source->category == RGWObjCategory::MultiMeta) + rc = source->delete_part_objs(dpp); + else + rc = source->delete_mobj(dpp); + if (rc < 0) { + ldpp_dout(dpp, 0) << "Failed to delete the object from Motr. " << dendl; + return rc; + } + + //result.delete_marker = parent_op.result.delete_marker; + //result.version_id = parent_op.result.version_id; + return 0; +} + +int MotrObject::delete_object(const DoutPrefixProvider* dpp, optional_yield y, bool prevent_versioning) +{ + MotrObject::MotrDeleteOp del_op(this); + del_op.params.bucket_owner = bucket->get_info().owner; + del_op.params.versioning_status = bucket->get_info().versioning_status(); + + return del_op.delete_obj(dpp, y); +} + +int MotrObject::delete_obj_aio(const DoutPrefixProvider* dpp, RGWObjState* astate, + Completions* aio, bool keep_index_consistent, + optional_yield y) +{ + /* XXX: Make it async */ + return 0; +} + +int MotrObject::copy_object(User* user, + req_info* info, + const rgw_zone_id& source_zone, + rgw::sal::Object* dest_object, + rgw::sal::Bucket* dest_bucket, + rgw::sal::Bucket* src_bucket, + const rgw_placement_rule& dest_placement, + ceph::real_time* src_mtime, + ceph::real_time* mtime, + const ceph::real_time* mod_ptr, + const ceph::real_time* unmod_ptr, + bool high_precision_time, + const char* if_match, + const char* if_nomatch, + AttrsMod attrs_mod, + bool copy_if_newer, + Attrs& attrs, + RGWObjCategory category, + uint64_t olh_epoch, + boost::optional<ceph::real_time> delete_at, + std::string* version_id, + std::string* tag, + std::string* etag, + void (*progress_cb)(off_t, void *), + void* progress_data, + const DoutPrefixProvider* dpp, + optional_yield y) +{ + return 0; +} + +int MotrObject::swift_versioning_restore(bool& restored, + const DoutPrefixProvider* dpp) +{ + return 0; +} + +int MotrObject::swift_versioning_copy(const DoutPrefixProvider* dpp, + optional_yield y) +{ + return 0; +} + +MotrAtomicWriter::MotrAtomicWriter(const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* obj, + MotrStore* _store, + const rgw_user& _owner, + const rgw_placement_rule *_ptail_placement_rule, + uint64_t _olh_epoch, + const std::string& _unique_tag) : + StoreWriter(dpp, y), + store(_store), + owner(_owner), + ptail_placement_rule(_ptail_placement_rule), + olh_epoch(_olh_epoch), + unique_tag(_unique_tag), + obj(_store, obj->get_key(), obj->get_bucket()), + old_obj(_store, obj->get_key(), obj->get_bucket()) {} + +static const unsigned MAX_BUFVEC_NR = 256; + +int MotrAtomicWriter::prepare(optional_yield y) +{ + total_data_size = 0; + + if (obj.is_opened()) + return 0; + + rgw_bucket_dir_entry ent; + int rc = old_obj.get_bucket_dir_ent(dpp, ent); + if (rc == 0) { + ldpp_dout(dpp, 20) << __func__ << ": object exists." << dendl; + } + + rc = m0_bufvec_empty_alloc(&buf, MAX_BUFVEC_NR) ?: + m0_bufvec_alloc(&attr, MAX_BUFVEC_NR, 1) ?: + m0_indexvec_alloc(&ext, MAX_BUFVEC_NR); + if (rc != 0) + this->cleanup(); + + return rc; +} + +int MotrObject::create_mobj(const DoutPrefixProvider *dpp, uint64_t sz) +{ + if (mobj != nullptr) { + ldpp_dout(dpp, 0) <<__func__<< "ERROR: object is already opened" << dendl; + return -EINVAL; + } + + int rc = m0_ufid_next(&ufid_gr, 1, &meta.oid); + if (rc != 0) { + ldpp_dout(dpp, 0) <<__func__<< "ERROR: m0_ufid_next() failed: " << rc << dendl; + return rc; + } + + char fid_str[M0_FID_STR_LEN]; + snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); + ldpp_dout(dpp, 20) <<__func__<< ": sz=" << sz << " oid=" << fid_str << dendl; + + int64_t lid = m0_layout_find_by_objsz(store->instance, nullptr, sz); + M0_ASSERT(lid > 0); + + M0_ASSERT(mobj == nullptr); + mobj = new m0_obj(); + m0_obj_init(mobj, &store->container.co_realm, &meta.oid, lid); + + struct m0_op *op = nullptr; + mobj->ob_entity.en_flags |= M0_ENF_META; + rc = m0_entity_create(nullptr, &mobj->ob_entity, &op); + if (rc != 0) { + this->close_mobj(); + ldpp_dout(dpp, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl; + return rc; + } + ldpp_dout(dpp, 20) <<__func__<< ": call m0_op_launch()..." << dendl; + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + + if (rc != 0) { + this->close_mobj(); + ldpp_dout(dpp, 0) << "ERROR: failed to create motr object: " << rc << dendl; + return rc; + } + + meta.layout_id = mobj->ob_attr.oa_layout_id; + meta.pver = mobj->ob_attr.oa_pver; + ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << meta.layout_id + << std::dec << " rc=" << rc << dendl; + + // TODO: add key:user+bucket+key+obj.meta.oid value:timestamp to + // gc.queue.index. See more at github.com/Seagate/cortx-rgw/issues/7. + + return rc; +} + +int MotrObject::open_mobj(const DoutPrefixProvider *dpp) +{ + char fid_str[M0_FID_STR_LEN]; + snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); + ldpp_dout(dpp, 20) <<__func__<< ": oid=" << fid_str << dendl; + + int rc; + if (meta.layout_id == 0) { + rgw_bucket_dir_entry ent; + rc = this->get_bucket_dir_ent(dpp, ent); + if (rc < 0) { + ldpp_dout(dpp, 0) << "ERROR: open_mobj() failed: rc=" << rc << dendl; + return rc; + } + } + + if (meta.layout_id == 0) + return -ENOENT; + + M0_ASSERT(mobj == nullptr); + mobj = new m0_obj(); + memset(mobj, 0, sizeof *mobj); + m0_obj_init(mobj, &store->container.co_realm, &meta.oid, store->conf.mc_layout_id); + + struct m0_op *op = nullptr; + mobj->ob_attr.oa_layout_id = meta.layout_id; + mobj->ob_attr.oa_pver = meta.pver; + mobj->ob_entity.en_flags |= M0_ENF_META; + rc = m0_entity_open(&mobj->ob_entity, &op); + if (rc != 0) { + ldpp_dout(dpp, 0) << "ERROR: m0_entity_open() failed: rc=" << rc << dendl; + this->close_mobj(); + return rc; + } + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + + if (rc < 0) { + ldpp_dout(dpp, 10) << "ERROR: failed to open motr object: rc=" << rc << dendl; + this->close_mobj(); + return rc; + } + + ldpp_dout(dpp, 20) <<__func__<< ": rc=" << rc << dendl; + + return 0; +} + +int MotrObject::delete_mobj(const DoutPrefixProvider *dpp) +{ + int rc; + char fid_str[M0_FID_STR_LEN]; + snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); + if (!meta.oid.u_hi || !meta.oid.u_lo) { + ldpp_dout(dpp, 20) << __func__ << ": invalid motr object oid=" << fid_str << dendl; + return -EINVAL; + } + ldpp_dout(dpp, 20) << __func__ << ": deleting motr object oid=" << fid_str << dendl; + + // Open the object. + if (mobj == nullptr) { + rc = this->open_mobj(dpp); + if (rc < 0) + return rc; + } + + // Create an DELETE op and execute it (sync version). + struct m0_op *op = nullptr; + mobj->ob_entity.en_flags |= M0_ENF_META; + rc = m0_entity_delete(&mobj->ob_entity, &op); + if (rc != 0) { + ldpp_dout(dpp, 0) << "ERROR: m0_entity_delete() failed: " << rc << dendl; + return rc; + } + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + + if (rc < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to open motr object: " << rc << dendl; + return rc; + } + + this->close_mobj(); + + return 0; +} + +void MotrObject::close_mobj() +{ + if (mobj == nullptr) + return; + m0_obj_fini(mobj); + delete mobj; mobj = nullptr; +} + +int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& data, uint64_t offset) +{ + int rc; + unsigned bs, left; + struct m0_op *op; + char *start, *p; + struct m0_bufvec buf; + struct m0_bufvec attr; + struct m0_indexvec ext; + + left = data.length(); + if (left == 0) + return 0; + + rc = m0_bufvec_empty_alloc(&buf, 1) ?: + m0_bufvec_alloc(&attr, 1, 1) ?: + m0_indexvec_alloc(&ext, 1); + if (rc != 0) + goto out; + + bs = this->get_optimal_bs(left); + ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl; + + start = data.c_str(); + + for (p = start; left > 0; left -= bs, p += bs, offset += bs) { + if (left < bs) + bs = this->get_optimal_bs(left); + if (left < bs) { + data.append_zero(bs - left); + left = bs; + p = data.c_str(); + } + buf.ov_buf[0] = p; + buf.ov_vec.v_count[0] = bs; + ext.iv_index[0] = offset; + ext.iv_vec.v_count[0] = bs; + attr.ov_vec.v_count[0] = 0; + + op = nullptr; + rc = m0_obj_op(this->mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, 0, &op); + if (rc != 0) + goto out; + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + if (rc != 0) + goto out; + } + +out: + m0_indexvec_free(&ext); + m0_bufvec_free(&attr); + m0_bufvec_free2(&buf); + return rc; +} + +int MotrObject::read_mobj(const DoutPrefixProvider* dpp, int64_t off, int64_t end, RGWGetDataCB* cb) +{ + int rc; + unsigned bs, actual, left; + struct m0_op *op; + struct m0_bufvec buf; + struct m0_bufvec attr; + struct m0_indexvec ext; + + // make end pointer exclusive: + // it's easier to work with it this way + end++; + ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): off=" << off << + " end=" << end << dendl; + // As `off` may not be parity group size aligned, even using optimal + // buffer block size, simply reading data from offset `off` could come + // across parity group boundary. And Motr only allows page-size aligned + // offset. + // + // The optimal size of each IO should also take into account the data + // transfer size to s3 client. For example, 16MB may be nice to read + // data from motr, but it could be too big for network transfer. + // + // TODO: We leave proper handling of offset in the future. + bs = this->get_optimal_bs(end - off); + ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): bs=" << bs << dendl; + + rc = m0_bufvec_empty_alloc(&buf, 1) ? : + m0_bufvec_alloc(&attr, 1, 1) ? : + m0_indexvec_alloc(&ext, 1); + if (rc < 0) + goto out; + + left = end - off; + for (; left > 0; off += actual) { + if (left < bs) + bs = this->get_optimal_bs(left); + actual = bs; + if (left < bs) + actual = left; + ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): off=" << off << + " actual=" << actual << dendl; + bufferlist bl; + buf.ov_buf[0] = bl.append_hole(bs).c_str(); + buf.ov_vec.v_count[0] = bs; + ext.iv_index[0] = off; + ext.iv_vec.v_count[0] = bs; + attr.ov_vec.v_count[0] = 0; + + left -= actual; + // Read from Motr. + op = nullptr; + rc = m0_obj_op(this->mobj, M0_OC_READ, &ext, &buf, &attr, 0, 0, &op); + ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): init read op rc=" << rc << dendl; + if (rc != 0) { + ldpp_dout(dpp, 0) << __func__ << ": read failed during m0_obj_op, rc=" << rc << dendl; + goto out; + } + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + if (rc != 0) { + ldpp_dout(dpp, 0) << __func__ << ": read failed, m0_op_wait rc=" << rc << dendl; + goto out; + } + // Call `cb` to process returned data. + ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): call cb to process data" << dendl; + cb->handle_data(bl, 0, actual); + } + +out: + m0_indexvec_free(&ext); + m0_bufvec_free(&attr); + m0_bufvec_free2(&buf); + this->close_mobj(); + + return rc; +} + +int MotrObject::get_bucket_dir_ent(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry& ent) +{ + int rc = 0; + string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name(); + int max = 1000; + vector<string> keys(max); + vector<bufferlist> vals(max); + bufferlist bl; + bufferlist::const_iterator iter; + + if (this->get_bucket()->get_info().versioning_status() == BUCKET_VERSIONED || + this->get_bucket()->get_info().versioning_status() == BUCKET_SUSPENDED) { + + rgw_bucket_dir_entry ent_to_check; + + if (this->store->get_obj_meta_cache()->get(dpp, this->get_name(), bl) == 0) { + iter = bl.cbegin(); + ent_to_check.decode(iter); + if (ent_to_check.is_current()) { + ent = ent_to_check; + rc = 0; + goto out; + } + } + + ldpp_dout(dpp, 20) <<__func__<< ": versioned bucket!" << dendl; + keys[0] = this->get_name(); + rc = store->next_query_by_name(bucket_index_iname, keys, vals); + if (rc < 0) { + ldpp_dout(dpp, 0) << __func__ << "ERROR: NEXT query failed. " << rc << dendl; + return rc; + } + + rc = -ENOENT; + for (const auto& bl: vals) { + if (bl.length() == 0) + break; + + iter = bl.cbegin(); + ent_to_check.decode(iter); + if (ent_to_check.is_current()) { + ldpp_dout(dpp, 20) <<__func__<< ": found current version!" << dendl; + ent = ent_to_check; + rc = 0; + + this->store->get_obj_meta_cache()->put(dpp, this->get_name(), bl); + + break; + } + } + } else { + if (this->store->get_obj_meta_cache()->get(dpp, this->get_key().get_oid(), bl)) { + ldpp_dout(dpp, 20) <<__func__<< ": non-versioned bucket!" << dendl; + rc = this->store->do_idx_op_by_name(bucket_index_iname, + M0_IC_GET, this->get_key().get_oid(), bl); + if (rc < 0) { + ldpp_dout(dpp, 0) << __func__ << "ERROR: failed to get object's entry from bucket index: rc=" + << rc << dendl; + return rc; + } + this->store->get_obj_meta_cache()->put(dpp, this->get_key().get_oid(), bl); + } + + bufferlist& blr = bl; + iter = blr.cbegin(); + ent.decode(iter); + } + +out: + if (rc == 0) { + sal::Attrs dummy; + decode(dummy, iter); + meta.decode(iter); + ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << meta.layout_id << dendl; + char fid_str[M0_FID_STR_LEN]; + snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); + ldpp_dout(dpp, 70) << __func__ << ": oid=" << fid_str << dendl; + } else + ldpp_dout(dpp, 0) <<__func__<< ": rc=" << rc << dendl; + + return rc; +} + +int MotrObject::update_version_entries(const DoutPrefixProvider *dpp) +{ + int rc; + int max = 10; + vector<string> keys(max); + vector<bufferlist> vals(max); + + string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name(); + keys[0] = this->get_name(); + rc = store->next_query_by_name(bucket_index_iname, keys, vals); + ldpp_dout(dpp, 20) << "get all versions, name = " << this->get_name() << "rc = " << rc << dendl; + if (rc < 0) { + ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; + return rc; + } + + // no entries returned. + if (rc == 0) + return 0; + + for (const auto& bl: vals) { + if (bl.length() == 0) + break; + + rgw_bucket_dir_entry ent; + auto iter = bl.cbegin(); + ent.decode(iter); + + if (0 != ent.key.name.compare(0, this->get_name().size(), this->get_name())) + continue; + + if (!ent.is_current()) + continue; + + // Remove from the cache. + store->get_obj_meta_cache()->remove(dpp, this->get_name()); + + rgw::sal::Attrs attrs; + decode(attrs, iter); + MotrObject::Meta meta; + meta.decode(iter); + + ent.flags = rgw_bucket_dir_entry::FLAG_VER; + string key; + if (ent.key.instance.empty()) + key = ent.key.name; + else { + char buf[ent.key.name.size() + ent.key.instance.size() + 16]; + snprintf(buf, sizeof(buf), "%s[%s]", ent.key.name.c_str(), ent.key.instance.c_str()); + key = buf; + } + ldpp_dout(dpp, 20) << "update one version, key = " << key << dendl; + bufferlist ent_bl; + ent.encode(ent_bl); + encode(attrs, ent_bl); + meta.encode(ent_bl); + + rc = store->do_idx_op_by_name(bucket_index_iname, + M0_IC_PUT, key, ent_bl); + if (rc < 0) + break; + } + return rc; +} + +// Scan object_nnn_part_index to get all parts then open their motr objects. +// TODO: all parts are opened in the POC. But for a large object, for example +// a 5GB object will have about 300 parts (for default 15MB part). A better +// way of managing opened object may be needed. +int MotrObject::get_part_objs(const DoutPrefixProvider* dpp, + std::map<int, std::unique_ptr<MotrObject>>& part_objs) +{ + int rc; + int max_parts = 1000; + int marker = 0; + uint64_t off = 0; + bool truncated = false; + std::unique_ptr<rgw::sal::MultipartUpload> upload; + + upload = this->get_bucket()->get_multipart_upload(this->get_name(), string()); + + do { + rc = upload->list_parts(dpp, store->ctx(), max_parts, marker, &marker, &truncated); + if (rc == -ENOENT) { + rc = -ERR_NO_SUCH_UPLOAD; + } + if (rc < 0) + return rc; + + std::map<uint32_t, std::unique_ptr<MultipartPart>>& parts = upload->get_parts(); + for (auto part_iter = parts.begin(); part_iter != parts.end(); ++part_iter) { + + MultipartPart *mpart = part_iter->second.get(); + MotrMultipartPart *mmpart = static_cast<MotrMultipartPart *>(mpart); + uint32_t part_num = mmpart->get_num(); + uint64_t part_size = mmpart->get_size(); + + string part_obj_name = this->get_bucket()->get_name() + "." + + this->get_key().get_oid() + + ".part." + std::to_string(part_num); + std::unique_ptr<rgw::sal::Object> obj; + obj = this->bucket->get_object(rgw_obj_key(part_obj_name)); + std::unique_ptr<rgw::sal::MotrObject> mobj(static_cast<rgw::sal::MotrObject *>(obj.release())); + + ldpp_dout(dpp, 20) << "get_part_objs: off = " << off << ", size = " << part_size << dendl; + mobj->part_off = off; + mobj->part_size = part_size; + mobj->part_num = part_num; + mobj->meta = mmpart->meta; + + part_objs.emplace(part_num, std::move(mobj)); + + off += part_size; + } + } while (truncated); + + return 0; +} + +int MotrObject::open_part_objs(const DoutPrefixProvider* dpp, + std::map<int, std::unique_ptr<MotrObject>>& part_objs) +{ + //for (auto& iter: part_objs) { + for (auto iter = part_objs.begin(); iter != part_objs.end(); ++iter) { + MotrObject* obj = static_cast<MotrObject *>(iter->second.get()); + ldpp_dout(dpp, 20) << "open_part_objs: name = " << obj->get_name() << dendl; + int rc = obj->open_mobj(dpp); + if (rc < 0) + return rc; + } + + return 0; +} + +int MotrObject::delete_part_objs(const DoutPrefixProvider* dpp) +{ + std::unique_ptr<rgw::sal::MultipartUpload> upload; + upload = this->get_bucket()->get_multipart_upload(this->get_name(), string()); + std::unique_ptr<rgw::sal::MotrMultipartUpload> mupload(static_cast<rgw::sal::MotrMultipartUpload *>(upload.release())); + return mupload->delete_parts(dpp); +} + +int MotrObject::read_multipart_obj(const DoutPrefixProvider* dpp, + int64_t off, int64_t end, RGWGetDataCB* cb, + std::map<int, std::unique_ptr<MotrObject>>& part_objs) +{ + int64_t cursor = off; + + ldpp_dout(dpp, 20) << "read_multipart_obj: off=" << off << " end=" << end << dendl; + + // Find the parts which are in the (off, end) range and + // read data from it. Note: `end` argument is inclusive. + for (auto iter = part_objs.begin(); iter != part_objs.end(); ++iter) { + MotrObject* obj = static_cast<MotrObject *>(iter->second.get()); + int64_t part_off = obj->part_off; + int64_t part_size = obj->part_size; + int64_t part_end = obj->part_off + obj->part_size - 1; + ldpp_dout(dpp, 20) << "read_multipart_obj: part_off=" << part_off + << " part_end=" << part_end << dendl; + if (part_end < off) + continue; + + int64_t local_off = cursor - obj->part_off; + int64_t local_end = part_end < end? part_size - 1 : end - part_off; + ldpp_dout(dpp, 20) << "real_multipart_obj: name=" << obj->get_name() + << " local_off=" << local_off + << " local_end=" << local_end << dendl; + int rc = obj->read_mobj(dpp, local_off, local_end, cb); + if (rc < 0) + return rc; + + cursor = part_end + 1; + if (cursor > end) + break; + } + + return 0; +} + +static unsigned roundup(unsigned x, unsigned by) +{ + return ((x - 1) / by + 1) * by; +} + +unsigned MotrObject::get_optimal_bs(unsigned len) +{ + struct m0_pool_version *pver; + + pver = m0_pool_version_find(&store->instance->m0c_pools_common, + &mobj->ob_attr.oa_pver); + M0_ASSERT(pver != nullptr); + struct m0_pdclust_attr *pa = &pver->pv_attr; + uint64_t lid = M0_OBJ_LAYOUT_ID(meta.layout_id); + unsigned unit_sz = m0_obj_layout_id_to_unit_size(lid); + unsigned grp_sz = unit_sz * pa->pa_N; + + // bs should be max 4-times pool-width deep counting by 1MB units, or + // 8-times deep counting by 512K units, 16-times deep by 256K units, + // and so on. Several units to one target will be aggregated to make + // fewer network RPCs, disk i/o operations and BE transactions. + // For unit sizes of 32K or less, the depth is 128, which + // makes it 32K * 128 == 4MB - the maximum amount per target when + // the performance is still good on LNet (which has max 1MB frames). + // TODO: it may be different on libfabric, should be re-measured. + unsigned depth = 128 / ((unit_sz + 0x7fff) / 0x8000); + if (depth == 0) + depth = 1; + // P * N / (N + K + S) - number of data units to span the pool-width + unsigned max_bs = depth * unit_sz * pa->pa_P * pa->pa_N / + (pa->pa_N + pa->pa_K + pa->pa_S); + max_bs = roundup(max_bs, grp_sz); // multiple of group size + if (len >= max_bs) + return max_bs; + else if (len <= grp_sz) + return grp_sz; + else + return roundup(len, grp_sz); +} + +void MotrAtomicWriter::cleanup() +{ + m0_indexvec_free(&ext); + m0_bufvec_free(&attr); + m0_bufvec_free2(&buf); + acc_data.clear(); + obj.close_mobj(); + old_obj.close_mobj(); +} + +unsigned MotrAtomicWriter::populate_bvec(unsigned len, bufferlist::iterator &bi) +{ + unsigned i, l, done = 0; + const char *data; + + for (i = 0; i < MAX_BUFVEC_NR && len > 0; ++i) { + l = bi.get_ptr_and_advance(len, &data); + buf.ov_buf[i] = (char*)data; + buf.ov_vec.v_count[i] = l; + ext.iv_index[i] = acc_off; + ext.iv_vec.v_count[i] = l; + attr.ov_vec.v_count[i] = 0; + acc_off += l; + len -= l; + done += l; + } + buf.ov_vec.v_nr = i; + ext.iv_vec.v_nr = i; + + return done; +} + +int MotrAtomicWriter::write() +{ + int rc; + unsigned bs, left; + struct m0_op *op; + bufferlist::iterator bi; + + left = acc_data.length(); + + if (!obj.is_opened()) { + rc = obj.create_mobj(dpp, left); + if (rc == -EEXIST) + rc = obj.open_mobj(dpp); + if (rc != 0) { + char fid_str[M0_FID_STR_LEN]; + snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&obj.meta.oid)); + ldpp_dout(dpp, 0) << "ERROR: failed to create/open motr object " + << fid_str << " (" << obj.get_bucket()->get_name() + << "/" << obj.get_key().get_oid() << "): rc=" << rc + << dendl; + goto err; + } + } + + total_data_size += left; + + bs = obj.get_optimal_bs(left); + ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl; + + bi = acc_data.begin(); + while (left > 0) { + if (left < bs) + bs = obj.get_optimal_bs(left); + if (left < bs) { + acc_data.append_zero(bs - left); + auto off = bi.get_off(); + bufferlist tmp; + acc_data.splice(off, bs, &tmp); + acc_data.clear(); + acc_data.append(tmp.c_str(), bs); // make it a single buf + bi = acc_data.begin(); + left = bs; + } + + left -= this->populate_bvec(bs, bi); + + op = nullptr; + rc = m0_obj_op(obj.mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, 0, &op); + if (rc != 0) + goto err; + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + if (rc != 0) + goto err; + } + acc_data.clear(); + + return 0; + +err: + this->cleanup(); + return rc; +} + +static const unsigned MAX_ACC_SIZE = 32 * 1024 * 1024; + +// Accumulate enough data first to make a reasonable decision about the +// optimal unit size for a new object, or bs for existing object (32M seems +// enough for 4M units in 8+2 parity groups, a common config on wide pools), +// and then launch the write operations. +int MotrAtomicWriter::process(bufferlist&& data, uint64_t offset) +{ + if (data.length() == 0) { // last call, flush data + int rc = 0; + if (acc_data.length() != 0) + rc = this->write(); + this->cleanup(); + return rc; + } + + if (acc_data.length() == 0) + acc_off = offset; + + acc_data.append(std::move(data)); + if (acc_data.length() < MAX_ACC_SIZE) + return 0; + + return this->write(); +} + +int MotrAtomicWriter::complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map<std::string, bufferlist>& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) +{ + int rc = 0; + + if (acc_data.length() != 0) { // check again, just in case + rc = this->write(); + this->cleanup(); + if (rc != 0) + return rc; + } + + bufferlist bl; + rgw_bucket_dir_entry ent; + + // Set rgw_bucet_dir_entry. Some of the member of this structure may not + // apply to motr. For example the storage_class. + // + // Checkout AtomicObjectProcessor::complete() in rgw_putobj_processor.cc + // and RGWRados::Object::Write::write_meta() in rgw_rados.cc for what and + // how to set the dir entry. Only set the basic ones for POC, no ACLs and + // other attrs. + obj.get_key().get_index_key(&ent.key); + ent.meta.size = total_data_size; + ent.meta.accounted_size = total_data_size; + ent.meta.mtime = real_clock::is_zero(set_mtime)? ceph::real_clock::now() : set_mtime; + ent.meta.etag = etag; + ent.meta.owner = owner.to_str(); + ent.meta.owner_display_name = obj.get_bucket()->get_owner()->get_display_name(); + bool is_versioned = obj.get_key().have_instance(); + if (is_versioned) + ent.flags = rgw_bucket_dir_entry::FLAG_VER | rgw_bucket_dir_entry::FLAG_CURRENT; + ldpp_dout(dpp, 20) <<__func__<< ": key=" << obj.get_key().get_oid() + << " etag: " << etag << " user_data=" << user_data << dendl; + if (user_data) + ent.meta.user_data = *user_data; + ent.encode(bl); + + RGWBucketInfo &info = obj.get_bucket()->get_info(); + if (info.obj_lock_enabled() && info.obj_lock.has_rule()) { + auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION); + if (iter == attrs.end()) { + real_time lock_until_date = info.obj_lock.get_lock_until_date(ent.meta.mtime); + string mode = info.obj_lock.get_mode(); + RGWObjectRetention obj_retention(mode, lock_until_date); + bufferlist retention_bl; + obj_retention.encode(retention_bl); + attrs[RGW_ATTR_OBJECT_RETENTION] = retention_bl; + } + } + encode(attrs, bl); + obj.meta.encode(bl); + ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << obj.meta.layout_id + << dendl; + if (is_versioned) { + // get the list of all versioned objects with the same key and + // unset their FLAG_CURRENT later, if do_idx_op_by_name() is successful. + // Note: without distributed lock on the index - it is possible that 2 + // CURRENT entries would appear in the bucket. For example, consider the + // following scenario when two clients are trying to add the new object + // version concurrently: + // client 1: reads all the CURRENT entries + // client 2: updates the index and sets the new CURRENT + // client 1: updates the index and sets the new CURRENT + // At the step (1) client 1 would not see the new current record from step (2), + // so it won't update it. As a result, two CURRENT version entries will appear + // in the bucket. + // TODO: update the current version (unset the flag) and insert the new current + // version can be launched in one motr op. This requires change at do_idx_op() + // and do_idx_op_by_name(). + rc = obj.update_version_entries(dpp); + if (rc < 0) + return rc; + } + // Insert an entry into bucket index. + string bucket_index_iname = "motr.rgw.bucket.index." + obj.get_bucket()->get_name(); + rc = store->do_idx_op_by_name(bucket_index_iname, + M0_IC_PUT, obj.get_key().get_oid(), bl); + if (rc == 0) + store->get_obj_meta_cache()->put(dpp, obj.get_key().get_oid(), bl); + + if (old_obj.get_bucket()->get_info().versioning_status() != BUCKET_VERSIONED) { + // Delete old object data if exists. + old_obj.delete_mobj(dpp); + } + + // TODO: We need to handle the object leak caused by parallel object upload by + // making use of background gc, which is currently not enabled for motr. + return rc; +} + +int MotrMultipartUpload::delete_parts(const DoutPrefixProvider *dpp) +{ + int rc; + int max_parts = 1000; + int marker = 0; + bool truncated = false; + + // Scan all parts and delete the corresponding motr objects. + do { + rc = this->list_parts(dpp, store->ctx(), max_parts, marker, &marker, &truncated); + if (rc == -ENOENT) { + truncated = false; + rc = 0; + } + if (rc < 0) + return rc; + + std::map<uint32_t, std::unique_ptr<MultipartPart>>& parts = this->get_parts(); + for (auto part_iter = parts.begin(); part_iter != parts.end(); ++part_iter) { + + MultipartPart *mpart = part_iter->second.get(); + MotrMultipartPart *mmpart = static_cast<MotrMultipartPart *>(mpart); + uint32_t part_num = mmpart->get_num(); + + // Delete the part object. Note that the part object is not + // inserted into bucket index, only the corresponding motr object + // needs to be delete. That is why we don't call + // MotrObject::delete_object(). + string part_obj_name = bucket->get_name() + "." + + mp_obj.get_key() + + ".part." + std::to_string(part_num); + std::unique_ptr<rgw::sal::Object> obj; + obj = this->bucket->get_object(rgw_obj_key(part_obj_name)); + std::unique_ptr<rgw::sal::MotrObject> mobj(static_cast<rgw::sal::MotrObject *>(obj.release())); + mobj->meta = mmpart->meta; + rc = mobj->delete_mobj(dpp); + if (rc < 0) { + ldpp_dout(dpp, 0) << __func__ << ": Failed to delete object from Motr. rc=" << rc << dendl; + return rc; + } + } + } while (truncated); + + // Delete object part index. + std::string oid = mp_obj.get_key(); + string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts"; + return store->delete_motr_idx_by_name(obj_part_iname); +} + +int MotrMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct) +{ + int rc; + // Check if multipart upload exists + bufferlist bl; + std::unique_ptr<rgw::sal::Object> meta_obj; + meta_obj = get_meta_obj(); + string bucket_multipart_iname = + "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts"; + rc = store->do_idx_op_by_name(bucket_multipart_iname, + M0_IC_GET, meta_obj->get_key().to_str(), bl); + if (rc < 0) { + ldpp_dout(dpp, 0) << __func__ << ": Failed to get multipart upload. rc=" << rc << dendl; + return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc; + } + + // Scan all parts and delete the corresponding motr objects. + rc = this->delete_parts(dpp); + if (rc < 0) + return rc; + + bl.clear(); + // Remove the upload from bucket multipart index. + rc = store->do_idx_op_by_name(bucket_multipart_iname, + M0_IC_DEL, meta_obj->get_key().get_oid(), bl); + return rc; +} + +std::unique_ptr<rgw::sal::Object> MotrMultipartUpload::get_meta_obj() +{ + std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns)); + std::unique_ptr<rgw::sal::MotrObject> mobj(static_cast<rgw::sal::MotrObject *>(obj.release())); + mobj->set_category(RGWObjCategory::MultiMeta); + return mobj; +} + +struct motr_multipart_upload_info +{ + rgw_placement_rule dest_placement; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(dest_placement, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(dest_placement, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(motr_multipart_upload_info) + +int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, + ACLOwner& _owner, + rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) +{ + int rc; + std::string oid = mp_obj.get_key(); + + owner = _owner; + + do { + char buf[33]; + string tmp_obj_name; + gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); + std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */ + upload_id.append(buf); + + mp_obj.init(oid, upload_id); + tmp_obj_name = mp_obj.get_meta(); + + std::unique_ptr<rgw::sal::Object> obj; + obj = bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns)); + // the meta object will be indexed with 0 size, we c + obj->set_in_extra_data(true); + obj->set_hash_source(oid); + + motr_multipart_upload_info upload_info; + upload_info.dest_placement = dest_placement; + bufferlist mpbl; + encode(upload_info, mpbl); + + // Create an initial entry in the bucket. The entry will be + // updated when multipart upload is completed, for example, + // size, etag etc. + bufferlist bl; + rgw_bucket_dir_entry ent; + obj->get_key().get_index_key(&ent.key); + ent.meta.owner = owner.get_id().to_str(); + ent.meta.category = RGWObjCategory::MultiMeta; + ent.meta.mtime = ceph::real_clock::now(); + ent.meta.user_data.assign(mpbl.c_str(), mpbl.c_str() + mpbl.length()); + ent.encode(bl); + + // Insert an entry into bucket multipart index so it is not shown + // when listing a bucket. + string bucket_multipart_iname = + "motr.rgw.bucket." + obj->get_bucket()->get_name() + ".multiparts"; + rc = store->do_idx_op_by_name(bucket_multipart_iname, + M0_IC_PUT, obj->get_key().get_oid(), bl); + + } while (rc == -EEXIST); + + if (rc < 0) + return rc; + + // Create object part index. + // TODO: add bucket as part of the name. + string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts"; + ldpp_dout(dpp, 20) << "MotrMultipartUpload::init(): object part index=" << obj_part_iname << dendl; + rc = store->create_motr_idx_by_name(obj_part_iname); + if (rc == -EEXIST) + rc = 0; + if (rc < 0) + // TODO: clean the bucket index entry + ldpp_dout(dpp, 0) << "Failed to create object multipart index " << obj_part_iname << dendl; + + return rc; +} + +int MotrMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct, + int num_parts, int marker, + int *next_marker, bool *truncated, + bool assume_unsorted) +{ + int rc; + vector<string> key_vec(num_parts); + vector<bufferlist> val_vec(num_parts); + + std::string oid = mp_obj.get_key(); + string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts"; + ldpp_dout(dpp, 20) << __func__ << ": object part index = " << obj_part_iname << dendl; + key_vec[0].clear(); + key_vec[0] = "part."; + char buf[32]; + snprintf(buf, sizeof(buf), "%08d", marker + 1); + key_vec[0].append(buf); + rc = store->next_query_by_name(obj_part_iname, key_vec, val_vec); + if (rc < 0) { + ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; + return rc; + } + + int last_num = 0; + int part_cnt = 0; + uint32_t expected_next = 0; + ldpp_dout(dpp, 20) << __func__ << ": marker = " << marker << dendl; + for (const auto& bl: val_vec) { + if (bl.length() == 0) + break; + + RGWUploadPartInfo info; + auto iter = bl.cbegin(); + info.decode(iter); + rgw::sal::Attrs attrs_dummy; + decode(attrs_dummy, iter); + MotrObject::Meta meta; + meta.decode(iter); + + ldpp_dout(dpp, 20) << __func__ << ": part_num=" << info.num + << " part_size=" << info.size << dendl; + ldpp_dout(dpp, 20) << __func__ << ": meta:oid=[" << meta.oid.u_hi << "," << meta.oid.u_lo + << "], meta:pvid=[" << meta.pver.f_container << "," << meta.pver.f_key + << "], meta:layout id=" << meta.layout_id << dendl; + + if (!expected_next) + expected_next = info.num + 1; + else if (expected_next && info.num != expected_next) + return -EINVAL; + else expected_next = info.num + 1; + + if ((int)info.num > marker) { + last_num = info.num; + parts.emplace(info.num, std::make_unique<MotrMultipartPart>(info, meta)); + } + + part_cnt++; + } + + // Does it have more parts? + if (truncated) + *truncated = part_cnt < num_parts? false : true; + ldpp_dout(dpp, 20) << __func__ << ": truncated=" << *truncated << dendl; + + if (next_marker) + *next_marker = last_num; + + return 0; +} + +// Heavily copy from rgw_sal_rados.cc +int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp, + optional_yield y, CephContext* cct, + map<int, string>& part_etags, + list<rgw_obj_index_key>& remove_objs, + uint64_t& accounted_size, bool& compressed, + RGWCompressionInfo& cs_info, off_t& off, + std::string& tag, ACLOwner& owner, + uint64_t olh_epoch, + rgw::sal::Object* target_obj) +{ + char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; + std::string etag; + bufferlist etag_bl; + MD5 hash; + // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes + hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); + bool truncated; + int rc; + + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): enter" << dendl; + int total_parts = 0; + int handled_parts = 0; + int max_parts = 1000; + int marker = 0; + uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; + auto etags_iter = part_etags.begin(); + rgw::sal::Attrs attrs = target_obj->get_attrs(); + + do { + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): list_parts()" << dendl; + rc = list_parts(dpp, cct, max_parts, marker, &marker, &truncated); + if (rc == -ENOENT) { + rc = -ERR_NO_SUCH_UPLOAD; + } + if (rc < 0) + return rc; + + total_parts += parts.size(); + if (!truncated && total_parts != (int)part_etags.size()) { + ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts + << " expected: " << part_etags.size() << dendl; + rc = -ERR_INVALID_PART; + return rc; + } + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): parts.size()=" << parts.size() << dendl; + + for (auto obj_iter = parts.begin(); + etags_iter != part_etags.end() && obj_iter != parts.end(); + ++etags_iter, ++obj_iter, ++handled_parts) { + MultipartPart *mpart = obj_iter->second.get(); + MotrMultipartPart *mmpart = static_cast<MotrMultipartPart *>(mpart); + RGWUploadPartInfo *part = &mmpart->info; + + uint64_t part_size = part->accounted_size; + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): part_size=" << part_size << dendl; + if (handled_parts < (int)part_etags.size() - 1 && + part_size < min_part_size) { + rc = -ERR_TOO_SMALL; + return rc; + } + + char petag[CEPH_CRYPTO_MD5_DIGESTSIZE]; + if (etags_iter->first != (int)obj_iter->first) { + ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: " + << etags_iter->first << " next uploaded: " + << obj_iter->first << dendl; + rc = -ERR_INVALID_PART; + return rc; + } + string part_etag = rgw_string_unquote(etags_iter->second); + if (part_etag.compare(part->etag) != 0) { + ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first + << " etag: " << etags_iter->second << dendl; + rc = -ERR_INVALID_PART; + return rc; + } + + hex_to_buf(part->etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE); + hash.Update((const unsigned char *)petag, sizeof(petag)); + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): calc etag " << dendl; + + string oid = mp_obj.get_part(part->num); + rgw_obj src_obj; + src_obj.init_ns(bucket->get_key(), oid, mp_ns); + +#if 0 // does Motr backend need it? + /* update manifest for part */ + if (part->manifest.empty()) { + ldpp_dout(dpp, 0) << "ERROR: empty manifest for object part: obj=" + << src_obj << dendl; + rc = -ERR_INVALID_PART; + return rc; + } else { + manifest.append(dpp, part->manifest, store->get_zone()); + } + ldpp_dout(dpp, 0) << "MotrMultipartUpload::complete(): manifest " << dendl; +#endif + + bool part_compressed = (part->cs_info.compression_type != "none"); + if ((handled_parts > 0) && + ((part_compressed != compressed) || + (cs_info.compression_type != part->cs_info.compression_type))) { + ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload (" + << cs_info.compression_type << ">>" << part->cs_info.compression_type << ")" << dendl; + rc = -ERR_INVALID_PART; + return rc; + } + + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): part compression" << dendl; + if (part_compressed) { + int64_t new_ofs; // offset in compression data for new part + if (cs_info.blocks.size() > 0) + new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len; + else + new_ofs = 0; + for (const auto& block : part->cs_info.blocks) { + compression_block cb; + cb.old_ofs = block.old_ofs + cs_info.orig_size; + cb.new_ofs = new_ofs; + cb.len = block.len; + cs_info.blocks.push_back(cb); + new_ofs = cb.new_ofs + cb.len; + } + if (!compressed) + cs_info.compression_type = part->cs_info.compression_type; + cs_info.orig_size += part->cs_info.orig_size; + compressed = true; + } + + // We may not need to do the following as remove_objs are those + // don't show when listing a bucket. As we store in-progress uploaded + // object's metadata in a separate index, they are not shown when + // listing a bucket. + rgw_obj_index_key remove_key; + src_obj.key.get_index_key(&remove_key); + remove_objs.push_back(remove_key); + + off += part_size; + accounted_size += part->accounted_size; + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): off=" << off << ", accounted_size = " << accounted_size << dendl; + } + } while (truncated); + hash.Final((unsigned char *)final_etag); + + buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str); + snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], + sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, + "-%lld", (long long)part_etags.size()); + etag = final_etag_str; + ldpp_dout(dpp, 20) << "calculated etag: " << etag << dendl; + etag_bl.append(etag); + attrs[RGW_ATTR_ETAG] = etag_bl; + + if (compressed) { + // write compression attribute to full object + bufferlist tmp; + encode(cs_info, tmp); + attrs[RGW_ATTR_COMPRESSION] = tmp; + } + + // Read the object's the multipart_upload_info. + // TODO: all those index name and key constructions should be implemented as + // member functions. + bufferlist bl; + std::unique_ptr<rgw::sal::Object> meta_obj; + meta_obj = get_meta_obj(); + string bucket_multipart_iname = + "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts"; + rc = this->store->do_idx_op_by_name(bucket_multipart_iname, + M0_IC_GET, meta_obj->get_key().get_oid(), bl); + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): read entry from bucket multipart index rc=" << rc << dendl; + if (rc < 0) + return rc; + rgw_bucket_dir_entry ent; + bufferlist& blr = bl; + auto ent_iter = blr.cbegin(); + ent.decode(ent_iter); + + // Update the dir entry and insert it to the bucket index so + // the object will be seen when listing the bucket. + bufferlist update_bl; + target_obj->get_key().get_index_key(&ent.key); // Change to offical name :) + ent.meta.size = off; + ent.meta.accounted_size = accounted_size; + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): obj size=" << ent.meta.size + << " obj accounted size=" << ent.meta.accounted_size << dendl; + ent.meta.mtime = ceph::real_clock::now(); + ent.meta.etag = etag; + ent.encode(update_bl); + encode(attrs, update_bl); + MotrObject::Meta meta_dummy; + meta_dummy.encode(update_bl); + + string bucket_index_iname = "motr.rgw.bucket.index." + meta_obj->get_bucket()->get_name(); + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): target_obj name=" << target_obj->get_name() + << " target_obj oid=" << target_obj->get_oid() << dendl; + rc = store->do_idx_op_by_name(bucket_index_iname, M0_IC_PUT, + target_obj->get_name(), update_bl); + if (rc < 0) + return rc; + + // Put into metadata cache. + store->get_obj_meta_cache()->put(dpp, target_obj->get_name(), update_bl); + + // Now we can remove it from bucket multipart index. + ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): remove from bucket multipartindex " << dendl; + return store->do_idx_op_by_name(bucket_multipart_iname, + M0_IC_DEL, meta_obj->get_key().get_oid(), bl); +} + +int MotrMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs) +{ + if (!rule && !attrs) { + return 0; + } + + if (rule) { + if (!placement.empty()) { + *rule = &placement; + if (!attrs) { + /* Don't need attrs, done */ + return 0; + } + } else { + *rule = nullptr; + } + } + + std::unique_ptr<rgw::sal::Object> meta_obj; + meta_obj = get_meta_obj(); + meta_obj->set_in_extra_data(true); + + // Read the object's the multipart_upload_info. + bufferlist bl; + string bucket_multipart_iname = + "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts"; + int rc = this->store->do_idx_op_by_name(bucket_multipart_iname, + M0_IC_GET, meta_obj->get_key().get_oid(), bl); + if (rc < 0) { + ldpp_dout(dpp, 0) << __func__ << ": Failed to get multipart info. rc=" << rc << dendl; + return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc; + } + + rgw_bucket_dir_entry ent; + bufferlist& blr = bl; + auto ent_iter = blr.cbegin(); + ent.decode(ent_iter); + + if (attrs) { + bufferlist etag_bl; + string& etag = ent.meta.etag; + ldpp_dout(dpp, 20) << "object's etag: " << ent.meta.etag << dendl; + etag_bl.append(etag.c_str(), etag.size()); + attrs->emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl)); + if (!rule || *rule != nullptr) { + /* placement was cached; don't actually read */ + return 0; + } + } + + /* Decode multipart_upload_info */ + motr_multipart_upload_info upload_info; + bufferlist mpbl; + mpbl.append(ent.meta.user_data.c_str(), ent.meta.user_data.size()); + auto mpbl_iter = mpbl.cbegin(); + upload_info.decode(mpbl_iter); + placement = upload_info.dest_placement; + *rule = &placement; + + return 0; +} + +std::unique_ptr<Writer> MotrMultipartUpload::get_writer( + const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* obj, + const rgw_user& owner, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, + const std::string& part_num_str) +{ + return std::make_unique<MotrMultipartWriter>(dpp, y, this, + obj, store, owner, + ptail_placement_rule, part_num, part_num_str); +} + +int MotrMultipartWriter::prepare(optional_yield y) +{ + string part_obj_name = head_obj->get_bucket()->get_name() + "." + + head_obj->get_key().get_oid() + + ".part." + std::to_string(part_num); + ldpp_dout(dpp, 20) << "bucket=" << head_obj->get_bucket()->get_name() << "part_obj_name=" << part_obj_name << dendl; + part_obj = std::make_unique<MotrObject>(this->store, rgw_obj_key(part_obj_name), head_obj->get_bucket()); + if (part_obj == nullptr) + return -ENOMEM; + + // s3 client may retry uploading part, so the part may have already + // been created. + int rc = part_obj->create_mobj(dpp, store->cctx->_conf->rgw_max_chunk_size); + if (rc == -EEXIST) { + rc = part_obj->open_mobj(dpp); + if (rc < 0) + return rc; + } + return rc; +} + +int MotrMultipartWriter::process(bufferlist&& data, uint64_t offset) +{ + int rc = part_obj->write_mobj(dpp, std::move(data), offset); + if (rc == 0) { + actual_part_size += data.length(); + ldpp_dout(dpp, 20) << " write_mobj(): actual_part_size=" << actual_part_size << dendl; + } + return rc; +} + +int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map<std::string, bufferlist>& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) +{ + // Should the dir entry(object metadata) be updated? For example + // mtime. + + ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): enter" << dendl; + // Add an entry into object_nnn_part_index. + bufferlist bl; + RGWUploadPartInfo info; + info.num = part_num; + info.etag = etag; + info.size = actual_part_size; + info.accounted_size = accounted_size; + info.modified = real_clock::now(); + + bool compressed; + int rc = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info); + ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): compression rc=" << rc << dendl; + if (rc < 0) { + ldpp_dout(dpp, 1) << "cannot get compression info" << dendl; + return rc; + } + encode(info, bl); + encode(attrs, bl); + part_obj->meta.encode(bl); + + string p = "part."; + char buf[32]; + snprintf(buf, sizeof(buf), "%08d", (int)part_num); + p.append(buf); + string obj_part_iname = "motr.rgw.object." + head_obj->get_bucket()->get_name() + "." + + head_obj->get_key().get_oid() + ".parts"; + ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): object part index = " << obj_part_iname << dendl; + rc = store->do_idx_op_by_name(obj_part_iname, M0_IC_PUT, p, bl); + if (rc < 0) { + return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc; + } + + return 0; +} + +std::unique_ptr<RGWRole> MotrStore::get_role(std::string name, + std::string tenant, + std::string path, + std::string trust_policy, + std::string max_session_duration_str, + std::multimap<std::string,std::string> tags) +{ + RGWRole* p = nullptr; + return std::unique_ptr<RGWRole>(p); +} + +std::unique_ptr<RGWRole> MotrStore::get_role(const RGWRoleInfo& info) +{ + RGWRole* p = nullptr; + return std::unique_ptr<RGWRole>(p); +} + +std::unique_ptr<RGWRole> MotrStore::get_role(std::string id) +{ + RGWRole* p = nullptr; + return std::unique_ptr<RGWRole>(p); +} + +int MotrStore::get_roles(const DoutPrefixProvider *dpp, + optional_yield y, + const std::string& path_prefix, + const std::string& tenant, + vector<std::unique_ptr<RGWRole>>& roles) +{ + return 0; +} + +std::unique_ptr<RGWOIDCProvider> MotrStore::get_oidc_provider() +{ + RGWOIDCProvider* p = nullptr; + return std::unique_ptr<RGWOIDCProvider>(p); +} + +int MotrStore::get_oidc_providers(const DoutPrefixProvider *dpp, + const std::string& tenant, + vector<std::unique_ptr<RGWOIDCProvider>>& providers) +{ + return 0; +} + +std::unique_ptr<MultipartUpload> MotrBucket::get_multipart_upload(const std::string& oid, + std::optional<std::string> upload_id, + ACLOwner owner, ceph::real_time mtime) +{ + return std::make_unique<MotrMultipartUpload>(store, this, oid, upload_id, owner, mtime); +} + +std::unique_ptr<Writer> MotrStore::get_append_writer(const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* obj, + const rgw_user& owner, + const rgw_placement_rule *ptail_placement_rule, + const std::string& unique_tag, + uint64_t position, + uint64_t *cur_accounted_size) { + return nullptr; +} + +std::unique_ptr<Writer> MotrStore::get_atomic_writer(const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* obj, + const rgw_user& owner, + const rgw_placement_rule *ptail_placement_rule, + uint64_t olh_epoch, + const std::string& unique_tag) { + return std::make_unique<MotrAtomicWriter>(dpp, y, + obj, this, owner, + ptail_placement_rule, olh_epoch, unique_tag); +} + +const std::string& MotrStore::get_compression_type(const rgw_placement_rule& rule) +{ + return zone.zone_params->get_compression_type(rule); +} + +bool MotrStore::valid_placement(const rgw_placement_rule& rule) +{ + return zone.zone_params->valid_placement(rule); +} + +std::unique_ptr<User> MotrStore::get_user(const rgw_user &u) +{ + ldout(cctx, 20) << "bucket's user: " << u.to_str() << dendl; + return std::make_unique<MotrUser>(this, u); +} + +int MotrStore::get_user_by_access_key(const DoutPrefixProvider *dpp, const std::string &key, optional_yield y, std::unique_ptr<User> *user) +{ + int rc; + User *u; + bufferlist bl; + RGWUserInfo uinfo; + MotrAccessKey access_key; + + rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY, + M0_IC_GET, key, bl); + if (rc < 0){ + ldout(cctx, 0) << "Access key not found: rc = " << rc << dendl; + return rc; + } + + bufferlist& blr = bl; + auto iter = blr.cbegin(); + access_key.decode(iter); + + uinfo.user_id.from_str(access_key.user_id); + ldout(cctx, 0) << "Loading user: " << uinfo.user_id.id << dendl; + rc = MotrUser().load_user_from_idx(dpp, this, uinfo, nullptr, nullptr); + if (rc < 0){ + ldout(cctx, 0) << "Failed to load user: rc = " << rc << dendl; + return rc; + } + u = new MotrUser(this, uinfo); + if (!u) + return -ENOMEM; + + user->reset(u); + return 0; +} + +int MotrStore::get_user_by_email(const DoutPrefixProvider *dpp, const std::string& email, optional_yield y, std::unique_ptr<User>* user) +{ + int rc; + User *u; + bufferlist bl; + RGWUserInfo uinfo; + MotrEmailInfo email_info; + rc = do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, + M0_IC_GET, email, bl); + if (rc < 0){ + ldout(cctx, 0) << "Email Id not found: rc = " << rc << dendl; + return rc; + } + auto iter = bl.cbegin(); + email_info.decode(iter); + ldout(cctx, 0) << "Loading user: " << email_info.user_id << dendl; + uinfo.user_id.from_str(email_info.user_id); + rc = MotrUser().load_user_from_idx(dpp, this, uinfo, nullptr, nullptr); + if (rc < 0){ + ldout(cctx, 0) << "Failed to load user: rc = " << rc << dendl; + return rc; + } + u = new MotrUser(this, uinfo); + if (!u) + return -ENOMEM; + + user->reset(u); + return 0; +} + +int MotrStore::get_user_by_swift(const DoutPrefixProvider *dpp, const std::string& user_str, optional_yield y, std::unique_ptr<User>* user) +{ + /* Swift keys and subusers are not supported for now */ + return 0; +} + +int MotrStore::store_access_key(const DoutPrefixProvider *dpp, optional_yield y, MotrAccessKey access_key) +{ + int rc; + bufferlist bl; + access_key.encode(bl); + rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY, + M0_IC_PUT, access_key.id, bl); + if (rc < 0){ + ldout(cctx, 0) << "Failed to store key: rc = " << rc << dendl; + return rc; + } + return rc; +} + +int MotrStore::delete_access_key(const DoutPrefixProvider *dpp, optional_yield y, std::string access_key) +{ + int rc; + bufferlist bl; + rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY, + M0_IC_DEL, access_key, bl); + if (rc < 0){ + ldout(cctx, 0) << "Failed to delete key: rc = " << rc << dendl; + } + return rc; +} + +int MotrStore::store_email_info(const DoutPrefixProvider *dpp, optional_yield y, MotrEmailInfo& email_info ) +{ + int rc; + bufferlist bl; + email_info.encode(bl); + rc = do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, + M0_IC_PUT, email_info.email_id, bl); + if (rc < 0) { + ldout(cctx, 0) << "Failed to store the user by email as key: rc = " << rc << dendl; + } + return rc; +} + +std::unique_ptr<Object> MotrStore::get_object(const rgw_obj_key& k) +{ + return std::make_unique<MotrObject>(this, k); +} + + +int MotrStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const rgw_bucket& b, std::unique_ptr<Bucket>* bucket, optional_yield y) +{ + int ret; + Bucket* bp; + + bp = new MotrBucket(this, b, u); + ret = bp->load_bucket(dpp, y); + if (ret < 0) { + delete bp; + return ret; + } + + bucket->reset(bp); + return 0; +} + +int MotrStore::get_bucket(User* u, const RGWBucketInfo& i, std::unique_ptr<Bucket>* bucket) +{ + Bucket* bp; + + bp = new MotrBucket(this, i, u); + /* Don't need to fetch the bucket info, use the provided one */ + + bucket->reset(bp); + return 0; +} + +int MotrStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const std::string& tenant, const std::string& name, std::unique_ptr<Bucket>* bucket, optional_yield y) +{ + rgw_bucket b; + + b.tenant = tenant; + b.name = name; + + return get_bucket(dpp, u, b, bucket, y); +} + +bool MotrStore::is_meta_master() +{ + return true; +} + +int MotrStore::forward_request_to_master(const DoutPrefixProvider *dpp, User* user, obj_version *objv, + bufferlist& in_data, + JSONParser *jp, req_info& info, + optional_yield y) +{ + return 0; +} + +int MotrStore::forward_iam_request_to_master(const DoutPrefixProvider *dpp, const RGWAccessKey& key, obj_version* objv, + bufferlist& in_data, + RGWXMLDecoder::XMLParser* parser, req_info& info, + optional_yield y) +{ + return 0; +} + +std::string MotrStore::zone_unique_id(uint64_t unique_num) +{ + return ""; +} + +std::string MotrStore::zone_unique_trans_id(const uint64_t unique_num) +{ + return ""; +} + +int MotrStore::get_zonegroup(const std::string& id, std::unique_ptr<ZoneGroup>* group) +{ + /* XXX: for now only one zonegroup supported */ + ZoneGroup* zg; + zg = new MotrZoneGroup(this, zone.zonegroup.get_group()); + + group->reset(zg); + return 0; +} + +int MotrStore::list_all_zones(const DoutPrefixProvider* dpp, + std::list<std::string>& zone_ids) +{ + zone_ids.push_back(zone.get_id()); + return 0; +} + +int MotrStore::cluster_stat(RGWClusterStat& stats) +{ + return 0; +} + +std::unique_ptr<Lifecycle> MotrStore::get_lifecycle(void) +{ + return 0; +} + +std::unique_ptr<Completions> MotrStore::get_completions(void) +{ + return 0; +} + +std::unique_ptr<Notification> MotrStore::get_notification(Object* obj, Object* src_obj, req_state* s, + rgw::notify::EventType event_type, optional_yield y, const string* object_name) +{ + return std::make_unique<MotrNotification>(obj, src_obj, event_type); +} + +std::unique_ptr<Notification> MotrStore::get_notification(const DoutPrefixProvider* dpp, Object* obj, + Object* src_obj, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, + std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) +{ + return std::make_unique<MotrNotification>(obj, src_obj, event_type); +} + +int MotrStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info) +{ + return 0; +} + +int MotrStore::log_op(const DoutPrefixProvider *dpp, string& oid, bufferlist& bl) +{ + return 0; +} + +int MotrStore::register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type, + const map<string, string>& meta) +{ + return 0; +} + +void MotrStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, + RGWRateLimitInfo& user_ratelimit, + RGWRateLimitInfo& anon_ratelimit) +{ + return; +} + +void MotrStore::get_quota(RGWQuota& quota) +{ + // XXX: Not handled for the first pass + return; +} + +int MotrStore::set_buckets_enabled(const DoutPrefixProvider *dpp, vector<rgw_bucket>& buckets, bool enabled) +{ + return 0; +} + +int MotrStore::get_sync_policy_handler(const DoutPrefixProvider *dpp, + std::optional<rgw_zone_id> zone, + std::optional<rgw_bucket> bucket, + RGWBucketSyncPolicyHandlerRef *phandler, + optional_yield y) +{ + return 0; +} + +RGWDataSyncStatusManager* MotrStore::get_data_sync_manager(const rgw_zone_id& source_zone) +{ + return 0; +} + +int MotrStore::read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, + uint32_t max_entries, bool *is_truncated, + RGWUsageIter& usage_iter, + map<rgw_user_bucket, rgw_usage_log_entry>& usage) +{ + return 0; +} + +int MotrStore::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) +{ + return 0; +} + +int MotrStore::get_config_key_val(string name, bufferlist *bl) +{ + return 0; +} + +int MotrStore::meta_list_keys_init(const DoutPrefixProvider *dpp, const string& section, const string& marker, void** phandle) +{ + return 0; +} + +int MotrStore::meta_list_keys_next(const DoutPrefixProvider *dpp, void* handle, int max, list<string>& keys, bool* truncated) +{ + return 0; +} + +void MotrStore::meta_list_keys_complete(void* handle) +{ + return; +} + +std::string MotrStore::meta_get_marker(void* handle) +{ + return ""; +} + +int MotrStore::meta_remove(const DoutPrefixProvider *dpp, string& metadata_key, optional_yield y) +{ + return 0; +} + +int MotrStore::open_idx(struct m0_uint128 *id, bool create, struct m0_idx *idx) +{ + m0_idx_init(idx, &container.co_realm, id); + + if (!create) + return 0; // nothing to do more + + // create index or make sure it's created + struct m0_op *op = nullptr; + int rc = m0_entity_create(nullptr, &idx->in_entity, &op); + if (rc != 0) { + ldout(cctx, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl; + goto out; + } + + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + + if (rc != 0 && rc != -EEXIST) + ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl; +out: + return rc; +} + +static void set_m0bufvec(struct m0_bufvec *bv, vector<uint8_t>& vec) +{ + *bv->ov_buf = reinterpret_cast<char*>(vec.data()); + *bv->ov_vec.v_count = vec.size(); +} + +// idx must be opened with open_idx() beforehand +int MotrStore::do_idx_op(struct m0_idx *idx, enum m0_idx_opcode opcode, + vector<uint8_t>& key, vector<uint8_t>& val, bool update) +{ + int rc, rc_i; + struct m0_bufvec k, v, *vp = &v; + uint32_t flags = 0; + struct m0_op *op = nullptr; + + if (m0_bufvec_empty_alloc(&k, 1) != 0) { + ldout(cctx, 0) << "ERROR: failed to allocate key bufvec" << dendl; + return -ENOMEM; + } + + if (opcode == M0_IC_PUT || opcode == M0_IC_GET) { + rc = -ENOMEM; + if (m0_bufvec_empty_alloc(&v, 1) != 0) { + ldout(cctx, 0) << "ERROR: failed to allocate value bufvec" << dendl; + goto out; + } + } + + set_m0bufvec(&k, key); + if (opcode == M0_IC_PUT) + set_m0bufvec(&v, val); + + if (opcode == M0_IC_DEL) + vp = nullptr; + + if (opcode == M0_IC_PUT && update) + flags |= M0_OIF_OVERWRITE; + + rc = m0_idx_op(idx, opcode, &k, vp, &rc_i, flags, &op); + if (rc != 0) { + ldout(cctx, 0) << "ERROR: failed to init index op: " << rc << dendl; + goto out; + } + + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + + if (rc != 0) { + ldout(cctx, 0) << "ERROR: op failed: " << rc << dendl; + goto out; + } + + if (rc_i != 0) { + ldout(cctx, 0) << "ERROR: idx op failed: " << rc_i << dendl; + rc = rc_i; + goto out; + } + + if (opcode == M0_IC_GET) { + val.resize(*v.ov_vec.v_count); + memcpy(reinterpret_cast<char*>(val.data()), *v.ov_buf, *v.ov_vec.v_count); + } + +out: + m0_bufvec_free2(&k); + if (opcode == M0_IC_GET) + m0_bufvec_free(&v); // cleanup buffer after GET + else if (opcode == M0_IC_PUT) + m0_bufvec_free2(&v); + + return rc; +} + +// Retrieve a range of key/value pairs starting from keys[0]. +int MotrStore::do_idx_next_op(struct m0_idx *idx, + vector<vector<uint8_t>>& keys, + vector<vector<uint8_t>>& vals) +{ + int rc; + uint32_t i = 0; + int nr_kvp = vals.size(); + int *rcs = new int[nr_kvp]; + struct m0_bufvec k, v; + struct m0_op *op = nullptr; + + rc = m0_bufvec_empty_alloc(&k, nr_kvp)?: + m0_bufvec_empty_alloc(&v, nr_kvp); + if (rc != 0) { + ldout(cctx, 0) << "ERROR: failed to allocate kv bufvecs" << dendl; + return rc; + } + + set_m0bufvec(&k, keys[0]); + + rc = m0_idx_op(idx, M0_IC_NEXT, &k, &v, rcs, 0, &op); + if (rc != 0) { + ldout(cctx, 0) << "ERROR: failed to init index op: " << rc << dendl; + goto out; + } + + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + + if (rc != 0) { + ldout(cctx, 0) << "ERROR: op failed: " << rc << dendl; + goto out; + } + + for (i = 0; i < v.ov_vec.v_nr; ++i) { + if (rcs[i] < 0) + break; + + vector<uint8_t>& key = keys[i]; + vector<uint8_t>& val = vals[i]; + key.resize(k.ov_vec.v_count[i]); + val.resize(v.ov_vec.v_count[i]); + memcpy(reinterpret_cast<char*>(key.data()), k.ov_buf[i], k.ov_vec.v_count[i]); + memcpy(reinterpret_cast<char*>(val.data()), v.ov_buf[i], v.ov_vec.v_count[i]); + } + +out: + k.ov_vec.v_nr = i; + v.ov_vec.v_nr = i; + m0_bufvec_free(&k); + m0_bufvec_free(&v); // cleanup buffer after GET + + delete []rcs; + return rc ?: i; +} + +// Retrieve a number of key/value pairs under the prefix starting +// from the marker at key_out[0]. +int MotrStore::next_query_by_name(string idx_name, + vector<string>& key_out, + vector<bufferlist>& val_out, + string prefix, string delim) +{ + unsigned nr_kvp = std::min(val_out.size(), 100UL); + struct m0_idx idx = {}; + vector<vector<uint8_t>> keys(nr_kvp); + vector<vector<uint8_t>> vals(nr_kvp); + struct m0_uint128 idx_id; + int i = 0, j, k = 0; + + index_name_to_motr_fid(idx_name, &idx_id); + int rc = open_motr_idx(&idx_id, &idx); + if (rc != 0) { + ldout(cctx, 0) << "ERROR: next_query_by_name(): failed to open index: rc=" + << rc << dendl; + goto out; + } + + // Only the first element for keys needs to be set for NEXT query. + // The keys will be set will the returned keys from motr index. + ldout(cctx, 20) <<__func__<< ": next_query_by_name(): index=" << idx_name + << " prefix=" << prefix << " delim=" << delim << dendl; + keys[0].assign(key_out[0].begin(), key_out[0].end()); + for (i = 0; i < (int)val_out.size(); i += k, k = 0) { + rc = do_idx_next_op(&idx, keys, vals); + ldout(cctx, 20) << "do_idx_next_op() = " << rc << dendl; + if (rc < 0) { + ldout(cctx, 0) << "ERROR: NEXT query failed. " << rc << dendl; + goto out; + } + + string dir; + for (j = 0, k = 0; j < rc; ++j) { + string key(keys[j].begin(), keys[j].end()); + size_t pos = std::string::npos; + if (!delim.empty()) + pos = key.find(delim, prefix.length()); + if (pos != std::string::npos) { // DIR entry + dir.assign(key, 0, pos + 1); + if (dir.compare(0, prefix.length(), prefix) != 0) + goto out; + if (i + k == 0 || dir != key_out[i + k - 1]) // a new one + key_out[i + k++] = dir; + continue; + } + dir = ""; + if (key.compare(0, prefix.length(), prefix) != 0) + goto out; + key_out[i + k] = key; + bufferlist& vbl = val_out[i + k]; + vbl.append(reinterpret_cast<char*>(vals[j].data()), vals[j].size()); + ++k; + } + + if (rc < (int)nr_kvp) // there are no more keys to fetch + break; + + string next_key; + if (dir != "") + next_key = dir + "\xff"; // skip all dir content in 1 step + else + next_key = key_out[i + k - 1] + " "; + ldout(cctx, 0) << "do_idx_next_op(): next_key=" << next_key << dendl; + keys[0].assign(next_key.begin(), next_key.end()); + } + +out: + m0_idx_fini(&idx); + return rc < 0 ? rc : i + k; +} + +int MotrStore::delete_motr_idx_by_name(string iname) +{ + struct m0_idx idx; + struct m0_uint128 idx_id; + struct m0_op *op = nullptr; + + ldout(cctx, 20) << "delete_motr_idx_by_name=" << iname << dendl; + + index_name_to_motr_fid(iname, &idx_id); + m0_idx_init(&idx, &container.co_realm, &idx_id); + m0_entity_open(&idx.in_entity, &op); + int rc = m0_entity_delete(&idx.in_entity, &op); + if (rc < 0) + goto out; + + m0_op_launch(&op, 1); + + ldout(cctx, 70) << "waiting for op completion" << dendl; + + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + + if (rc == -ENOENT) // race deletion?? + rc = 0; + else if (rc < 0) + ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl; + + ldout(cctx, 20) << "delete_motr_idx_by_name rc=" << rc << dendl; + +out: + m0_idx_fini(&idx); + return rc; +} + +int MotrStore::open_motr_idx(struct m0_uint128 *id, struct m0_idx *idx) +{ + m0_idx_init(idx, &container.co_realm, id); + return 0; +} + +// The following marcos are from dix/fid_convert.h which are not exposed. +enum { + M0_DIX_FID_DEVICE_ID_OFFSET = 32, + M0_DIX_FID_DIX_CONTAINER_MASK = (1ULL << M0_DIX_FID_DEVICE_ID_OFFSET) + - 1, +}; + +// md5 is used here, a more robust way to convert index name to fid is +// needed to avoid collision. +void MotrStore::index_name_to_motr_fid(string iname, struct m0_uint128 *id) +{ + unsigned char md5[16]; // 128/8 = 16 + MD5 hash; + + // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes + hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); + hash.Update((const unsigned char *)iname.c_str(), iname.length()); + hash.Final(md5); + + memcpy(&id->u_hi, md5, 8); + memcpy(&id->u_lo, md5 + 8, 8); + ldout(cctx, 20) << "id = 0x" << std::hex << id->u_hi << ":0x" << std::hex << id->u_lo << dendl; + + struct m0_fid *fid = (struct m0_fid*)id; + m0_fid_tset(fid, m0_dix_fid_type.ft_id, + fid->f_container & M0_DIX_FID_DIX_CONTAINER_MASK, fid->f_key); + ldout(cctx, 20) << "converted id = 0x" << std::hex << id->u_hi << ":0x" << std::hex << id->u_lo << dendl; +} + +int MotrStore::do_idx_op_by_name(string idx_name, enum m0_idx_opcode opcode, + string key_str, bufferlist &bl, bool update) +{ + struct m0_idx idx; + vector<uint8_t> key(key_str.begin(), key_str.end()); + vector<uint8_t> val; + struct m0_uint128 idx_id; + + index_name_to_motr_fid(idx_name, &idx_id); + int rc = open_motr_idx(&idx_id, &idx); + if (rc != 0) { + ldout(cctx, 0) << "ERROR: failed to open index: " << rc << dendl; + goto out; + } + + if (opcode == M0_IC_PUT) + val.assign(bl.c_str(), bl.c_str() + bl.length()); + + ldout(cctx, 20) <<__func__<< ": do_idx_op_by_name(): op=" + << (opcode == M0_IC_PUT ? "PUT" : "GET") + << " idx=" << idx_name << " key=" << key_str << dendl; + rc = do_idx_op(&idx, opcode, key, val, update); + if (rc == 0 && opcode == M0_IC_GET) + // Append the returned value (blob) to the bufferlist. + bl.append(reinterpret_cast<char*>(val.data()), val.size()); + +out: + m0_idx_fini(&idx); + return rc; +} + +int MotrStore::create_motr_idx_by_name(string iname) +{ + struct m0_idx idx = {}; + struct m0_uint128 id; + + index_name_to_motr_fid(iname, &id); + m0_idx_init(&idx, &container.co_realm, &id); + + // create index or make sure it's created + struct m0_op *op = nullptr; + int rc = m0_entity_create(nullptr, &idx.in_entity, &op); + if (rc != 0) { + ldout(cctx, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl; + goto out; + } + + m0_op_launch(&op, 1); + rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: + m0_rc(op); + m0_op_fini(op); + m0_op_free(op); + + if (rc != 0 && rc != -EEXIST) + ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl; +out: + m0_idx_fini(&idx); + return rc; +} + +// If a global index is checked (if it has been create) every time +// before they're queried (put/get), which takes 2 Motr operations to +// complete the query. As the global indices' name and FID are known +// already when MotrStore is created, we move the check and creation +// in newMotrStore(). +// Similar method is used for per bucket/user index. For example, +// bucket instance index is created when creating the bucket. +int MotrStore::check_n_create_global_indices() +{ + int rc = 0; + + for (const auto& iname : motr_global_indices) { + rc = create_motr_idx_by_name(iname); + if (rc < 0 && rc != -EEXIST) + break; + rc = 0; + } + + return rc; +} + +std::string MotrStore::get_cluster_id(const DoutPrefixProvider* dpp, optional_yield y) +{ + char id[M0_FID_STR_LEN]; + struct m0_confc *confc = m0_reqh2confc(&instance->m0c_reqh); + + m0_fid_print(id, ARRAY_SIZE(id), &confc->cc_root->co_id); + return std::string(id); +} + +int MotrStore::init_metadata_cache(const DoutPrefixProvider *dpp, + CephContext *cct) +{ + this->obj_meta_cache = new MotrMetaCache(dpp, cct); + this->get_obj_meta_cache()->set_enabled(true); + + this->user_cache = new MotrMetaCache(dpp, cct); + this->get_user_cache()->set_enabled(true); + + this->bucket_inst_cache = new MotrMetaCache(dpp, cct); + this->get_bucket_inst_cache()->set_enabled(true); + + return 0; +} + + int MotrLuaManager::get_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, std::string& script) + { + return -ENOENT; + } + + int MotrLuaManager::put_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, const std::string& script) + { + return -ENOENT; + } + + int MotrLuaManager::del_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key) + { + return -ENOENT; + } + + int MotrLuaManager::add_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name) + { + return -ENOENT; + } + + int MotrLuaManager::remove_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name) + { + return -ENOENT; + } + + int MotrLuaManager::list_packages(const DoutPrefixProvider* dpp, optional_yield y, rgw::lua::packages_t& packages) + { + return -ENOENT; + } +} // namespace rgw::sal + +extern "C" { + +void *newMotrStore(CephContext *cct) +{ + int rc = -1; + rgw::sal::MotrStore *store = new rgw::sal::MotrStore(cct); + + if (store) { + store->conf.mc_is_oostore = true; + // XXX: these params should be taken from config settings and + // cct somehow? + store->instance = nullptr; + const auto& proc_ep = g_conf().get_val<std::string>("motr_my_endpoint"); + const auto& ha_ep = g_conf().get_val<std::string>("motr_ha_endpoint"); + const auto& proc_fid = g_conf().get_val<std::string>("motr_my_fid"); + const auto& profile = g_conf().get_val<std::string>("motr_profile_fid"); + const auto& admin_proc_ep = g_conf().get_val<std::string>("motr_admin_endpoint"); + const auto& admin_proc_fid = g_conf().get_val<std::string>("motr_admin_fid"); + const int init_flags = cct->get_init_flags(); + ldout(cct, 0) << "INFO: motr my endpoint: " << proc_ep << dendl; + ldout(cct, 0) << "INFO: motr ha endpoint: " << ha_ep << dendl; + ldout(cct, 0) << "INFO: motr my fid: " << proc_fid << dendl; + ldout(cct, 0) << "INFO: motr profile fid: " << profile << dendl; + store->conf.mc_local_addr = proc_ep.c_str(); + store->conf.mc_process_fid = proc_fid.c_str(); + + ldout(cct, 0) << "INFO: init flags: " << init_flags << dendl; + ldout(cct, 0) << "INFO: motr admin endpoint: " << admin_proc_ep << dendl; + ldout(cct, 0) << "INFO: motr admin fid: " << admin_proc_fid << dendl; + + // HACK this is so that radosge-admin uses a different client + if (init_flags == 0) { + store->conf.mc_process_fid = admin_proc_fid.c_str(); + store->conf.mc_local_addr = admin_proc_ep.c_str(); + } else { + store->conf.mc_process_fid = proc_fid.c_str(); + store->conf.mc_local_addr = proc_ep.c_str(); + } + store->conf.mc_ha_addr = ha_ep.c_str(); + store->conf.mc_profile = profile.c_str(); + + ldout(cct, 50) << "INFO: motr profile fid: " << store->conf.mc_profile << dendl; + ldout(cct, 50) << "INFO: ha addr: " << store->conf.mc_ha_addr << dendl; + ldout(cct, 50) << "INFO: process fid: " << store->conf.mc_process_fid << dendl; + ldout(cct, 50) << "INFO: motr endpoint: " << store->conf.mc_local_addr << dendl; + + store->conf.mc_tm_recv_queue_min_len = 64; + store->conf.mc_max_rpc_msg_size = 524288; + store->conf.mc_idx_service_id = M0_IDX_DIX; + store->dix_conf.kc_create_meta = false; + store->conf.mc_idx_service_conf = &store->dix_conf; + + if (!g_conf().get_val<bool>("motr_tracing_enabled")) { + m0_trace_level_allow(M0_WARN); // allow errors and warnings in syslog anyway + m0_trace_set_mmapped_buffer(false); + } + + store->instance = nullptr; + rc = m0_client_init(&store->instance, &store->conf, true); + if (rc != 0) { + ldout(cct, 0) << "ERROR: m0_client_init() failed: " << rc << dendl; + goto out; + } + + m0_container_init(&store->container, nullptr, &M0_UBER_REALM, store->instance); + rc = store->container.co_realm.re_entity.en_sm.sm_rc; + if (rc != 0) { + ldout(cct, 0) << "ERROR: m0_container_init() failed: " << rc << dendl; + goto out; + } + + rc = m0_ufid_init(store->instance, &ufid_gr); + if (rc != 0) { + ldout(cct, 0) << "ERROR: m0_ufid_init() failed: " << rc << dendl; + goto out; + } + + // Create global indices if not yet. + rc = store->check_n_create_global_indices(); + if (rc != 0) { + ldout(cct, 0) << "ERROR: check_n_create_global_indices() failed: " << rc << dendl; + goto out; + } + + } + +out: + if (rc != 0) { + delete store; + return nullptr; + } + return store; +} + +} |