// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp #include "common/errno.h" #include "rgw_common.h" #include "rgw_coroutine.h" #include "rgw_sync_module.h" #include "rgw_data_sync.h" #include "rgw_sync_module_aws.h" #include "rgw_cr_rados.h" #include "rgw_rest_conn.h" #include "rgw_cr_rest.h" #include "rgw_acl.h" #include "rgw_zone.h" #include "services/svc_zone.h" #include #define dout_subsys ceph_subsys_rgw #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024) static string default_target_path = "rgw-${zonegroup}-${sid}/${bucket}"; static string get_key_oid(const rgw_obj_key& key) { string oid = key.name; if (!key.instance.empty() && !key.have_null_instance()) { oid += string(":") + key.instance; } return oid; } static string obj_to_aws_path(rgw::sal::RGWObject* obj) { string path = obj->get_bucket()->get_name() + "/" + get_key_oid(obj->get_key()); return path; } /* json configuration definition: { "connection": { "access_key": , "secret": , "endpoint": , "host_style": , }, "acls": [ { "type": , "source_id": , "dest_id": } ... ], # optional, acl mappings, no mappings if does not exist "target_path": , # override default # anything below here is for non trivial configuration # can be used in conjuction with the above "default": { "connection": { "access_key": , "secret": , "endpoint": , "host_style" , }, "acls": [ # list of source uids and how they map into destination uids in the dest objects acls { "type" : , # optional, default is id "source_id": , "dest_id": } ... ] "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path, # final object name will be target_path + "/" + obj }, "connections": [ { "id": , "access_key": , "secret": , "endpoint": , } ... ], "acl_profiles": [ { "id": , # acl mappings "acls": [ { "type": , "source_id": , "dest_id": } ... ] } ], "profiles": [ { "source_bucket": , # can specify either specific bucket name (foo), or prefix (foo*) "target_path": , # (override default) "connection_id": , # optional, if empty references default connection "acls_id": , # optional, if empty references default mappings } ... ], } target path optional variables: (evaluated at init) sid: sync instance id, randomly generated by sync process on first sync initalization zonegroup: zonegroup name zonegroup_id: zonegroup name zone: zone name zone_id: zone name (evaluated when syncing) bucket: bucket name owner: bucket owner */ struct ACLMapping { ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER}; string source_id; string dest_id; ACLMapping() = default; ACLMapping(ACLGranteeTypeEnum t, const string& s, const string& d) : type(t), source_id(s), dest_id(d) {} void init(const JSONFormattable& config) { const string& t = config["type"]; if (t == "email") { type = ACL_TYPE_EMAIL_USER; } else if (t == "uri") { type = ACL_TYPE_GROUP; } else { type = ACL_TYPE_CANON_USER; } source_id = config["source_id"]; dest_id = config["dest_id"]; } void dump_conf(CephContext *cct, JSONFormatter& jf) const { Formatter::ObjectSection os(jf, "acl_mapping"); string s; switch (type) { case ACL_TYPE_EMAIL_USER: s = "email"; break; case ACL_TYPE_GROUP: s = "uri"; break; default: s = "id"; break; } encode_json("type", s, &jf); encode_json("source_id", source_id, &jf); encode_json("dest_id", dest_id, &jf); } }; struct ACLMappings { map acl_mappings; void init(const JSONFormattable& config) { for (auto& c : config.array()) { ACLMapping m; m.init(c); acl_mappings.emplace(std::make_pair(m.source_id, m)); } } void dump_conf(CephContext *cct, JSONFormatter& jf) const { Formatter::ArraySection os(jf, "acls"); for (auto& i : acl_mappings) { i.second.dump_conf(cct, jf); } } }; struct AWSSyncConfig_ACLProfiles { map > acl_profiles; void init(const JSONFormattable& config) { for (auto& c : config.array()) { const string& profile_id = c["id"]; std::shared_ptr ap{new ACLMappings}; ap->init(c["acls"]); acl_profiles[profile_id] = ap; } } void dump_conf(CephContext *cct, JSONFormatter& jf) const { Formatter::ArraySection section(jf, "acl_profiles"); for (auto& p : acl_profiles) { Formatter::ObjectSection section(jf, "profile"); encode_json("id", p.first, &jf); p.second->dump_conf(cct, jf); } } bool find(const string& profile_id, ACLMappings *result) const { auto iter = acl_profiles.find(profile_id); if (iter == acl_profiles.end()) { return false; } *result = *iter->second; return true; } }; struct AWSSyncConfig_Connection { string connection_id; string endpoint; RGWAccessKey key; HostStyle host_style{PathStyle}; bool has_endpoint{false}; bool has_key{false}; bool has_host_style{false}; void init(const JSONFormattable& config) { has_endpoint = config.exists("endpoint"); has_key = config.exists("access_key") || config.exists("secret"); has_host_style = config.exists("host_style"); connection_id = config["id"]; endpoint = config["endpoint"]; key = RGWAccessKey(config["access_key"], config["secret"]); string host_style_str = config["host_style"]; if (host_style_str != "virtual") { host_style = PathStyle; } else { host_style = VirtualStyle; } } void dump_conf(CephContext *cct, JSONFormatter& jf) const { Formatter::ObjectSection section(jf, "connection"); encode_json("id", connection_id, &jf); encode_json("endpoint", endpoint, &jf); string s = (host_style == PathStyle ? "path" : "virtual"); encode_json("host_style", s, &jf); { Formatter::ObjectSection os(jf, "key"); encode_json("access_key", key.id, &jf); string secret = (key.key.empty() ? "" : "******"); encode_json("secret", secret, &jf); } } }; static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval) { string sval; if (config.find(key, &sval)) { string err; uint64_t val = strict_strtoll(sval.c_str(), 10, &err); if (!err.empty()) { ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl; return -EINVAL; } *pval = val; } return 0; } struct AWSSyncConfig_S3 { uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE}; uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE}; int init(CephContext *cct, const JSONFormattable& config) { int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &multipart_sync_threshold); if (r < 0) { return r; } r = conf_to_uint64(cct, config, "multipart_min_part_size", &multipart_min_part_size); if (r < 0) { return r; } #define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024) if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) { multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE; } return 0; } void dump_conf(CephContext *cct, JSONFormatter& jf) const { Formatter::ObjectSection section(jf, "s3"); encode_json("multipart_sync_threshold", multipart_sync_threshold, &jf); encode_json("multipart_min_part_size", multipart_min_part_size, &jf); } }; struct AWSSyncConfig_Profile { string source_bucket; bool prefix{false}; string target_path; string connection_id; string acls_id; std::shared_ptr conn_conf; std::shared_ptr acls; std::shared_ptr conn; void init(const JSONFormattable& config) { source_bucket = config["source_bucket"]; prefix = (!source_bucket.empty() && source_bucket[source_bucket.size() - 1] == '*'); if (prefix) { source_bucket = source_bucket.substr(0, source_bucket.size() - 1); } target_path = config["target_path"]; connection_id = config["connection_id"]; acls_id = config["acls_id"]; if (config.exists("connection")) { conn_conf = make_shared(); conn_conf->init(config["connection"]); } if (config.exists("acls")) { acls = make_shared(); acls->init(config["acls"]); } } void dump_conf(CephContext *cct, JSONFormatter& jf, const char *section = "config") const { Formatter::ObjectSection config(jf, section); string sb{source_bucket}; if (prefix) { sb.append("*"); } encode_json("source_bucket", sb, &jf); encode_json("target_path", target_path, &jf); encode_json("connection_id", connection_id, &jf); encode_json("acls_id", acls_id, &jf); if (conn_conf.get()) { conn_conf->dump_conf(cct, jf); } if (acls.get()) { acls->dump_conf(cct, jf); } } }; static void find_and_replace(const string& src, const string& find, const string& replace, string *dest) { string s = src; size_t pos = s.find(find); while (pos != string::npos) { size_t next_ofs = pos + find.size(); s = s.substr(0, pos) + replace + s.substr(next_ofs); pos = s.find(find, next_ofs); } *dest = s; } static void apply_meta_param(const string& src, const string& param, const string& val, string *dest) { string s = string("${") + param + "}"; find_and_replace(src, s, val, dest); } struct AWSSyncConfig { AWSSyncConfig_Profile default_profile; std::shared_ptr root_profile; map > connections; AWSSyncConfig_ACLProfiles acl_profiles; map > explicit_profiles; AWSSyncConfig_S3 s3; int init_profile(CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile, bool connection_must_exist) { if (!profile.connection_id.empty()) { if (profile.conn_conf) { ldout(cct, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl; return -EINVAL; } if (connections.find(profile.connection_id) == connections.end()) { ldout(cct, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl; return -EINVAL; } profile.conn_conf = connections[profile.connection_id]; } else if (!profile.conn_conf) { profile.connection_id = default_profile.connection_id; auto i = connections.find(profile.connection_id); if (i != connections.end()) { profile.conn_conf = i->second; } } if (connection_must_exist && !profile.conn_conf) { ldout(cct, 0) << "ERROR: remote connection undefined for sync profile" << dendl; return -EINVAL; } if (profile.conn_conf && default_profile.conn_conf) { if (!profile.conn_conf->has_endpoint) { profile.conn_conf->endpoint = default_profile.conn_conf->endpoint; } if (!profile.conn_conf->has_host_style) { profile.conn_conf->host_style = default_profile.conn_conf->host_style; } if (!profile.conn_conf->has_key) { profile.conn_conf->key = default_profile.conn_conf->key; } } ACLMappings acl_mappings; if (!profile.acls_id.empty()) { if (!acl_profiles.find(profile.acls_id, &acl_mappings)) { ldout(cct, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl; return -EINVAL; } profile.acls = acl_profiles.acl_profiles[profile.acls_id]; } else if (!profile.acls) { if (default_profile.acls) { profile.acls = default_profile.acls; profile.acls_id = default_profile.acls_id; } } if (profile.target_path.empty()) { profile.target_path = default_profile.target_path; } if (profile.target_path.empty()) { profile.target_path = default_target_path; } return 0; } int init_target(CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr *ptarget) { std::shared_ptr profile; profile.reset(new AWSSyncConfig_Profile); profile->init(profile_conf); int ret = init_profile(cct, profile_conf, *profile, true); if (ret < 0) { return ret; } auto& sb = profile->source_bucket; if (explicit_profiles.find(sb) != explicit_profiles.end()) { ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl; } explicit_profiles[sb] = profile; if (ptarget) { *ptarget = profile; } return 0; } bool do_find_profile(const rgw_bucket bucket, std::shared_ptr *result) { const string& name = bucket.name; auto iter = explicit_profiles.upper_bound(name); if (iter == explicit_profiles.begin()) { return false; } --iter; if (iter->first.size() > name.size()) { return false; } if (name.compare(0, iter->first.size(), iter->first) != 0) { return false; } std::shared_ptr& target = iter->second; if (!target->prefix && name.size() != iter->first.size()) { return false; } *result = target; return true; } void find_profile(const rgw_bucket bucket, std::shared_ptr *result) { if (!do_find_profile(bucket, result)) { *result = root_profile; } } AWSSyncConfig() {} int init(CephContext *cct, const JSONFormattable& config) { auto& default_conf = config["default"]; if (config.exists("default")) { default_profile.init(default_conf); init_profile(cct, default_conf, default_profile, false); } for (auto& conn : config["connections"].array()) { auto new_conn = conn; std::shared_ptr c{new AWSSyncConfig_Connection}; c->init(new_conn); connections[new_conn["id"]] = c; } acl_profiles.init(config["acl_profiles"]); int r = s3.init(cct, config["s3"]); if (r < 0) { return r; } auto new_root_conf = config; r = init_target(cct, new_root_conf, &root_profile); /* the root profile config */ if (r < 0) { return r; } for (auto target_conf : config["profiles"].array()) { int r = init_target(cct, target_conf, nullptr); if (r < 0) { return r; } } JSONFormatter jf(true); dump_conf(cct, jf); stringstream ss; jf.flush(ss); ldout(cct, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl; return 0; } void expand_target(RGWDataSyncCtx *sc, const string& sid, const string& path, string *dest) { apply_meta_param(path, "sid", sid, dest); const RGWZoneGroup& zg = sc->env->svc->zone->get_zonegroup(); apply_meta_param(path, "zonegroup", zg.get_name(), dest); apply_meta_param(path, "zonegroup_id", zg.get_id(), dest); const RGWZone& zone = sc->env->svc->zone->get_zone(); apply_meta_param(path, "zone", zone.name, dest); apply_meta_param(path, "zone_id", zone.id, dest); } void update_config(RGWDataSyncCtx *sc, const string& sid) { expand_target(sc, sid, root_profile->target_path, &root_profile->target_path); ldout(sc->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl; for (auto& t : explicit_profiles) { expand_target(sc, sid, t.second->target_path, &t.second->target_path); ldout(sc->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl; } } void dump_conf(CephContext *cct, JSONFormatter& jf) const { Formatter::ObjectSection config(jf, "config"); root_profile->dump_conf(cct, jf); jf.open_array_section("connections"); for (auto c : connections) { c.second->dump_conf(cct, jf); } jf.close_section(); acl_profiles.dump_conf(cct, jf); { // targets Formatter::ArraySection as(jf, "profiles"); for (auto& t : explicit_profiles) { Formatter::ObjectSection target_section(jf, "profile"); encode_json("name", t.first, &jf); t.second->dump_conf(cct, jf); } } } string get_path(std::shared_ptr& profile, const RGWBucketInfo& bucket_info, const rgw_obj_key& obj) { string bucket_str; string owner; if (!bucket_info.owner.tenant.empty()) { bucket_str = owner = bucket_info.owner.tenant + "-"; owner += bucket_info.owner.id; } bucket_str += bucket_info.bucket.name; const string& path = profile->target_path; string new_path; apply_meta_param(path, "bucket", bucket_str, &new_path); apply_meta_param(new_path, "owner", owner, &new_path); new_path += string("/") + get_key_oid(obj); return new_path; } void get_target(std::shared_ptr& profile, const RGWBucketInfo& bucket_info, const rgw_obj_key& obj, string *bucket_name, string *obj_name) { string path = get_path(profile, bucket_info, obj); size_t pos = path.find('/'); *bucket_name = path.substr(0, pos); *obj_name = path.substr(pos + 1); } void init_conns(RGWDataSyncCtx *sc, const string& id) { auto sync_env = sc->env; update_config(sc, id); auto& root_conf = root_profile->conn_conf; root_profile->conn.reset(new S3RESTConn(sc->cct, sync_env->svc->zone, id, { root_conf->endpoint }, root_conf->key, root_conf->host_style)); for (auto i : explicit_profiles) { auto& c = i.second; c->conn.reset(new S3RESTConn(sc->cct, sync_env->svc->zone, id, { c->conn_conf->endpoint }, c->conn_conf->key, c->conn_conf->host_style)); } } }; struct AWSSyncInstanceEnv { AWSSyncConfig conf; string id; explicit AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {} void init(RGWDataSyncCtx *sc, uint64_t instance_id) { char buf[32]; snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id); id = buf; conf.init_conns(sc, id); } void get_profile(const rgw_bucket& bucket, std::shared_ptr *ptarget) { conf.find_profile(bucket, ptarget); ceph_assert(ptarget); } }; static int do_decode_rest_obj(CephContext *cct, map& attrs, map& headers, rgw_rest_obj *info) { for (auto header : headers) { const string& val = header.second; if (header.first == "RGWX_OBJECT_SIZE") { info->content_len = atoi(val.c_str()); } else { info->attrs[header.first] = val; } } info->acls.set_ctx(cct); auto aiter = attrs.find(RGW_ATTR_ACL); if (aiter != attrs.end()) { bufferlist& bl = aiter->second; auto bliter = bl.cbegin(); try { info->acls.decode(bliter); } catch (buffer::error& err) { ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl; return -EIO; } } else { ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl; } return 0; } class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF { RGWDataSyncCtx *sc; RGWRESTConn *conn; rgw::sal::RGWObject* src_obj; RGWRESTConn::get_obj_params req_params; rgw_sync_aws_src_obj_properties src_properties; public: RGWRESTStreamGetCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, RGWDataSyncCtx *_sc, RGWRESTConn *_conn, rgw::sal::RGWObject* _src_obj, const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sc->env->http_manager, _src_obj->get_key()), sc(_sc), conn(_conn), src_obj(_src_obj), src_properties(_src_properties) { } int init(const DoutPrefixProvider *dpp) override { /* init input connection */ req_params.get_op = true; req_params.prepend_metadata = true; req_params.unmod_ptr = &src_properties.mtime; req_params.etag = src_properties.etag; req_params.mod_zone_id = src_properties.zone_short_id; req_params.mod_pg_ver = src_properties.pg_ver; if (range.is_set) { req_params.range_is_set = true; req_params.range_start = range.ofs; req_params.range_end = range.ofs + range.size - 1; } RGWRESTStreamRWRequest *in_req; int ret = conn->get_obj(dpp, src_obj, req_params, false /* send */, &in_req); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; return ret; } set_req(in_req); return RGWStreamReadHTTPResourceCRF::init(dpp); } int decode_rest_obj(map& headers, bufferlist& extra_data) override { map src_attrs; ldout(sc->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl; if (extra_data.length() > 0) { JSONParser jp; if (!jp.parse(extra_data.c_str(), extra_data.length())) { ldout(sc->cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl; return -EIO; } JSONDecoder::decode_json("attrs", src_attrs, &jp); } return do_decode_rest_obj(sc->cct, src_attrs, headers, &rest_obj); } bool need_extra_data() override { return true; } }; static std::set keep_headers = { "CONTENT_TYPE", "CONTENT_ENCODING", "CONTENT_DISPOSITION", "CONTENT_LANGUAGE" }; class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF { RGWDataSyncCtx *sc; rgw_sync_aws_src_obj_properties src_properties; std::shared_ptr target; rgw::sal::RGWObject* dest_obj; string etag; public: RGWAWSStreamPutCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, RGWDataSyncCtx *_sc, const rgw_sync_aws_src_obj_properties& _src_properties, std::shared_ptr& _target, rgw::sal::RGWObject* _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sc->env->http_manager), sc(_sc), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) { } int init() override { /* init output connection */ RGWRESTStreamS3PutObj *out_req{nullptr}; if (multipart.is_multipart) { char buf[32]; snprintf(buf, sizeof(buf), "%d", multipart.part_num); rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() }, { "partNumber", buf }, { nullptr, nullptr } }; target->conn->put_obj_send_init(dest_obj, params, &out_req); } else { target->conn->put_obj_send_init(dest_obj, nullptr, &out_req); } set_req(out_req); return RGWStreamWriteHTTPResourceCRF::init(); } static bool keep_attr(const string& h) { return (keep_headers.find(h) != keep_headers.end() || boost::algorithm::starts_with(h, "X_AMZ_")); } static void init_send_attrs(CephContext *cct, const rgw_rest_obj& rest_obj, const rgw_sync_aws_src_obj_properties& src_properties, const AWSSyncConfig_Profile *target, map *attrs) { auto& new_attrs = *attrs; new_attrs.clear(); for (auto& hi : rest_obj.attrs) { if (keep_attr(hi.first)) { new_attrs.insert(hi); } } auto acl = rest_obj.acls.get_acl(); map > access_map; if (target->acls) { for (auto& grant : acl.get_grant_map()) { auto& orig_grantee = grant.first; auto& perm = grant.second; string grantee; const auto& am = target->acls->acl_mappings; auto iter = am.find(orig_grantee); if (iter == am.end()) { ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; continue; } grantee = iter->second.dest_id; string type; switch (iter->second.type) { case ACL_TYPE_CANON_USER: type = "id"; break; case ACL_TYPE_EMAIL_USER: type = "emailAddress"; break; case ACL_TYPE_GROUP: type = "uri"; break; default: continue; } string tv = type + "=" + grantee; int flags = perm.get_permission().get_permissions(); if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) { access_map[flags].push_back(tv); continue; } for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) { if (flags & i) { access_map[i].push_back(tv); } } } } for (auto aiter : access_map) { int grant_type = aiter.first; string header_str("x-amz-grant-"); switch (grant_type) { case RGW_PERM_READ: header_str.append("read"); break; case RGW_PERM_WRITE: header_str.append("write"); break; case RGW_PERM_READ_ACP: header_str.append("read-acp"); break; case RGW_PERM_WRITE_ACP: header_str.append("write-acp"); break; case RGW_PERM_FULL_CONTROL: header_str.append("full-control"); break; } string s; for (auto viter : aiter.second) { if (!s.empty()) { s.append(", "); } s.append(viter); } ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl; new_attrs[header_str] = s; } char buf[32]; snprintf(buf, sizeof(buf), "%llu", (long long)src_properties.versioned_epoch); new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf; utime_t ut(src_properties.mtime); snprintf(buf, sizeof(buf), "%lld.%09lld", (long long)ut.sec(), (long long)ut.nsec()); new_attrs["x-amz-meta-rgwx-source-mtime"] = buf; new_attrs["x-amz-meta-rgwx-source-etag"] = src_properties.etag; new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name; if (!rest_obj.key.instance.empty()) { new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance; } } void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override { RGWRESTStreamS3PutObj *r = static_cast(req); map new_attrs; if (!multipart.is_multipart) { init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs); } r->set_send_length(rest_obj.content_len); RGWAccessControlPolicy policy; r->send_ready(dpp, target->conn->get_key(), new_attrs, policy, false); } void handle_headers(const map& headers) { for (auto h : headers) { if (h.first == "ETAG") { etag = h.second; } } } bool get_etag(string *petag) { if (etag.empty()) { return false; } *petag = etag; return true; } }; class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWRESTConn *source_conn; std::shared_ptr target; rgw::sal::RGWObject* src_obj; rgw::sal::RGWObject* dest_obj; rgw_sync_aws_src_obj_properties src_properties; std::shared_ptr in_crf; std::shared_ptr out_crf; public: RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx *_sc, RGWRESTConn *_source_conn, rgw::sal::RGWObject* _src_obj, const rgw_sync_aws_src_obj_properties& _src_properties, std::shared_ptr _target, rgw::sal::RGWObject* _dest_obj) : RGWCoroutine(_sc->cct), sc(_sc), source_conn(_source_conn), target(_target), src_obj(_src_obj), dest_obj(_dest_obj), src_properties(_src_properties) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { /* init input */ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc, source_conn, src_obj, src_properties)); /* init output */ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc, src_properties, target, dest_obj)); yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf)); if (retcode < 0) { return set_cr_error(retcode); } return set_cr_done(); } return 0; } }; class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWRESTConn *source_conn; std::shared_ptr target; rgw::sal::RGWObject* src_obj; rgw::sal::RGWObject* dest_obj; rgw_sync_aws_src_obj_properties src_properties; string upload_id; rgw_sync_aws_multipart_part_info part_info; std::shared_ptr in_crf; std::shared_ptr out_crf; string *petag; public: RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx *_sc, RGWRESTConn *_source_conn, rgw::sal::RGWObject* _src_obj, std::shared_ptr& _target, rgw::sal::RGWObject* _dest_obj, const rgw_sync_aws_src_obj_properties& _src_properties, const string& _upload_id, const rgw_sync_aws_multipart_part_info& _part_info, string *_petag) : RGWCoroutine(_sc->cct), sc(_sc), source_conn(_source_conn), target(_target), src_obj(_src_obj), dest_obj(_dest_obj), src_properties(_src_properties), upload_id(_upload_id), part_info(_part_info), petag(_petag) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { /* init input */ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc, source_conn, src_obj, src_properties)); in_crf->set_range(part_info.ofs, part_info.size); /* init output */ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc, src_properties, target, dest_obj)); out_crf->set_multipart(upload_id, part_info.part_num, part_info.size); yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf)); if (retcode < 0) { return set_cr_error(retcode); } if (!(static_cast(out_crf.get()))->get_etag(petag)) { ldout(sc->cct, 0) << "ERROR: failed to get etag from PUT request" << dendl; return set_cr_error(-EIO); } return set_cr_done(); } return 0; } }; class RGWAWSAbortMultipartCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWRESTConn *dest_conn; rgw::sal::RGWObject* dest_obj; string upload_id; public: RGWAWSAbortMultipartCR(RGWDataSyncCtx *_sc, RGWRESTConn *_dest_conn, rgw::sal::RGWObject* _dest_obj, const string& _upload_id) : RGWCoroutine(_sc->cct), sc(_sc), dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield { rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; bufferlist bl; call(new RGWDeleteRESTResourceCR(sc->cct, dest_conn, sc->env->http_manager, obj_to_aws_path(dest_obj), params)); } if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl; return set_cr_error(retcode); } return set_cr_done(); } return 0; } }; class RGWAWSInitMultipartCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWRESTConn *dest_conn; rgw::sal::RGWObject* dest_obj; uint64_t obj_size; map attrs; bufferlist out_bl; string *upload_id; struct InitMultipartResult { string bucket; string key; string upload_id; void decode_xml(XMLObj *obj) { RGWXMLDecoder::decode_xml("Bucket", bucket, obj); RGWXMLDecoder::decode_xml("Key", key, obj); RGWXMLDecoder::decode_xml("UploadId", upload_id, obj); } } result; public: RGWAWSInitMultipartCR(RGWDataSyncCtx *_sc, RGWRESTConn *_dest_conn, rgw::sal::RGWObject* _dest_obj, uint64_t _obj_size, const map& _attrs, string *_upload_id) : RGWCoroutine(_sc->cct), sc(_sc), dest_conn(_dest_conn), dest_obj(_dest_obj), obj_size(_obj_size), attrs(_attrs), upload_id(_upload_id) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield { rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} }; bufferlist bl; call(new RGWPostRawRESTResourceCR (sc->cct, dest_conn, sc->env->http_manager, obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl)); } if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; return set_cr_error(retcode); } { /* * If one of the following fails we cannot abort upload, as we cannot * extract the upload id. If one of these fail it's very likely that that's * the least of our problem. */ RGWXMLDecoder::XMLParser parser; if (!parser.init()) { ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; return set_cr_error(-EIO); } if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { string str(out_bl.c_str(), out_bl.length()); ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; return set_cr_error(-EIO); } try { RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true); } catch (RGWXMLDecoder::err& err) { string str(out_bl.c_str(), out_bl.length()); ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl; return set_cr_error(-EIO); } } ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl; *upload_id = result.upload_id; return set_cr_done(); } return 0; } }; class RGWAWSCompleteMultipartCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWRESTConn *dest_conn; rgw::sal::RGWObject* dest_obj; bufferlist out_bl; string upload_id; struct CompleteMultipartReq { map parts; explicit CompleteMultipartReq(const map& _parts) : parts(_parts) {} void dump_xml(Formatter *f) const { for (auto p : parts) { f->open_object_section("Part"); encode_xml("PartNumber", p.first, f); encode_xml("ETag", p.second.etag, f); f->close_section(); }; } } req_enc; struct CompleteMultipartResult { string location; string bucket; string key; string etag; void decode_xml(XMLObj *obj) { RGWXMLDecoder::decode_xml("Location", bucket, obj); RGWXMLDecoder::decode_xml("Bucket", bucket, obj); RGWXMLDecoder::decode_xml("Key", key, obj); RGWXMLDecoder::decode_xml("ETag", etag, obj); } } result; public: RGWAWSCompleteMultipartCR(RGWDataSyncCtx *_sc, RGWRESTConn *_dest_conn, rgw::sal::RGWObject* _dest_obj, string _upload_id, const map& _parts) : RGWCoroutine(_sc->cct), sc(_sc), dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id), req_enc(_parts) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield { rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; stringstream ss; XMLFormatter formatter; encode_xml("CompleteMultipartUpload", req_enc, &formatter); formatter.flush(ss); bufferlist bl; bl.append(ss.str()); call(new RGWPostRawRESTResourceCR (sc->cct, dest_conn, sc->env->http_manager, obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl)); } if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; return set_cr_error(retcode); } { /* * If one of the following fails we cannot abort upload, as we cannot * extract the upload id. If one of these fail it's very likely that that's * the least of our problem. */ RGWXMLDecoder::XMLParser parser; if (!parser.init()) { ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; return set_cr_error(-EIO); } if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { string str(out_bl.c_str(), out_bl.length()); ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; return set_cr_error(-EIO); } try { RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true); } catch (RGWXMLDecoder::err& err) { string str(out_bl.c_str(), out_bl.length()); ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl; return set_cr_error(-EIO); } } ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl; return set_cr_done(); } return 0; } }; class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWRESTConn *dest_conn; rgw::sal::RGWObject* dest_obj; const rgw_raw_obj status_obj; string upload_id; public: RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx *_sc, RGWRESTConn *_dest_conn, rgw::sal::RGWObject* _dest_obj, const rgw_raw_obj& _status_obj, const string& _upload_id) : RGWCoroutine(_sc->cct), sc(_sc), dest_conn(_dest_conn), dest_obj(_dest_obj), status_obj(_status_obj), upload_id(_upload_id) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield call(new RGWAWSAbortMultipartCR(sc, dest_conn, dest_obj, upload_id)); if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; /* ignore error, best effort */ } yield call(new RGWRadosRemoveCR(sc->env->store, status_obj)); if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl; /* ignore error, best effort */ } return set_cr_done(); } return 0; } }; class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; AWSSyncConfig& conf; RGWRESTConn *source_conn; std::shared_ptr target; rgw::sal::RGWObject* src_obj; rgw::sal::RGWObject* dest_obj; uint64_t obj_size; string src_etag; rgw_sync_aws_src_obj_properties src_properties; rgw_rest_obj rest_obj; rgw_sync_aws_multipart_upload_info status; map new_attrs; rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr}; int ret_err{0}; rgw_raw_obj status_obj; public: RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, AWSSyncConfig& _conf, RGWRESTConn *_source_conn, rgw::sal::RGWObject* _src_obj, std::shared_ptr& _target, rgw::sal::RGWObject* _dest_obj, uint64_t _obj_size, const rgw_sync_aws_src_obj_properties& _src_properties, const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), conf(_conf), source_conn(_source_conn), target(_target), src_obj(_src_obj), dest_obj(_dest_obj), obj_size(_obj_size), src_properties(_src_properties), rest_obj(_rest_obj), status_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe, sc->source_zone, src_obj)) { } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield call(new RGWSimpleRadosReadCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, status_obj, &status, false)); if (retcode < 0 && retcode != -ENOENT) { ldpp_dout(dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl; return retcode; } if (retcode >= 0) { /* check here that mtime and size did not change */ if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size || status.src_properties.etag != src_properties.etag) { yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id)); retcode = -ENOENT; } } if (retcode == -ENOENT) { RGWAWSStreamPutCRF::init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs); yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id)); if (retcode < 0) { return set_cr_error(retcode); } status.obj_size = obj_size; status.src_properties = src_properties; #define MULTIPART_MAX_PARTS 10000 uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS; status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size); status.num_parts = (obj_size + status.part_size - 1) / status.part_size; status.cur_part = 1; } for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) { yield { rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part]; cur_part_info.part_num = status.cur_part; cur_part_info.ofs = status.cur_ofs; cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs); pcur_part_info = &cur_part_info; status.cur_ofs += status.part_size; call(new RGWAWSStreamObjToCloudMultipartPartCR(sc, source_conn, src_obj, target, dest_obj, status.src_properties, status.upload_id, cur_part_info, &cur_part_info.etag)); } if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } yield call(new RGWSimpleRadosWriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, status_obj, status)); if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; /* continue with upload anyway */ } ldout(sc->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl; } yield call(new RGWAWSCompleteMultipartCR(sc, target->conn.get(), dest_obj, status.upload_id, status.parts)); if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } /* remove status obj */ yield call(new RGWRadosRemoveCR(sync_env->store, status_obj)); if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; /* ignore error, best effort */ } return set_cr_done(); } return 0; } }; template int decode_attr(map& attrs, const char *attr_name, T *result, T def_val) { map::iterator iter = attrs.find(attr_name); if (iter == attrs.end()) { *result = def_val; return 0; } bufferlist& bl = iter->second; if (bl.length() == 0) { *result = def_val; return 0; } auto bliter = bl.cbegin(); try { decode(*result, bliter); } catch (buffer::error& err) { return -EIO; } return 0; } // maybe use Fetch Remote Obj instead? class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { rgw_bucket_sync_pipe sync_pipe; AWSSyncInstanceEnv& instance; uint64_t versioned_epoch{0}; RGWRESTConn *source_conn{nullptr}; std::shared_ptr target; bufferlist res; unordered_map bucket_created; string target_bucket_name; string target_obj_name; rgw_rest_obj rest_obj; int ret{0}; uint32_t src_zone_short_id{0}; uint64_t src_pg_ver{0}; bufferlist out_bl; struct CreateBucketResult { string code; void decode_xml(XMLObj *obj) { RGWXMLDecoder::decode_xml("Code", code, obj); } } result; public: RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sync_pipe(_sync_pipe), instance(_instance), versioned_epoch(_versioned_epoch) {} ~RGWAWSHandleRemoteObjCBCR(){ } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0); if (ret < 0) { ldout(sc->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl; } else { ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0); if (ret < 0) { ldout(sc->cct, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl; src_pg_ver = 0; /* all or nothing */ } } ldpp_dout(dpp, 4) << "AWS: download begin: z=" << sc->source_zone << " b=" << src_bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " etag=" << etag << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver << dendl; source_conn = sync_env->svc->zone->get_zone_conn(sc->source_zone); if (!source_conn) { ldout(sc->cct, 0) << "ERROR: cannot find http connection to zone " << sc->source_zone << dendl; return set_cr_error(-EINVAL); } instance.get_profile(sync_pipe.info.source_bs.bucket, &target); instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name); if (bucket_created.find(target_bucket_name) == bucket_created.end()){ yield { ldout(sc->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl; bufferlist bl; call(new RGWPutRawRESTResourceCR (sc->cct, target->conn.get(), sync_env->http_manager, target_bucket_name, nullptr, bl, &out_bl)); } if (retcode < 0 ) { RGWXMLDecoder::XMLParser parser; if (!parser.init()) { ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; return set_cr_error(retcode); } if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { string str(out_bl.c_str(), out_bl.length()); ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; return set_cr_error(retcode); } try { RGWXMLDecoder::decode_xml("Error", result, &parser, true); } catch (RGWXMLDecoder::err& err) { string str(out_bl.c_str(), out_bl.length()); ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl; return set_cr_error(retcode); } if (result.code != "BucketAlreadyOwnedByYou") { return set_cr_error(retcode); } } bucket_created[target_bucket_name] = true; } yield { rgw::sal::RGWRadosBucket bucket(sync_env->store, src_bucket); rgw::sal::RGWRadosObject src_obj(sync_env->store, key, &bucket); /* init output */ rgw_bucket target_bucket; target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for uri resolution */ rgw::sal::RGWRadosBucket dest_bucket(sync_env->store, target_bucket); rgw::sal::RGWRadosObject dest_obj(sync_env->store, rgw_obj_key(target_obj_name), &dest_bucket); rgw_sync_aws_src_obj_properties src_properties; src_properties.mtime = mtime; src_properties.etag = etag; src_properties.zone_short_id = src_zone_short_id; src_properties.pg_ver = src_pg_ver; src_properties.versioned_epoch = versioned_epoch; if (size < instance.conf.s3.multipart_sync_threshold) { call(new RGWAWSStreamObjToCloudPlainCR(sc, source_conn, &src_obj, src_properties, target, &dest_obj)); } else { rgw_rest_obj rest_obj; rest_obj.init(key); if (do_decode_rest_obj(sc->cct, attrs, headers, &rest_obj)) { ldout(sc->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl; return set_cr_error(-EINVAL); } call(new RGWAWSStreamObjToCloudMultipartCR(sc, sync_pipe, instance.conf, source_conn, &src_obj, target, &dest_obj, size, src_properties, rest_obj)); } } if (retcode < 0) { return set_cr_error(retcode); } return set_cr_done(); } return 0; } }; class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { rgw_bucket_sync_pipe sync_pipe; AWSSyncInstanceEnv& instance; uint64_t versioned_epoch; public: RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sync_pipe(_sync_pipe), instance(_instance), versioned_epoch(_versioned_epoch) { } ~RGWAWSHandleRemoteObjCR() {} RGWStatRemoteObjCBCR *allocate_callback() override { return new RGWAWSHandleRemoteObjCBCR(sc, sync_pipe, key, instance, versioned_epoch); } }; class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine { RGWDataSyncCtx *sc; std::shared_ptr target; rgw_bucket_sync_pipe sync_pipe; rgw_obj_key key; ceph::real_time mtime; AWSSyncInstanceEnv& instance; int ret{0}; public: RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sc->cct), sc(_sc), sync_pipe(_sync_pipe), key(_key), mtime(_mtime), instance(_instance) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { ldout(sc->cct, 0) << ": remove remote obj: z=" << sc->source_zone << " b=" <cct, target->conn.get(), sc->env->http_manager, path, nullptr /* params */)); } if (retcode < 0) { return set_cr_error(retcode); } return set_cr_done(); } return 0; } }; class RGWAWSDataSyncModule: public RGWDataSyncModule { CephContext *cct; AWSSyncInstanceEnv instance; public: RGWAWSDataSyncModule(CephContext *_cct, AWSSyncConfig& _conf) : cct(_cct), instance(_conf) { } void init(RGWDataSyncCtx *sc, uint64_t instance_id) override { instance.init(sc, instance_id); } ~RGWAWSDataSyncModule() {} RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0)); } RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance); } RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } }; class RGWAWSSyncModuleInstance : public RGWSyncModuleInstance { RGWAWSDataSyncModule data_handler; public: RGWAWSSyncModuleInstance(CephContext *cct, AWSSyncConfig& _conf) : data_handler(cct, _conf) {} RGWDataSyncModule *get_data_handler() override { return &data_handler; } }; int RGWAWSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){ AWSSyncConfig conf; int r = conf.init(cct, config); if (r < 0) { return r; } instance->reset(new RGWAWSSyncModuleInstance(cct, conf)); return 0; }