diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rgw/rgw_sync_module_aws.cc | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_sync_module_aws.cc')
-rw-r--r-- | src/rgw/rgw_sync_module_aws.cc | 1817 |
1 files changed, 1817 insertions, 0 deletions
diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc new file mode 100644 index 000000000..e57327b19 --- /dev/null +++ b/src/rgw/rgw_sync_module_aws.cc @@ -0,0 +1,1817 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "common/errno.h" + +#include "rgw_common.h" +#include "rgw_coroutine.h" +#include "rgw_sync_module.h" +#include "rgw_data_sync.h" +#include "rgw_sync_module_aws.h" +#include "rgw_cr_rados.h" +#include "rgw_rest_conn.h" +#include "rgw_cr_rest.h" +#include "rgw_acl.h" +#include "rgw_zone.h" + +#include "services/svc_zone.h" + +#include <boost/asio/yield.hpp> + +#define dout_subsys ceph_subsys_rgw + + +#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024) + +static string default_target_path = "rgw-${zonegroup}-${sid}/${bucket}"; + +static string get_key_oid(const rgw_obj_key& key) +{ + string oid = key.name; + if (!key.instance.empty() && + !key.have_null_instance()) { + oid += string(":") + key.instance; + } + return oid; +} + +static string obj_to_aws_path(rgw::sal::RGWObject* obj) +{ + string path = obj->get_bucket()->get_name() + "/" + get_key_oid(obj->get_key()); + + + return path; +} + +/* + + json configuration definition: + + { + "connection": { + "access_key": <access>, + "secret": <secret>, + "endpoint": <endpoint>, + "host_style": <path | virtual>, + }, + "acls": [ { "type": <id | email | uri>, + "source_id": <source_id>, + "dest_id": <dest_id> } ... ], # optional, acl mappings, no mappings if does not exist + "target_path": <target_path>, # override default + + + # anything below here is for non trivial configuration + # can be used in conjuction with the above + + "default": { + "connection": { + "access_key": <access>, + "secret": <secret>, + "endpoint": <endpoint>, + "host_style" <path | virtual>, + }, + "acls": [ # list of source uids and how they map into destination uids in the dest objects acls + { + "type" : <id | email | uri>, # optional, default is id + "source_id": <id>, + "dest_id": <id> + } ... ] + "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path, + # final object name will be target_path + "/" + obj + }, + "connections": [ + { + "id": <id>, + "access_key": <access>, + "secret": <secret>, + "endpoint": <endpoint>, + } ... ], + "acl_profiles": [ + { + "id": <id>, # acl mappings + "acls": [ { + "type": <id | email | uri>, + "source_id": <id>, + "dest_id": <id> + } ... ] + } + ], + "profiles": [ + { + "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*) + "target_path": <dest>, # (override default) + "connection_id": <connection_id>, # optional, if empty references default connection + "acls_id": <mappings_id>, # optional, if empty references default mappings + } ... ], + } + +target path optional variables: + +(evaluated at init) +sid: sync instance id, randomly generated by sync process on first sync initalization +zonegroup: zonegroup name +zonegroup_id: zonegroup name +zone: zone name +zone_id: zone name + +(evaluated when syncing) +bucket: bucket name +owner: bucket owner + +*/ + +struct ACLMapping { + ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER}; + string source_id; + string dest_id; + + ACLMapping() = default; + + ACLMapping(ACLGranteeTypeEnum t, + const string& s, + const string& d) : type(t), + source_id(s), + dest_id(d) {} + + void init(const JSONFormattable& config) { + const string& t = config["type"]; + + if (t == "email") { + type = ACL_TYPE_EMAIL_USER; + } else if (t == "uri") { + type = ACL_TYPE_GROUP; + } else { + type = ACL_TYPE_CANON_USER; + } + + source_id = config["source_id"]; + dest_id = config["dest_id"]; + } + + void dump_conf(CephContext *cct, JSONFormatter& jf) const { + Formatter::ObjectSection os(jf, "acl_mapping"); + string s; + switch (type) { + case ACL_TYPE_EMAIL_USER: + s = "email"; + break; + case ACL_TYPE_GROUP: + s = "uri"; + break; + default: + s = "id"; + break; + } + encode_json("type", s, &jf); + encode_json("source_id", source_id, &jf); + encode_json("dest_id", dest_id, &jf); + } +}; + +struct ACLMappings { + map<string, ACLMapping> acl_mappings; + + void init(const JSONFormattable& config) { + for (auto& c : config.array()) { + ACLMapping m; + m.init(c); + + acl_mappings.emplace(std::make_pair(m.source_id, m)); + } + } + void dump_conf(CephContext *cct, JSONFormatter& jf) const { + Formatter::ArraySection os(jf, "acls"); + + for (auto& i : acl_mappings) { + i.second.dump_conf(cct, jf); + } + } +}; + +struct AWSSyncConfig_ACLProfiles { + map<string, std::shared_ptr<ACLMappings> > acl_profiles; + + void init(const JSONFormattable& config) { + for (auto& c : config.array()) { + const string& profile_id = c["id"]; + + std::shared_ptr<ACLMappings> ap{new ACLMappings}; + ap->init(c["acls"]); + + acl_profiles[profile_id] = ap; + } + } + void dump_conf(CephContext *cct, JSONFormatter& jf) const { + Formatter::ArraySection section(jf, "acl_profiles"); + + for (auto& p : acl_profiles) { + Formatter::ObjectSection section(jf, "profile"); + encode_json("id", p.first, &jf); + p.second->dump_conf(cct, jf); + } + } + + bool find(const string& profile_id, ACLMappings *result) const { + auto iter = acl_profiles.find(profile_id); + if (iter == acl_profiles.end()) { + return false; + } + *result = *iter->second; + return true; + } +}; + +struct AWSSyncConfig_Connection { + string connection_id; + string endpoint; + RGWAccessKey key; + HostStyle host_style{PathStyle}; + + bool has_endpoint{false}; + bool has_key{false}; + bool has_host_style{false}; + + void init(const JSONFormattable& config) { + has_endpoint = config.exists("endpoint"); + has_key = config.exists("access_key") || config.exists("secret"); + has_host_style = config.exists("host_style"); + + connection_id = config["id"]; + endpoint = config["endpoint"]; + + key = RGWAccessKey(config["access_key"], config["secret"]); + string host_style_str = config["host_style"]; + if (host_style_str != "virtual") { + host_style = PathStyle; + } else { + host_style = VirtualStyle; + } + } + void dump_conf(CephContext *cct, JSONFormatter& jf) const { + Formatter::ObjectSection section(jf, "connection"); + encode_json("id", connection_id, &jf); + encode_json("endpoint", endpoint, &jf); + string s = (host_style == PathStyle ? "path" : "virtual"); + encode_json("host_style", s, &jf); + + { + Formatter::ObjectSection os(jf, "key"); + encode_json("access_key", key.id, &jf); + string secret = (key.key.empty() ? "" : "******"); + encode_json("secret", secret, &jf); + } + } +}; + +static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval) +{ + string sval; + if (config.find(key, &sval)) { + string err; + uint64_t val = strict_strtoll(sval.c_str(), 10, &err); + if (!err.empty()) { + ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl; + return -EINVAL; + } + *pval = val; + } + return 0; +} + +struct AWSSyncConfig_S3 { + uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE}; + uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE}; + + int init(CephContext *cct, const JSONFormattable& config) { + int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &multipart_sync_threshold); + if (r < 0) { + return r; + } + + r = conf_to_uint64(cct, config, "multipart_min_part_size", &multipart_min_part_size); + if (r < 0) { + return r; + } +#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024) + if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) { + multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE; + } + return 0; + } + + void dump_conf(CephContext *cct, JSONFormatter& jf) const { + Formatter::ObjectSection section(jf, "s3"); + encode_json("multipart_sync_threshold", multipart_sync_threshold, &jf); + encode_json("multipart_min_part_size", multipart_min_part_size, &jf); + } +}; + +struct AWSSyncConfig_Profile { + string source_bucket; + bool prefix{false}; + string target_path; + string connection_id; + string acls_id; + + std::shared_ptr<AWSSyncConfig_Connection> conn_conf; + std::shared_ptr<ACLMappings> acls; + + std::shared_ptr<RGWRESTConn> conn; + + void init(const JSONFormattable& config) { + source_bucket = config["source_bucket"]; + + prefix = (!source_bucket.empty() && source_bucket[source_bucket.size() - 1] == '*'); + + if (prefix) { + source_bucket = source_bucket.substr(0, source_bucket.size() - 1); + } + + target_path = config["target_path"]; + connection_id = config["connection_id"]; + acls_id = config["acls_id"]; + + if (config.exists("connection")) { + conn_conf = make_shared<AWSSyncConfig_Connection>(); + conn_conf->init(config["connection"]); + } + + if (config.exists("acls")) { + acls = make_shared<ACLMappings>(); + acls->init(config["acls"]); + } + } + + void dump_conf(CephContext *cct, JSONFormatter& jf, const char *section = "config") const { + Formatter::ObjectSection config(jf, section); + string sb{source_bucket}; + if (prefix) { + sb.append("*"); + } + encode_json("source_bucket", sb, &jf); + encode_json("target_path", target_path, &jf); + encode_json("connection_id", connection_id, &jf); + encode_json("acls_id", acls_id, &jf); + if (conn_conf.get()) { + conn_conf->dump_conf(cct, jf); + } + if (acls.get()) { + acls->dump_conf(cct, jf); + } + } +}; + +static void find_and_replace(const string& src, const string& find, const string& replace, string *dest) +{ + string s = src; + + size_t pos = s.find(find); + while (pos != string::npos) { + size_t next_ofs = pos + find.size(); + s = s.substr(0, pos) + replace + s.substr(next_ofs); + pos = s.find(find, next_ofs); + } + + *dest = s; +} + +static void apply_meta_param(const string& src, const string& param, const string& val, string *dest) +{ + string s = string("${") + param + "}"; + find_and_replace(src, s, val, dest); +} + + +struct AWSSyncConfig { + AWSSyncConfig_Profile default_profile; + std::shared_ptr<AWSSyncConfig_Profile> root_profile; + + map<string, std::shared_ptr<AWSSyncConfig_Connection> > connections; + AWSSyncConfig_ACLProfiles acl_profiles; + + map<string, std::shared_ptr<AWSSyncConfig_Profile> > explicit_profiles; + + AWSSyncConfig_S3 s3; + + int init_profile(CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile, + bool connection_must_exist) { + if (!profile.connection_id.empty()) { + if (profile.conn_conf) { + ldout(cct, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl; + return -EINVAL; + } + if (connections.find(profile.connection_id) == connections.end()) { + ldout(cct, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl; + return -EINVAL; + } + profile.conn_conf = connections[profile.connection_id]; + } else if (!profile.conn_conf) { + profile.connection_id = default_profile.connection_id; + auto i = connections.find(profile.connection_id); + if (i != connections.end()) { + profile.conn_conf = i->second; + } + } + + if (connection_must_exist && !profile.conn_conf) { + ldout(cct, 0) << "ERROR: remote connection undefined for sync profile" << dendl; + return -EINVAL; + } + + if (profile.conn_conf && default_profile.conn_conf) { + if (!profile.conn_conf->has_endpoint) { + profile.conn_conf->endpoint = default_profile.conn_conf->endpoint; + } + if (!profile.conn_conf->has_host_style) { + profile.conn_conf->host_style = default_profile.conn_conf->host_style; + } + if (!profile.conn_conf->has_key) { + profile.conn_conf->key = default_profile.conn_conf->key; + } + } + + ACLMappings acl_mappings; + + if (!profile.acls_id.empty()) { + if (!acl_profiles.find(profile.acls_id, &acl_mappings)) { + ldout(cct, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl; + return -EINVAL; + } + profile.acls = acl_profiles.acl_profiles[profile.acls_id]; + } else if (!profile.acls) { + if (default_profile.acls) { + profile.acls = default_profile.acls; + profile.acls_id = default_profile.acls_id; + } + } + + if (profile.target_path.empty()) { + profile.target_path = default_profile.target_path; + } + if (profile.target_path.empty()) { + profile.target_path = default_target_path; + } + + return 0; + } + + int init_target(CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) { + std::shared_ptr<AWSSyncConfig_Profile> profile; + profile.reset(new AWSSyncConfig_Profile); + profile->init(profile_conf); + + int ret = init_profile(cct, profile_conf, *profile, true); + if (ret < 0) { + return ret; + } + + auto& sb = profile->source_bucket; + + if (explicit_profiles.find(sb) != explicit_profiles.end()) { + ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl; + } + + explicit_profiles[sb] = profile; + if (ptarget) { + *ptarget = profile; + } + return 0; + } + + bool do_find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) { + const string& name = bucket.name; + auto iter = explicit_profiles.upper_bound(name); + if (iter == explicit_profiles.begin()) { + return false; + } + + --iter; + if (iter->first.size() > name.size()) { + return false; + } + if (name.compare(0, iter->first.size(), iter->first) != 0) { + return false; + } + + std::shared_ptr<AWSSyncConfig_Profile>& target = iter->second; + + if (!target->prefix && + name.size() != iter->first.size()) { + return false; + } + + *result = target; + return true; + } + + void find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) { + if (!do_find_profile(bucket, result)) { + *result = root_profile; + } + } + + AWSSyncConfig() {} + + int init(CephContext *cct, const JSONFormattable& config) { + auto& default_conf = config["default"]; + + if (config.exists("default")) { + default_profile.init(default_conf); + init_profile(cct, default_conf, default_profile, false); + } + + for (auto& conn : config["connections"].array()) { + auto new_conn = conn; + + std::shared_ptr<AWSSyncConfig_Connection> c{new AWSSyncConfig_Connection}; + c->init(new_conn); + + connections[new_conn["id"]] = c; + } + + acl_profiles.init(config["acl_profiles"]); + + int r = s3.init(cct, config["s3"]); + if (r < 0) { + return r; + } + + auto new_root_conf = config; + + r = init_target(cct, new_root_conf, &root_profile); /* the root profile config */ + if (r < 0) { + return r; + } + + for (auto target_conf : config["profiles"].array()) { + int r = init_target(cct, target_conf, nullptr); + if (r < 0) { + return r; + } + } + + JSONFormatter jf(true); + dump_conf(cct, jf); + stringstream ss; + jf.flush(ss); + + ldout(cct, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl; + + return 0; + } + + void expand_target(RGWDataSyncCtx *sc, const string& sid, const string& path, string *dest) { + apply_meta_param(path, "sid", sid, dest); + + const RGWZoneGroup& zg = sc->env->svc->zone->get_zonegroup(); + apply_meta_param(path, "zonegroup", zg.get_name(), dest); + apply_meta_param(path, "zonegroup_id", zg.get_id(), dest); + + const RGWZone& zone = sc->env->svc->zone->get_zone(); + apply_meta_param(path, "zone", zone.name, dest); + apply_meta_param(path, "zone_id", zone.id, dest); + } + + void update_config(RGWDataSyncCtx *sc, const string& sid) { + expand_target(sc, sid, root_profile->target_path, &root_profile->target_path); + ldout(sc->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl; + for (auto& t : explicit_profiles) { + expand_target(sc, sid, t.second->target_path, &t.second->target_path); + ldout(sc->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl; + } + } + + void dump_conf(CephContext *cct, JSONFormatter& jf) const { + Formatter::ObjectSection config(jf, "config"); + root_profile->dump_conf(cct, jf); + jf.open_array_section("connections"); + for (auto c : connections) { + c.second->dump_conf(cct, jf); + } + jf.close_section(); + + acl_profiles.dump_conf(cct, jf); + + { // targets + Formatter::ArraySection as(jf, "profiles"); + for (auto& t : explicit_profiles) { + Formatter::ObjectSection target_section(jf, "profile"); + encode_json("name", t.first, &jf); + t.second->dump_conf(cct, jf); + } + } + } + + string get_path(std::shared_ptr<AWSSyncConfig_Profile>& profile, + const RGWBucketInfo& bucket_info, + const rgw_obj_key& obj) { + string bucket_str; + string owner; + if (!bucket_info.owner.tenant.empty()) { + bucket_str = owner = bucket_info.owner.tenant + "-"; + owner += bucket_info.owner.id; + } + bucket_str += bucket_info.bucket.name; + + const string& path = profile->target_path; + + string new_path; + apply_meta_param(path, "bucket", bucket_str, &new_path); + apply_meta_param(new_path, "owner", owner, &new_path); + + new_path += string("/") + get_key_oid(obj); + + return new_path; + } + + void get_target(std::shared_ptr<AWSSyncConfig_Profile>& profile, + const RGWBucketInfo& bucket_info, + const rgw_obj_key& obj, + string *bucket_name, + string *obj_name) { + string path = get_path(profile, bucket_info, obj); + size_t pos = path.find('/'); + + *bucket_name = path.substr(0, pos); + *obj_name = path.substr(pos + 1); + } + + void init_conns(RGWDataSyncCtx *sc, const string& id) { + auto sync_env = sc->env; + + update_config(sc, id); + + auto& root_conf = root_profile->conn_conf; + + root_profile->conn.reset(new S3RESTConn(sc->cct, + sync_env->svc->zone, + id, + { root_conf->endpoint }, + root_conf->key, + root_conf->host_style)); + + for (auto i : explicit_profiles) { + auto& c = i.second; + + c->conn.reset(new S3RESTConn(sc->cct, + sync_env->svc->zone, + id, + { c->conn_conf->endpoint }, + c->conn_conf->key, + c->conn_conf->host_style)); + } + } +}; + + +struct AWSSyncInstanceEnv { + AWSSyncConfig conf; + string id; + + explicit AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {} + + void init(RGWDataSyncCtx *sc, uint64_t instance_id) { + char buf[32]; + snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id); + id = buf; + + conf.init_conns(sc, id); + } + + void get_profile(const rgw_bucket& bucket, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) { + conf.find_profile(bucket, ptarget); + ceph_assert(ptarget); + } +}; + +static int do_decode_rest_obj(CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info) +{ + for (auto header : headers) { + const string& val = header.second; + if (header.first == "RGWX_OBJECT_SIZE") { + info->content_len = atoi(val.c_str()); + } else { + info->attrs[header.first] = val; + } + } + + info->acls.set_ctx(cct); + auto aiter = attrs.find(RGW_ATTR_ACL); + if (aiter != attrs.end()) { + bufferlist& bl = aiter->second; + auto bliter = bl.cbegin(); + try { + info->acls.decode(bliter); + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl; + return -EIO; + } + } else { + ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl; + } + + return 0; +} + +class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF +{ + RGWDataSyncCtx *sc; + RGWRESTConn *conn; + rgw::sal::RGWObject* src_obj; + RGWRESTConn::get_obj_params req_params; + + rgw_sync_aws_src_obj_properties src_properties; +public: + RGWRESTStreamGetCRF(CephContext *_cct, + RGWCoroutinesEnv *_env, + RGWCoroutine *_caller, + RGWDataSyncCtx *_sc, + RGWRESTConn *_conn, + rgw::sal::RGWObject* _src_obj, + const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, + _sc->env->http_manager, _src_obj->get_key()), + sc(_sc), conn(_conn), src_obj(_src_obj), + src_properties(_src_properties) { + } + + int init(const DoutPrefixProvider *dpp) override { + /* init input connection */ + + + req_params.get_op = true; + req_params.prepend_metadata = true; + + req_params.unmod_ptr = &src_properties.mtime; + req_params.etag = src_properties.etag; + req_params.mod_zone_id = src_properties.zone_short_id; + req_params.mod_pg_ver = src_properties.pg_ver; + + if (range.is_set) { + req_params.range_is_set = true; + req_params.range_start = range.ofs; + req_params.range_end = range.ofs + range.size - 1; + } + + RGWRESTStreamRWRequest *in_req; + int ret = conn->get_obj(dpp, src_obj, req_params, false /* send */, &in_req); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; + return ret; + } + + set_req(in_req); + + return RGWStreamReadHTTPResourceCRF::init(dpp); + } + + int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override { + map<string, bufferlist> src_attrs; + + ldout(sc->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl; + + if (extra_data.length() > 0) { + JSONParser jp; + if (!jp.parse(extra_data.c_str(), extra_data.length())) { + ldout(sc->cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl; + return -EIO; + } + + JSONDecoder::decode_json("attrs", src_attrs, &jp); + } + return do_decode_rest_obj(sc->cct, src_attrs, headers, &rest_obj); + } + + bool need_extra_data() override { + return true; + } +}; + +static std::set<string> keep_headers = { "CONTENT_TYPE", + "CONTENT_ENCODING", + "CONTENT_DISPOSITION", + "CONTENT_LANGUAGE" }; + +class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF +{ + RGWDataSyncCtx *sc; + rgw_sync_aws_src_obj_properties src_properties; + std::shared_ptr<AWSSyncConfig_Profile> target; + rgw::sal::RGWObject* dest_obj; + string etag; +public: + RGWAWSStreamPutCRF(CephContext *_cct, + RGWCoroutinesEnv *_env, + RGWCoroutine *_caller, + RGWDataSyncCtx *_sc, + const rgw_sync_aws_src_obj_properties& _src_properties, + std::shared_ptr<AWSSyncConfig_Profile>& _target, + rgw::sal::RGWObject* _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sc->env->http_manager), + sc(_sc), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) { + } + + int init() override { + /* init output connection */ + RGWRESTStreamS3PutObj *out_req{nullptr}; + + if (multipart.is_multipart) { + char buf[32]; + snprintf(buf, sizeof(buf), "%d", multipart.part_num); + rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() }, + { "partNumber", buf }, + { nullptr, nullptr } }; + target->conn->put_obj_send_init(dest_obj, params, &out_req); + } else { + target->conn->put_obj_send_init(dest_obj, nullptr, &out_req); + } + + set_req(out_req); + + return RGWStreamWriteHTTPResourceCRF::init(); + } + + static bool keep_attr(const string& h) { + return (keep_headers.find(h) != keep_headers.end() || + boost::algorithm::starts_with(h, "X_AMZ_")); + } + + static void init_send_attrs(CephContext *cct, + const rgw_rest_obj& rest_obj, + const rgw_sync_aws_src_obj_properties& src_properties, + const AWSSyncConfig_Profile *target, + map<string, string> *attrs) { + auto& new_attrs = *attrs; + + new_attrs.clear(); + + for (auto& hi : rest_obj.attrs) { + if (keep_attr(hi.first)) { + new_attrs.insert(hi); + } + } + + auto acl = rest_obj.acls.get_acl(); + + map<int, vector<string> > access_map; + + if (target->acls) { + for (auto& grant : acl.get_grant_map()) { + auto& orig_grantee = grant.first; + auto& perm = grant.second; + + string grantee; + + const auto& am = target->acls->acl_mappings; + + auto iter = am.find(orig_grantee); + if (iter == am.end()) { + ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; + continue; + } + + grantee = iter->second.dest_id; + + string type; + + switch (iter->second.type) { + case ACL_TYPE_CANON_USER: + type = "id"; + break; + case ACL_TYPE_EMAIL_USER: + type = "emailAddress"; + break; + case ACL_TYPE_GROUP: + type = "uri"; + break; + default: + continue; + } + + string tv = type + "=" + grantee; + + int flags = perm.get_permission().get_permissions(); + if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) { + access_map[flags].push_back(tv); + continue; + } + + for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) { + if (flags & i) { + access_map[i].push_back(tv); + } + } + } + } + + for (auto aiter : access_map) { + int grant_type = aiter.first; + + string header_str("x-amz-grant-"); + + switch (grant_type) { + case RGW_PERM_READ: + header_str.append("read"); + break; + case RGW_PERM_WRITE: + header_str.append("write"); + break; + case RGW_PERM_READ_ACP: + header_str.append("read-acp"); + break; + case RGW_PERM_WRITE_ACP: + header_str.append("write-acp"); + break; + case RGW_PERM_FULL_CONTROL: + header_str.append("full-control"); + break; + } + + string s; + + for (auto viter : aiter.second) { + if (!s.empty()) { + s.append(", "); + } + s.append(viter); + } + + ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl; + + new_attrs[header_str] = s; + } + + char buf[32]; + snprintf(buf, sizeof(buf), "%llu", (long long)src_properties.versioned_epoch); + new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf; + + utime_t ut(src_properties.mtime); + snprintf(buf, sizeof(buf), "%lld.%09lld", + (long long)ut.sec(), + (long long)ut.nsec()); + + new_attrs["x-amz-meta-rgwx-source-mtime"] = buf; + new_attrs["x-amz-meta-rgwx-source-etag"] = src_properties.etag; + new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name; + if (!rest_obj.key.instance.empty()) { + new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance; + } + } + + void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override { + RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req); + + map<string, string> new_attrs; + if (!multipart.is_multipart) { + init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs); + } + + r->set_send_length(rest_obj.content_len); + + RGWAccessControlPolicy policy; + + r->send_ready(dpp, target->conn->get_key(), new_attrs, policy, false); + } + + void handle_headers(const map<string, string>& headers) { + for (auto h : headers) { + if (h.first == "ETAG") { + etag = h.second; + } + } + } + + bool get_etag(string *petag) { + if (etag.empty()) { + return false; + } + *petag = etag; + return true; + } +}; + + +class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWRESTConn *source_conn; + std::shared_ptr<AWSSyncConfig_Profile> target; + rgw::sal::RGWObject* src_obj; + rgw::sal::RGWObject* dest_obj; + + rgw_sync_aws_src_obj_properties src_properties; + + std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf; + std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf; + +public: + RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx *_sc, + RGWRESTConn *_source_conn, + rgw::sal::RGWObject* _src_obj, + const rgw_sync_aws_src_obj_properties& _src_properties, + std::shared_ptr<AWSSyncConfig_Profile> _target, + rgw::sal::RGWObject* _dest_obj) : RGWCoroutine(_sc->cct), + sc(_sc), + source_conn(_source_conn), + target(_target), + src_obj(_src_obj), + dest_obj(_dest_obj), + src_properties(_src_properties) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + /* init input */ + in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc, + source_conn, src_obj, + src_properties)); + + /* init output */ + out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc, + src_properties, target, dest_obj)); + + yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + return set_cr_done(); + } + + return 0; + } +}; + +class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWRESTConn *source_conn; + std::shared_ptr<AWSSyncConfig_Profile> target; + rgw::sal::RGWObject* src_obj; + rgw::sal::RGWObject* dest_obj; + + rgw_sync_aws_src_obj_properties src_properties; + + string upload_id; + + rgw_sync_aws_multipart_part_info part_info; + + std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf; + std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf; + + string *petag; + +public: + RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx *_sc, + RGWRESTConn *_source_conn, + rgw::sal::RGWObject* _src_obj, + std::shared_ptr<AWSSyncConfig_Profile>& _target, + rgw::sal::RGWObject* _dest_obj, + const rgw_sync_aws_src_obj_properties& _src_properties, + const string& _upload_id, + const rgw_sync_aws_multipart_part_info& _part_info, + string *_petag) : RGWCoroutine(_sc->cct), + sc(_sc), + source_conn(_source_conn), + target(_target), + src_obj(_src_obj), + dest_obj(_dest_obj), + src_properties(_src_properties), + upload_id(_upload_id), + part_info(_part_info), + petag(_petag) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + /* init input */ + in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc, + source_conn, src_obj, + src_properties)); + + in_crf->set_range(part_info.ofs, part_info.size); + + /* init output */ + out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc, + src_properties, target, dest_obj)); + + out_crf->set_multipart(upload_id, part_info.part_num, part_info.size); + + yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + if (!(static_cast<RGWAWSStreamPutCRF *>(out_crf.get()))->get_etag(petag)) { + ldout(sc->cct, 0) << "ERROR: failed to get etag from PUT request" << dendl; + return set_cr_error(-EIO); + } + + return set_cr_done(); + } + + return 0; + } +}; + +class RGWAWSAbortMultipartCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWRESTConn *dest_conn; + rgw::sal::RGWObject* dest_obj; + + string upload_id; + +public: + RGWAWSAbortMultipartCR(RGWDataSyncCtx *_sc, + RGWRESTConn *_dest_conn, + rgw::sal::RGWObject* _dest_obj, + const string& _upload_id) : RGWCoroutine(_sc->cct), + sc(_sc), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + upload_id(_upload_id) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + + yield { + rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; + bufferlist bl; + call(new RGWDeleteRESTResourceCR(sc->cct, dest_conn, sc->env->http_manager, + obj_to_aws_path(dest_obj), params)); + } + + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl; + return set_cr_error(retcode); + } + + return set_cr_done(); + } + + return 0; + } +}; + +class RGWAWSInitMultipartCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWRESTConn *dest_conn; + rgw::sal::RGWObject* dest_obj; + + uint64_t obj_size; + map<string, string> attrs; + + bufferlist out_bl; + + string *upload_id; + + struct InitMultipartResult { + string bucket; + string key; + string upload_id; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Bucket", bucket, obj); + RGWXMLDecoder::decode_xml("Key", key, obj); + RGWXMLDecoder::decode_xml("UploadId", upload_id, obj); + } + } result; + +public: + RGWAWSInitMultipartCR(RGWDataSyncCtx *_sc, + RGWRESTConn *_dest_conn, + rgw::sal::RGWObject* _dest_obj, + uint64_t _obj_size, + const map<string, string>& _attrs, + string *_upload_id) : RGWCoroutine(_sc->cct), + sc(_sc), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + obj_size(_obj_size), + attrs(_attrs), + upload_id(_upload_id) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + + yield { + rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} }; + bufferlist bl; + call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager, + obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl)); + } + + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; + return set_cr_error(retcode); + } + { + /* + * If one of the following fails we cannot abort upload, as we cannot + * extract the upload id. If one of these fail it's very likely that that's + * the least of our problem. + */ + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + return set_cr_error(-EIO); + } + + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + return set_cr_error(-EIO); + } + + try { + RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl; + return set_cr_error(-EIO); + } + } + + ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl; + + *upload_id = result.upload_id; + + return set_cr_done(); + } + + return 0; + } +}; + +class RGWAWSCompleteMultipartCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWRESTConn *dest_conn; + rgw::sal::RGWObject* dest_obj; + + bufferlist out_bl; + + string upload_id; + + struct CompleteMultipartReq { + map<int, rgw_sync_aws_multipart_part_info> parts; + + explicit CompleteMultipartReq(const map<int, rgw_sync_aws_multipart_part_info>& _parts) : parts(_parts) {} + + void dump_xml(Formatter *f) const { + for (auto p : parts) { + f->open_object_section("Part"); + encode_xml("PartNumber", p.first, f); + encode_xml("ETag", p.second.etag, f); + f->close_section(); + }; + } + } req_enc; + + struct CompleteMultipartResult { + string location; + string bucket; + string key; + string etag; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Location", bucket, obj); + RGWXMLDecoder::decode_xml("Bucket", bucket, obj); + RGWXMLDecoder::decode_xml("Key", key, obj); + RGWXMLDecoder::decode_xml("ETag", etag, obj); + } + } result; + +public: + RGWAWSCompleteMultipartCR(RGWDataSyncCtx *_sc, + RGWRESTConn *_dest_conn, + rgw::sal::RGWObject* _dest_obj, + string _upload_id, + const map<int, rgw_sync_aws_multipart_part_info>& _parts) : RGWCoroutine(_sc->cct), + sc(_sc), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + upload_id(_upload_id), + req_enc(_parts) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + + yield { + rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; + stringstream ss; + XMLFormatter formatter; + + encode_xml("CompleteMultipartUpload", req_enc, &formatter); + + formatter.flush(ss); + + bufferlist bl; + bl.append(ss.str()); + + call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager, + obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl)); + } + + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; + return set_cr_error(retcode); + } + { + /* + * If one of the following fails we cannot abort upload, as we cannot + * extract the upload id. If one of these fail it's very likely that that's + * the least of our problem. + */ + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + return set_cr_error(-EIO); + } + + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + return set_cr_error(-EIO); + } + + try { + RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl; + return set_cr_error(-EIO); + } + } + + ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl; + + return set_cr_done(); + } + + return 0; + } +}; + + +class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWRESTConn *dest_conn; + rgw::sal::RGWObject* dest_obj; + const rgw_raw_obj status_obj; + + string upload_id; + +public: + + RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx *_sc, + RGWRESTConn *_dest_conn, + rgw::sal::RGWObject* _dest_obj, + const rgw_raw_obj& _status_obj, + const string& _upload_id) : RGWCoroutine(_sc->cct), sc(_sc), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + status_obj(_status_obj), + upload_id(_upload_id) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + yield call(new RGWAWSAbortMultipartCR(sc, dest_conn, dest_obj, upload_id)); + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; + /* ignore error, best effort */ + } + yield call(new RGWRadosRemoveCR(sc->env->store, status_obj)); + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl; + /* ignore error, best effort */ + } + return set_cr_done(); + } + + return 0; + } +}; + +class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *sync_env; + AWSSyncConfig& conf; + RGWRESTConn *source_conn; + std::shared_ptr<AWSSyncConfig_Profile> target; + rgw::sal::RGWObject* src_obj; + rgw::sal::RGWObject* dest_obj; + + uint64_t obj_size; + string src_etag; + rgw_sync_aws_src_obj_properties src_properties; + rgw_rest_obj rest_obj; + + rgw_sync_aws_multipart_upload_info status; + + map<string, string> new_attrs; + + rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr}; + + int ret_err{0}; + + rgw_raw_obj status_obj; + +public: + RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx *_sc, + rgw_bucket_sync_pipe& _sync_pipe, + AWSSyncConfig& _conf, + RGWRESTConn *_source_conn, + rgw::sal::RGWObject* _src_obj, + std::shared_ptr<AWSSyncConfig_Profile>& _target, + rgw::sal::RGWObject* _dest_obj, + uint64_t _obj_size, + const rgw_sync_aws_src_obj_properties& _src_properties, + const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sc->cct), + sc(_sc), + sync_env(_sc->env), + conf(_conf), + source_conn(_source_conn), + target(_target), + src_obj(_src_obj), + dest_obj(_dest_obj), + obj_size(_obj_size), + src_properties(_src_properties), + rest_obj(_rest_obj), + status_obj(sync_env->svc->zone->get_zone_params().log_pool, + RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe, sc->source_zone, src_obj)) { + } + + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->async_rados, sync_env->svc->sysobj, + status_obj, &status, false)); + + if (retcode < 0 && retcode != -ENOENT) { + ldpp_dout(dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl; + return retcode; + } + + if (retcode >= 0) { + /* check here that mtime and size did not change */ + + if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size || + status.src_properties.etag != src_properties.etag) { + yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id)); + retcode = -ENOENT; + } + } + + if (retcode == -ENOENT) { + RGWAWSStreamPutCRF::init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs); + + yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + status.obj_size = obj_size; + status.src_properties = src_properties; +#define MULTIPART_MAX_PARTS 10000 + uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS; + status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size); + status.num_parts = (obj_size + status.part_size - 1) / status.part_size; + status.cur_part = 1; + } + + for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) { + yield { + rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part]; + cur_part_info.part_num = status.cur_part; + cur_part_info.ofs = status.cur_ofs; + cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs); + + pcur_part_info = &cur_part_info; + + status.cur_ofs += status.part_size; + + call(new RGWAWSStreamObjToCloudMultipartPartCR(sc, + source_conn, src_obj, + target, + dest_obj, + status.src_properties, + status.upload_id, + cur_part_info, + &cur_part_info.etag)); + } + + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; + ret_err = retcode; + yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id)); + return set_cr_error(ret_err); + } + + yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->async_rados, sync_env->svc->sysobj, status_obj, status)); + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; + /* continue with upload anyway */ + } + ldout(sc->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl; + } + + yield call(new RGWAWSCompleteMultipartCR(sc, target->conn.get(), dest_obj, status.upload_id, status.parts)); + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; + ret_err = retcode; + yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id)); + return set_cr_error(ret_err); + } + + /* remove status obj */ + yield call(new RGWRadosRemoveCR(sync_env->store, status_obj)); + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; + /* ignore error, best effort */ + } + return set_cr_done(); + } + + return 0; + } +}; +template <class T> +int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result, T def_val) +{ + map<string, bufferlist>::iterator iter = attrs.find(attr_name); + if (iter == attrs.end()) { + *result = def_val; + return 0; + } + bufferlist& bl = iter->second; + if (bl.length() == 0) { + *result = def_val; + return 0; + } + auto bliter = bl.cbegin(); + try { + decode(*result, bliter); + } catch (buffer::error& err) { + return -EIO; + } + return 0; +} + +// maybe use Fetch Remote Obj instead? +class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { + rgw_bucket_sync_pipe sync_pipe; + AWSSyncInstanceEnv& instance; + + uint64_t versioned_epoch{0}; + + RGWRESTConn *source_conn{nullptr}; + std::shared_ptr<AWSSyncConfig_Profile> target; + bufferlist res; + unordered_map <string, bool> bucket_created; + string target_bucket_name; + string target_obj_name; + rgw_rest_obj rest_obj; + int ret{0}; + + uint32_t src_zone_short_id{0}; + uint64_t src_pg_ver{0}; + + bufferlist out_bl; + + struct CreateBucketResult { + string code; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Code", code, obj); + } + } result; + +public: + RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, + rgw_bucket_sync_pipe& _sync_pipe, + rgw_obj_key& _key, + AWSSyncInstanceEnv& _instance, + uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key), + sync_pipe(_sync_pipe), + instance(_instance), versioned_epoch(_versioned_epoch) + {} + + ~RGWAWSHandleRemoteObjCBCR(){ + } + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0); + if (ret < 0) { + ldout(sc->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl; + } else { + ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0); + if (ret < 0) { + ldout(sc->cct, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl; + src_pg_ver = 0; /* all or nothing */ + } + } + ldpp_dout(dpp, 4) << "AWS: download begin: z=" << sc->source_zone + << " b=" << src_bucket << " k=" << key << " size=" << size + << " mtime=" << mtime << " etag=" << etag + << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver + << dendl; + + source_conn = sync_env->svc->zone->get_zone_conn(sc->source_zone); + if (!source_conn) { + ldout(sc->cct, 0) << "ERROR: cannot find http connection to zone " << sc->source_zone << dendl; + return set_cr_error(-EINVAL); + } + + instance.get_profile(sync_pipe.info.source_bs.bucket, &target); + instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name); + + if (bucket_created.find(target_bucket_name) == bucket_created.end()){ + yield { + ldout(sc->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl; + bufferlist bl; + call(new RGWPutRawRESTResourceCR <bufferlist> (sc->cct, target->conn.get(), + sync_env->http_manager, + target_bucket_name, nullptr, bl, &out_bl)); + } + if (retcode < 0 ) { + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + return set_cr_error(retcode); + } + + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + return set_cr_error(retcode); + } + + try { + RGWXMLDecoder::decode_xml("Error", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl; + return set_cr_error(retcode); + } + + if (result.code != "BucketAlreadyOwnedByYou") { + return set_cr_error(retcode); + } + } + + bucket_created[target_bucket_name] = true; + } + + yield { + rgw::sal::RGWRadosBucket bucket(sync_env->store, src_bucket); + rgw::sal::RGWRadosObject src_obj(sync_env->store, key, &bucket); + + /* init output */ + rgw_bucket target_bucket; + target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for + uri resolution */ + rgw::sal::RGWRadosBucket dest_bucket(sync_env->store, target_bucket); + rgw::sal::RGWRadosObject dest_obj(sync_env->store, rgw_obj_key(target_obj_name), &dest_bucket); + + + rgw_sync_aws_src_obj_properties src_properties; + src_properties.mtime = mtime; + src_properties.etag = etag; + src_properties.zone_short_id = src_zone_short_id; + src_properties.pg_ver = src_pg_ver; + src_properties.versioned_epoch = versioned_epoch; + + if (size < instance.conf.s3.multipart_sync_threshold) { + call(new RGWAWSStreamObjToCloudPlainCR(sc, source_conn, &src_obj, + src_properties, + target, + &dest_obj)); + } else { + rgw_rest_obj rest_obj; + rest_obj.init(key); + if (do_decode_rest_obj(sc->cct, attrs, headers, &rest_obj)) { + ldout(sc->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl; + return set_cr_error(-EINVAL); + } + call(new RGWAWSStreamObjToCloudMultipartCR(sc, sync_pipe, instance.conf, source_conn, &src_obj, + target, &dest_obj, size, src_properties, rest_obj)); + } + } + if (retcode < 0) { + return set_cr_error(retcode); + } + + return set_cr_done(); + } + + return 0; + } +}; + +class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { + rgw_bucket_sync_pipe sync_pipe; + AWSSyncInstanceEnv& instance; + uint64_t versioned_epoch; +public: + RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc, + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, + AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key), + sync_pipe(_sync_pipe), + instance(_instance), versioned_epoch(_versioned_epoch) { + } + + ~RGWAWSHandleRemoteObjCR() {} + + RGWStatRemoteObjCBCR *allocate_callback() override { + return new RGWAWSHandleRemoteObjCBCR(sc, sync_pipe, key, instance, versioned_epoch); + } +}; + +class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + std::shared_ptr<AWSSyncConfig_Profile> target; + rgw_bucket_sync_pipe sync_pipe; + rgw_obj_key key; + ceph::real_time mtime; + AWSSyncInstanceEnv& instance; + int ret{0}; +public: + RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc, + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, + AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sc->cct), sc(_sc), + sync_pipe(_sync_pipe), key(_key), + mtime(_mtime), instance(_instance) {} + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + ldout(sc->cct, 0) << ": remove remote obj: z=" << sc->source_zone + << " b=" <<sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl; + yield { + instance.get_profile(sync_pipe.info.source_bs.bucket, &target); + string path = instance.conf.get_path(target, sync_pipe.dest_bucket_info, key); + ldout(sc->cct, 0) << "AWS: removing aws object at" << path << dendl; + + call(new RGWDeleteRESTResourceCR(sc->cct, target->conn.get(), + sc->env->http_manager, + path, nullptr /* params */)); + } + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; + } + +}; + + +class RGWAWSDataSyncModule: public RGWDataSyncModule { + CephContext *cct; + AWSSyncInstanceEnv instance; +public: + RGWAWSDataSyncModule(CephContext *_cct, AWSSyncConfig& _conf) : + cct(_cct), + instance(_conf) { + } + + void init(RGWDataSyncCtx *sc, uint64_t instance_id) override { + instance.init(sc, instance_id); + } + + ~RGWAWSDataSyncModule() {} + + RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, + std::optional<uint64_t> versioned_epoch, + rgw_zone_set *zones_trace) override { + ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0)); + } + RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, + rgw_zone_set *zones_trace) override { + ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance); + } + RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, + rgw_zone_set *zones_trace) override { + ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime + << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + return NULL; + } +}; + +class RGWAWSSyncModuleInstance : public RGWSyncModuleInstance { + RGWAWSDataSyncModule data_handler; +public: + RGWAWSSyncModuleInstance(CephContext *cct, AWSSyncConfig& _conf) : data_handler(cct, _conf) {} + RGWDataSyncModule *get_data_handler() override { + return &data_handler; + } +}; + +int RGWAWSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){ + AWSSyncConfig conf; + + int r = conf.init(cct, config); + if (r < 0) { + return r; + } + + instance->reset(new RGWAWSSyncModuleInstance(cct, conf)); + return 0; +} |