// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=2 sw=2 expandtab ft=cpp /* * Ceph - scalable distributed file system * * SAL implementation for the CORTX Motr backend * * Copyright (C) 2021 Seagate Technology LLC and/or its Affiliates * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include #include extern "C" { #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wextern-c-compat" #pragma clang diagnostic ignored "-Wdeprecated-anon-enum-enum-conversion" #include "motr/config.h" #include "lib/types.h" #include "lib/trace.h" // m0_trace_set_mmapped_buffer #include "motr/layout.h" // M0_OBJ_LAYOUT_ID #include "helpers/helpers.h" // m0_ufid_next #pragma clang diagnostic pop } #include "common/Clock.h" #include "common/errno.h" #include "rgw_compression.h" #include "rgw_sal.h" #include "rgw_sal_motr.h" #include "rgw_bucket.h" #define dout_subsys ceph_subsys_rgw using std::string; using std::map; using std::vector; using std::set; using std::list; static string mp_ns = RGW_OBJ_NS_MULTIPART; static struct m0_ufid_generator ufid_gr; namespace rgw::sal { using ::ceph::encode; using ::ceph::decode; static std::string motr_global_indices[] = { RGW_MOTR_USERS_IDX_NAME, RGW_MOTR_BUCKET_INST_IDX_NAME, RGW_MOTR_BUCKET_HD_IDX_NAME, RGW_IAM_MOTR_ACCESS_KEY, RGW_IAM_MOTR_EMAIL_KEY }; void MotrMetaCache::invalid(const DoutPrefixProvider *dpp, const string& name) { cache.invalidate_remove(dpp, name); } int MotrMetaCache::put(const DoutPrefixProvider *dpp, const string& name, const bufferlist& data) { ldpp_dout(dpp, 0) << "Put into cache: name = " << name << dendl; ObjectCacheInfo info; info.status = 0; info.data = data; info.flags = CACHE_FLAG_DATA; info.meta.mtime = ceph::real_clock::now(); info.meta.size = data.length(); cache.put(dpp, name, info, NULL); // Inform other rgw instances. Do nothing if it gets some error? int rc = distribute_cache(dpp, name, info, UPDATE_OBJ); if (rc < 0) ldpp_dout(dpp, 0) << "ERROR: failed to distribute cache for " << name << dendl; return 0; } int MotrMetaCache::get(const DoutPrefixProvider *dpp, const string& name, bufferlist& data) { ObjectCacheInfo info; uint32_t flags = CACHE_FLAG_DATA; int rc = cache.get(dpp, name, info, flags, NULL); if (rc == 0) { if (info.status < 0) return info.status; bufferlist& bl = info.data; bufferlist::iterator it = bl.begin(); data.clear(); it.copy_all(data); ldpp_dout(dpp, 0) << "Cache hit: name = " << name << dendl; return 0; } ldpp_dout(dpp, 0) << "Cache miss: name = " << name << ", rc = "<< rc << dendl; if(rc == -ENODATA) return -ENOENT; return rc; } int MotrMetaCache::remove(const DoutPrefixProvider *dpp, const string& name) { cache.invalidate_remove(dpp, name); ObjectCacheInfo info; int rc = distribute_cache(dpp, name, info, INVALIDATE_OBJ); if (rc < 0) { ldpp_dout(dpp, 0) << "ERROR: " <<__func__<< "(): failed to distribute cache: rc =" << rc << dendl; } ldpp_dout(dpp, 0) << "Remove from cache: name = " << name << dendl; return 0; } int MotrMetaCache::distribute_cache(const DoutPrefixProvider *dpp, const string& normal_name, ObjectCacheInfo& obj_info, int op) { return 0; } int MotrMetaCache::watch_cb(const DoutPrefixProvider *dpp, uint64_t notify_id, uint64_t cookie, uint64_t notifier_id, bufferlist& bl) { return 0; } void MotrMetaCache::set_enabled(bool status) { cache.set_enabled(status); } // TODO: properly handle the number of key/value pairs to get in // one query. Now the POC simply tries to retrieve all `max` number of pairs // with starting key `marker`. int MotrUser::list_buckets(const DoutPrefixProvider *dpp, const string& marker, const string& end_marker, uint64_t max, bool need_stats, BucketList &buckets, optional_yield y) { int rc; vector keys(max); vector vals(max); bool is_truncated = false; ldpp_dout(dpp, 20) <<__func__<< ": list_user_buckets: marker=" << marker << " end_marker=" << end_marker << " max=" << max << dendl; // Retrieve all `max` number of pairs. buckets.clear(); string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str(); keys[0] = marker; rc = store->next_query_by_name(user_info_iname, keys, vals); if (rc < 0) { ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; return rc; } // Process the returned pairs to add into BucketList. uint64_t bcount = 0; for (const auto& bl: vals) { if (bl.length() == 0) break; RGWBucketEnt ent; auto iter = bl.cbegin(); ent.decode(iter); std::time_t ctime = ceph::real_clock::to_time_t(ent.creation_time); ldpp_dout(dpp, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl; if (!end_marker.empty() && end_marker.compare(ent.bucket.marker) <= 0) break; buckets.add(std::make_unique(this->store, ent, this)); bcount++; } if (bcount == max) is_truncated = true; buckets.set_truncated(is_truncated); return 0; } int MotrUser::create_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b, const std::string& zonegroup_id, rgw_placement_rule& placement_rule, std::string& swift_ver_location, const RGWQuotaInfo* pquota_info, const RGWAccessControlPolicy& policy, Attrs& attrs, RGWBucketInfo& info, obj_version& ep_objv, bool exclusive, bool obj_lock_enabled, bool* existed, req_info& req_info, std::unique_ptr* bucket_out, optional_yield y) { int ret; std::unique_ptr bucket; // Look up the bucket. Create it if it doesn't exist. ret = this->store->get_bucket(dpp, this, b, &bucket, y); if (ret < 0 && ret != -ENOENT) return ret; if (ret != -ENOENT) { *existed = true; // if (swift_ver_location.empty()) { // swift_ver_location = bucket->get_info().swift_ver_location; // } // placement_rule.inherit_from(bucket->get_info().placement_rule); // TODO: ACL policy // // don't allow changes to the acl policy //RGWAccessControlPolicy old_policy(ctx()); //int rc = rgw_op_get_bucket_policy_from_attr( // dpp, this, u, bucket->get_attrs(), &old_policy, y); //if (rc >= 0 && old_policy != policy) { // bucket_out->swap(bucket); // return -EEXIST; //} } else { placement_rule.name = "default"; placement_rule.storage_class = "STANDARD"; bucket = std::make_unique(store, b, this); bucket->set_attrs(attrs); *existed = false; } if (!*existed){ // TODO: how to handle zone and multi-site. info.placement_rule = placement_rule; info.bucket = b; info.owner = this->get_info().user_id; info.zonegroup = zonegroup_id; if (obj_lock_enabled) info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED; bucket->set_version(ep_objv); bucket->get_info() = info; // Create a new bucket: (1) Add a key/value pair in the // bucket instance index. (2) Create a new bucket index. MotrBucket* mbucket = static_cast(bucket.get()); ret = mbucket->put_info(dpp, y, ceph::real_time())? : mbucket->create_bucket_index() ? : mbucket->create_multipart_indices(); if (ret < 0) ldpp_dout(dpp, 0) << "ERROR: failed to create bucket indices! " << ret << dendl; // Insert the bucket entry into the user info index. ret = mbucket->link_user(dpp, this, y); if (ret < 0) ldpp_dout(dpp, 0) << "ERROR: failed to add bucket entry! " << ret << dendl; } else { return -EEXIST; // bucket->set_version(ep_objv); // bucket->get_info() = info; } bucket_out->swap(bucket); return ret; } int MotrUser::read_attrs(const DoutPrefixProvider* dpp, optional_yield y) { return 0; } int MotrUser::read_stats(const DoutPrefixProvider *dpp, optional_yield y, RGWStorageStats* stats, ceph::real_time *last_stats_sync, ceph::real_time *last_stats_update) { return 0; } /* stats - Not for first pass */ int MotrUser::read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB *cb) { return 0; } int MotrUser::complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y) { return 0; } int MotrUser::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, bool *is_truncated, RGWUsageIter& usage_iter, map& usage) { return 0; } int MotrUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) { return 0; } int MotrUser::load_user_from_idx(const DoutPrefixProvider *dpp, MotrStore *store, RGWUserInfo& info, map *attrs, RGWObjVersionTracker *objv_tr) { struct MotrUserInfo muinfo; bufferlist bl; ldpp_dout(dpp, 20) << "info.user_id.id = " << info.user_id.id << dendl; if (store->get_user_cache()->get(dpp, info.user_id.id, bl)) { // Cache misses int rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME, M0_IC_GET, info.user_id.to_str(), bl); ldpp_dout(dpp, 20) << "do_idx_op_by_name() = " << rc << dendl; if (rc < 0) return rc; // Put into cache. store->get_user_cache()->put(dpp, info.user_id.id, bl); } bufferlist& blr = bl; auto iter = blr.cbegin(); muinfo.decode(iter); info = muinfo.info; if (attrs) *attrs = muinfo.attrs; if (objv_tr) { objv_tr->read_version = muinfo.user_version; objv_tracker.read_version = objv_tr->read_version; } if (!info.access_keys.empty()) { for(auto key : info.access_keys) { access_key_tracker.insert(key.first); } } return 0; } int MotrUser::load_user(const DoutPrefixProvider *dpp, optional_yield y) { ldpp_dout(dpp, 20) << "load user: user id = " << info.user_id.to_str() << dendl; return load_user_from_idx(dpp, store, info, &attrs, &objv_tracker); } int MotrUser::create_user_info_idx() { string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str(); return store->create_motr_idx_by_name(user_info_iname); } int MotrUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) { for (auto& it : new_attrs) attrs[it.first] = it.second; return store_user(dpp, y, false); } int MotrUser::store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info) { bufferlist bl; struct MotrUserInfo muinfo; RGWUserInfo orig_info; RGWObjVersionTracker objv_tr = {}; obj_version& obj_ver = objv_tr.read_version; ldpp_dout(dpp, 20) << "Store_user(): User = " << info.user_id.id << dendl; orig_info.user_id = info.user_id; // XXX: we open and close motr idx 2 times in this method: // 1) on load_user_from_idx() here and 2) on do_idx_op_by_name(PUT) below. // Maybe this can be optimised later somewhow. int rc = load_user_from_idx(dpp, store, orig_info, nullptr, &objv_tr); ldpp_dout(dpp, 10) << "Get user: rc = " << rc << dendl; // Check if the user already exists if (rc == 0 && obj_ver.ver > 0) { if (old_info) *old_info = orig_info; if (obj_ver.ver != objv_tracker.read_version.ver) { rc = -ECANCELED; ldpp_dout(dpp, 0) << "ERROR: User Read version mismatch" << dendl; goto out; } if (exclusive) return rc; obj_ver.ver++; } else { obj_ver.ver = 1; obj_ver.tag = "UserTAG"; } // Insert the user to user info index. muinfo.info = info; muinfo.attrs = attrs; muinfo.user_version = obj_ver; muinfo.encode(bl); rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME, M0_IC_PUT, info.user_id.to_str(), bl); ldpp_dout(dpp, 10) << "Store user to motr index: rc = " << rc << dendl; if (rc == 0) { objv_tracker.read_version = obj_ver; objv_tracker.write_version = obj_ver; } // Store access key in access key index if (!info.access_keys.empty()) { std::string access_key; std::string secret_key; std::map::const_iterator iter = info.access_keys.begin(); const RGWAccessKey& k = iter->second; access_key = k.id; secret_key = k.key; MotrAccessKey MGWUserKeys(access_key, secret_key, info.user_id.to_str()); store->store_access_key(dpp, y, MGWUserKeys); access_key_tracker.insert(access_key); } // Check if any key need to be deleted if (access_key_tracker.size() != info.access_keys.size()) { std::string key_for_deletion; for (auto key : access_key_tracker) { if (!info.get_key(key)) { key_for_deletion = key; ldpp_dout(dpp, 0) << "Deleting access key: " << key_for_deletion << dendl; store->delete_access_key(dpp, y, key_for_deletion); if (rc < 0) { ldpp_dout(dpp, 0) << "Unable to delete access key" << rc << dendl; } } } if(rc >= 0){ access_key_tracker.erase(key_for_deletion); } } if (!info.user_email.empty()) { MotrEmailInfo MGWEmailInfo(info.user_id.to_str(), info.user_email); store->store_email_info(dpp, y, MGWEmailInfo); } // Create user info index to store all buckets that are belong // to this bucket. rc = create_user_info_idx(); if (rc < 0 && rc != -EEXIST) { ldpp_dout(dpp, 0) << "Failed to create user info index: rc = " << rc << dendl; goto out; } // Put the user info into cache. rc = store->get_user_cache()->put(dpp, info.user_id.id, bl); out: return rc; } int MotrUser::remove_user(const DoutPrefixProvider* dpp, optional_yield y) { // Remove user info from cache // Delete access keys for user // Delete user info // Delete user from user index // Delete email for user - TODO bufferlist bl; int rc; // Remove the user info from cache. store->get_user_cache()->remove(dpp, info.user_id.id); // Delete all access key of user if (!info.access_keys.empty()) { for(auto acc_key = info.access_keys.begin(); acc_key != info.access_keys.end(); acc_key++) { auto access_key = acc_key->first; rc = store->delete_access_key(dpp, y, access_key); // TODO // Check error code for access_key does not exist // Continue to next step only if delete failed because key doesn't exists if (rc < 0){ ldpp_dout(dpp, 0) << "Unable to delete access key" << rc << dendl; } } } //Delete email id if (!info.user_email.empty()) { rc = store->do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, M0_IC_DEL, info.user_email, bl); if (rc < 0 && rc != -ENOENT) { ldpp_dout(dpp, 0) << "Unable to delete email id " << rc << dendl; } } // Delete user info index string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str(); store->delete_motr_idx_by_name(user_info_iname); ldpp_dout(dpp, 10) << "Deleted user info index - " << user_info_iname << dendl; // Delete user from user index rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME, M0_IC_DEL, info.user_id.to_str(), bl); if (rc < 0){ ldpp_dout(dpp, 0) << "Unable to delete user from user index " << rc << dendl; return rc; } // TODO // Delete email for user // rc = store->do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, // M0_IC_DEL, info.user_email, bl); // if (rc < 0){ // ldpp_dout(dpp, 0) << "Unable to delete email for user" << rc << dendl; // return rc; // } return 0; } int MotrUser::verify_mfa(const std::string& mfa_str, bool* verified, const DoutPrefixProvider *dpp, optional_yield y) { *verified = false; return 0; } int MotrBucket::remove_bucket(const DoutPrefixProvider *dpp, bool delete_children, bool forward_to_master, req_info* req_info, optional_yield y) { int ret; ldpp_dout(dpp, 20) << "remove_bucket Entry=" << info.bucket.name << dendl; // Refresh info ret = load_bucket(dpp, y); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_bucket load_bucket failed rc=" << ret << dendl; return ret; } ListParams params; params.list_versions = true; params.allow_unordered = true; ListResults results; // 1. Check if Bucket has objects. // If bucket contains objects and delete_children is true, delete all objects. // Else throw error that bucket is not empty. do { results.objs.clear(); // Check if bucket has objects. ret = list(dpp, params, 1000, results, y); if (ret < 0) { return ret; } // If result contains entries, bucket is not empty. if (!results.objs.empty() && !delete_children) { ldpp_dout(dpp, 0) << "ERROR: could not remove non-empty bucket " << info.bucket.name << dendl; return -ENOTEMPTY; } for (const auto& obj : results.objs) { rgw_obj_key key(obj.key); /* xxx dang */ ret = rgw_remove_object(dpp, store, this, key); if (ret < 0 && ret != -ENOENT) { ldpp_dout(dpp, 0) << "ERROR: remove_bucket rgw_remove_object failed rc=" << ret << dendl; return ret; } } } while(results.is_truncated); // 2. Abort Mp uploads on the bucket. ret = abort_multiparts(dpp, store->ctx()); if (ret < 0) { return ret; } // 3. Remove mp index?? string bucket_multipart_iname = "motr.rgw.bucket." + info.bucket.name + ".multiparts"; ret = store->delete_motr_idx_by_name(bucket_multipart_iname); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove multipart index rc=" << ret << dendl; return ret; } // 4. Sync user stats. ret = this->sync_user_stats(dpp, y); if (ret < 0) { ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl; } // 5. Remove the bucket from user info index. (unlink user) ret = this->unlink_user(dpp, owner, y); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_bucket unlink_user failed rc=" << ret << dendl; return ret; } // 6. Remove bucket index. string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name; ret = store->delete_motr_idx_by_name(bucket_index_iname); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_bucket unlink_user failed rc=" << ret << dendl; return ret; } // 7. Remove bucket instance info. bufferlist bl; ret = store->get_bucket_inst_cache()->remove(dpp, info.bucket.name); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove bucket instance from cache rc=" << ret << dendl; return ret; } ret = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME, M0_IC_DEL, info.bucket.name, bl); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove bucket instance rc=" << ret << dendl; return ret; } // TODO : // 8. Remove Notifications // if bucket has notification definitions associated with it // they should be removed (note that any pending notifications on the bucket are still going to be sent) // 9. Forward request to master. if (forward_to_master) { bufferlist in_data; ret = store->forward_request_to_master(dpp, owner, &bucket_version, in_data, nullptr, *req_info, y); if (ret < 0) { if (ret == -ENOENT) { /* adjust error, we want to return with NoSuchBucket and not * NoSuchKey */ ret = -ERR_NO_SUCH_BUCKET; } ldpp_dout(dpp, 0) << "ERROR: Forward to master failed. ret=" << ret << dendl; return ret; } } ldpp_dout(dpp, 20) << "remove_bucket Exit=" << info.bucket.name << dendl; return ret; } int MotrBucket::remove_bucket_bypass_gc(int concurrent_max, bool keep_index_consistent, optional_yield y, const DoutPrefixProvider *dpp) { return 0; } int MotrBucket::put_info(const DoutPrefixProvider *dpp, bool exclusive, ceph::real_time _mtime) { bufferlist bl; struct MotrBucketInfo mbinfo; ldpp_dout(dpp, 20) << "put_info(): bucket_id=" << info.bucket.bucket_id << dendl; mbinfo.info = info; mbinfo.bucket_attrs = attrs; mbinfo.mtime = _mtime; mbinfo.bucket_version = bucket_version; mbinfo.encode(bl); // Insert bucket instance using bucket's marker (string). int rc = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME, M0_IC_PUT, info.bucket.name, bl, !exclusive); if (rc == 0) store->get_bucket_inst_cache()->put(dpp, info.bucket.name, bl); return rc; } int MotrBucket::load_bucket(const DoutPrefixProvider *dpp, optional_yield y, bool get_stats) { // Get bucket instance using bucket's name (string). or bucket id? bufferlist bl; if (store->get_bucket_inst_cache()->get(dpp, info.bucket.name, bl)) { // Cache misses. ldpp_dout(dpp, 20) << "load_bucket(): name=" << info.bucket.name << dendl; int rc = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME, M0_IC_GET, info.bucket.name, bl); ldpp_dout(dpp, 20) << "load_bucket(): rc=" << rc << dendl; if (rc < 0) return rc; store->get_bucket_inst_cache()->put(dpp, info.bucket.name, bl); } struct MotrBucketInfo mbinfo; bufferlist& blr = bl; auto iter =blr.cbegin(); mbinfo.decode(iter); //Decode into MotrBucketInfo. info = mbinfo.info; ldpp_dout(dpp, 20) << "load_bucket(): bucket_id=" << info.bucket.bucket_id << dendl; rgw_placement_rule placement_rule; placement_rule.name = "default"; placement_rule.storage_class = "STANDARD"; info.placement_rule = placement_rule; attrs = mbinfo.bucket_attrs; mtime = mbinfo.mtime; bucket_version = mbinfo.bucket_version; return 0; } int MotrBucket::link_user(const DoutPrefixProvider* dpp, User* new_user, optional_yield y) { bufferlist bl; RGWBucketEnt new_bucket; ceph::real_time creation_time = get_creation_time(); // RGWBucketEnt or cls_user_bucket_entry is the structure that is stored. new_bucket.bucket = info.bucket; new_bucket.size = 0; if (real_clock::is_zero(creation_time)) creation_time = ceph::real_clock::now(); new_bucket.creation_time = creation_time; new_bucket.encode(bl); std::time_t ctime = ceph::real_clock::to_time_t(new_bucket.creation_time); ldpp_dout(dpp, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl; // Insert the user into the user info index. string user_info_idx_name = "motr.rgw.user.info." + new_user->get_info().user_id.to_str(); return store->do_idx_op_by_name(user_info_idx_name, M0_IC_PUT, info.bucket.name, bl); } int MotrBucket::unlink_user(const DoutPrefixProvider* dpp, User* new_user, optional_yield y) { // Remove the user into the user info index. bufferlist bl; string user_info_idx_name = "motr.rgw.user.info." + new_user->get_info().user_id.to_str(); return store->do_idx_op_by_name(user_info_idx_name, M0_IC_DEL, info.bucket.name, bl); } /* stats - Not for first pass */ int MotrBucket::read_stats(const DoutPrefixProvider *dpp, const bucket_index_layout_generation& idx_layout, int shard_id, std::string *bucket_ver, std::string *master_ver, std::map& stats, std::string *max_marker, bool *syncstopped) { return 0; } int MotrBucket::create_bucket_index() { string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name; return store->create_motr_idx_by_name(bucket_index_iname); } int MotrBucket::create_multipart_indices() { int rc; // Bucket multipart index stores in-progress multipart uploads. // Key is the object name + upload_id, value is a rgw_bucket_dir_entry. // An entry is inserted when a multipart upload is initialised ( // MotrMultipartUpload::init()) and will be removed when the upload // is completed (MotrMultipartUpload::complete()). // MotrBucket::list_multiparts() will scan this index to return all // in-progress multipart uploads in the bucket. string bucket_multipart_iname = "motr.rgw.bucket." + info.bucket.name + ".multiparts"; rc = store->create_motr_idx_by_name(bucket_multipart_iname); if (rc < 0) { ldout(store->cctx, 0) << "Failed to create bucket multipart index " << bucket_multipart_iname << dendl; return rc; } return 0; } int MotrBucket::read_stats_async(const DoutPrefixProvider *dpp, const bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB *ctx) { return 0; } int MotrBucket::sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) { return 0; } int MotrBucket::update_container_stats(const DoutPrefixProvider *dpp) { return 0; } int MotrBucket::check_bucket_shards(const DoutPrefixProvider *dpp) { return 0; } int MotrBucket::chown(const DoutPrefixProvider *dpp, User& new_user, optional_yield y) { // TODO: update bucket with new owner return 0; } /* Make sure to call load_bucket() if you need it first */ bool MotrBucket::is_owner(User* user) { return (info.owner.compare(user->get_id()) == 0); } int MotrBucket::check_empty(const DoutPrefixProvider *dpp, optional_yield y) { /* XXX: Check if bucket contains any objects */ return 0; } int MotrBucket::check_quota(const DoutPrefixProvider *dpp, RGWQuota& quota, uint64_t obj_size, optional_yield y, bool check_size_only) { /* Not Handled in the first pass as stats are also needed */ return 0; } int MotrBucket::merge_and_store_attrs(const DoutPrefixProvider *dpp, Attrs& new_attrs, optional_yield y) { for (auto& it : new_attrs) attrs[it.first] = it.second; return put_info(dpp, y, ceph::real_time()); } int MotrBucket::try_refresh_info(const DoutPrefixProvider *dpp, ceph::real_time *pmtime) { return 0; } /* XXX: usage and stats not supported in the first pass */ int MotrBucket::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, bool *is_truncated, RGWUsageIter& usage_iter, map& usage) { return 0; } int MotrBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) { return 0; } int MotrBucket::remove_objs_from_index(const DoutPrefixProvider *dpp, std::list& objs_to_unlink) { /* XXX: CHECK: Unlike RadosStore, there is no seperate bucket index table. * Delete all the object in the list from the object table of this * bucket */ return 0; } int MotrBucket::check_index(const DoutPrefixProvider *dpp, std::map& existing_stats, std::map& calculated_stats) { /* XXX: stats not supported yet */ return 0; } int MotrBucket::rebuild_index(const DoutPrefixProvider *dpp) { /* there is no index table in dbstore. Not applicable */ return 0; } int MotrBucket::set_tag_timeout(const DoutPrefixProvider *dpp, uint64_t timeout) { /* XXX: CHECK: set tag timeout for all the bucket objects? */ return 0; } int MotrBucket::purge_instance(const DoutPrefixProvider *dpp) { /* XXX: CHECK: for dbstore only single instance supported. * Remove all the objects for that instance? Anything extra needed? */ return 0; } int MotrBucket::set_acl(const DoutPrefixProvider *dpp, RGWAccessControlPolicy &acl, optional_yield y) { int ret = 0; bufferlist aclbl; acls = acl; acl.encode(aclbl); Attrs attrs = get_attrs(); attrs[RGW_ATTR_ACL] = aclbl; // TODO: update bucket entry with the new attrs return ret; } std::unique_ptr MotrBucket::get_object(const rgw_obj_key& k) { return std::make_unique(this->store, k, this); } int MotrBucket::list(const DoutPrefixProvider *dpp, ListParams& params, int max, ListResults& results, optional_yield y) { int rc; vector keys(max); vector vals(max); ldpp_dout(dpp, 20) << "bucket=" << info.bucket.name << " prefix=" << params.prefix << " marker=" << params.marker << " max=" << max << dendl; // Retrieve all `max` number of pairs. string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name; keys[0] = params.marker.empty() ? params.prefix : params.marker.get_oid(); rc = store->next_query_by_name(bucket_index_iname, keys, vals, params.prefix, params.delim); if (rc < 0) { ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; return rc; } // Process the returned pairs to add into ListResults. int i = 0; for (; i < rc; ++i) { if (vals[i].length() == 0) { results.common_prefixes[keys[i]] = true; } else { rgw_bucket_dir_entry ent; auto iter = vals[i].cbegin(); ent.decode(iter); if (params.list_versions || ent.is_visible()) results.objs.emplace_back(std::move(ent)); } } if (i == max) { results.is_truncated = true; results.next_marker = keys[max - 1] + " "; } else { results.is_truncated = false; } return 0; } int MotrBucket::list_multiparts(const DoutPrefixProvider *dpp, const string& prefix, string& marker, const string& delim, const int& max_uploads, vector>& uploads, map *common_prefixes, bool *is_truncated) { int rc; vector key_vec(max_uploads); vector val_vec(max_uploads); string bucket_multipart_iname = "motr.rgw.bucket." + this->get_name() + ".multiparts"; key_vec[0].clear(); key_vec[0].assign(marker.begin(), marker.end()); rc = store->next_query_by_name(bucket_multipart_iname, key_vec, val_vec); if (rc < 0) { ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; return rc; } // Process the returned pairs to add into ListResults. // The POC can only support listing all objects or selecting // with prefix. int ocount = 0; rgw_obj_key last_obj_key; *is_truncated = false; for (const auto& bl: val_vec) { if (bl.length() == 0) break; rgw_bucket_dir_entry ent; auto iter = bl.cbegin(); ent.decode(iter); if (prefix.size() && (0 != ent.key.name.compare(0, prefix.size(), prefix))) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ": skippping \"" << ent.key << "\" because doesn't match prefix" << dendl; continue; } rgw_obj_key key(ent.key); uploads.push_back(this->get_multipart_upload(key.name)); last_obj_key = key; ocount++; if (ocount == max_uploads) { *is_truncated = true; break; } } marker = last_obj_key.name; // What is common prefix? We don't handle it for now. return 0; } int MotrBucket::abort_multiparts(const DoutPrefixProvider *dpp, CephContext *cct) { return 0; } void MotrStore::finalize(void) { // close connection with motr m0_client_fini(this->instance, true); } const std::string& MotrZoneGroup::get_endpoint() const { if (!group.endpoints.empty()) { return group.endpoints.front(); } else { // use zonegroup's master zone endpoints auto z = group.zones.find(group.master_zone); if (z != group.zones.end() && !z->second.endpoints.empty()) { return z->second.endpoints.front(); } } return empty; } bool MotrZoneGroup::placement_target_exists(std::string& target) const { return !!group.placement_targets.count(target); } void MotrZoneGroup::get_placement_target_names(std::set& names) const { for (const auto& target : group.placement_targets) { names.emplace(target.second.name); } } int MotrZoneGroup::get_placement_tier(const rgw_placement_rule& rule, std::unique_ptr* tier) { std::map::const_iterator titer; titer = group.placement_targets.find(rule.name); if (titer == group.placement_targets.end()) { return -ENOENT; } const auto& target_rule = titer->second; std::map::const_iterator ttier; ttier = target_rule.tier_targets.find(rule.storage_class); if (ttier == target_rule.tier_targets.end()) { // not found return -ENOENT; } PlacementTier* t; t = new MotrPlacementTier(store, ttier->second); if (!t) return -ENOMEM; tier->reset(t); return 0; } ZoneGroup& MotrZone::get_zonegroup() { return zonegroup; } const std::string& MotrZone::get_id() { return zone_params->get_id(); } const std::string& MotrZone::get_name() const { return zone_params->get_name(); } bool MotrZone::is_writeable() { return true; } bool MotrZone::get_redirect_endpoint(std::string* endpoint) { return false; } bool MotrZone::has_zonegroup_api(const std::string& api) const { return (zonegroup->api_name == api); } const std::string& MotrZone::get_current_period_id() { return current_period->get_id(); } std::unique_ptr MotrStore::get_lua_manager() { return std::make_unique(this); } int MotrObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **_state, optional_yield y, bool follow_olh) { // Get object's metadata (those stored in rgw_bucket_dir_entry). bufferlist bl; if (this->store->get_obj_meta_cache()->get(dpp, this->get_key().get_oid(), bl)) { // Cache misses. string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name(); int rc = this->store->do_idx_op_by_name(bucket_index_iname, M0_IC_GET, this->get_key().get_oid(), bl); if (rc < 0) { ldpp_dout(dpp, 0) << "Failed to get object's entry from bucket index. " << dendl; return rc; } // Put into cache. this->store->get_obj_meta_cache()->put(dpp, this->get_key().get_oid(), bl); } rgw_bucket_dir_entry ent; bufferlist& blr = bl; auto iter = blr.cbegin(); ent.decode(iter); // Set object's type. this->category = ent.meta.category; // Set object state. state.exists = true; state.size = ent.meta.size; state.accounted_size = ent.meta.size; state.mtime = ent.meta.mtime; state.has_attrs = true; bufferlist etag_bl; string& etag = ent.meta.etag; ldpp_dout(dpp, 20) <<__func__<< ": object's etag: " << ent.meta.etag << dendl; etag_bl.append(etag); state.attrset[RGW_ATTR_ETAG] = etag_bl; return 0; } MotrObject::~MotrObject() { this->close_mobj(); } // int MotrObject::read_attrs(const DoutPrefixProvider* dpp, Motr::Object::Read &read_op, optional_yield y, rgw_obj* target_obj) // { // read_op.params.attrs = &attrs; // read_op.params.target_obj = target_obj; // read_op.params.obj_size = &obj_size; // read_op.params.lastmod = &mtime; // // return read_op.prepare(dpp); // } int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) { // TODO: implement ldpp_dout(dpp, 20) <<__func__<< ": MotrObject::set_obj_attrs()" << dendl; return 0; } int MotrObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj) { if (this->category == RGWObjCategory::MultiMeta) return 0; string bname, key; if (target_obj) { bname = target_obj->bucket.name; key = target_obj->key.get_oid(); } else { bname = this->get_bucket()->get_name(); key = this->get_key().get_oid(); } ldpp_dout(dpp, 20) << "MotrObject::get_obj_attrs(): " << bname << "/" << key << dendl; // Get object's metadata (those stored in rgw_bucket_dir_entry). bufferlist bl; if (this->store->get_obj_meta_cache()->get(dpp, key, bl)) { // Cache misses. string bucket_index_iname = "motr.rgw.bucket.index." + bname; int rc = this->store->do_idx_op_by_name(bucket_index_iname, M0_IC_GET, key, bl); if (rc < 0) { ldpp_dout(dpp, 0) << "Failed to get object's entry from bucket index. " << dendl; return rc; } // Put into cache. this->store->get_obj_meta_cache()->put(dpp, key, bl); } rgw_bucket_dir_entry ent; bufferlist& blr = bl; auto iter = blr.cbegin(); ent.decode(iter); decode(attrs, iter); return 0; } int MotrObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) { rgw_obj target = get_obj(); int r = get_obj_attrs(y, dpp, &target); if (r < 0) { return r; } set_atomic(); attrs[attr_name] = attr_val; return set_obj_attrs(dpp, &attrs, nullptr, y); } int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) { rgw_obj target = get_obj(); Attrs rmattr; bufferlist bl; set_atomic(); rmattr[attr_name] = bl; return set_obj_attrs(dpp, nullptr, &rmattr, y); } bool MotrObject::is_expired() { return false; } // Taken from rgw_rados.cc void MotrObject::gen_rand_obj_instance_name() { enum {OBJ_INSTANCE_LEN = 32}; char buf[OBJ_INSTANCE_LEN + 1]; gen_rand_alphanumeric_no_underscore(store->ctx(), buf, OBJ_INSTANCE_LEN); state.obj.key.set_instance(buf); } int MotrObject::omap_get_vals(const DoutPrefixProvider *dpp, const std::string& marker, uint64_t count, std::map *m, bool* pmore, optional_yield y) { return 0; } int MotrObject::omap_get_all(const DoutPrefixProvider *dpp, std::map *m, optional_yield y) { return 0; } int MotrObject::omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid, const std::set& keys, Attrs* vals) { return 0; } int MotrObject::omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val, bool must_exist, optional_yield y) { return 0; } int MotrObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_yield y) { return 0; } std::unique_ptr MotrObject::get_serializer(const DoutPrefixProvider *dpp, const std::string& lock_name) { return std::make_unique(dpp, store, this, lock_name); } int MotrObject::transition(Bucket* bucket, const rgw_placement_rule& placement_rule, const real_time& mtime, uint64_t olh_epoch, const DoutPrefixProvider* dpp, optional_yield y) { return 0; } bool MotrObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) { /* XXX: support single default zone and zonegroup for now */ return true; } int MotrObject::dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) { return 0; } std::unique_ptr MotrObject::get_read_op() { return std::make_unique(this); } MotrObject::MotrReadOp::MotrReadOp(MotrObject *_source) : source(_source) { } int MotrObject::MotrReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp) { int rc; ldpp_dout(dpp, 20) <<__func__<< ": bucket=" << source->get_bucket()->get_name() << dendl; rgw_bucket_dir_entry ent; rc = source->get_bucket_dir_ent(dpp, ent); if (rc < 0) return rc; // Set source object's attrs. The attrs is key/value map and is used // in send_response_data() to set attributes, including etag. bufferlist etag_bl; string& etag = ent.meta.etag; ldpp_dout(dpp, 20) <<__func__<< ": object's etag: " << ent.meta.etag << dendl; etag_bl.append(etag.c_str(), etag.size()); source->get_attrs().emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl)); source->set_key(ent.key); source->set_obj_size(ent.meta.size); source->category = ent.meta.category; *params.lastmod = ent.meta.mtime; if (params.mod_ptr || params.unmod_ptr) { // Convert all times go GMT to make them compatible obj_time_weight src_weight; src_weight.init(*params.lastmod, params.mod_zone_id, params.mod_pg_ver); src_weight.high_precision = params.high_precision_time; obj_time_weight dest_weight; dest_weight.high_precision = params.high_precision_time; // Check if-modified-since condition if (params.mod_ptr && !params.if_nomatch) { dest_weight.init(*params.mod_ptr, params.mod_zone_id, params.mod_pg_ver); ldpp_dout(dpp, 10) << "If-Modified-Since: " << dest_weight << " & " << "Last-Modified: " << src_weight << dendl; if (!(dest_weight < src_weight)) { return -ERR_NOT_MODIFIED; } } // Check if-unmodified-since condition if (params.unmod_ptr && !params.if_match) { dest_weight.init(*params.unmod_ptr, params.mod_zone_id, params.mod_pg_ver); ldpp_dout(dpp, 10) << "If-UnModified-Since: " << dest_weight << " & " << "Last-Modified: " << src_weight << dendl; if (dest_weight < src_weight) { return -ERR_PRECONDITION_FAILED; } } } // Check if-match condition if (params.if_match) { string if_match_str = rgw_string_unquote(params.if_match); ldpp_dout(dpp, 10) << "ETag: " << etag << " & " << "If-Match: " << if_match_str << dendl; if (if_match_str.compare(etag) != 0) { return -ERR_PRECONDITION_FAILED; } } // Check if-none-match condition if (params.if_nomatch) { string if_nomatch_str = rgw_string_unquote(params.if_nomatch); ldpp_dout(dpp, 10) << "ETag: " << etag << " & " << "If-NoMatch: " << if_nomatch_str << dendl; if (if_nomatch_str.compare(etag) == 0) { return -ERR_NOT_MODIFIED; } } // Skip opening an empty object. if(source->get_obj_size() == 0) return 0; // Open the object here. if (source->category == RGWObjCategory::MultiMeta) { ldpp_dout(dpp, 20) <<__func__<< ": open obj parts..." << dendl; rc = source->get_part_objs(dpp, this->part_objs)? : source->open_part_objs(dpp, this->part_objs); return rc; } else { ldpp_dout(dpp, 20) <<__func__<< ": open object..." << dendl; return source->open_mobj(dpp); } } int MotrObject::MotrReadOp::read(int64_t off, int64_t end, bufferlist& bl, optional_yield y, const DoutPrefixProvider* dpp) { ldpp_dout(dpp, 20) << "MotrReadOp::read(): sync read." << dendl; return 0; } // RGWGetObj::execute() calls ReadOp::iterate() to read object from 'off' to 'end'. // The returned data is processed in 'cb' which is a chain of post-processing // filters such as decompression, de-encryption and sending back data to client // (RGWGetObj_CB::handle_dta which in turn calls RGWGetObj::get_data_cb() to // send data back.). // // POC implements a simple sync version of iterate() function in which it reads // a block of data each time and call 'cb' for post-processing. int MotrObject::MotrReadOp::iterate(const DoutPrefixProvider* dpp, int64_t off, int64_t end, RGWGetDataCB* cb, optional_yield y) { int rc; if (source->category == RGWObjCategory::MultiMeta) rc = source->read_multipart_obj(dpp, off, end, cb, part_objs); else rc = source->read_mobj(dpp, off, end, cb); return rc; } int MotrObject::MotrReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y) { //return 0; return -ENODATA; } std::unique_ptr MotrObject::get_delete_op() { return std::make_unique(this); } MotrObject::MotrDeleteOp::MotrDeleteOp(MotrObject *_source) : source(_source) { } // Implementation of DELETE OBJ also requires MotrObject::get_obj_state() // to retrieve and set object's state from object's metadata. // // TODO: // 1. The POC only remove the object's entry from bucket index and delete // corresponding Motr objects. It doesn't handle the DeleteOp::params. // Delete::delete_obj() in rgw_rados.cc shows how rados backend process the // params. // 2. Delete an object when its versioning is turned on. int MotrObject::MotrDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y) { ldpp_dout(dpp, 20) << "delete " << source->get_key().get_oid() << " from " << source->get_bucket()->get_name() << dendl; rgw_bucket_dir_entry ent; int rc = source->get_bucket_dir_ent(dpp, ent); if (rc < 0) { return rc; } //TODO: When integrating with background GC for object deletion, // we should consider adding object entry to GC before deleting the metadata. // Delete from the cache first. source->store->get_obj_meta_cache()->remove(dpp, source->get_key().get_oid()); // Delete the object's entry from the bucket index. bufferlist bl; string bucket_index_iname = "motr.rgw.bucket.index." + source->get_bucket()->get_name(); rc = source->store->do_idx_op_by_name(bucket_index_iname, M0_IC_DEL, source->get_key().get_oid(), bl); if (rc < 0) { ldpp_dout(dpp, 0) << "Failed to del object's entry from bucket index. " << dendl; return rc; } if (ent.meta.size == 0) { ldpp_dout(dpp, 0) << __func__ << ": Object size is 0, not deleting motr object." << dendl; return 0; } // Remove the motr objects. if (source->category == RGWObjCategory::MultiMeta) rc = source->delete_part_objs(dpp); else rc = source->delete_mobj(dpp); if (rc < 0) { ldpp_dout(dpp, 0) << "Failed to delete the object from Motr. " << dendl; return rc; } //result.delete_marker = parent_op.result.delete_marker; //result.version_id = parent_op.result.version_id; return 0; } int MotrObject::delete_object(const DoutPrefixProvider* dpp, optional_yield y, bool prevent_versioning) { MotrObject::MotrDeleteOp del_op(this); del_op.params.bucket_owner = bucket->get_info().owner; del_op.params.versioning_status = bucket->get_info().versioning_status(); return del_op.delete_obj(dpp, y); } int MotrObject::delete_obj_aio(const DoutPrefixProvider* dpp, RGWObjState* astate, Completions* aio, bool keep_index_consistent, optional_yield y) { /* XXX: Make it async */ return 0; } int MotrObject::copy_object(User* user, req_info* info, const rgw_zone_id& source_zone, rgw::sal::Object* dest_object, rgw::sal::Bucket* dest_bucket, rgw::sal::Bucket* src_bucket, const rgw_placement_rule& dest_placement, ceph::real_time* src_mtime, ceph::real_time* mtime, const ceph::real_time* mod_ptr, const ceph::real_time* unmod_ptr, bool high_precision_time, const char* if_match, const char* if_nomatch, AttrsMod attrs_mod, bool copy_if_newer, Attrs& attrs, RGWObjCategory category, uint64_t olh_epoch, boost::optional delete_at, std::string* version_id, std::string* tag, std::string* etag, void (*progress_cb)(off_t, void *), void* progress_data, const DoutPrefixProvider* dpp, optional_yield y) { return 0; } int MotrObject::swift_versioning_restore(bool& restored, const DoutPrefixProvider* dpp) { return 0; } int MotrObject::swift_versioning_copy(const DoutPrefixProvider* dpp, optional_yield y) { return 0; } MotrAtomicWriter::MotrAtomicWriter(const DoutPrefixProvider *dpp, optional_yield y, rgw::sal::Object* obj, MotrStore* _store, const rgw_user& _owner, const rgw_placement_rule *_ptail_placement_rule, uint64_t _olh_epoch, const std::string& _unique_tag) : StoreWriter(dpp, y), store(_store), owner(_owner), ptail_placement_rule(_ptail_placement_rule), olh_epoch(_olh_epoch), unique_tag(_unique_tag), obj(_store, obj->get_key(), obj->get_bucket()), old_obj(_store, obj->get_key(), obj->get_bucket()) {} static const unsigned MAX_BUFVEC_NR = 256; int MotrAtomicWriter::prepare(optional_yield y) { total_data_size = 0; if (obj.is_opened()) return 0; rgw_bucket_dir_entry ent; int rc = old_obj.get_bucket_dir_ent(dpp, ent); if (rc == 0) { ldpp_dout(dpp, 20) << __func__ << ": object exists." << dendl; } rc = m0_bufvec_empty_alloc(&buf, MAX_BUFVEC_NR) ?: m0_bufvec_alloc(&attr, MAX_BUFVEC_NR, 1) ?: m0_indexvec_alloc(&ext, MAX_BUFVEC_NR); if (rc != 0) this->cleanup(); return rc; } int MotrObject::create_mobj(const DoutPrefixProvider *dpp, uint64_t sz) { if (mobj != nullptr) { ldpp_dout(dpp, 0) <<__func__<< "ERROR: object is already opened" << dendl; return -EINVAL; } int rc = m0_ufid_next(&ufid_gr, 1, &meta.oid); if (rc != 0) { ldpp_dout(dpp, 0) <<__func__<< "ERROR: m0_ufid_next() failed: " << rc << dendl; return rc; } char fid_str[M0_FID_STR_LEN]; snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); ldpp_dout(dpp, 20) <<__func__<< ": sz=" << sz << " oid=" << fid_str << dendl; int64_t lid = m0_layout_find_by_objsz(store->instance, nullptr, sz); M0_ASSERT(lid > 0); M0_ASSERT(mobj == nullptr); mobj = new m0_obj(); m0_obj_init(mobj, &store->container.co_realm, &meta.oid, lid); struct m0_op *op = nullptr; mobj->ob_entity.en_flags |= M0_ENF_META; rc = m0_entity_create(nullptr, &mobj->ob_entity, &op); if (rc != 0) { this->close_mobj(); ldpp_dout(dpp, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl; return rc; } ldpp_dout(dpp, 20) <<__func__<< ": call m0_op_launch()..." << dendl; m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc != 0) { this->close_mobj(); ldpp_dout(dpp, 0) << "ERROR: failed to create motr object: " << rc << dendl; return rc; } meta.layout_id = mobj->ob_attr.oa_layout_id; meta.pver = mobj->ob_attr.oa_pver; ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << meta.layout_id << std::dec << " rc=" << rc << dendl; // TODO: add key:user+bucket+key+obj.meta.oid value:timestamp to // gc.queue.index. See more at github.com/Seagate/cortx-rgw/issues/7. return rc; } int MotrObject::open_mobj(const DoutPrefixProvider *dpp) { char fid_str[M0_FID_STR_LEN]; snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); ldpp_dout(dpp, 20) <<__func__<< ": oid=" << fid_str << dendl; int rc; if (meta.layout_id == 0) { rgw_bucket_dir_entry ent; rc = this->get_bucket_dir_ent(dpp, ent); if (rc < 0) { ldpp_dout(dpp, 0) << "ERROR: open_mobj() failed: rc=" << rc << dendl; return rc; } } if (meta.layout_id == 0) return -ENOENT; M0_ASSERT(mobj == nullptr); mobj = new m0_obj(); memset(mobj, 0, sizeof *mobj); m0_obj_init(mobj, &store->container.co_realm, &meta.oid, store->conf.mc_layout_id); struct m0_op *op = nullptr; mobj->ob_attr.oa_layout_id = meta.layout_id; mobj->ob_attr.oa_pver = meta.pver; mobj->ob_entity.en_flags |= M0_ENF_META; rc = m0_entity_open(&mobj->ob_entity, &op); if (rc != 0) { ldpp_dout(dpp, 0) << "ERROR: m0_entity_open() failed: rc=" << rc << dendl; this->close_mobj(); return rc; } m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc < 0) { ldpp_dout(dpp, 10) << "ERROR: failed to open motr object: rc=" << rc << dendl; this->close_mobj(); return rc; } ldpp_dout(dpp, 20) <<__func__<< ": rc=" << rc << dendl; return 0; } int MotrObject::delete_mobj(const DoutPrefixProvider *dpp) { int rc; char fid_str[M0_FID_STR_LEN]; snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); if (!meta.oid.u_hi || !meta.oid.u_lo) { ldpp_dout(dpp, 20) << __func__ << ": invalid motr object oid=" << fid_str << dendl; return -EINVAL; } ldpp_dout(dpp, 20) << __func__ << ": deleting motr object oid=" << fid_str << dendl; // Open the object. if (mobj == nullptr) { rc = this->open_mobj(dpp); if (rc < 0) return rc; } // Create an DELETE op and execute it (sync version). struct m0_op *op = nullptr; mobj->ob_entity.en_flags |= M0_ENF_META; rc = m0_entity_delete(&mobj->ob_entity, &op); if (rc != 0) { ldpp_dout(dpp, 0) << "ERROR: m0_entity_delete() failed: " << rc << dendl; return rc; } m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to open motr object: " << rc << dendl; return rc; } this->close_mobj(); return 0; } void MotrObject::close_mobj() { if (mobj == nullptr) return; m0_obj_fini(mobj); delete mobj; mobj = nullptr; } int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& data, uint64_t offset) { int rc; unsigned bs, left; struct m0_op *op; char *start, *p; struct m0_bufvec buf; struct m0_bufvec attr; struct m0_indexvec ext; left = data.length(); if (left == 0) return 0; rc = m0_bufvec_empty_alloc(&buf, 1) ?: m0_bufvec_alloc(&attr, 1, 1) ?: m0_indexvec_alloc(&ext, 1); if (rc != 0) goto out; bs = this->get_optimal_bs(left); ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl; start = data.c_str(); for (p = start; left > 0; left -= bs, p += bs, offset += bs) { if (left < bs) bs = this->get_optimal_bs(left); if (left < bs) { data.append_zero(bs - left); left = bs; p = data.c_str(); } buf.ov_buf[0] = p; buf.ov_vec.v_count[0] = bs; ext.iv_index[0] = offset; ext.iv_vec.v_count[0] = bs; attr.ov_vec.v_count[0] = 0; op = nullptr; rc = m0_obj_op(this->mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, 0, &op); if (rc != 0) goto out; m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc != 0) goto out; } out: m0_indexvec_free(&ext); m0_bufvec_free(&attr); m0_bufvec_free2(&buf); return rc; } int MotrObject::read_mobj(const DoutPrefixProvider* dpp, int64_t off, int64_t end, RGWGetDataCB* cb) { int rc; unsigned bs, actual, left; struct m0_op *op; struct m0_bufvec buf; struct m0_bufvec attr; struct m0_indexvec ext; // make end pointer exclusive: // it's easier to work with it this way end++; ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): off=" << off << " end=" << end << dendl; // As `off` may not be parity group size aligned, even using optimal // buffer block size, simply reading data from offset `off` could come // across parity group boundary. And Motr only allows page-size aligned // offset. // // The optimal size of each IO should also take into account the data // transfer size to s3 client. For example, 16MB may be nice to read // data from motr, but it could be too big for network transfer. // // TODO: We leave proper handling of offset in the future. bs = this->get_optimal_bs(end - off); ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): bs=" << bs << dendl; rc = m0_bufvec_empty_alloc(&buf, 1) ? : m0_bufvec_alloc(&attr, 1, 1) ? : m0_indexvec_alloc(&ext, 1); if (rc < 0) goto out; left = end - off; for (; left > 0; off += actual) { if (left < bs) bs = this->get_optimal_bs(left); actual = bs; if (left < bs) actual = left; ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): off=" << off << " actual=" << actual << dendl; bufferlist bl; buf.ov_buf[0] = bl.append_hole(bs).c_str(); buf.ov_vec.v_count[0] = bs; ext.iv_index[0] = off; ext.iv_vec.v_count[0] = bs; attr.ov_vec.v_count[0] = 0; left -= actual; // Read from Motr. op = nullptr; rc = m0_obj_op(this->mobj, M0_OC_READ, &ext, &buf, &attr, 0, 0, &op); ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): init read op rc=" << rc << dendl; if (rc != 0) { ldpp_dout(dpp, 0) << __func__ << ": read failed during m0_obj_op, rc=" << rc << dendl; goto out; } m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc != 0) { ldpp_dout(dpp, 0) << __func__ << ": read failed, m0_op_wait rc=" << rc << dendl; goto out; } // Call `cb` to process returned data. ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): call cb to process data" << dendl; cb->handle_data(bl, 0, actual); } out: m0_indexvec_free(&ext); m0_bufvec_free(&attr); m0_bufvec_free2(&buf); this->close_mobj(); return rc; } int MotrObject::get_bucket_dir_ent(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry& ent) { int rc = 0; string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name(); int max = 1000; vector keys(max); vector vals(max); bufferlist bl; bufferlist::const_iterator iter; if (this->get_bucket()->get_info().versioning_status() == BUCKET_VERSIONED || this->get_bucket()->get_info().versioning_status() == BUCKET_SUSPENDED) { rgw_bucket_dir_entry ent_to_check; if (this->store->get_obj_meta_cache()->get(dpp, this->get_name(), bl) == 0) { iter = bl.cbegin(); ent_to_check.decode(iter); if (ent_to_check.is_current()) { ent = ent_to_check; rc = 0; goto out; } } ldpp_dout(dpp, 20) <<__func__<< ": versioned bucket!" << dendl; keys[0] = this->get_name(); rc = store->next_query_by_name(bucket_index_iname, keys, vals); if (rc < 0) { ldpp_dout(dpp, 0) << __func__ << "ERROR: NEXT query failed. " << rc << dendl; return rc; } rc = -ENOENT; for (const auto& bl: vals) { if (bl.length() == 0) break; iter = bl.cbegin(); ent_to_check.decode(iter); if (ent_to_check.is_current()) { ldpp_dout(dpp, 20) <<__func__<< ": found current version!" << dendl; ent = ent_to_check; rc = 0; this->store->get_obj_meta_cache()->put(dpp, this->get_name(), bl); break; } } } else { if (this->store->get_obj_meta_cache()->get(dpp, this->get_key().get_oid(), bl)) { ldpp_dout(dpp, 20) <<__func__<< ": non-versioned bucket!" << dendl; rc = this->store->do_idx_op_by_name(bucket_index_iname, M0_IC_GET, this->get_key().get_oid(), bl); if (rc < 0) { ldpp_dout(dpp, 0) << __func__ << "ERROR: failed to get object's entry from bucket index: rc=" << rc << dendl; return rc; } this->store->get_obj_meta_cache()->put(dpp, this->get_key().get_oid(), bl); } bufferlist& blr = bl; iter = blr.cbegin(); ent.decode(iter); } out: if (rc == 0) { sal::Attrs dummy; decode(dummy, iter); meta.decode(iter); ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << meta.layout_id << dendl; char fid_str[M0_FID_STR_LEN]; snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); ldpp_dout(dpp, 70) << __func__ << ": oid=" << fid_str << dendl; } else ldpp_dout(dpp, 0) <<__func__<< ": rc=" << rc << dendl; return rc; } int MotrObject::update_version_entries(const DoutPrefixProvider *dpp) { int rc; int max = 10; vector keys(max); vector vals(max); string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name(); keys[0] = this->get_name(); rc = store->next_query_by_name(bucket_index_iname, keys, vals); ldpp_dout(dpp, 20) << "get all versions, name = " << this->get_name() << "rc = " << rc << dendl; if (rc < 0) { ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; return rc; } // no entries returned. if (rc == 0) return 0; for (const auto& bl: vals) { if (bl.length() == 0) break; rgw_bucket_dir_entry ent; auto iter = bl.cbegin(); ent.decode(iter); if (0 != ent.key.name.compare(0, this->get_name().size(), this->get_name())) continue; if (!ent.is_current()) continue; // Remove from the cache. store->get_obj_meta_cache()->remove(dpp, this->get_name()); rgw::sal::Attrs attrs; decode(attrs, iter); MotrObject::Meta meta; meta.decode(iter); ent.flags = rgw_bucket_dir_entry::FLAG_VER; string key; if (ent.key.instance.empty()) key = ent.key.name; else { char buf[ent.key.name.size() + ent.key.instance.size() + 16]; snprintf(buf, sizeof(buf), "%s[%s]", ent.key.name.c_str(), ent.key.instance.c_str()); key = buf; } ldpp_dout(dpp, 20) << "update one version, key = " << key << dendl; bufferlist ent_bl; ent.encode(ent_bl); encode(attrs, ent_bl); meta.encode(ent_bl); rc = store->do_idx_op_by_name(bucket_index_iname, M0_IC_PUT, key, ent_bl); if (rc < 0) break; } return rc; } // Scan object_nnn_part_index to get all parts then open their motr objects. // TODO: all parts are opened in the POC. But for a large object, for example // a 5GB object will have about 300 parts (for default 15MB part). A better // way of managing opened object may be needed. int MotrObject::get_part_objs(const DoutPrefixProvider* dpp, std::map>& part_objs) { int rc; int max_parts = 1000; int marker = 0; uint64_t off = 0; bool truncated = false; std::unique_ptr upload; upload = this->get_bucket()->get_multipart_upload(this->get_name(), string()); do { rc = upload->list_parts(dpp, store->ctx(), max_parts, marker, &marker, &truncated); if (rc == -ENOENT) { rc = -ERR_NO_SUCH_UPLOAD; } if (rc < 0) return rc; std::map>& parts = upload->get_parts(); for (auto part_iter = parts.begin(); part_iter != parts.end(); ++part_iter) { MultipartPart *mpart = part_iter->second.get(); MotrMultipartPart *mmpart = static_cast(mpart); uint32_t part_num = mmpart->get_num(); uint64_t part_size = mmpart->get_size(); string part_obj_name = this->get_bucket()->get_name() + "." + this->get_key().get_oid() + ".part." + std::to_string(part_num); std::unique_ptr obj; obj = this->bucket->get_object(rgw_obj_key(part_obj_name)); std::unique_ptr mobj(static_cast(obj.release())); ldpp_dout(dpp, 20) << "get_part_objs: off = " << off << ", size = " << part_size << dendl; mobj->part_off = off; mobj->part_size = part_size; mobj->part_num = part_num; mobj->meta = mmpart->meta; part_objs.emplace(part_num, std::move(mobj)); off += part_size; } } while (truncated); return 0; } int MotrObject::open_part_objs(const DoutPrefixProvider* dpp, std::map>& part_objs) { //for (auto& iter: part_objs) { for (auto iter = part_objs.begin(); iter != part_objs.end(); ++iter) { MotrObject* obj = static_cast(iter->second.get()); ldpp_dout(dpp, 20) << "open_part_objs: name = " << obj->get_name() << dendl; int rc = obj->open_mobj(dpp); if (rc < 0) return rc; } return 0; } int MotrObject::delete_part_objs(const DoutPrefixProvider* dpp) { std::unique_ptr upload; upload = this->get_bucket()->get_multipart_upload(this->get_name(), string()); std::unique_ptr mupload(static_cast(upload.release())); return mupload->delete_parts(dpp); } int MotrObject::read_multipart_obj(const DoutPrefixProvider* dpp, int64_t off, int64_t end, RGWGetDataCB* cb, std::map>& part_objs) { int64_t cursor = off; ldpp_dout(dpp, 20) << "read_multipart_obj: off=" << off << " end=" << end << dendl; // Find the parts which are in the (off, end) range and // read data from it. Note: `end` argument is inclusive. for (auto iter = part_objs.begin(); iter != part_objs.end(); ++iter) { MotrObject* obj = static_cast(iter->second.get()); int64_t part_off = obj->part_off; int64_t part_size = obj->part_size; int64_t part_end = obj->part_off + obj->part_size - 1; ldpp_dout(dpp, 20) << "read_multipart_obj: part_off=" << part_off << " part_end=" << part_end << dendl; if (part_end < off) continue; int64_t local_off = cursor - obj->part_off; int64_t local_end = part_end < end? part_size - 1 : end - part_off; ldpp_dout(dpp, 20) << "real_multipart_obj: name=" << obj->get_name() << " local_off=" << local_off << " local_end=" << local_end << dendl; int rc = obj->read_mobj(dpp, local_off, local_end, cb); if (rc < 0) return rc; cursor = part_end + 1; if (cursor > end) break; } return 0; } static unsigned roundup(unsigned x, unsigned by) { return ((x - 1) / by + 1) * by; } unsigned MotrObject::get_optimal_bs(unsigned len) { struct m0_pool_version *pver; pver = m0_pool_version_find(&store->instance->m0c_pools_common, &mobj->ob_attr.oa_pver); M0_ASSERT(pver != nullptr); struct m0_pdclust_attr *pa = &pver->pv_attr; uint64_t lid = M0_OBJ_LAYOUT_ID(meta.layout_id); unsigned unit_sz = m0_obj_layout_id_to_unit_size(lid); unsigned grp_sz = unit_sz * pa->pa_N; // bs should be max 4-times pool-width deep counting by 1MB units, or // 8-times deep counting by 512K units, 16-times deep by 256K units, // and so on. Several units to one target will be aggregated to make // fewer network RPCs, disk i/o operations and BE transactions. // For unit sizes of 32K or less, the depth is 128, which // makes it 32K * 128 == 4MB - the maximum amount per target when // the performance is still good on LNet (which has max 1MB frames). // TODO: it may be different on libfabric, should be re-measured. unsigned depth = 128 / ((unit_sz + 0x7fff) / 0x8000); if (depth == 0) depth = 1; // P * N / (N + K + S) - number of data units to span the pool-width unsigned max_bs = depth * unit_sz * pa->pa_P * pa->pa_N / (pa->pa_N + pa->pa_K + pa->pa_S); max_bs = roundup(max_bs, grp_sz); // multiple of group size if (len >= max_bs) return max_bs; else if (len <= grp_sz) return grp_sz; else return roundup(len, grp_sz); } void MotrAtomicWriter::cleanup() { m0_indexvec_free(&ext); m0_bufvec_free(&attr); m0_bufvec_free2(&buf); acc_data.clear(); obj.close_mobj(); old_obj.close_mobj(); } unsigned MotrAtomicWriter::populate_bvec(unsigned len, bufferlist::iterator &bi) { unsigned i, l, done = 0; const char *data; for (i = 0; i < MAX_BUFVEC_NR && len > 0; ++i) { l = bi.get_ptr_and_advance(len, &data); buf.ov_buf[i] = (char*)data; buf.ov_vec.v_count[i] = l; ext.iv_index[i] = acc_off; ext.iv_vec.v_count[i] = l; attr.ov_vec.v_count[i] = 0; acc_off += l; len -= l; done += l; } buf.ov_vec.v_nr = i; ext.iv_vec.v_nr = i; return done; } int MotrAtomicWriter::write() { int rc; unsigned bs, left; struct m0_op *op; bufferlist::iterator bi; left = acc_data.length(); if (!obj.is_opened()) { rc = obj.create_mobj(dpp, left); if (rc == -EEXIST) rc = obj.open_mobj(dpp); if (rc != 0) { char fid_str[M0_FID_STR_LEN]; snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&obj.meta.oid)); ldpp_dout(dpp, 0) << "ERROR: failed to create/open motr object " << fid_str << " (" << obj.get_bucket()->get_name() << "/" << obj.get_key().get_oid() << "): rc=" << rc << dendl; goto err; } } total_data_size += left; bs = obj.get_optimal_bs(left); ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl; bi = acc_data.begin(); while (left > 0) { if (left < bs) bs = obj.get_optimal_bs(left); if (left < bs) { acc_data.append_zero(bs - left); auto off = bi.get_off(); bufferlist tmp; acc_data.splice(off, bs, &tmp); acc_data.clear(); acc_data.append(tmp.c_str(), bs); // make it a single buf bi = acc_data.begin(); left = bs; } left -= this->populate_bvec(bs, bi); op = nullptr; rc = m0_obj_op(obj.mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, 0, &op); if (rc != 0) goto err; m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc != 0) goto err; } acc_data.clear(); return 0; err: this->cleanup(); return rc; } static const unsigned MAX_ACC_SIZE = 32 * 1024 * 1024; // Accumulate enough data first to make a reasonable decision about the // optimal unit size for a new object, or bs for existing object (32M seems // enough for 4M units in 8+2 parity groups, a common config on wide pools), // and then launch the write operations. int MotrAtomicWriter::process(bufferlist&& data, uint64_t offset) { if (data.length() == 0) { // last call, flush data int rc = 0; if (acc_data.length() != 0) rc = this->write(); this->cleanup(); return rc; } if (acc_data.length() == 0) acc_off = offset; acc_data.append(std::move(data)); if (acc_data.length() < MAX_ACC_SIZE) return 0; return this->write(); } int MotrAtomicWriter::complete(size_t accounted_size, const std::string& etag, ceph::real_time *mtime, ceph::real_time set_mtime, std::map& attrs, ceph::real_time delete_at, const char *if_match, const char *if_nomatch, const std::string *user_data, rgw_zone_set *zones_trace, bool *canceled, optional_yield y) { int rc = 0; if (acc_data.length() != 0) { // check again, just in case rc = this->write(); this->cleanup(); if (rc != 0) return rc; } bufferlist bl; rgw_bucket_dir_entry ent; // Set rgw_bucet_dir_entry. Some of the member of this structure may not // apply to motr. For example the storage_class. // // Checkout AtomicObjectProcessor::complete() in rgw_putobj_processor.cc // and RGWRados::Object::Write::write_meta() in rgw_rados.cc for what and // how to set the dir entry. Only set the basic ones for POC, no ACLs and // other attrs. obj.get_key().get_index_key(&ent.key); ent.meta.size = total_data_size; ent.meta.accounted_size = total_data_size; ent.meta.mtime = real_clock::is_zero(set_mtime)? ceph::real_clock::now() : set_mtime; ent.meta.etag = etag; ent.meta.owner = owner.to_str(); ent.meta.owner_display_name = obj.get_bucket()->get_owner()->get_display_name(); bool is_versioned = obj.get_key().have_instance(); if (is_versioned) ent.flags = rgw_bucket_dir_entry::FLAG_VER | rgw_bucket_dir_entry::FLAG_CURRENT; ldpp_dout(dpp, 20) <<__func__<< ": key=" << obj.get_key().get_oid() << " etag: " << etag << " user_data=" << user_data << dendl; if (user_data) ent.meta.user_data = *user_data; ent.encode(bl); RGWBucketInfo &info = obj.get_bucket()->get_info(); if (info.obj_lock_enabled() && info.obj_lock.has_rule()) { auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION); if (iter == attrs.end()) { real_time lock_until_date = info.obj_lock.get_lock_until_date(ent.meta.mtime); string mode = info.obj_lock.get_mode(); RGWObjectRetention obj_retention(mode, lock_until_date); bufferlist retention_bl; obj_retention.encode(retention_bl); attrs[RGW_ATTR_OBJECT_RETENTION] = retention_bl; } } encode(attrs, bl); obj.meta.encode(bl); ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << obj.meta.layout_id << dendl; if (is_versioned) { // get the list of all versioned objects with the same key and // unset their FLAG_CURRENT later, if do_idx_op_by_name() is successful. // Note: without distributed lock on the index - it is possible that 2 // CURRENT entries would appear in the bucket. For example, consider the // following scenario when two clients are trying to add the new object // version concurrently: // client 1: reads all the CURRENT entries // client 2: updates the index and sets the new CURRENT // client 1: updates the index and sets the new CURRENT // At the step (1) client 1 would not see the new current record from step (2), // so it won't update it. As a result, two CURRENT version entries will appear // in the bucket. // TODO: update the current version (unset the flag) and insert the new current // version can be launched in one motr op. This requires change at do_idx_op() // and do_idx_op_by_name(). rc = obj.update_version_entries(dpp); if (rc < 0) return rc; } // Insert an entry into bucket index. string bucket_index_iname = "motr.rgw.bucket.index." + obj.get_bucket()->get_name(); rc = store->do_idx_op_by_name(bucket_index_iname, M0_IC_PUT, obj.get_key().get_oid(), bl); if (rc == 0) store->get_obj_meta_cache()->put(dpp, obj.get_key().get_oid(), bl); if (old_obj.get_bucket()->get_info().versioning_status() != BUCKET_VERSIONED) { // Delete old object data if exists. old_obj.delete_mobj(dpp); } // TODO: We need to handle the object leak caused by parallel object upload by // making use of background gc, which is currently not enabled for motr. return rc; } int MotrMultipartUpload::delete_parts(const DoutPrefixProvider *dpp) { int rc; int max_parts = 1000; int marker = 0; bool truncated = false; // Scan all parts and delete the corresponding motr objects. do { rc = this->list_parts(dpp, store->ctx(), max_parts, marker, &marker, &truncated); if (rc == -ENOENT) { truncated = false; rc = 0; } if (rc < 0) return rc; std::map>& parts = this->get_parts(); for (auto part_iter = parts.begin(); part_iter != parts.end(); ++part_iter) { MultipartPart *mpart = part_iter->second.get(); MotrMultipartPart *mmpart = static_cast(mpart); uint32_t part_num = mmpart->get_num(); // Delete the part object. Note that the part object is not // inserted into bucket index, only the corresponding motr object // needs to be delete. That is why we don't call // MotrObject::delete_object(). string part_obj_name = bucket->get_name() + "." + mp_obj.get_key() + ".part." + std::to_string(part_num); std::unique_ptr obj; obj = this->bucket->get_object(rgw_obj_key(part_obj_name)); std::unique_ptr mobj(static_cast(obj.release())); mobj->meta = mmpart->meta; rc = mobj->delete_mobj(dpp); if (rc < 0) { ldpp_dout(dpp, 0) << __func__ << ": Failed to delete object from Motr. rc=" << rc << dendl; return rc; } } } while (truncated); // Delete object part index. std::string oid = mp_obj.get_key(); string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts"; return store->delete_motr_idx_by_name(obj_part_iname); } int MotrMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct) { int rc; // Check if multipart upload exists bufferlist bl; std::unique_ptr meta_obj; meta_obj = get_meta_obj(); string bucket_multipart_iname = "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts"; rc = store->do_idx_op_by_name(bucket_multipart_iname, M0_IC_GET, meta_obj->get_key().to_str(), bl); if (rc < 0) { ldpp_dout(dpp, 0) << __func__ << ": Failed to get multipart upload. rc=" << rc << dendl; return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc; } // Scan all parts and delete the corresponding motr objects. rc = this->delete_parts(dpp); if (rc < 0) return rc; bl.clear(); // Remove the upload from bucket multipart index. rc = store->do_idx_op_by_name(bucket_multipart_iname, M0_IC_DEL, meta_obj->get_key().get_oid(), bl); return rc; } std::unique_ptr MotrMultipartUpload::get_meta_obj() { std::unique_ptr obj = bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns)); std::unique_ptr mobj(static_cast(obj.release())); mobj->set_category(RGWObjCategory::MultiMeta); return mobj; } struct motr_multipart_upload_info { rgw_placement_rule dest_placement; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(dest_placement, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); decode(dest_placement, bl); DECODE_FINISH(bl); } }; WRITE_CLASS_ENCODER(motr_multipart_upload_info) int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, ACLOwner& _owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) { int rc; std::string oid = mp_obj.get_key(); owner = _owner; do { char buf[33]; string tmp_obj_name; gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */ upload_id.append(buf); mp_obj.init(oid, upload_id); tmp_obj_name = mp_obj.get_meta(); std::unique_ptr obj; obj = bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns)); // the meta object will be indexed with 0 size, we c obj->set_in_extra_data(true); obj->set_hash_source(oid); motr_multipart_upload_info upload_info; upload_info.dest_placement = dest_placement; bufferlist mpbl; encode(upload_info, mpbl); // Create an initial entry in the bucket. The entry will be // updated when multipart upload is completed, for example, // size, etag etc. bufferlist bl; rgw_bucket_dir_entry ent; obj->get_key().get_index_key(&ent.key); ent.meta.owner = owner.get_id().to_str(); ent.meta.category = RGWObjCategory::MultiMeta; ent.meta.mtime = ceph::real_clock::now(); ent.meta.user_data.assign(mpbl.c_str(), mpbl.c_str() + mpbl.length()); ent.encode(bl); // Insert an entry into bucket multipart index so it is not shown // when listing a bucket. string bucket_multipart_iname = "motr.rgw.bucket." + obj->get_bucket()->get_name() + ".multiparts"; rc = store->do_idx_op_by_name(bucket_multipart_iname, M0_IC_PUT, obj->get_key().get_oid(), bl); } while (rc == -EEXIST); if (rc < 0) return rc; // Create object part index. // TODO: add bucket as part of the name. string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts"; ldpp_dout(dpp, 20) << "MotrMultipartUpload::init(): object part index=" << obj_part_iname << dendl; rc = store->create_motr_idx_by_name(obj_part_iname); if (rc == -EEXIST) rc = 0; if (rc < 0) // TODO: clean the bucket index entry ldpp_dout(dpp, 0) << "Failed to create object multipart index " << obj_part_iname << dendl; return rc; } int MotrMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct, int num_parts, int marker, int *next_marker, bool *truncated, bool assume_unsorted) { int rc; vector key_vec(num_parts); vector val_vec(num_parts); std::string oid = mp_obj.get_key(); string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts"; ldpp_dout(dpp, 20) << __func__ << ": object part index = " << obj_part_iname << dendl; key_vec[0].clear(); key_vec[0] = "part."; char buf[32]; snprintf(buf, sizeof(buf), "%08d", marker + 1); key_vec[0].append(buf); rc = store->next_query_by_name(obj_part_iname, key_vec, val_vec); if (rc < 0) { ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; return rc; } int last_num = 0; int part_cnt = 0; uint32_t expected_next = 0; ldpp_dout(dpp, 20) << __func__ << ": marker = " << marker << dendl; for (const auto& bl: val_vec) { if (bl.length() == 0) break; RGWUploadPartInfo info; auto iter = bl.cbegin(); info.decode(iter); rgw::sal::Attrs attrs_dummy; decode(attrs_dummy, iter); MotrObject::Meta meta; meta.decode(iter); ldpp_dout(dpp, 20) << __func__ << ": part_num=" << info.num << " part_size=" << info.size << dendl; ldpp_dout(dpp, 20) << __func__ << ": meta:oid=[" << meta.oid.u_hi << "," << meta.oid.u_lo << "], meta:pvid=[" << meta.pver.f_container << "," << meta.pver.f_key << "], meta:layout id=" << meta.layout_id << dendl; if (!expected_next) expected_next = info.num + 1; else if (expected_next && info.num != expected_next) return -EINVAL; else expected_next = info.num + 1; if ((int)info.num > marker) { last_num = info.num; parts.emplace(info.num, std::make_unique(info, meta)); } part_cnt++; } // Does it have more parts? if (truncated) *truncated = part_cnt < num_parts? false : true; ldpp_dout(dpp, 20) << __func__ << ": truncated=" << *truncated << dendl; if (next_marker) *next_marker = last_num; return 0; } // Heavily copy from rgw_sal_rados.cc int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp, optional_yield y, CephContext* cct, map& part_etags, list& remove_objs, uint64_t& accounted_size, bool& compressed, RGWCompressionInfo& cs_info, off_t& off, std::string& tag, ACLOwner& owner, uint64_t olh_epoch, rgw::sal::Object* target_obj) { char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; std::string etag; bufferlist etag_bl; MD5 hash; // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); bool truncated; int rc; ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): enter" << dendl; int total_parts = 0; int handled_parts = 0; int max_parts = 1000; int marker = 0; uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; auto etags_iter = part_etags.begin(); rgw::sal::Attrs attrs = target_obj->get_attrs(); do { ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): list_parts()" << dendl; rc = list_parts(dpp, cct, max_parts, marker, &marker, &truncated); if (rc == -ENOENT) { rc = -ERR_NO_SUCH_UPLOAD; } if (rc < 0) return rc; total_parts += parts.size(); if (!truncated && total_parts != (int)part_etags.size()) { ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts << " expected: " << part_etags.size() << dendl; rc = -ERR_INVALID_PART; return rc; } ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): parts.size()=" << parts.size() << dendl; for (auto obj_iter = parts.begin(); etags_iter != part_etags.end() && obj_iter != parts.end(); ++etags_iter, ++obj_iter, ++handled_parts) { MultipartPart *mpart = obj_iter->second.get(); MotrMultipartPart *mmpart = static_cast(mpart); RGWUploadPartInfo *part = &mmpart->info; uint64_t part_size = part->accounted_size; ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): part_size=" << part_size << dendl; if (handled_parts < (int)part_etags.size() - 1 && part_size < min_part_size) { rc = -ERR_TOO_SMALL; return rc; } char petag[CEPH_CRYPTO_MD5_DIGESTSIZE]; if (etags_iter->first != (int)obj_iter->first) { ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: " << etags_iter->first << " next uploaded: " << obj_iter->first << dendl; rc = -ERR_INVALID_PART; return rc; } string part_etag = rgw_string_unquote(etags_iter->second); if (part_etag.compare(part->etag) != 0) { ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first << " etag: " << etags_iter->second << dendl; rc = -ERR_INVALID_PART; return rc; } hex_to_buf(part->etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE); hash.Update((const unsigned char *)petag, sizeof(petag)); ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): calc etag " << dendl; string oid = mp_obj.get_part(part->num); rgw_obj src_obj; src_obj.init_ns(bucket->get_key(), oid, mp_ns); #if 0 // does Motr backend need it? /* update manifest for part */ if (part->manifest.empty()) { ldpp_dout(dpp, 0) << "ERROR: empty manifest for object part: obj=" << src_obj << dendl; rc = -ERR_INVALID_PART; return rc; } else { manifest.append(dpp, part->manifest, store->get_zone()); } ldpp_dout(dpp, 0) << "MotrMultipartUpload::complete(): manifest " << dendl; #endif bool part_compressed = (part->cs_info.compression_type != "none"); if ((handled_parts > 0) && ((part_compressed != compressed) || (cs_info.compression_type != part->cs_info.compression_type))) { ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload (" << cs_info.compression_type << ">>" << part->cs_info.compression_type << ")" << dendl; rc = -ERR_INVALID_PART; return rc; } ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): part compression" << dendl; if (part_compressed) { int64_t new_ofs; // offset in compression data for new part if (cs_info.blocks.size() > 0) new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len; else new_ofs = 0; for (const auto& block : part->cs_info.blocks) { compression_block cb; cb.old_ofs = block.old_ofs + cs_info.orig_size; cb.new_ofs = new_ofs; cb.len = block.len; cs_info.blocks.push_back(cb); new_ofs = cb.new_ofs + cb.len; } if (!compressed) cs_info.compression_type = part->cs_info.compression_type; cs_info.orig_size += part->cs_info.orig_size; compressed = true; } // We may not need to do the following as remove_objs are those // don't show when listing a bucket. As we store in-progress uploaded // object's metadata in a separate index, they are not shown when // listing a bucket. rgw_obj_index_key remove_key; src_obj.key.get_index_key(&remove_key); remove_objs.push_back(remove_key); off += part_size; accounted_size += part->accounted_size; ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): off=" << off << ", accounted_size = " << accounted_size << dendl; } } while (truncated); hash.Final((unsigned char *)final_etag); buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str); snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, "-%lld", (long long)part_etags.size()); etag = final_etag_str; ldpp_dout(dpp, 20) << "calculated etag: " << etag << dendl; etag_bl.append(etag); attrs[RGW_ATTR_ETAG] = etag_bl; if (compressed) { // write compression attribute to full object bufferlist tmp; encode(cs_info, tmp); attrs[RGW_ATTR_COMPRESSION] = tmp; } // Read the object's the multipart_upload_info. // TODO: all those index name and key constructions should be implemented as // member functions. bufferlist bl; std::unique_ptr meta_obj; meta_obj = get_meta_obj(); string bucket_multipart_iname = "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts"; rc = this->store->do_idx_op_by_name(bucket_multipart_iname, M0_IC_GET, meta_obj->get_key().get_oid(), bl); ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): read entry from bucket multipart index rc=" << rc << dendl; if (rc < 0) return rc; rgw_bucket_dir_entry ent; bufferlist& blr = bl; auto ent_iter = blr.cbegin(); ent.decode(ent_iter); // Update the dir entry and insert it to the bucket index so // the object will be seen when listing the bucket. bufferlist update_bl; target_obj->get_key().get_index_key(&ent.key); // Change to offical name :) ent.meta.size = off; ent.meta.accounted_size = accounted_size; ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): obj size=" << ent.meta.size << " obj accounted size=" << ent.meta.accounted_size << dendl; ent.meta.mtime = ceph::real_clock::now(); ent.meta.etag = etag; ent.encode(update_bl); encode(attrs, update_bl); MotrObject::Meta meta_dummy; meta_dummy.encode(update_bl); string bucket_index_iname = "motr.rgw.bucket.index." + meta_obj->get_bucket()->get_name(); ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): target_obj name=" << target_obj->get_name() << " target_obj oid=" << target_obj->get_oid() << dendl; rc = store->do_idx_op_by_name(bucket_index_iname, M0_IC_PUT, target_obj->get_name(), update_bl); if (rc < 0) return rc; // Put into metadata cache. store->get_obj_meta_cache()->put(dpp, target_obj->get_name(), update_bl); // Now we can remove it from bucket multipart index. ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): remove from bucket multipartindex " << dendl; return store->do_idx_op_by_name(bucket_multipart_iname, M0_IC_DEL, meta_obj->get_key().get_oid(), bl); } int MotrMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs) { if (!rule && !attrs) { return 0; } if (rule) { if (!placement.empty()) { *rule = &placement; if (!attrs) { /* Don't need attrs, done */ return 0; } } else { *rule = nullptr; } } std::unique_ptr meta_obj; meta_obj = get_meta_obj(); meta_obj->set_in_extra_data(true); // Read the object's the multipart_upload_info. bufferlist bl; string bucket_multipart_iname = "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts"; int rc = this->store->do_idx_op_by_name(bucket_multipart_iname, M0_IC_GET, meta_obj->get_key().get_oid(), bl); if (rc < 0) { ldpp_dout(dpp, 0) << __func__ << ": Failed to get multipart info. rc=" << rc << dendl; return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc; } rgw_bucket_dir_entry ent; bufferlist& blr = bl; auto ent_iter = blr.cbegin(); ent.decode(ent_iter); if (attrs) { bufferlist etag_bl; string& etag = ent.meta.etag; ldpp_dout(dpp, 20) << "object's etag: " << ent.meta.etag << dendl; etag_bl.append(etag.c_str(), etag.size()); attrs->emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl)); if (!rule || *rule != nullptr) { /* placement was cached; don't actually read */ return 0; } } /* Decode multipart_upload_info */ motr_multipart_upload_info upload_info; bufferlist mpbl; mpbl.append(ent.meta.user_data.c_str(), ent.meta.user_data.size()); auto mpbl_iter = mpbl.cbegin(); upload_info.decode(mpbl_iter); placement = upload_info.dest_placement; *rule = &placement; return 0; } std::unique_ptr MotrMultipartUpload::get_writer( const DoutPrefixProvider *dpp, optional_yield y, rgw::sal::Object* obj, const rgw_user& owner, const rgw_placement_rule *ptail_placement_rule, uint64_t part_num, const std::string& part_num_str) { return std::make_unique(dpp, y, this, obj, store, owner, ptail_placement_rule, part_num, part_num_str); } int MotrMultipartWriter::prepare(optional_yield y) { string part_obj_name = head_obj->get_bucket()->get_name() + "." + head_obj->get_key().get_oid() + ".part." + std::to_string(part_num); ldpp_dout(dpp, 20) << "bucket=" << head_obj->get_bucket()->get_name() << "part_obj_name=" << part_obj_name << dendl; part_obj = std::make_unique(this->store, rgw_obj_key(part_obj_name), head_obj->get_bucket()); if (part_obj == nullptr) return -ENOMEM; // s3 client may retry uploading part, so the part may have already // been created. int rc = part_obj->create_mobj(dpp, store->cctx->_conf->rgw_max_chunk_size); if (rc == -EEXIST) { rc = part_obj->open_mobj(dpp); if (rc < 0) return rc; } return rc; } int MotrMultipartWriter::process(bufferlist&& data, uint64_t offset) { int rc = part_obj->write_mobj(dpp, std::move(data), offset); if (rc == 0) { actual_part_size += data.length(); ldpp_dout(dpp, 20) << " write_mobj(): actual_part_size=" << actual_part_size << dendl; } return rc; } int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag, ceph::real_time *mtime, ceph::real_time set_mtime, std::map& attrs, ceph::real_time delete_at, const char *if_match, const char *if_nomatch, const std::string *user_data, rgw_zone_set *zones_trace, bool *canceled, optional_yield y) { // Should the dir entry(object metadata) be updated? For example // mtime. ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): enter" << dendl; // Add an entry into object_nnn_part_index. bufferlist bl; RGWUploadPartInfo info; info.num = part_num; info.etag = etag; info.size = actual_part_size; info.accounted_size = accounted_size; info.modified = real_clock::now(); bool compressed; int rc = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info); ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): compression rc=" << rc << dendl; if (rc < 0) { ldpp_dout(dpp, 1) << "cannot get compression info" << dendl; return rc; } encode(info, bl); encode(attrs, bl); part_obj->meta.encode(bl); string p = "part."; char buf[32]; snprintf(buf, sizeof(buf), "%08d", (int)part_num); p.append(buf); string obj_part_iname = "motr.rgw.object." + head_obj->get_bucket()->get_name() + "." + head_obj->get_key().get_oid() + ".parts"; ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): object part index = " << obj_part_iname << dendl; rc = store->do_idx_op_by_name(obj_part_iname, M0_IC_PUT, p, bl); if (rc < 0) { return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc; } return 0; } std::unique_ptr MotrStore::get_role(std::string name, std::string tenant, std::string path, std::string trust_policy, std::string max_session_duration_str, std::multimap tags) { RGWRole* p = nullptr; return std::unique_ptr(p); } std::unique_ptr MotrStore::get_role(const RGWRoleInfo& info) { RGWRole* p = nullptr; return std::unique_ptr(p); } std::unique_ptr MotrStore::get_role(std::string id) { RGWRole* p = nullptr; return std::unique_ptr(p); } int MotrStore::get_roles(const DoutPrefixProvider *dpp, optional_yield y, const std::string& path_prefix, const std::string& tenant, vector>& roles) { return 0; } std::unique_ptr MotrStore::get_oidc_provider() { RGWOIDCProvider* p = nullptr; return std::unique_ptr(p); } int MotrStore::get_oidc_providers(const DoutPrefixProvider *dpp, const std::string& tenant, vector>& providers) { return 0; } std::unique_ptr MotrBucket::get_multipart_upload(const std::string& oid, std::optional upload_id, ACLOwner owner, ceph::real_time mtime) { return std::make_unique(store, this, oid, upload_id, owner, mtime); } std::unique_ptr MotrStore::get_append_writer(const DoutPrefixProvider *dpp, optional_yield y, rgw::sal::Object* obj, const rgw_user& owner, const rgw_placement_rule *ptail_placement_rule, const std::string& unique_tag, uint64_t position, uint64_t *cur_accounted_size) { return nullptr; } std::unique_ptr MotrStore::get_atomic_writer(const DoutPrefixProvider *dpp, optional_yield y, rgw::sal::Object* obj, const rgw_user& owner, const rgw_placement_rule *ptail_placement_rule, uint64_t olh_epoch, const std::string& unique_tag) { return std::make_unique(dpp, y, obj, this, owner, ptail_placement_rule, olh_epoch, unique_tag); } const std::string& MotrStore::get_compression_type(const rgw_placement_rule& rule) { return zone.zone_params->get_compression_type(rule); } bool MotrStore::valid_placement(const rgw_placement_rule& rule) { return zone.zone_params->valid_placement(rule); } std::unique_ptr MotrStore::get_user(const rgw_user &u) { ldout(cctx, 20) << "bucket's user: " << u.to_str() << dendl; return std::make_unique(this, u); } int MotrStore::get_user_by_access_key(const DoutPrefixProvider *dpp, const std::string &key, optional_yield y, std::unique_ptr *user) { int rc; User *u; bufferlist bl; RGWUserInfo uinfo; MotrAccessKey access_key; rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY, M0_IC_GET, key, bl); if (rc < 0){ ldout(cctx, 0) << "Access key not found: rc = " << rc << dendl; return rc; } bufferlist& blr = bl; auto iter = blr.cbegin(); access_key.decode(iter); uinfo.user_id.from_str(access_key.user_id); ldout(cctx, 0) << "Loading user: " << uinfo.user_id.id << dendl; rc = MotrUser().load_user_from_idx(dpp, this, uinfo, nullptr, nullptr); if (rc < 0){ ldout(cctx, 0) << "Failed to load user: rc = " << rc << dendl; return rc; } u = new MotrUser(this, uinfo); if (!u) return -ENOMEM; user->reset(u); return 0; } int MotrStore::get_user_by_email(const DoutPrefixProvider *dpp, const std::string& email, optional_yield y, std::unique_ptr* user) { int rc; User *u; bufferlist bl; RGWUserInfo uinfo; MotrEmailInfo email_info; rc = do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, M0_IC_GET, email, bl); if (rc < 0){ ldout(cctx, 0) << "Email Id not found: rc = " << rc << dendl; return rc; } auto iter = bl.cbegin(); email_info.decode(iter); ldout(cctx, 0) << "Loading user: " << email_info.user_id << dendl; uinfo.user_id.from_str(email_info.user_id); rc = MotrUser().load_user_from_idx(dpp, this, uinfo, nullptr, nullptr); if (rc < 0){ ldout(cctx, 0) << "Failed to load user: rc = " << rc << dendl; return rc; } u = new MotrUser(this, uinfo); if (!u) return -ENOMEM; user->reset(u); return 0; } int MotrStore::get_user_by_swift(const DoutPrefixProvider *dpp, const std::string& user_str, optional_yield y, std::unique_ptr* user) { /* Swift keys and subusers are not supported for now */ return 0; } int MotrStore::store_access_key(const DoutPrefixProvider *dpp, optional_yield y, MotrAccessKey access_key) { int rc; bufferlist bl; access_key.encode(bl); rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY, M0_IC_PUT, access_key.id, bl); if (rc < 0){ ldout(cctx, 0) << "Failed to store key: rc = " << rc << dendl; return rc; } return rc; } int MotrStore::delete_access_key(const DoutPrefixProvider *dpp, optional_yield y, std::string access_key) { int rc; bufferlist bl; rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY, M0_IC_DEL, access_key, bl); if (rc < 0){ ldout(cctx, 0) << "Failed to delete key: rc = " << rc << dendl; } return rc; } int MotrStore::store_email_info(const DoutPrefixProvider *dpp, optional_yield y, MotrEmailInfo& email_info ) { int rc; bufferlist bl; email_info.encode(bl); rc = do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, M0_IC_PUT, email_info.email_id, bl); if (rc < 0) { ldout(cctx, 0) << "Failed to store the user by email as key: rc = " << rc << dendl; } return rc; } std::unique_ptr MotrStore::get_object(const rgw_obj_key& k) { return std::make_unique(this, k); } int MotrStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const rgw_bucket& b, std::unique_ptr* bucket, optional_yield y) { int ret; Bucket* bp; bp = new MotrBucket(this, b, u); ret = bp->load_bucket(dpp, y); if (ret < 0) { delete bp; return ret; } bucket->reset(bp); return 0; } int MotrStore::get_bucket(User* u, const RGWBucketInfo& i, std::unique_ptr* bucket) { Bucket* bp; bp = new MotrBucket(this, i, u); /* Don't need to fetch the bucket info, use the provided one */ bucket->reset(bp); return 0; } int MotrStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const std::string& tenant, const std::string& name, std::unique_ptr* bucket, optional_yield y) { rgw_bucket b; b.tenant = tenant; b.name = name; return get_bucket(dpp, u, b, bucket, y); } bool MotrStore::is_meta_master() { return true; } int MotrStore::forward_request_to_master(const DoutPrefixProvider *dpp, User* user, obj_version *objv, bufferlist& in_data, JSONParser *jp, req_info& info, optional_yield y) { return 0; } int MotrStore::forward_iam_request_to_master(const DoutPrefixProvider *dpp, const RGWAccessKey& key, obj_version* objv, bufferlist& in_data, RGWXMLDecoder::XMLParser* parser, req_info& info, optional_yield y) { return 0; } std::string MotrStore::zone_unique_id(uint64_t unique_num) { return ""; } std::string MotrStore::zone_unique_trans_id(const uint64_t unique_num) { return ""; } int MotrStore::get_zonegroup(const std::string& id, std::unique_ptr* group) { /* XXX: for now only one zonegroup supported */ ZoneGroup* zg; zg = new MotrZoneGroup(this, zone.zonegroup.get_group()); group->reset(zg); return 0; } int MotrStore::list_all_zones(const DoutPrefixProvider* dpp, std::list& zone_ids) { zone_ids.push_back(zone.get_id()); return 0; } int MotrStore::cluster_stat(RGWClusterStat& stats) { return 0; } std::unique_ptr MotrStore::get_lifecycle(void) { return 0; } std::unique_ptr MotrStore::get_completions(void) { return 0; } std::unique_ptr MotrStore::get_notification(Object* obj, Object* src_obj, req_state* s, rgw::notify::EventType event_type, optional_yield y, const string* object_name) { return std::make_unique(obj, src_obj, event_type); } std::unique_ptr MotrStore::get_notification(const DoutPrefixProvider* dpp, Object* obj, Object* src_obj, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) { return std::make_unique(obj, src_obj, event_type); } int MotrStore::log_usage(const DoutPrefixProvider *dpp, map& usage_info) { return 0; } int MotrStore::log_op(const DoutPrefixProvider *dpp, string& oid, bufferlist& bl) { return 0; } int MotrStore::register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type, const map& meta) { return 0; } void MotrStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit) { return; } void MotrStore::get_quota(RGWQuota& quota) { // XXX: Not handled for the first pass return; } int MotrStore::set_buckets_enabled(const DoutPrefixProvider *dpp, vector& buckets, bool enabled) { return 0; } int MotrStore::get_sync_policy_handler(const DoutPrefixProvider *dpp, std::optional zone, std::optional bucket, RGWBucketSyncPolicyHandlerRef *phandler, optional_yield y) { return 0; } RGWDataSyncStatusManager* MotrStore::get_data_sync_manager(const rgw_zone_id& source_zone) { return 0; } int MotrStore::read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, bool *is_truncated, RGWUsageIter& usage_iter, map& usage) { return 0; } int MotrStore::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) { return 0; } int MotrStore::get_config_key_val(string name, bufferlist *bl) { return 0; } int MotrStore::meta_list_keys_init(const DoutPrefixProvider *dpp, const string& section, const string& marker, void** phandle) { return 0; } int MotrStore::meta_list_keys_next(const DoutPrefixProvider *dpp, void* handle, int max, list& keys, bool* truncated) { return 0; } void MotrStore::meta_list_keys_complete(void* handle) { return; } std::string MotrStore::meta_get_marker(void* handle) { return ""; } int MotrStore::meta_remove(const DoutPrefixProvider *dpp, string& metadata_key, optional_yield y) { return 0; } int MotrStore::open_idx(struct m0_uint128 *id, bool create, struct m0_idx *idx) { m0_idx_init(idx, &container.co_realm, id); if (!create) return 0; // nothing to do more // create index or make sure it's created struct m0_op *op = nullptr; int rc = m0_entity_create(nullptr, &idx->in_entity, &op); if (rc != 0) { ldout(cctx, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl; goto out; } m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc != 0 && rc != -EEXIST) ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl; out: return rc; } static void set_m0bufvec(struct m0_bufvec *bv, vector& vec) { *bv->ov_buf = reinterpret_cast(vec.data()); *bv->ov_vec.v_count = vec.size(); } // idx must be opened with open_idx() beforehand int MotrStore::do_idx_op(struct m0_idx *idx, enum m0_idx_opcode opcode, vector& key, vector& val, bool update) { int rc, rc_i; struct m0_bufvec k, v, *vp = &v; uint32_t flags = 0; struct m0_op *op = nullptr; if (m0_bufvec_empty_alloc(&k, 1) != 0) { ldout(cctx, 0) << "ERROR: failed to allocate key bufvec" << dendl; return -ENOMEM; } if (opcode == M0_IC_PUT || opcode == M0_IC_GET) { rc = -ENOMEM; if (m0_bufvec_empty_alloc(&v, 1) != 0) { ldout(cctx, 0) << "ERROR: failed to allocate value bufvec" << dendl; goto out; } } set_m0bufvec(&k, key); if (opcode == M0_IC_PUT) set_m0bufvec(&v, val); if (opcode == M0_IC_DEL) vp = nullptr; if (opcode == M0_IC_PUT && update) flags |= M0_OIF_OVERWRITE; rc = m0_idx_op(idx, opcode, &k, vp, &rc_i, flags, &op); if (rc != 0) { ldout(cctx, 0) << "ERROR: failed to init index op: " << rc << dendl; goto out; } m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc != 0) { ldout(cctx, 0) << "ERROR: op failed: " << rc << dendl; goto out; } if (rc_i != 0) { ldout(cctx, 0) << "ERROR: idx op failed: " << rc_i << dendl; rc = rc_i; goto out; } if (opcode == M0_IC_GET) { val.resize(*v.ov_vec.v_count); memcpy(reinterpret_cast(val.data()), *v.ov_buf, *v.ov_vec.v_count); } out: m0_bufvec_free2(&k); if (opcode == M0_IC_GET) m0_bufvec_free(&v); // cleanup buffer after GET else if (opcode == M0_IC_PUT) m0_bufvec_free2(&v); return rc; } // Retrieve a range of key/value pairs starting from keys[0]. int MotrStore::do_idx_next_op(struct m0_idx *idx, vector>& keys, vector>& vals) { int rc; uint32_t i = 0; int nr_kvp = vals.size(); int *rcs = new int[nr_kvp]; struct m0_bufvec k, v; struct m0_op *op = nullptr; rc = m0_bufvec_empty_alloc(&k, nr_kvp)?: m0_bufvec_empty_alloc(&v, nr_kvp); if (rc != 0) { ldout(cctx, 0) << "ERROR: failed to allocate kv bufvecs" << dendl; return rc; } set_m0bufvec(&k, keys[0]); rc = m0_idx_op(idx, M0_IC_NEXT, &k, &v, rcs, 0, &op); if (rc != 0) { ldout(cctx, 0) << "ERROR: failed to init index op: " << rc << dendl; goto out; } m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc != 0) { ldout(cctx, 0) << "ERROR: op failed: " << rc << dendl; goto out; } for (i = 0; i < v.ov_vec.v_nr; ++i) { if (rcs[i] < 0) break; vector& key = keys[i]; vector& val = vals[i]; key.resize(k.ov_vec.v_count[i]); val.resize(v.ov_vec.v_count[i]); memcpy(reinterpret_cast(key.data()), k.ov_buf[i], k.ov_vec.v_count[i]); memcpy(reinterpret_cast(val.data()), v.ov_buf[i], v.ov_vec.v_count[i]); } out: k.ov_vec.v_nr = i; v.ov_vec.v_nr = i; m0_bufvec_free(&k); m0_bufvec_free(&v); // cleanup buffer after GET delete []rcs; return rc ?: i; } // Retrieve a number of key/value pairs under the prefix starting // from the marker at key_out[0]. int MotrStore::next_query_by_name(string idx_name, vector& key_out, vector& val_out, string prefix, string delim) { unsigned nr_kvp = std::min(val_out.size(), 100UL); struct m0_idx idx = {}; vector> keys(nr_kvp); vector> vals(nr_kvp); struct m0_uint128 idx_id; int i = 0, j, k = 0; index_name_to_motr_fid(idx_name, &idx_id); int rc = open_motr_idx(&idx_id, &idx); if (rc != 0) { ldout(cctx, 0) << "ERROR: next_query_by_name(): failed to open index: rc=" << rc << dendl; goto out; } // Only the first element for keys needs to be set for NEXT query. // The keys will be set will the returned keys from motr index. ldout(cctx, 20) <<__func__<< ": next_query_by_name(): index=" << idx_name << " prefix=" << prefix << " delim=" << delim << dendl; keys[0].assign(key_out[0].begin(), key_out[0].end()); for (i = 0; i < (int)val_out.size(); i += k, k = 0) { rc = do_idx_next_op(&idx, keys, vals); ldout(cctx, 20) << "do_idx_next_op() = " << rc << dendl; if (rc < 0) { ldout(cctx, 0) << "ERROR: NEXT query failed. " << rc << dendl; goto out; } string dir; for (j = 0, k = 0; j < rc; ++j) { string key(keys[j].begin(), keys[j].end()); size_t pos = std::string::npos; if (!delim.empty()) pos = key.find(delim, prefix.length()); if (pos != std::string::npos) { // DIR entry dir.assign(key, 0, pos + 1); if (dir.compare(0, prefix.length(), prefix) != 0) goto out; if (i + k == 0 || dir != key_out[i + k - 1]) // a new one key_out[i + k++] = dir; continue; } dir = ""; if (key.compare(0, prefix.length(), prefix) != 0) goto out; key_out[i + k] = key; bufferlist& vbl = val_out[i + k]; vbl.append(reinterpret_cast(vals[j].data()), vals[j].size()); ++k; } if (rc < (int)nr_kvp) // there are no more keys to fetch break; string next_key; if (dir != "") next_key = dir + "\xff"; // skip all dir content in 1 step else next_key = key_out[i + k - 1] + " "; ldout(cctx, 0) << "do_idx_next_op(): next_key=" << next_key << dendl; keys[0].assign(next_key.begin(), next_key.end()); } out: m0_idx_fini(&idx); return rc < 0 ? rc : i + k; } int MotrStore::delete_motr_idx_by_name(string iname) { struct m0_idx idx; struct m0_uint128 idx_id; struct m0_op *op = nullptr; ldout(cctx, 20) << "delete_motr_idx_by_name=" << iname << dendl; index_name_to_motr_fid(iname, &idx_id); m0_idx_init(&idx, &container.co_realm, &idx_id); m0_entity_open(&idx.in_entity, &op); int rc = m0_entity_delete(&idx.in_entity, &op); if (rc < 0) goto out; m0_op_launch(&op, 1); ldout(cctx, 70) << "waiting for op completion" << dendl; rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc == -ENOENT) // race deletion?? rc = 0; else if (rc < 0) ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl; ldout(cctx, 20) << "delete_motr_idx_by_name rc=" << rc << dendl; out: m0_idx_fini(&idx); return rc; } int MotrStore::open_motr_idx(struct m0_uint128 *id, struct m0_idx *idx) { m0_idx_init(idx, &container.co_realm, id); return 0; } // The following marcos are from dix/fid_convert.h which are not exposed. enum { M0_DIX_FID_DEVICE_ID_OFFSET = 32, M0_DIX_FID_DIX_CONTAINER_MASK = (1ULL << M0_DIX_FID_DEVICE_ID_OFFSET) - 1, }; // md5 is used here, a more robust way to convert index name to fid is // needed to avoid collision. void MotrStore::index_name_to_motr_fid(string iname, struct m0_uint128 *id) { unsigned char md5[16]; // 128/8 = 16 MD5 hash; // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); hash.Update((const unsigned char *)iname.c_str(), iname.length()); hash.Final(md5); memcpy(&id->u_hi, md5, 8); memcpy(&id->u_lo, md5 + 8, 8); ldout(cctx, 20) << "id = 0x" << std::hex << id->u_hi << ":0x" << std::hex << id->u_lo << dendl; struct m0_fid *fid = (struct m0_fid*)id; m0_fid_tset(fid, m0_dix_fid_type.ft_id, fid->f_container & M0_DIX_FID_DIX_CONTAINER_MASK, fid->f_key); ldout(cctx, 20) << "converted id = 0x" << std::hex << id->u_hi << ":0x" << std::hex << id->u_lo << dendl; } int MotrStore::do_idx_op_by_name(string idx_name, enum m0_idx_opcode opcode, string key_str, bufferlist &bl, bool update) { struct m0_idx idx; vector key(key_str.begin(), key_str.end()); vector val; struct m0_uint128 idx_id; index_name_to_motr_fid(idx_name, &idx_id); int rc = open_motr_idx(&idx_id, &idx); if (rc != 0) { ldout(cctx, 0) << "ERROR: failed to open index: " << rc << dendl; goto out; } if (opcode == M0_IC_PUT) val.assign(bl.c_str(), bl.c_str() + bl.length()); ldout(cctx, 20) <<__func__<< ": do_idx_op_by_name(): op=" << (opcode == M0_IC_PUT ? "PUT" : "GET") << " idx=" << idx_name << " key=" << key_str << dendl; rc = do_idx_op(&idx, opcode, key, val, update); if (rc == 0 && opcode == M0_IC_GET) // Append the returned value (blob) to the bufferlist. bl.append(reinterpret_cast(val.data()), val.size()); out: m0_idx_fini(&idx); return rc; } int MotrStore::create_motr_idx_by_name(string iname) { struct m0_idx idx = {}; struct m0_uint128 id; index_name_to_motr_fid(iname, &id); m0_idx_init(&idx, &container.co_realm, &id); // create index or make sure it's created struct m0_op *op = nullptr; int rc = m0_entity_create(nullptr, &idx.in_entity, &op); if (rc != 0) { ldout(cctx, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl; goto out; } m0_op_launch(&op, 1); rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: m0_rc(op); m0_op_fini(op); m0_op_free(op); if (rc != 0 && rc != -EEXIST) ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl; out: m0_idx_fini(&idx); return rc; } // If a global index is checked (if it has been create) every time // before they're queried (put/get), which takes 2 Motr operations to // complete the query. As the global indices' name and FID are known // already when MotrStore is created, we move the check and creation // in newMotrStore(). // Similar method is used for per bucket/user index. For example, // bucket instance index is created when creating the bucket. int MotrStore::check_n_create_global_indices() { int rc = 0; for (const auto& iname : motr_global_indices) { rc = create_motr_idx_by_name(iname); if (rc < 0 && rc != -EEXIST) break; rc = 0; } return rc; } std::string MotrStore::get_cluster_id(const DoutPrefixProvider* dpp, optional_yield y) { char id[M0_FID_STR_LEN]; struct m0_confc *confc = m0_reqh2confc(&instance->m0c_reqh); m0_fid_print(id, ARRAY_SIZE(id), &confc->cc_root->co_id); return std::string(id); } int MotrStore::init_metadata_cache(const DoutPrefixProvider *dpp, CephContext *cct) { this->obj_meta_cache = new MotrMetaCache(dpp, cct); this->get_obj_meta_cache()->set_enabled(true); this->user_cache = new MotrMetaCache(dpp, cct); this->get_user_cache()->set_enabled(true); this->bucket_inst_cache = new MotrMetaCache(dpp, cct); this->get_bucket_inst_cache()->set_enabled(true); return 0; } int MotrLuaManager::get_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, std::string& script) { return -ENOENT; } int MotrLuaManager::put_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, const std::string& script) { return -ENOENT; } int MotrLuaManager::del_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key) { return -ENOENT; } int MotrLuaManager::add_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name) { return -ENOENT; } int MotrLuaManager::remove_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name) { return -ENOENT; } int MotrLuaManager::list_packages(const DoutPrefixProvider* dpp, optional_yield y, rgw::lua::packages_t& packages) { return -ENOENT; } } // namespace rgw::sal extern "C" { void *newMotrStore(CephContext *cct) { int rc = -1; rgw::sal::MotrStore *store = new rgw::sal::MotrStore(cct); if (store) { store->conf.mc_is_oostore = true; // XXX: these params should be taken from config settings and // cct somehow? store->instance = nullptr; const auto& proc_ep = g_conf().get_val("motr_my_endpoint"); const auto& ha_ep = g_conf().get_val("motr_ha_endpoint"); const auto& proc_fid = g_conf().get_val("motr_my_fid"); const auto& profile = g_conf().get_val("motr_profile_fid"); const auto& admin_proc_ep = g_conf().get_val("motr_admin_endpoint"); const auto& admin_proc_fid = g_conf().get_val("motr_admin_fid"); const int init_flags = cct->get_init_flags(); ldout(cct, 0) << "INFO: motr my endpoint: " << proc_ep << dendl; ldout(cct, 0) << "INFO: motr ha endpoint: " << ha_ep << dendl; ldout(cct, 0) << "INFO: motr my fid: " << proc_fid << dendl; ldout(cct, 0) << "INFO: motr profile fid: " << profile << dendl; store->conf.mc_local_addr = proc_ep.c_str(); store->conf.mc_process_fid = proc_fid.c_str(); ldout(cct, 0) << "INFO: init flags: " << init_flags << dendl; ldout(cct, 0) << "INFO: motr admin endpoint: " << admin_proc_ep << dendl; ldout(cct, 0) << "INFO: motr admin fid: " << admin_proc_fid << dendl; // HACK this is so that radosge-admin uses a different client if (init_flags == 0) { store->conf.mc_process_fid = admin_proc_fid.c_str(); store->conf.mc_local_addr = admin_proc_ep.c_str(); } else { store->conf.mc_process_fid = proc_fid.c_str(); store->conf.mc_local_addr = proc_ep.c_str(); } store->conf.mc_ha_addr = ha_ep.c_str(); store->conf.mc_profile = profile.c_str(); ldout(cct, 50) << "INFO: motr profile fid: " << store->conf.mc_profile << dendl; ldout(cct, 50) << "INFO: ha addr: " << store->conf.mc_ha_addr << dendl; ldout(cct, 50) << "INFO: process fid: " << store->conf.mc_process_fid << dendl; ldout(cct, 50) << "INFO: motr endpoint: " << store->conf.mc_local_addr << dendl; store->conf.mc_tm_recv_queue_min_len = 64; store->conf.mc_max_rpc_msg_size = 524288; store->conf.mc_idx_service_id = M0_IDX_DIX; store->dix_conf.kc_create_meta = false; store->conf.mc_idx_service_conf = &store->dix_conf; if (!g_conf().get_val("motr_tracing_enabled")) { m0_trace_level_allow(M0_WARN); // allow errors and warnings in syslog anyway m0_trace_set_mmapped_buffer(false); } store->instance = nullptr; rc = m0_client_init(&store->instance, &store->conf, true); if (rc != 0) { ldout(cct, 0) << "ERROR: m0_client_init() failed: " << rc << dendl; goto out; } m0_container_init(&store->container, nullptr, &M0_UBER_REALM, store->instance); rc = store->container.co_realm.re_entity.en_sm.sm_rc; if (rc != 0) { ldout(cct, 0) << "ERROR: m0_container_init() failed: " << rc << dendl; goto out; } rc = m0_ufid_init(store->instance, &ufid_gr); if (rc != 0) { ldout(cct, 0) << "ERROR: m0_ufid_init() failed: " << rc << dendl; goto out; } // Create global indices if not yet. rc = store->check_n_create_global_indices(); if (rc != 0) { ldout(cct, 0) << "ERROR: check_n_create_global_indices() failed: " << rc << dendl; goto out; } } out: if (rc != 0) { delete store; return nullptr; } return store; } }